From 714ae7d67f26dcc223d3fc7d9ddce0f5c5c743d1 Mon Sep 17 00:00:00 2001 From: abdullaev Date: Tue, 23 Nov 2021 18:01:51 +0500 Subject: [PATCH] CallbackData class added --- examples/CallbackData_example.py | 86 ++++++++++++++++++++++++++ telebot/callback_data.py | 100 +++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 examples/CallbackData_example.py create mode 100644 telebot/callback_data.py diff --git a/examples/CallbackData_example.py b/examples/CallbackData_example.py new file mode 100644 index 0000000..a0d70ea --- /dev/null +++ b/examples/CallbackData_example.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +This Example will show you how to use CallbackData +""" + +from telebot.callback_data import CallbackData, CallbackDataFilter +from telebot import types, TeleBot +from telebot.custom_filters import AdvancedCustomFilter + +API_TOKEN = '1802978815:AAG6ju32eC-O3s8WRg57BRCT64VkJhQiHNk' +PRODUCTS = [ + {'id': '0', 'name': 'xiaomi mi 10', 'price': 400}, + {'id': '1', 'name': 'samsung s20', 'price': 800}, + {'id': '2', 'name': 'iphone 13', 'price': 1300} +] + +bot = TeleBot(API_TOKEN) +products_factory = CallbackData('product_id', prefix='products') + + +def products_keyboard(): + return types.InlineKeyboardMarkup( + keyboard=[ + [ + types.InlineKeyboardButton( + text=product['name'], + callback_data=products_factory.new(product_id=product["id"]) + ) + ] + for product in PRODUCTS + ] + ) + + +def back_keyboard(): + return types.InlineKeyboardMarkup( + keyboard=[ + [ + types.InlineKeyboardButton( + text='⬅', + callback_data='back' + ) + ] + ] + ) + + +class ProductsCallbackFilter(AdvancedCustomFilter): + key = 'config' + + def check(self, call: types.CallbackQuery, config: CallbackDataFilter): + return config.check(query=call) + + +@bot.message_handler(commands=['products']) +def products_command_handler(message: types.Message): + bot.send_message(message.chat.id, 'Products:', reply_markup=products_keyboard()) + + +# Only product with field - product_id = 2 +@bot.callback_query_handler(func=None, config=products_factory.filter(product_id='2')) +def product_one_callback(call: types.CallbackQuery): + bot.answer_callback_query(callback_query_id=call.id, text='Not available :(', show_alert=True) + + +# Any other products +@bot.callback_query_handler(func=None, config=products_factory.filter()) +def products_callback(call: types.CallbackQuery): + callback_data: dict = products_factory.parse(callback_data=call.data) + product_id = int(callback_data['product_id']) + product = PRODUCTS[product_id] + + text = f"Product name: {product['name']}\n" \ + f"Product price: {product['price']}" + bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, + text=text, reply_markup=back_keyboard()) + + +@bot.callback_query_handler(func=lambda c: c.data == 'back') +def back_callback(call: types.CallbackQuery): + bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, + text='Products:', reply_markup=products_keyboard()) + + +bot.add_custom_filter(ProductsCallbackFilter()) +bot.infinity_polling() diff --git a/telebot/callback_data.py b/telebot/callback_data.py new file mode 100644 index 0000000..2897c40 --- /dev/null +++ b/telebot/callback_data.py @@ -0,0 +1,100 @@ +import typing + + +class CallbackDataFilter: + + def __init__(self, factory, config: typing.Dict[str, str]): + self.config = config + self.factory = factory + + def check(self, query): + try: + data = self.factory.parse(query.data) + except ValueError: + return False + + for key, value in self.config.items(): + if isinstance(value, (list, tuple, set, frozenset)): + if data.get(key) not in value: + return False + elif data.get(key) != value: + return False + return True + + +class CallbackData: + """ + Callback data factory + """ + + def __init__(self, *parts, prefix: str, sep=':'): + if not isinstance(prefix, str): + raise TypeError(f'Prefix must be instance of str not {type(prefix).__name__}') + if not prefix: + raise ValueError("Prefix can't be empty") + if sep in prefix: + raise ValueError(f"Separator {sep!r} can't be used in prefix") + + self.prefix = prefix + self.sep = sep + + self._part_names = parts + + def new(self, *args, **kwargs) -> str: + """ + Generate callback data + """ + args = list(args) + + data = [self.prefix] + + for part in self._part_names: + value = kwargs.pop(part, None) + if value is None: + if args: + value = args.pop(0) + else: + raise ValueError(f'Value for {part!r} was not passed!') + + if value is not None and not isinstance(value, str): + value = str(value) + + if self.sep in value: + raise ValueError(f"Symbol {self.sep!r} is defined as the separator and can't be used in parts' values") + + data.append(value) + + if args or kwargs: + raise TypeError('Too many arguments were passed!') + + callback_data = self.sep.join(data) + + if len(callback_data.encode()) > 64: + raise ValueError('Resulted callback data is too long!') + + return callback_data + + def parse(self, callback_data: str) -> typing.Dict[str, str]: + """ + Parse data from the callback data + """ + + prefix, *parts = callback_data.split(self.sep) + if prefix != self.prefix: + raise ValueError("Passed callback data can't be parsed with that prefix.") + elif len(parts) != len(self._part_names): + raise ValueError('Invalid parts count!') + + result = {'@': prefix} + result.update(zip(self._part_names, parts)) + return result + + def filter(self, **config) -> CallbackDataFilter: + """ + Generate filter + """ + + for key in config.keys(): + if key not in self._part_names: + raise ValueError(f'Invalid field name {key!r}') + return CallbackDataFilter(self, config)