1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge pull request #1315 from coder2020official/master

States, New filter, and more
This commit is contained in:
Badiboy 2021-09-25 20:43:33 +03:00 committed by GitHub
commit be7317cc86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 2 deletions

View File

@ -242,7 +242,7 @@ Handle edited channel post messages
Handle callback queries Handle callback queries
```python ```python
@bot.callback_query_handler(func=lambda call: True) @bot.callback_query_handler(func=lambda call: True)
def test_callback(call): # <- passes a CallbackQuery type object to your function def test_callback(call): # <- passes a CallbackQuery type object to your function
logger.info(call) logger.info(call)
``` ```

View File

@ -0,0 +1,12 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot('TOKEN')
# Handler
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
def answer_for_admin(message):
bot.send_message(message.chat.id,"hello my admin")
# Register filter
bot.add_custom_filter(custom_filters.IsAdminFilter(bot))
bot.polling()

34
examples/custom_states.py Normal file
View File

@ -0,0 +1,34 @@
import telebot
from telebot.handler_backends import State
bot = telebot.TeleBot("")
@bot.message_handler(commands=['start'])
def start_ex(message):
bot.set_state(message.chat.id, 1)
bot.send_message(message.chat.id, 'Hi, write me a name')
@bot.state_handler(state=1)
def name_get(message, state:State):
bot.send_message(message.chat.id, f'Now write me a surname')
state.set(message.chat.id, 2)
with state.retrieve_data(message.chat.id) as data:
data['name'] = message.text
@bot.state_handler(state=2)
def ask_age(message, state:State):
bot.send_message(message.chat.id, "What is your age?")
state.set(message.chat.id, 3)
with state.retrieve_data(message.chat.id) as data:
data['surname'] = message.text
@bot.state_handler(state=3)
def ready_for_answer(message, state: State):
with state.retrieve_data(message.chat.id) as data:
bot.send_message(message.chat.id, "Ready, take a look:\n<b>Name: {name}\nSurname: {surname}\nAge: {age}</b>".format(name=data['name'], surname=data['surname'], age=message.text), parse_mode="html")
state.finish(message.chat.id)
bot.polling()

View File

@ -0,0 +1,21 @@
import telebot
api_token = 'token'
bot = telebot.TeleBot(api_token)
def start_executor(message):
bot.send_message(message.chat.id, 'Hello!')
bot.register_message_handler(start_executor, commands=['start']) # Start command executor
# See also
# bot.register_callback_query_handler(*args, **kwargs)
# bot.register_channel_post_handler(*args, **kwargs)
# bot.register_chat_member_handler(*args, **kwargs)
# bot.register_inline_handler(*args, **kwargs)
# bot.register_my_chat_member_handler(*args, **kwargs)
# bot.register_edited_message_handler(*args, **kwargs)
# And other functions..
bot.polling()

View File

