1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Asynchronous TeleBot version Alpha release

This commit is contained in:
Badiboy 2021-11-27 22:28:46 +03:00 committed by GitHub
commit a308ab12fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 5163 additions and 334 deletions

View File

@ -0,0 +1,11 @@
from telebot.async_telebot import AsyncTeleBot
import asyncio
import telebot
bot = AsyncTeleBot('TOKEN')
@bot.chat_join_request_handler()
async def make_some(message: telebot.types.ChatJoinRequest):
await bot.send_message(message.chat.id, 'I accepted a new user!')
await bot.approve_chat_join_request(message.chat.id, message.from_user.id)
asyncio.run(bot.polling(skip_pending=True))

View File

@ -0,0 +1,33 @@
from telebot import types,util
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
#chat_member_handler. When status changes, telegram gives update. check status from old_chat_member and new_chat_member.
@bot.chat_member_handler()
async def chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
await bot.send_message(message.chat.id,"Hello {name}!".format(name=new.user.first_name)) # Welcome message
#if bot is added to group, this handler will work
@bot.my_chat_member_handler()
async def my_chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
await bot.send_message(message.chat.id,"Somebody added me to group") # Welcome message, if bot was added to group
await bot.leave_chat(message.chat.id)
#content_Type_service is:
#'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created',
#'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
#'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended',
#'voice_chat_participants_invited', 'message_auto_delete_timer_changed'
# this handler deletes service messages
@bot.message_handler(content_types=util.content_type_service)
async def delall(message: types.Message):
await bot.delete_message(message.chat.id,message.message_id)
asyncio.run(bot.polling())

View File

@ -0,0 +1,13 @@
import asyncio
from telebot.async_telebot import AsyncTeleBot
from telebot import asyncio_filters
bot = AsyncTeleBot('TOKEN')
# Handler
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
async def answer_for_admin(message):
await bot.send_message(message.chat.id,"hello my admin")
# Register filter
bot.add_custom_filter(asyncio_filters.IsAdminFilter(bot))
asyncio.run(bot.polling())

View File

@ -0,0 +1,44 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
# AdvancedCustomFilter is for list, string filter values
class MainFilter(telebot.asyncio_filters.AdvancedCustomFilter):
key='text'
@staticmethod
async def check(message, text):
return message.text in text
# SimpleCustomFilter is for boolean values, such as is_admin=True
class IsAdmin(telebot.asyncio_filters.SimpleCustomFilter):
key='is_admin'
@staticmethod
async def check(message: telebot.types.Message):
result = await bot.get_chat_member(message.chat.id,message.from_user.id)
return result.status in ['administrator','creator']
@bot.message_handler(is_admin=True, commands=['admin']) # Check if user is admin
async def admin_rep(message):
await bot.send_message(message.chat.id, "Hi admin")
@bot.message_handler(is_admin=False, commands=['admin']) # If user is not admin
async def not_admin(message):
await bot.send_message(message.chat.id, "You are not admin")
@bot.message_handler(text=['hi']) # Response to hi message
async def welcome_hi(message):
await bot.send_message(message.chat.id, 'You said hi')
@bot.message_handler(text=['bye']) # Response to bye message
async def bye_user(message):
await bot.send_message(message.chat.id, 'You said bye')
# Do not forget to register filters
bot.add_custom_filter(MainFilter())
bot.add_custom_filter(IsAdmin())
import asyncio
asyncio.run(bot.polling())

View File

@ -0,0 +1,19 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
import asyncio
bot = AsyncTeleBot('TOKEN')
# Chat id can be private or supergroups.
@bot.message_handler(chat_id=[12345678], commands=['admin']) # chat_id checks id corresponds to your list or not.
async def admin_rep(message):
await bot.send_message(message.chat.id, "You are allowed to use this command.")
@bot.message_handler(commands=['admin'])
async def not_admin(message):
await bot.send_message(message.chat.id, "You are not allowed to use this command")
# Do not forget to register
bot.add_custom_filter(telebot.asyncio_filters.ChatFilter())
asyncio.run(bot.polling())

View File

