1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

CallbackData class added

This commit is contained in:
abdullaev
2021-11-23 18:01:51 +05:00
parent 98d63d235f
commit 714ae7d67f
2 changed files with 186 additions and 0 deletions

100
telebot/callback_data.py Normal file
View File

@@ -0,0 +1,100 @@
import typing
class CallbackDataFilter:
def __init__(self, factory, config: typing.Dict[str, str]):
self.config = config
self.factory = factory
def check(self, query):
try:
data = self.factory.parse(query.data)
except ValueError:
return False
for key, value in self.config.items():
if isinstance(value, (list, tuple, set, frozenset)):
if data.get(key) not in value:
return False
elif data.get(key) != value:
return False
return True
class CallbackData:
"""
Callback data factory
"""
def __init__(self, *parts, prefix: str, sep=':'):
if not isinstance(prefix, str):
raise TypeError(f'Prefix must be instance of str not {type(prefix).__name__}')
if not prefix:
raise ValueError("Prefix can't be empty")
if sep in prefix:
raise ValueError(f"Separator {sep!r} can't be used in prefix")
self.prefix = prefix
self.sep = sep
self._part_names = parts
def new(self, *args, **kwargs) -> str:
"""
Generate callback data
"""
args = list(args)
data = [self.prefix]
for part in self._part_names:
value = kwargs.pop(part, None)
if value is None:
if args:
value = args.pop(0)
else:
raise ValueError(f'Value for {part!r} was not passed!')
if value is not None and not isinstance(value, str):
value = str(value)
if self.sep in value:
raise ValueError(f"Symbol {self.sep!r} is defined as the separator and can't be used in parts' values")
data.append(value)
if args or kwargs:
raise TypeError('Too many arguments were passed!')
callback_data = self.sep.join(data)
if len(callback_data.encode()) > 64:
raise ValueError('Resulted callback data is too long!')
return callback_data
def parse(self, callback_data: str) -> typing.Dict[str, str]:
"""
Parse data from the callback data
"""
prefix, *parts = callback_data.split(self.sep)
if prefix != self.prefix:
raise ValueError("Passed callback data can't be parsed with that prefix.")
elif len(parts) != len(self._part_names):
raise ValueError('Invalid parts count!')
result = {'@': prefix}
result.update(zip(self._part_names, parts))
return result
def filter(self, **config) -> CallbackDataFilter:
"""
Generate filter
"""
for key in config.keys():
if key not in self._part_names:
raise ValueError(f'Invalid field name {key!r}')
return CallbackDataFilter(self, config)