@ -27,7 +27,7 @@ logger.addHandler(console_output_handler)
logger.setLevel(logging.ERROR) logger.setLevel(logging.ERROR)
from telebot import apihelper, util, types from telebot import apihelper, util, types
from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend, StateMachine, State
REPLY_MARKUP_TYPES = Union[ REPLY_MARKUP_TYPES = Union[
@ -186,6 +186,9 @@ class TeleBot:
self.my_chat_member_handlers = [] self.my_chat_member_handlers = []
self.chat_member_handlers = [] self.chat_member_handlers = []
self.custom_filters = {} self.custom_filters = {}
self.state_handlers = []
self.current_states = StateMachine()
if apihelper.ENABLE_MIDDLEWARE: if apihelper.ENABLE_MIDDLEWARE:
self.typed_middleware_handlers = { self.typed_middleware_handlers = {
@ -495,6 +498,7 @@ class TeleBot:
def process_new_messages(self, new_messages): def process_new_messages(self, new_messages):
self._notify_next_handlers(new_messages) self._notify_next_handlers(new_messages)
self._notify_state_handlers(new_messages)
self._notify_reply_handlers(new_messages) self._notify_reply_handlers(new_messages)
self.__notify_update(new_messages) self.__notify_update(new_messages)
self._notify_command_handlers(self.message_handlers, new_messages) self._notify_command_handlers(self.message_handlers, new_messages)
@ -2362,6 +2366,31 @@ class TeleBot:
chat_id = message.chat.id chat_id = message.chat.id
self.register_next_step_handler_by_chat_id(chat_id, callback, *args, **kwargs) self.register_next_step_handler_by_chat_id(chat_id, callback, *args, **kwargs)
def set_state(self, chat_id, state):
"""
Sets a new state of a user.
:param chat_id:
:param state: new state. can be string or integer.
"""
self.current_states.add_state(chat_id, state)
def delete_state(self, chat_id):
"""
Delete the current state of a user.
:param chat_id:
:return:
"""
self.current_states.delete_state(chat_id)
def get_state(self, chat_id):
"""
Get current state of a user.
:param chat_id:
:return: state of a user
"""
return self.current_states.current_state(chat_id)
def register_next_step_handler_by_chat_id( def register_next_step_handler_by_chat_id(
self, chat_id: Union[int, str], callback: Callable, *args, **kwargs) -> None: self, chat_id: Union[int, str], callback: Callable, *args, **kwargs) -> None:
""" """
@ -2426,6 +2455,31 @@ class TeleBot:
if need_pop: if need_pop:
new_messages.pop(i) # removing message that was detected with next_step_handler new_messages.pop(i) # removing message that was detected with next_step_handler
def _notify_state_handlers(self, new_messages):
"""
Description: TBD
:param new_messages:
:return:
"""
for i, message in enumerate(new_messages):
need_pop = False
user_state = self.current_states.current_state(message.from_user.id)
if user_state:
for handler in self.state_handlers:
if handler['filters']['state'] == user_state:
for message_filter, filter_value in handler['filters'].items():
if filter_value is None:
continue
if not self._test_filter(message_filter, filter_value, message):
return False
need_pop = True
state = State(self.current_states)
self._exec_task(handler["function"], message, state)
if need_pop:
new_messages.pop(i) # removing message that was detected by states
@staticmethod @staticmethod
def _build_handler_dict(handler, **filters): def _build_handler_dict(handler, **filters):
""" """
@ -2661,6 +2715,55 @@ class TeleBot:
**kwargs) **kwargs)
self.add_edited_message_handler(handler_dict) self.add_edited_message_handler(handler_dict)
def state_handler(self, state, content_types=None, regexp=None, func=None, chat_types=None, **kwargs):
"""
State handler for getting input from a user.
:param state: state of a user
:param content_types:
:param regexp:
:param func:
:param chat_types:
"""
def decorator(handler):
handler_dict = self._build_handler_dict(handler,
state=state,
content_types=content_types,
regexp=regexp,
chat_types=chat_types,
func=func,
**kwargs)
self.add_state_handler(handler_dict)
return handler
return decorator
def add_state_handler(self, handler_dict):
"""
Adds the edit message handler
:param handler_dict:
:return:
"""
self.state_handlers.append(handler_dict)
def register_state_handler(self, callback, state, content_types=None, regexp=None, func=None, chat_types=None, **kwargs):
"""
Register a state handler.
:param callback: function to be called
:param state: state to be checked
:param content_types:
:param func:
"""
handler_dict = self._build_handler_dict(callback=callback,
state=state,
content_types=content_types,
regexp=regexp,
chat_types=chat_types,
func=func,
**kwargs)
self.add_state_handler(handler_dict)
def channel_post_handler(self, commands=None, regexp=None, func=None, content_types=None, **kwargs): def channel_post_handler(self, commands=None, regexp=None, func=None, content_types=None, **kwargs):
""" """
Channel post handler decorator Channel post handler decorator
@ -3139,6 +3242,8 @@ class TeleBot:
return message.content_type == 'text' and util.extract_command(message.text) in filter_value return message.content_type == 'text' and util.extract_command(message.text) in filter_value
elif message_filter == 'chat_types': elif message_filter == 'chat_types':
return message.chat.type in filter_value return message.chat.type in filter_value
elif message_filter == 'state':
return True
elif message_filter == 'func': elif message_filter == 'func':
return filter_value(message) return filter_value(message)
elif self.custom_filters and message_filter in self.custom_filters: elif self.custom_filters and message_filter in self.custom_filters:

View File

@ -130,3 +130,19 @@ class LanguageFilter(AdvancedCustomFilter):
if type(text) is list:return message.from_user.language_code in text if type(text) is list:return message.from_user.language_code in text
else: return message.from_user.language_code == text else: return message.from_user.language_code == text
class IsAdminFilter(SimpleCustomFilter):
"""
Check whether the user is administrator / owner of the chat.
Example:
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
"""
key = 'is_chat_admin'
def __init__(self, bot):
self._bot = bot
def check(self, message):
return self._bot.get_chat_member(message.chat.id, message.from_user.id).status in ['creator', 'administrator']

View File

@ -141,3 +141,84 @@ class RedisHandlerBackend(HandlerBackend):
handlers = pickle.loads(value) handlers = pickle.loads(value)
self.clear_handlers(handler_group_id) self.clear_handlers(handler_group_id)
return handlers return handlers
class StateMachine:
def __init__(self):
self._states = {}
def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
if chat_id in self._states:
self._states[int(chat_id)]['state'] = state
else:
self._states[chat_id] = {'state': state,'data': {}}
def current_state(self, chat_id):
"""Current state"""
if chat_id in self._states: return self._states[chat_id]['state']
else: return False
def delete_state(self, chat_id):
"""Delete a state"""
return self._states.pop(chat_id)
def _get_data(self, chat_id):
return self._states[chat_id]['data']
class State:
"""
Base class for state managing.
"""
def __init__(self, obj: StateMachine) -> None:
self.obj = obj
def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
self.obj._states[chat_id]['state'] = new_state
def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
self.obj._states.pop(chat_id)
def retrieve_data(self, chat_id):
"""
Save input text.
Usage:
with state.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateContext(self.obj, chat_id)
class StateContext:
"""
Class for data.
"""
def __init__(self , obj: StateMachine, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = obj._get_data(chat_id)
def __enter__(self):
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
return