@ -0,0 +1,22 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
import asyncio
bot = AsyncTeleBot('TOKEN')
# Check if message is a reply
@bot.message_handler(is_reply=True)
async def start_filter(message):
await bot.send_message(message.chat.id, "Looks like you replied to my message.")
# Check if message was forwarded
@bot.message_handler(is_forwarded=True)
async def text_filter(message):
await bot.send_message(message.chat.id, "I do not accept forwarded messages!")
# Do not forget to register filters
bot.add_custom_filter(telebot.asyncio_filters.IsReplyFilter())
bot.add_custom_filter(telebot.asyncio_filters.ForwardFilter())
asyncio.run(bot.polling())

View File

@ -0,0 +1,21 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
import asyncio
bot = AsyncTeleBot('TOKEN')
# Check if message starts with @admin tag
@bot.message_handler(text_startswith="@admin")
async def start_filter(message):
await bot.send_message(message.chat.id, "Looks like you are calling admin, wait...")
# Check if text is hi or hello
@bot.message_handler(text=['hi','hello'])
async def text_filter(message):
await bot.send_message(message.chat.id, "Hi, {name}!".format(name=message.from_user.first_name))
# Do not forget to register filters
bot.add_custom_filter(telebot.asyncio_filters.TextMatchFilter())
bot.add_custom_filter(telebot.asyncio_filters.TextStartsFilter())
asyncio.run(bot.polling())

View File

@ -0,0 +1,75 @@
import telebot
from telebot import asyncio_filters
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
class MyStates:
name = 1
surname = 2
age = 3
@bot.message_handler(commands=['start'])
async def start_ex(message):
"""
Start command. Here we are starting state
"""
await bot.set_state(message.from_user.id, MyStates.name)
await bot.send_message(message.chat.id, 'Hi, write me a name')
@bot.message_handler(state="*", commands='cancel')
async def any_state(message):
"""
Cancel state
"""
await bot.send_message(message.chat.id, "Your state was cancelled.")
await bot.delete_state(message.from_user.id)
@bot.message_handler(state=MyStates.name)
async def name_get(message):
"""
State 1. Will process when user's state is 1.
"""
await bot.send_message(message.chat.id, f'Now write me a surname')
await bot.set_state(message.from_user.id, MyStates.surname)
async with bot.retrieve_data(message.from_user.id) as data:
data['name'] = message.text
@bot.message_handler(state=MyStates.surname)
async def ask_age(message):
"""
State 2. Will process when user's state is 2.
"""
await bot.send_message(message.chat.id, "What is your age?")
await bot.set_state(message.from_user.id, MyStates.age)
async with bot.retrieve_data(message.from_user.id) as data:
data['surname'] = message.text
# result
@bot.message_handler(state=MyStates.age, is_digit=True)
async def ready_for_answer(message):
async with bot.retrieve_data(message.from_user.id) as data:
await bot.send_message(message.chat.id, "Ready, take a look:\n<b>Name: {name}\nSurname: {surname}\nAge: {age}</b>".format(name=data['name'], surname=data['surname'], age=message.text), parse_mode="html")
await bot.delete_state(message.from_user.id)
#incorrect number
@bot.message_handler(state=MyStates.age, is_digit=False)
async def age_incorrect(message):
await bot.send_message(message.chat.id, 'Looks like you are submitting a string in the field age. Please enter a number')
# register filters
bot.add_custom_filter(asyncio_filters.StateFilter(bot))
bot.add_custom_filter(asyncio_filters.IsDigitFilter())
# set saving states into file.
bot.enable_saving_states() # you can delete this if you do not need to save states
asyncio.run(bot.polling())

View File

@ -0,0 +1,27 @@
#!/usr/bin/python
# This is a simple echo bot using the decorator mechanism.
# It echoes any incoming text messages.
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
async def send_welcome(message):
await bot.reply_to(message, """\
Hi there, I am EchoBot.
I am here to echo your kind words back to you. Just say anything nice and I'll say the exact same thing to you!\
""")
# Handle all other messages with content_type 'text' (content_types defaults to ['text'])
@bot.message_handler(func=lambda message: True)
async def echo_message(message):
await bot.reply_to(message, message.text)
asyncio.run(bot.polling())

View File

