import os import pickle import threading from telebot import apihelper class HandlerBackend(object): """ Class for saving (next step|reply) handlers """ def __init__(self, handlers=None): if handlers is None: handlers = {} self.handlers = handlers def register_handler(self, handler_group_id, handler): raise NotImplementedError() def clear_handlers(self, handler_group_id): raise NotImplementedError() def get_handlers(self, handler_group_id): raise NotImplementedError() class MemoryHandlerBackend(HandlerBackend): def register_handler(self, handler_group_id, handler): if handler_group_id in self.handlers: self.handlers[handler_group_id].append(handler) else: self.handlers[handler_group_id] = [handler] def clear_handlers(self, handler_group_id): self.handlers.pop(handler_group_id, None) def get_handlers(self, handler_group_id): return self.handlers.pop(handler_group_id, None) def load_handlers(self, filename, del_file_after_loading): raise NotImplementedError() class FileHandlerBackend(HandlerBackend): def __init__(self, handlers=None, filename='./.handler-saves/handlers.save', delay=120): super(FileHandlerBackend, self).__init__(handlers) self.filename = filename self.delay = delay self.timer = threading.Timer(delay, self.save_handlers) def register_handler(self, handler_group_id, handler): if handler_group_id in self.handlers: self.handlers[handler_group_id].append(handler) else: self.handlers[handler_group_id] = [handler] self.start_save_timer() def clear_handlers(self, handler_group_id): self.handlers.pop(handler_group_id, None) self.start_save_timer() def get_handlers(self, handler_group_id): handlers = self.handlers.pop(handler_group_id, None) self.start_save_timer() return handlers def start_save_timer(self): if not self.timer.is_alive(): if self.delay <= 0: self.save_handlers() else: self.timer = threading.Timer(self.delay, self.save_handlers) self.timer.start() def save_handlers(self): self.dump_handlers(self.handlers, self.filename) def load_handlers(self, filename=None, del_file_after_loading=True): if not filename: filename = self.filename tmp = self.return_load_handlers(filename, del_file_after_loading=del_file_after_loading) if tmp is not None: self.handlers.update(tmp) @staticmethod def dump_handlers(handlers, filename, file_mode="wb"): dirs = filename.rsplit('/', maxsplit=1)[0] os.makedirs(dirs, exist_ok=True) with open(filename + ".tmp", file_mode) as file: if (apihelper.CUSTOM_SERIALIZER is None): pickle.dump(handlers, file) else: apihelper.CUSTOM_SERIALIZER.dump(handlers, file) if os.path.isfile(filename): os.remove(filename) os.rename(filename + ".tmp", filename) @staticmethod def return_load_handlers(filename, del_file_after_loading=True): if os.path.isfile(filename) and os.path.getsize(filename) > 0: with open(filename, "rb") as file: if (apihelper.CUSTOM_SERIALIZER is None): handlers = pickle.load(file) else: handlers = apihelper.CUSTOM_SERIALIZER.load(file) if del_file_after_loading: os.remove(filename) return handlers class RedisHandlerBackend(HandlerBackend): def __init__(self, handlers=None, host='localhost', port=6379, db=0, prefix='telebot', password=None): super(RedisHandlerBackend, self).__init__(handlers) from redis import Redis self.prefix = prefix self.redis = Redis(host, port, db, password) def _key(self, handle_group_id): return ':'.join((self.prefix, str(handle_group_id))) def register_handler(self, handler_group_id, handler): handlers = [] value = self.redis.get(self._key(handler_group_id)) if value: handlers = pickle.loads(value) handlers.append(handler) self.redis.set(self._key(handler_group_id), pickle.dumps(handlers)) def clear_handlers(self, handler_group_id): self.redis.delete(self._key(handler_group_id)) def get_handlers(self, handler_group_id): handlers = None value = self.redis.get(self._key(handler_group_id)) if value: handlers = pickle.loads(value) self.clear_handlers(handler_group_id) return handlers class StateMachine: def __init__(self): self._states = {} def add_state(self, chat_id, state): """ Add a state. :param chat_id: :param state: new state """ if chat_id in self._states: self._states[int(chat_id)]['state'] = state else: self._states[chat_id] = {'state': state,'data': {}} def current_state(self, chat_id): """Current state""" if chat_id in self._states: return self._states[chat_id]['state'] else: return False def delete_state(self, chat_id): """Delete a state""" return self._states.pop(chat_id) def _get_data(self, chat_id): return self._states[chat_id]['data'] class State: """ Base class for state managing. """ def __init__(self, obj: StateMachine) -> None: self.obj = obj def set(self, chat_id, new_state): """ Set a new state for a user. :param chat_id: :param new_state: new_state of a user """ self.obj._states[chat_id]['state'] = new_state def finish(self, chat_id): """ Finish(delete) state of a user. :param chat_id: """ self.obj._states.pop(chat_id) def retrieve_data(self, chat_id): """ Save input text. Usage: with state.retrieve_data(message.chat.id) as data: data['name'] = message.text Also, at the end of your 'Form' you can get the name: data['name'] """ return StateContext(self.obj, chat_id) class StateContext: """ Class for data. """ def __init__(self , obj: StateMachine, chat_id) -> None: self.obj = obj self.chat_id = chat_id self.data = obj._get_data(chat_id) def __enter__(self): return self.data def __exit__(self, exc_type, exc_val, exc_tb): return