1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00
pyTelegramBotAPI/telebot/asyncio_handler_backends.py

220 lines
5.6 KiB
Python
Raw Normal View History

2021-11-20 13:47:55 +03:00
import os
import pickle
class StateMemory:
def __init__(self):
self._states = {}
async def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
if chat_id in self._states:
self._states[chat_id]['state'] = state
else:
self._states[chat_id] = {'state': state,'data': {}}
async def current_state(self, chat_id):
"""Current state"""
if chat_id in self._states: return self._states[chat_id]['state']
else: return False
async def delete_state(self, chat_id):
"""Delete a state"""
self._states.pop(chat_id)
def _get_data(self, chat_id):
return self._states[chat_id]['data']
async def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
await self.add_state(chat_id,new_state)
async def _add_data(self, chat_id, key, value):
result = self._states[chat_id]['data'][key] = value
return result
async def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
await self.delete_state(chat_id)
def retrieve_data(self, chat_id):
"""
Save input text.
Usage:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateContext(self, chat_id)
class StateFile:
"""
Class to save states in a file.
"""
def __init__(self, filename):
self.file_path = filename
async def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
states_data = self._read_data()
if chat_id in states_data:
states_data[chat_id]['state'] = state
return await self._save_data(states_data)
else:
new_data = states_data[chat_id] = {'state': state,'data': {}}
return await self._save_data(states_data)
async def current_state(self, chat_id):
"""Current state."""
states_data = self._read_data()
if chat_id in states_data: return states_data[chat_id]['state']
else: return False
async def delete_state(self, chat_id):
"""Delete a state"""
2021-11-27 21:41:39 +03:00
states_data = self._read_data()
2021-11-20 13:47:55 +03:00
states_data.pop(chat_id)
await self._save_data(states_data)
2021-11-27 21:41:39 +03:00
def _read_data(self):
2021-11-20 13:47:55 +03:00
"""
Read the data from file.
"""
file = open(self.file_path, 'rb')
states_data = pickle.load(file)
file.close()
return states_data
2021-11-27 21:41:39 +03:00
def _create_dir(self):
2021-11-20 13:47:55 +03:00
"""
Create directory .save-handlers.
"""
dirs = self.file_path.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
if not os.path.isfile(self.file_path):
with open(self.file_path,'wb') as file:
pickle.dump({}, file)
async def _save_data(self, new_data):
"""
Save data after editing.
:param new_data:
"""
with open(self.file_path, 'wb+') as state_file:
pickle.dump(new_data, state_file, protocol=pickle.HIGHEST_PROTOCOL)
return True
def _get_data(self, chat_id):
return self._read_data()[chat_id]['data']
async def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
await self.add_state(chat_id,new_state)
async def _add_data(self, chat_id, key, value):
states_data = self._read_data()
result = states_data[chat_id]['data'][key] = value
await self._save_data(result)
return result
async def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
await self.delete_state(chat_id)
2021-11-27 21:41:39 +03:00
def retrieve_data(self, chat_id):
2021-11-20 13:47:55 +03:00
"""
Save input text.
Usage:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateFileContext(self, chat_id)
class StateContext:
"""
Class for data.
"""
def __init__(self , obj: StateMemory, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = obj._get_data(chat_id)
async def __aenter__(self):
return self.data
async def __aexit__(self, exc_type, exc_val, exc_tb):
return
class StateFileContext:
"""
Class for data.
"""
def __init__(self , obj: StateFile, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = None
async def __aenter__(self):
self.data = self.obj._get_data(self.chat_id)
return self.data
async def __aexit__(self, exc_type, exc_val, exc_tb):
2021-11-27 21:41:39 +03:00
old_data = self.obj._read_data()
2021-11-20 13:47:55 +03:00
for i in self.data:
old_data[self.chat_id]['data'][i] = self.data.get(i)
await self.obj._save_data(old_data)
return
2021-11-27 17:04:03 +03:00
class BaseMiddleware:
"""
Base class for middleware.
Your middlewares should be inherited from this class.
"""
def __init__(self):
pass
async def pre_process(self, message, data):
raise NotImplementedError
async def post_process(self, message, data, exception):
raise NotImplementedError