@ -0,0 +1,19 @@
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
async def start_executor(message):
await bot.send_message(message.chat.id, 'Hello!')
bot.register_message_handler(start_executor, commands=['start']) # Start command executor
# See also
# bot.register_callback_query_handler(*args, **kwargs)
# bot.register_channel_post_handler(*args, **kwargs)
# bot.register_chat_member_handler(*args, **kwargs)
# bot.register_inline_handler(*args, **kwargs)
# bot.register_my_chat_member_handler(*args, **kwargs)
# bot.register_edited_message_handler(*args, **kwargs)
# And other functions..
asyncio.run(bot.polling(skip_pending=True))

View File

@ -0,0 +1,13 @@
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
@bot.message_handler(commands=['start', 'help'])
async def send_welcome(message):
await bot.reply_to(message, "Howdy, how are you doing?")
@bot.message_handler(func=lambda message: True)
async def echo_all(message):
await bot.reply_to(message, message.text)
asyncio.run(bot.polling(skip_pending=True))# Skip pending skips old updates

View File

@ -1,3 +1,4 @@
pytest==3.0.2 pytest==3.0.2
requests==2.20.0 requests==2.20.0
wheel==0.24.0 wheel==0.24.0
aiohttp>=3.8.0,<3.9.0

View File

@ -12,10 +12,11 @@ from typing import Any, Callable, List, Optional, Union
# this imports are used to avoid circular import error # this imports are used to avoid circular import error
import telebot.util import telebot.util
import telebot.types import telebot.types
from telebot.custom_filters import SimpleCustomFilter, AdvancedCustomFilter
logger = logging.getLogger('TeleBot') logger = logging.getLogger('TeleBot')
formatter = logging.Formatter( formatter = logging.Formatter(
'%(asctime)s (%(filename)s:%(lineno)d %(threadName)s) %(levelname)s - %(name)s: "%(message)s"' '%(asctime)s (%(filename)s:%(lineno)d %(threadName)s) %(levelname)s - %(name)s: "%(message)s"'
) )
@ -28,6 +29,8 @@ logger.setLevel(logging.ERROR)
from telebot import apihelper, util, types from telebot import apihelper, util, types
from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend, StateMemory, StateFile from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend, StateMemory, StateFile
from telebot.custom_filters import SimpleCustomFilter, AdvancedCustomFilter
REPLY_MARKUP_TYPES = Union[ REPLY_MARKUP_TYPES = Union[
@ -35,6 +38,7 @@ REPLY_MARKUP_TYPES = Union[
types.ReplyKeyboardRemove, types.ForceReply] types.ReplyKeyboardRemove, types.ForceReply]
""" """
Module : telebot Module : telebot
""" """
@ -3308,338 +3312,6 @@ class TeleBot:
break break
class AsyncTeleBot(TeleBot):
def __init__(self, *args, **kwargs):
TeleBot.__init__(self, *args, **kwargs)
# I'm not sure if `get_updates` should be added here too
@util.async_dec()
def enable_save_next_step_handlers(self, delay=120, filename="./.handler-saves/step.save"):
return TeleBot.enable_save_next_step_handlers(self, delay, filename)
@util.async_dec()
def enable_save_reply_handlers(self, delay=120, filename="./.handler-saves/reply.save"):
return TeleBot.enable_save_reply_handlers(self, delay, filename)
@util.async_dec()
def disable_save_next_step_handlers(self):
return TeleBot.disable_save_next_step_handlers(self)
@util.async_dec()
def disable_save_reply_handlers(self):
return TeleBot.enable_save_reply_handlers(self)
@util.async_dec()
def load_next_step_handlers(self, filename="./.handler-saves/step.save", del_file_after_loading=True):
return TeleBot.load_next_step_handlers(self, filename, del_file_after_loading)
@util.async_dec()
def load_reply_handlers(self, filename="./.handler-saves/reply.save", del_file_after_loading=True):
return TeleBot.load_reply_handlers(self, filename, del_file_after_loading)
@util.async_dec()
def get_me(self):
return TeleBot.get_me(self)
@util.async_dec()
def log_out(self):
return TeleBot.log_out(self)
@util.async_dec()
def close(self):
return TeleBot.close(self)
@util.async_dec()
def get_my_commands(self, *args, **kwargs): # needed args because new scope and language_code
return TeleBot.get_my_commands(self, *args, **kwargs)
@util.async_dec()
def set_my_commands(self, *args, **kwargs):
return TeleBot.set_my_commands(self, *args, **kwargs)
@util.async_dec()
def delete_my_commands(self, *args, **kwargs):
return TeleBot.delete_my_commands(self, *args, **kwargs)
@util.async_dec()
def get_file(self, *args):
return TeleBot.get_file(self, *args)
@util.async_dec()
def download_file(self, *args):
return TeleBot.download_file(self, *args)
@util.async_dec()
def get_user_profile_photos(self, *args, **kwargs):
return TeleBot.get_user_profile_photos(self, *args, **kwargs)
@util.async_dec()
def get_chat(self, *args):
return TeleBot.get_chat(self, *args)
@util.async_dec()
def leave_chat(self, *args):
return TeleBot.leave_chat(self, *args)
@util.async_dec()
def get_chat_administrators(self, *args):
return TeleBot.get_chat_administrators(self, *args)
@util.async_dec()
def get_chat_members_count(self, *args):
logger.info('get_chat_members_count is deprecated. Use get_chat_member_count instead')
return TeleBot.get_chat_member_count(self, *args)
@util.async_dec()
def get_chat_member_count(self, *args):
return TeleBot.get_chat_member_count(self, *args)
@util.async_dec()
def set_chat_sticker_set(self, *args):
return TeleBot.set_chat_sticker_set(self, *args)
@util.async_dec()
def delete_chat_sticker_set(self, *args):
return TeleBot.delete_chat_sticker_set(self, *args)
@util.async_dec()
def get_chat_member(self, *args):
return TeleBot.get_chat_member(self, *args)
@util.async_dec()
def send_message(self, *args, **kwargs):
return TeleBot.send_message(self, *args, **kwargs)
@util.async_dec()
def send_dice(self, *args, **kwargs):
return TeleBot.send_dice(self, *args, **kwargs)
@util.async_dec()
def send_animation(self, *args, **kwargs):
return TeleBot.send_animation(self, *args, **kwargs)
@util.async_dec()
def forward_message(self, *args, **kwargs):
return TeleBot.forward_message(self, *args, **kwargs)
@util.async_dec()
def copy_message(self, *args, **kwargs):
return TeleBot.copy_message(self, *args, **kwargs)
@util.async_dec()
def delete_message(self, *args):
return TeleBot.delete_message(self, *args)
@util.async_dec()
def send_photo(self, *args, **kwargs):
return TeleBot.send_photo(self, *args, **kwargs)
@util.async_dec()
def send_audio(self, *args, **kwargs):
return TeleBot.send_audio(self, *args, **kwargs)
@util.async_dec()
def send_voice(self, *args, **kwargs):
return TeleBot.send_voice(self, *args, **kwargs)
@util.async_dec()
def send_document(self, *args, **kwargs):
return TeleBot.send_document(self, *args, **kwargs)
@util.async_dec()
def send_sticker(self, *args, **kwargs):
return TeleBot.send_sticker(self, *args, **kwargs)
@util.async_dec()
def send_video(self, *args, **kwargs):
return TeleBot.send_video(self, *args, **kwargs)
@util.async_dec()
def send_video_note(self, *args, **kwargs):
return TeleBot.send_video_note(self, *args, **kwargs)
@util.async_dec()
def send_media_group(self, *args, **kwargs):
return TeleBot.send_media_group(self, *args, **kwargs)
@util.async_dec()
def send_location(self, *args, **kwargs):
return TeleBot.send_location(self, *args, **kwargs)
@util.async_dec()
def edit_message_live_location(self, *args, **kwargs):
return TeleBot.edit_message_live_location(self, *args, **kwargs)
@util.async_dec()
def stop_message_live_location(self, *args, **kwargs):
return TeleBot.stop_message_live_location(self, *args, **kwargs)
@util.async_dec()
def send_venue(self, *args, **kwargs):
return TeleBot.send_venue(self, *args, **kwargs)
@util.async_dec()
def send_contact(self, *args, **kwargs):
return TeleBot.send_contact(self, *args, **kwargs)
@util.async_dec()
def send_chat_action(self, *args, **kwargs):
return TeleBot.send_chat_action(self, *args, **kwargs)
@util.async_dec()
def kick_chat_member(self, *args, **kwargs):
logger.info('kick_chat_member is deprecated. Use ban_chat_member instead.')
return TeleBot.ban_chat_member(self, *args, **kwargs)
@util.async_dec()
def ban_chat_member(self, *args, **kwargs):
return TeleBot.ban_chat_member(self, *args, **kwargs)
@util.async_dec()
def unban_chat_member(self, *args, **kwargs):
return TeleBot.unban_chat_member(self, *args, **kwargs)
@util.async_dec()
def restrict_chat_member(self, *args, **kwargs):
return TeleBot.restrict_chat_member(self, *args, **kwargs)
@util.async_dec()
def promote_chat_member(self, *args, **kwargs):
return TeleBot.promote_chat_member(self, *args, **kwargs)
@util.async_dec()
def set_chat_administrator_custom_title(self, *args, **kwargs):
return TeleBot.set_chat_administrator_custom_title(self, *args, **kwargs)
@util.async_dec()
def set_chat_permissions(self, *args, **kwargs):
return TeleBot.set_chat_permissions(self, *args, **kwargs)
@util.async_dec()
def create_chat_invite_link(self, *args, **kwargs):
return TeleBot.create_chat_invite_link(self, *args, **kwargs)
@util.async_dec()
def edit_chat_invite_link(self, *args, **kwargs):
return TeleBot.edit_chat_invite_link(self, *args, **kwargs)
@util.async_dec()
def revoke_chat_invite_link(self, *args, **kwargs):
return TeleBot.revoke_chat_invite_link(self, *args, **kwargs)
@util.async_dec()
def export_chat_invite_link(self, *args):
return TeleBot.export_chat_invite_link(self, *args)
@util.async_dec()
def set_chat_photo(self, *args):
return TeleBot.set_chat_photo(self, *args)
@util.async_dec()
def delete_chat_photo(self, *args):
return TeleBot.delete_chat_photo(self, *args)
@util.async_dec()
def set_chat_title(self, *args):
return TeleBot.set_chat_title(self, *args)
@util.async_dec()
def set_chat_description(self, *args):
return TeleBot.set_chat_description(self, *args)
@util.async_dec()
def pin_chat_message(self, *args, **kwargs):
return TeleBot.pin_chat_message(self, *args, **kwargs)
@util.async_dec()
def unpin_chat_message(self, *args):
return TeleBot.unpin_chat_message(self, *args)
@util.async_dec()
def unpin_all_chat_messages(self, *args):
return TeleBot.unpin_all_chat_messages(self, *args)
@util.async_dec()
def edit_message_text(self, *args, **kwargs):
return TeleBot.edit_message_text(self, *args, **kwargs)
@util.async_dec()
def edit_message_media(self, *args, **kwargs):
return TeleBot.edit_message_media(self, *args, **kwargs)
@util.async_dec()
def edit_message_reply_markup(self, *args, **kwargs):
return TeleBot.edit_message_reply_markup(self, *args, **kwargs)
@util.async_dec()
def send_game(self, *args, **kwargs):
return TeleBot.send_game(self, *args, **kwargs)
@util.async_dec()
def set_game_score(self, *args, **kwargs):
return TeleBot.set_game_score(self, *args, **kwargs)
@util.async_dec()
def get_game_high_scores(self, *args, **kwargs):
return TeleBot.get_game_high_scores(self, *args, **kwargs)
@util.async_dec()
def send_invoice(self, *args, **kwargs):
return TeleBot.send_invoice(self, *args, **kwargs)
@util.async_dec()
def answer_shipping_query(self, *args, **kwargs):
return TeleBot.answer_shipping_query(self, *args, **kwargs)
@util.async_dec()
def answer_pre_checkout_query(self, *args, **kwargs):
return TeleBot.answer_pre_checkout_query(self, *args, **kwargs)
@util.async_dec()
def edit_message_caption(self, *args, **kwargs):
return TeleBot.edit_message_caption(self, *args, **kwargs)
@util.async_dec()
def answer_inline_query(self, *args, **kwargs):
return TeleBot.answer_inline_query(self, *args, **kwargs)
@util.async_dec()
def answer_callback_query(self, *args, **kwargs):
return TeleBot.answer_callback_query(self, *args, **kwargs)
@util.async_dec()
def get_sticker_set(self, *args, **kwargs):
return TeleBot.get_sticker_set(self, *args, **kwargs)
@util.async_dec()
def upload_sticker_file(self, *args, **kwargs):
return TeleBot.upload_sticker_file(self, *args, **kwargs)
@util.async_dec()
def create_new_sticker_set(self, *args, **kwargs):
return TeleBot.create_new_sticker_set(self, *args, **kwargs)
@util.async_dec()
def add_sticker_to_set(self, *args, **kwargs):
return TeleBot.add_sticker_to_set(self, *args, **kwargs)
@util.async_dec()
def set_sticker_position_in_set(self, *args, **kwargs):
return TeleBot.set_sticker_position_in_set(self, *args, **kwargs)
@util.async_dec()
def delete_sticker_from_set(self, *args, **kwargs):
return TeleBot.delete_sticker_from_set(self, *args, **kwargs)
@util.async_dec()
def set_sticker_set_thumb(self, *args, **kwargs):
return TeleBot.set_sticker_set_thumb(self, *args, **kwargs)
@util.async_dec()
def send_poll(self, *args, **kwargs):
return TeleBot.send_poll(self, *args, **kwargs)
@util.async_dec()
def stop_poll(self, *args, **kwargs):
return TeleBot.stop_poll(self, *args, **kwargs)

2868
telebot/async_telebot.py Normal file

File diff suppressed because it is too large Load Diff

178
telebot/asyncio_filters.py Normal file
View File

@ -0,0 +1,178 @@
from abc import ABC
class SimpleCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts only message, returns bool value, that is compared with given in handler.
"""
async def check(self, message):
"""
Perform a check.
"""
pass
class AdvancedCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts two parameters, returns bool: True - filter passed, False - filter failed.
message: Message class
text: Filter value given in handler
"""
async def check(self, message, text):
"""
Perform a check.
"""
pass
class TextMatchFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
@bot.message_handler(text=['account'])
"""
key = 'text'
async def check(self, message, text):
if type(text) is list:return message.text in text
else: return text == message.text
class TextContainsFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
# Will respond if any message.text contains word 'account'
@bot.message_handler(text_contains=['account'])
"""
key = 'text_contains'
async def check(self, message, text):
return text in message.text
class TextStartsFilter(AdvancedCustomFilter):
"""
Filter to check whether message starts with some text.
Example:
# Will work if message.text starts with 'Sir'.
@bot.message_handler(text_startswith='Sir')
"""
key = 'text_startswith'
async def check(self, message, text):
return message.text.startswith(text)
class ChatFilter(AdvancedCustomFilter):
"""
Check whether chat_id corresponds to given chat_id.
Example:
@bot.message_handler(chat_id=[99999])
"""
key = 'chat_id'
async def check(self, message, text):
return message.chat.id in text
class ForwardFilter(SimpleCustomFilter):
"""
Check whether message was forwarded from channel or group.
Example:
@bot.message_handler(is_forwarded=True)
"""
key = 'is_forwarded'
async def check(self, message):
return message.forward_from_chat is not None
class IsReplyFilter(SimpleCustomFilter):
"""
Check whether message is a reply.
Example:
@bot.message_handler(is_reply=True)
"""
key = 'is_reply'
async def check(self, message):
return message.reply_to_message is not None
class LanguageFilter(AdvancedCustomFilter):
"""
Check users language_code.
Example:
@bot.message_handler(language_code=['ru'])
"""
key = 'language_code'
async def check(self, message, text):
if type(text) is list:return message.from_user.language_code in text
else: return message.from_user.language_code == text
class IsAdminFilter(SimpleCustomFilter):
"""
Check whether the user is administrator / owner of the chat.
Example:
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
"""
key = 'is_chat_admin'
def __init__(self, bot):
self._bot = bot
async def check(self, message):
result = await self._bot.get_chat_member(message.chat.id, message.from_user.id)
return result.status in ['creator', 'administrator']
class StateFilter(AdvancedCustomFilter):
"""
Filter to check state.
Example:
@bot.message_handler(state=1)
"""
def __init__(self, bot):
self.bot = bot
key = 'state'
async def check(self, message, text):
result = await self.bot.current_states.current_state(message.from_user.id)
if result is False: return False
elif text == '*': return True
elif type(text) is list: return result in text
return result == text
class IsDigitFilter(SimpleCustomFilter):
"""
Filter to check whether the string is made up of only digits.
Example:
@bot.message_handler(is_digit=True)
"""
key = 'is_digit'
async def check(self, message):
return message.text.isdigit()

View File

@ -0,0 +1,219 @@
import os
import pickle
class StateMemory:
def __init__(self):
self._states = {}
async def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
if chat_id in self._states:
self._states[chat_id]['state'] = state
else:
self._states[chat_id] = {'state': state,'data': {}}
async def current_state(self, chat_id):
"""Current state"""
if chat_id in self._states: return self._states[chat_id]['state']
else: return False
async def delete_state(self, chat_id):
"""Delete a state"""
self._states.pop(chat_id)
def _get_data(self, chat_id):
return self._states[chat_id]['data']
async def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
await self.add_state(chat_id,new_state)
async def _add_data(self, chat_id, key, value):
result = self._states[chat_id]['data'][key] = value
return result
async def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
await self.delete_state(chat_id)
def retrieve_data(self, chat_id):
"""
Save input text.
Usage:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateContext(self, chat_id)
class StateFile:
"""
Class to save states in a file.
"""
def __init__(self, filename):
self.file_path = filename
async def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
states_data = self._read_data()
if chat_id in states_data:
states_data[chat_id]['state'] = state
return await self._save_data(states_data)
else:
new_data = states_data[chat_id] = {'state': state,'data': {}}
return await self._save_data(states_data)
async def current_state(self, chat_id):
"""Current state."""
states_data = self._read_data()
if chat_id in states_data: return states_data[chat_id]['state']
else: return False
async def delete_state(self, chat_id):
"""Delete a state"""
states_data = self._read_data()
states_data.pop(chat_id)
await self._save_data(states_data)
def _read_data(self):
"""
Read the data from file.
"""
file = open(self.file_path, 'rb')
states_data = pickle.load(file)
file.close()
return states_data
def _create_dir(self):
"""
Create directory .save-handlers.
"""
dirs = self.file_path.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
if not os.path.isfile(self.file_path):
with open(self.file_path,'wb') as file:
pickle.dump({}, file)
async def _save_data(self, new_data):
"""
Save data after editing.
:param new_data:
"""
with open(self.file_path, 'wb+') as state_file:
pickle.dump(new_data, state_file, protocol=pickle.HIGHEST_PROTOCOL)
return True
def _get_data(self, chat_id):
return self._read_data()[chat_id]['data']
async def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
await self.add_state(chat_id,new_state)
async def _add_data(self, chat_id, key, value):
states_data = self._read_data()
result = states_data[chat_id]['data'][key] = value
await self._save_data(result)
return result
async def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
await self.delete_state(chat_id)
def retrieve_data(self, chat_id):
"""
Save input text.
Usage:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateFileContext(self, chat_id)
class StateContext:
"""
Class for data.
"""
def __init__(self , obj: StateMemory, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = obj._get_data(chat_id)
async def __aenter__(self):
return self.data
async def __aexit__(self, exc_type, exc_val, exc_tb):
return
class StateFileContext:
"""
Class for data.
"""
def __init__(self , obj: StateFile, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = None
async def __aenter__(self):
self.data = self.obj._get_data(self.chat_id)
return self.data
async def __aexit__(self, exc_type, exc_val, exc_tb):
old_data = self.obj._read_data()
for i in self.data:
old_data[self.chat_id]['data'][i] = self.data.get(i)
await self.obj._save_data(old_data)
return
class BaseMiddleware:
"""
Base class for middleware.
Your middlewares should be inherited from this class.
"""
def __init__(self):
pass
async def pre_process(self, message, data):
raise NotImplementedError
async def post_process(self, message, data, exception):
raise NotImplementedError

1594
telebot/asyncio_helper.py Normal file

File diff suppressed because it is too large Load Diff