From da639dd1f66adf1e2c4b6fcf41f2b44418fffb92 Mon Sep 17 00:00:00 2001 From: Badiboy Date: Sat, 17 Sep 2022 11:57:12 +0300 Subject: [PATCH 1/4] Handlers and Middlewares processing union Call for handlers now union in a single function for future extension. Plus minor fixes in storages. --- telebot/__init__.py | 172 ++++++++++++++++-------------- telebot/ext/sync/webhooks.py | 18 ++-- telebot/storage/base_storage.py | 5 +- telebot/storage/memory_storage.py | 1 + telebot/storage/pickle_storage.py | 2 +- telebot/storage/redis_storage.py | 1 + 6 files changed, 106 insertions(+), 93 deletions(-) diff --git a/telebot/__init__.py b/telebot/__init__.py index 026d32e..2e8caa9 100644 --- a/telebot/__init__.py +++ b/telebot/__init__.py @@ -469,6 +469,7 @@ class TeleBot: webhook_url = "{}://{}:{}/{}".format(protocol, listen, port, url_path) if certificate and certificate_key: + # noinspection PyTypeChecker ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(certificate, certificate_key) @@ -5875,7 +5876,7 @@ class TeleBot: return False # middleware check-up method - def _check_middleware(self, update_type): + def _check_middlewares(self, update_type): """ Check middleware @@ -5889,100 +5890,111 @@ class TeleBot: def _run_middlewares_and_handler(self, message, handlers, middlewares, update_type): """ - This class is made to run handler and middleware in queue. + This class is made to run handlers and middlewares in queue. - :param handler: handler that should be executed. - :param middleware: middleware that should be executed. + :param message: received message (update part) to process with handlers and/or middlewares + :param handlers: all created handlers (not filtered) + :param middlewares: middlewares that should be executed (already filtered) + :param update_type: handler/update type (Update field name) :return: """ - data = {} - params =[] - handler_error = None - skip_handlers = False - if middlewares: - for middleware in middlewares: - if middleware.update_sensitive: - if hasattr(middleware, f'pre_process_{update_type}'): - result = getattr(middleware, f'pre_process_{update_type}')(message, data) - else: - logger.error('Middleware {} does not have pre_process_{} method. pre_process function execution was skipped.'.format(middleware.__class__.__name__, update_type)) - result = None - else: - result = middleware.pre_process(message, data) - # We will break this loop if CancelUpdate is returned - # Also, we will not run other middlewares - if isinstance(result, CancelUpdate): - return - elif isinstance(result, SkipHandler): - skip_handlers = True - - if handlers and not(skip_handlers): - try: - for handler in handlers: - process_handler = self._test_message_handler(handler, message) - if not process_handler: continue - for i in inspect.signature(handler['function']).parameters: - params.append(i) - if len(params) == 1: - handler['function'](message) - elif "data" in params: - if len(params) == 2: - handler['function'](message, data) - elif len(params) == 3: - handler['function'](message, data=data, bot=self) - else: - logger.error("It is not allowed to pass data and values inside data to the handler. Check your handler: {}".format(handler['function'])) - return - else: - data_copy = data.copy() - for key in list(data_copy): - # remove data from data_copy if handler does not accept it - if key not in params: - del data_copy[key] - if handler.get('pass_bot'): - data_copy["bot"] = self - if len(data_copy) > len(params) - 1: # remove the message parameter - logger.error("You are passing more parameters than the handler needs. Check your handler: {}".format(handler['function'])) - return - handler["function"](message, **data_copy) + if not self.use_class_middlewares: + for message_handler in handlers: + if self._test_message_handler(message_handler, message): + self._exec_task(message_handler['function'], message, pass_bot=message_handler['pass_bot'], task_type='handler') break - except Exception as e: - handler_error = e - if self.exception_handler: - self.exception_handler.handle(e) - else: - logging.error(str(e)) - logger.debug("Exception traceback:\n%s", traceback.format_exc()) + else: + data = {} + params =[] + handler_error = None + skip_handlers = False - if middlewares: - for middleware in middlewares: - if middleware.update_sensitive: - if hasattr(middleware, f'post_process_{update_type}'): - getattr(middleware, f'post_process_{update_type}')(message, data, handler_error) + if middlewares: + for middleware in middlewares: + if middleware.update_sensitive: + if hasattr(middleware, f'pre_process_{update_type}'): + result = getattr(middleware, f'pre_process_{update_type}')(message, data) + else: + logger.error('Middleware {} does not have pre_process_{} method. pre_process function execution was skipped.'.format(middleware.__class__.__name__, update_type)) + result = None else: - logger.error("Middleware: {} does not have post_process_{} method. Post process function was not executed.".format(middleware.__class__.__name__, update_type)) - else: - middleware.post_process(message, data, handler_error) + result = middleware.pre_process(message, data) + # We will break this loop if CancelUpdate is returned + # Also, we will not run other middlewares + if isinstance(result, CancelUpdate): + return + elif isinstance(result, SkipHandler): + skip_handlers = True + + if handlers and not(skip_handlers): + try: + for handler in handlers: + process_handler = self._test_message_handler(handler, message) + if not process_handler: continue + for i in inspect.signature(handler['function']).parameters: + params.append(i) + if len(params) == 1: + handler['function'](message) + elif "data" in params: + if len(params) == 2: + handler['function'](message, data) + elif len(params) == 3: + handler['function'](message, data=data, bot=self) + else: + logger.error("It is not allowed to pass data and values inside data to the handler. Check your handler: {}".format(handler['function'])) + return + else: + data_copy = data.copy() + for key in list(data_copy): + # remove data from data_copy if handler does not accept it + if key not in params: + del data_copy[key] + if handler.get('pass_bot'): + data_copy["bot"] = self + if len(data_copy) > len(params) - 1: # remove the message parameter + logger.error("You are passing more parameters than the handler needs. Check your handler: {}".format(handler['function'])) + return + handler["function"](message, **data_copy) + break + except Exception as e: + handler_error = e + if self.exception_handler: + self.exception_handler.handle(e) + else: + logging.error(str(e)) + logger.debug("Exception traceback:\n%s", traceback.format_exc()) + + if middlewares: + for middleware in middlewares: + if middleware.update_sensitive: + if hasattr(middleware, f'post_process_{update_type}'): + getattr(middleware, f'post_process_{update_type}')(message, data, handler_error) + else: + logger.error("Middleware: {} does not have post_process_{} method. Post process function was not executed.".format(middleware.__class__.__name__, update_type)) + else: + middleware.post_process(message, data, handler_error) def _notify_command_handlers(self, handlers, new_messages, update_type): """ Notifies command handlers. - :param handlers: - :param new_messages: + :param handlers: all created handlers + :param new_messages: received messages to proceed + :param update_type: handler/update type (Update fields) :return: """ if not(handlers) and not(self.use_class_middlewares): return + if self.use_class_middlewares: + middlewares = self._check_middlewares(update_type) + else: + middlewares = None for message in new_messages: - if not self.use_class_middlewares: - for message_handler in handlers: - if self._test_message_handler(message_handler, message): - self._exec_task(message_handler['function'], message, pass_bot=message_handler['pass_bot'], task_type='handler') - break - else: - middleware = self._check_middleware(update_type) - self._exec_task(self._run_middlewares_and_handler, message, handlers=handlers, middlewares=middleware, update_type=update_type) - return + self._exec_task( + self._run_middlewares_and_handler, + message, + handlers=handlers, + middlewares=middlewares, + update_type=update_type) diff --git a/telebot/ext/sync/webhooks.py b/telebot/ext/sync/webhooks.py index 6e1714b..a73c16c 100644 --- a/telebot/ext/sync/webhooks.py +++ b/telebot/ext/sync/webhooks.py @@ -1,6 +1,5 @@ """ This file is used by TeleBot.run_webhooks() function. - Fastapi is required to run this script. """ @@ -15,15 +14,11 @@ try: except ImportError: fastapi_installed = False - from telebot.types import Update - from typing import Optional - - class SyncWebhookListener: def __init__(self, bot, secret_token: str, host: Optional[str]="127.0.0.1", @@ -33,13 +28,13 @@ class SyncWebhookListener: debug: Optional[bool]=False ) -> None: """ - Aynchronous implementation of webhook listener - for asynchronous version of telebot. + Synchronous implementation of webhook listener + for synchronous version of telebot. Not supposed to be used manually by user. - Use AsyncTeleBot.run_webhooks() instead. + Use TeleBot.run_webhooks() instead. - :param bot: AsyncTeleBot instance. - :type bot: telebot.async_telebot.AsyncTeleBot + :param bot: TeleBot instance. + :type bot: telebot.TeleBot :param secret_token: Telegram secret token :type secret_token: str @@ -77,7 +72,8 @@ class SyncWebhookListener: self._prepare_endpoint_urls() - def _check_dependencies(self): + @staticmethod + def _check_dependencies(): if not fastapi_installed: raise ImportError('Fastapi or uvicorn is not installed. Please install it via pip.') diff --git a/telebot/storage/base_storage.py b/telebot/storage/base_storage.py index bafd9a1..92b31ba 100644 --- a/telebot/storage/base_storage.py +++ b/telebot/storage/base_storage.py @@ -41,7 +41,10 @@ class StateStorageBase: def get_state(self, chat_id, user_id): raise NotImplementedError - + + def get_interactive_data(self, chat_id, user_id): + raise NotImplementedError + def save(self, chat_id, user_id, data): raise NotImplementedError diff --git a/telebot/storage/memory_storage.py b/telebot/storage/memory_storage.py index 67cd984..7d71c7c 100644 --- a/telebot/storage/memory_storage.py +++ b/telebot/storage/memory_storage.py @@ -3,6 +3,7 @@ from telebot.storage.base_storage import StateStorageBase, StateContext class StateMemoryStorage(StateStorageBase): def __init__(self) -> None: + super().__init__() self.data = {} # # {chat_id: {user_id: {'state': None, 'data': {}}, ...}, ...} diff --git a/telebot/storage/pickle_storage.py b/telebot/storage/pickle_storage.py index ff72ac3..dfffcf8 100644 --- a/telebot/storage/pickle_storage.py +++ b/telebot/storage/pickle_storage.py @@ -5,8 +5,8 @@ import pickle class StatePickleStorage(StateStorageBase): - # noinspection PyMissingConstructor def __init__(self, file_path="./.state-save/states.pkl") -> None: + super().__init__() self.file_path = file_path self.create_dir() self.data = self.read() diff --git a/telebot/storage/redis_storage.py b/telebot/storage/redis_storage.py index 2a5fb4a..a104948 100644 --- a/telebot/storage/redis_storage.py +++ b/telebot/storage/redis_storage.py @@ -16,6 +16,7 @@ class StateRedisStorage(StateStorageBase): TeleBot(storage=StateRedisStorage()) """ def __init__(self, host='localhost', port=6379, db=0, password=None, prefix='telebot_'): + super().__init__() self.redis = ConnectionPool(host=host, port=port, db=db, password=password) #self.con = Redis(connection_pool=self.redis) -> use this when necessary # From 598de25b6dded3b71423242663c993951e5c13cb Mon Sep 17 00:00:00 2001 From: Badiboy Date: Sat, 17 Sep 2022 12:55:55 +0300 Subject: [PATCH 2/4] Rename _check_middlewares to _get_middlewares --- telebot/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telebot/__init__.py b/telebot/__init__.py index 2e8caa9..7a57766 100644 --- a/telebot/__init__.py +++ b/telebot/__init__.py @@ -5876,7 +5876,7 @@ class TeleBot: return False # middleware check-up method - def _check_middlewares(self, update_type): + def _get_middlewares(self, update_type): """ Check middleware @@ -5988,7 +5988,7 @@ class TeleBot: return if self.use_class_middlewares: - middlewares = self._check_middlewares(update_type) + middlewares = self._get_middlewares(update_type) else: middlewares = None for message in new_messages: From e7a96ec2edd41409e7ebe9036fc2931da97fbac4 Mon Sep 17 00:00:00 2001 From: Badiboy Date: Sat, 17 Sep 2022 14:09:05 +0300 Subject: [PATCH 3/4] Rename also in Async --- telebot/async_telebot.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/telebot/async_telebot.py b/telebot/async_telebot.py index 68e1768..784479d 100644 --- a/telebot/async_telebot.py +++ b/telebot/async_telebot.py @@ -339,7 +339,8 @@ class AsyncTeleBot: await self.close_session() logger.warning('Polling is stopped.') - def _loop_create_task(self, coro): + @staticmethod + def _loop_create_task(coro): return asyncio.create_task(coro) async def _process_updates(self, handlers, messages, update_type): @@ -351,12 +352,12 @@ class AsyncTeleBot: :return: """ tasks = [] + middlewares = await self._get_middlewares(update_type) for message in messages: - middleware = await self.process_middlewares(update_type) - tasks.append(self._run_middlewares_and_handlers(handlers, message, middleware, update_type)) + tasks.append(self._run_middlewares_and_handlers(message, handlers, middlewares, update_type)) await asyncio.gather(*tasks) - async def _run_middlewares_and_handlers(self, handlers, message, middlewares, update_type): + async def _run_middlewares_and_handlers(self, message, handlers, middlewares, update_type): handler_error = None data = {} skip_handlers = False @@ -426,7 +427,7 @@ class AsyncTeleBot: else: logger.error('Middleware {} does not have post_process_{} method. post_process function execution was skipped.'.format(middleware.__class__.__name__, update_type)) else: await middleware.post_process(message, data, handler_error) - # update handling + async def process_new_updates(self, updates: List[types.Update]): """ Process new updates. @@ -615,7 +616,7 @@ class AsyncTeleBot: """ await self._process_updates(self.chat_join_request_handlers, chat_join_request, 'chat_join_request') - async def process_middlewares(self, update_type): + async def _get_middlewares(self, update_type): """ :meta private: """ From 52e09637c2f360d7f99d5599be1c92e8b515b21b Mon Sep 17 00:00:00 2001 From: Badiboy Date: Sat, 17 Sep 2022 23:17:07 +0300 Subject: [PATCH 4/4] Fix: do not call handler in one more task --- telebot/__init__.py | 31 ++++++++++++++++++------------- telebot/async_telebot.py | 10 ++++++++++ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/telebot/__init__.py b/telebot/__init__.py index 7a57766..539bf7e 100644 --- a/telebot/__init__.py +++ b/telebot/__init__.py @@ -1088,11 +1088,6 @@ class TeleBot: def _exec_task(self, task, *args, **kwargs): - if kwargs: - if kwargs.pop('task_type', "") == 'handler': - if kwargs.pop('pass_bot', False): - kwargs['bot'] = self - if self.threaded: self.worker_pool.put(task, *args, **kwargs) else: @@ -4791,8 +4786,14 @@ class TeleBot: if not isinstance(regexp, str): logger.error(f"{method_name}: Regexp filter should be string. Not able to use the supplied type.") - def message_handler(self, commands: Optional[List[str]]=None, regexp: Optional[str]=None, func: Optional[Callable]=None, - content_types: Optional[List[str]]=None, chat_types: Optional[List[str]]=None, **kwargs): + def message_handler( + self, + commands: Optional[List[str]]=None, + regexp: Optional[str]=None, + func: Optional[Callable]=None, + content_types: Optional[List[str]]=None, + chat_types: Optional[List[str]]=None, + **kwargs): """ Handles New incoming message of any kind - text, photo, sticker, etc. As a parameter to the decorator function, it passes :class:`telebot.types.Message` object. @@ -5890,7 +5891,7 @@ class TeleBot: def _run_middlewares_and_handler(self, message, handlers, middlewares, update_type): """ - This class is made to run handlers and middlewares in queue. + This method is made to run handlers and middlewares in queue. :param message: received message (update part) to process with handlers and/or middlewares :param handlers: all created handlers (not filtered) @@ -5900,10 +5901,14 @@ class TeleBot: """ if not self.use_class_middlewares: - for message_handler in handlers: - if self._test_message_handler(message_handler, message): - self._exec_task(message_handler['function'], message, pass_bot=message_handler['pass_bot'], task_type='handler') - break + if handlers: + for handler in handlers: + if self._test_message_handler(handler, message): + if handler.get('pass_bot', False): + handler['function'](message, bot = self) + else: + handler['function'](message) + break else: data = {} params =[] @@ -5962,7 +5967,7 @@ class TeleBot: if self.exception_handler: self.exception_handler.handle(e) else: - logging.error(str(e)) + logger.error(str(e)) logger.debug("Exception traceback:\n%s", traceback.format_exc()) if middlewares: diff --git a/telebot/async_telebot.py b/telebot/async_telebot.py index 784479d..abdaa45 100644 --- a/telebot/async_telebot.py +++ b/telebot/async_telebot.py @@ -358,6 +358,16 @@ class AsyncTeleBot: await asyncio.gather(*tasks) async def _run_middlewares_and_handlers(self, message, handlers, middlewares, update_type): + """ + This method is made to run handlers and middlewares in queue. + + :param message: received message (update part) to process with handlers and/or middlewares + :param handlers: all created handlers (not filtered) + :param middlewares: middlewares that should be executed (already filtered) + :param update_type: handler/update type (Update field name) + :return: + """ + handler_error = None data = {} skip_handlers = False