1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Badiboy 2021-11-08 18:32:38 +03:00
commit 7f1497c5e9
13 changed files with 375 additions and 50 deletions

1
.gitignore vendored
View File

@ -25,6 +25,7 @@ var/
.idea/
venv/
.venv/
# PyInstaller
# Usually these files are written by a python script from a template

View File

@ -31,6 +31,7 @@
* [Poll Answer Handler](#poll-answer-handler)
* [My Chat Member Handler](#my-chat-member-handler)
* [Chat Member Handler](#chat-member-handler)
* [Chat Join request handler](#chat-join-request-handler)
* [Inline Mode](#inline-mode)
* [Inline handler](#inline-handler)
* [Chosen Inline handler](#chosen-inline-handler)
@ -272,6 +273,10 @@ Handle updates of a chat member's status in a chat
`@bot.chat_member_handler() # <- passes a ChatMemberUpdated type object to your function`
*Note: "chat_member" updates are not requested by default. If you want to allow all update types, set `allowed_updates` in `bot.polling()` / `bot.infinity_polling()` to `util.update_types`*
#### Chat Join Request Handler
Handle chat join requests using:
`@bot.chat_join_request_handler() # <- passes ChatInviteLink type object to your function`
### Inline Mode
More information about [Inline mode](https://core.telegram.org/bots/inline).
@ -787,6 +792,8 @@ Get help. Discuss. Chat.
* [oneIPO bot](https://github.com/aaditya2200/IPO-proj) by [Aadithya](https://github.com/aaditya2200) & [Amol Soans](https://github.com/AmolDerickSoans) This Telegram bot provides live updates , data and documents on current and upcoming IPOs(Initial Public Offerings)
* [CoronaGraphsBot](https://t.me/CovidGraph_bot) ([source](https://github.com/TrevorWinstral/CoronaGraphsBot)) by *TrevorWinstral* - Gets live COVID Country data, plots it, and briefs the user
* [ETHLectureBot](https://t.me/ETHLectureBot) ([source](https://github.com/TrevorWinstral/ETHLectureBot)) by *TrevorWinstral* - Notifies ETH students when their lectures have been uploaded
* [Vlun Finder Bot](https://github.com/resinprotein2333/Vlun-Finder-bot) by [Resinprotein2333](https://github.com/resinprotein2333). This bot can help you to find The information of CVE vulnerabilities.
* [ETHGasFeeTrackerBot](https://t.me/ETHGasFeeTrackerBot) ([Source](https://github.com/DevAdvik/ETHGasFeeTrackerBot]) by *DevAdvik* - Get Live Ethereum Gas Fees in GWEI

View File

@ -0,0 +1,11 @@
import telebot
bot = telebot.TeleBot('TOKEN')
@bot.chat_join_request_handler()
def make_some(message: telebot.types.ChatJoinRequest):
bot.send_message(message.chat.id, 'I accepted a new user!')
bot.approve_chat_join_request(message.chat.id, message.from_user.id)
bot.infinity_polling(allowed_updates=telebot.util.update_types)

View File

@ -5,13 +5,19 @@ from telebot import custom_filters
bot = telebot.TeleBot("")
class MyStates:
name = 1
surname = 2
age = 3
@bot.message_handler(commands=['start'])
def start_ex(message):
"""
Start command. Here we are starting state
"""
bot.set_state(message.chat.id, 1)
bot.set_state(message.from_user.id, MyStates.name)
bot.send_message(message.chat.id, 'Hi, write me a name')
@ -22,38 +28,38 @@ def any_state(message):
Cancel state
"""
bot.send_message(message.chat.id, "Your state was cancelled.")
bot.delete_state(message.chat.id)
bot.delete_state(message.from_user.id)
@bot.message_handler(state=1)
@bot.message_handler(state=MyStates.name)
def name_get(message):
"""
State 1. Will process when user's state is 1.
"""
bot.send_message(message.chat.id, f'Now write me a surname')
bot.set_state(message.chat.id, 2)
with bot.retrieve_data(message.chat.id) as data:
bot.set_state(message.from_user.id, MyStates.surname)
with bot.retrieve_data(message.from_user.id) as data:
data['name'] = message.text
@bot.message_handler(state=2)
@bot.message_handler(state=MyStates.surname)
def ask_age(message):
"""
State 2. Will process when user's state is 2.
"""
bot.send_message(message.chat.id, "What is your age?")
bot.set_state(message.chat.id, 3)
with bot.retrieve_data(message.chat.id) as data:
bot.set_state(message.from_user.id, MyStates.age)
with bot.retrieve_data(message.from_user.id) as data:
data['surname'] = message.text
# result
@bot.message_handler(state=3, is_digit=True)
@bot.message_handler(state=MyStates.age, is_digit=True)
def ready_for_answer(message):
with bot.retrieve_data(message.chat.id) as data:
with bot.retrieve_data(message.from_user.id) as data:
bot.send_message(message.chat.id, "Ready, take a look:\n<b>Name: {name}\nSurname: {surname}\nAge: {age}</b>".format(name=data['name'], surname=data['surname'], age=message.text), parse_mode="html")
bot.delete_state(message.chat.id)
bot.delete_state(message.from_user.id)
#incorrect number
@bot.message_handler(state=3, is_digit=False)
@bot.message_handler(state=MyStates.age, is_digit=False)
def age_incorrect(message):
bot.send_message(message.chat.id, 'Looks like you are submitting a string in the field age. Please enter a number')
@ -61,4 +67,8 @@ def age_incorrect(message):
bot.add_custom_filter(custom_filters.StateFilter(bot))
bot.add_custom_filter(custom_filters.IsDigitFilter())
# set saving states into file.
bot.enable_saving_states() # you can delete this if you do not need to save states
bot.infinity_polling(skip_pending=True)

View File

@ -27,7 +27,7 @@ logger.addHandler(console_output_handler)
logger.setLevel(logging.ERROR)
from telebot import apihelper, util, types
from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend, State
from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend, StateMemory, StateFile
REPLY_MARKUP_TYPES = Union[
@ -185,10 +185,12 @@ class TeleBot:
self.poll_answer_handlers = []
self.my_chat_member_handlers = []
self.chat_member_handlers = []
self.chat_join_request_handlers = []
self.custom_filters = {}
self.state_handlers = []
self.current_states = State()
self.current_states = StateMemory()
if apihelper.ENABLE_MIDDLEWARE:
self.typed_middleware_handlers = {
@ -204,7 +206,8 @@ class TeleBot:
'poll': [],
'poll_answer': [],
'my_chat_member': [],
'chat_member': []
'chat_member': [],
'chat_join_request': []
}
self.default_middleware_handlers = []
@ -237,6 +240,17 @@ class TeleBot:
"""
self.next_step_backend = FileHandlerBackend(self.next_step_backend.handlers, filename, delay)
def enable_saving_states(self, filename="./.state-save/states.pkl"):
"""
Enable saving states (by default saving disabled)
:param filename: Filename of saving file
"""
self.current_states = StateFile(filename=filename)
self.current_states._create_dir()
def enable_save_reply_handlers(self, delay=120, filename="./.handler-saves/reply.save"):
"""
Enable saving reply handlers (by default saving disable)
@ -345,7 +359,7 @@ class TeleBot:
"""
return apihelper.delete_webhook(self.token, drop_pending_updates, timeout)
def get_webhook_info(self, timeout=None):
def get_webhook_info(self, timeout: Optional[int]=None):
"""
Use this method to get current webhook status. Requires no parameters.
If the bot is using getUpdates, will return an object with the url field empty.
@ -414,6 +428,7 @@ class TeleBot:
new_poll_answers = None
new_my_chat_members = None
new_chat_members = None
chat_join_request = None
for update in updates:
if apihelper.ENABLE_MIDDLEWARE:
@ -468,6 +483,9 @@ class TeleBot:
if update.chat_member:
if new_chat_members is None: new_chat_members = []
new_chat_members.append(update.chat_member)
if update.chat_join_request:
if chat_join_request is None: chat_join_request = []
chat_join_request.append(update.chat_join_request)
if new_messages:
self.process_new_messages(new_messages)
@ -495,6 +513,9 @@ class TeleBot:
self.process_new_my_chat_member(new_my_chat_members)
if new_chat_members:
self.process_new_chat_member(new_chat_members)
if chat_join_request:
self.process_new_chat_join_request(chat_join_request)
def process_new_messages(self, new_messages):
self._notify_next_handlers(new_messages)
@ -538,6 +559,9 @@ class TeleBot:
def process_new_chat_member(self, chat_members):
self._notify_command_handlers(self.chat_member_handlers, chat_members)
def process_new_chat_join_request(self, chat_join_request):
self._notify_command_handlers(self.chat_join_request_handlers, chat_join_request)
def process_middlewares(self, update):
for update_type, middlewares in self.typed_middleware_handlers.items():
if getattr(update, update_type) is not None:
@ -1655,9 +1679,11 @@ class TeleBot:
return apihelper.set_chat_permissions(self.token, chat_id, permissions)
def create_chat_invite_link(
self, chat_id: Union[int, str],
self, chat_id: Union[int, str],
name: Optional[str]=None,
expire_date: Optional[Union[int, datetime]]=None,
member_limit: Optional[int]=None) -> types.ChatInviteLink:
member_limit: Optional[int]=None,
creates_join_request: Optional[bool]=None) -> types.ChatInviteLink:
"""
Use this method to create an additional invite link for a chat.
The bot must be an administrator in the chat for this to work and must have the appropriate admin rights.
@ -1669,13 +1695,15 @@ class TeleBot:
:return:
"""
return types.ChatInviteLink.de_json(
apihelper.create_chat_invite_link(self.token, chat_id, expire_date, member_limit)
apihelper.create_chat_invite_link(self.token, chat_id, name, expire_date, member_limit, creates_join_request)
)
def edit_chat_invite_link(
self, chat_id: Union[int, str], invite_link: str,
self, chat_id: Union[int, str], name: Optional[str]=None,
invite_link: Optional[str] = None,
expire_date: Optional[Union[int, datetime]]=None,
member_limit: Optional[int]=None) -> types.ChatInviteLink:
member_limit: Optional[int]=None ,
creates_join_request: Optional[bool]=None) -> types.ChatInviteLink:
"""
Use this method to edit a non-primary invite link created by the bot.
The bot must be an administrator in the chat for this to work and must have the appropriate admin rights.
@ -1689,7 +1717,7 @@ class TeleBot:
:return:
"""
return types.ChatInviteLink.de_json(
apihelper.edit_chat_invite_link(self.token, chat_id, invite_link, expire_date, member_limit)
apihelper.edit_chat_invite_link(self.token, chat_id, name, invite_link, expire_date, member_limit, creates_join_request)
)
def revoke_chat_invite_link(
@ -1719,6 +1747,32 @@ class TeleBot:
"""
return apihelper.export_chat_invite_link(self.token, chat_id)
def approve_chat_join_request(self, chat_id: Union[str, int], user_id: Union[int, str]) -> bool:
"""
Use this method to approve a chat join request.
The bot must be an administrator in the chat for this to work and must have
the can_invite_users administrator right. Returns True on success.
:param chat_id: Unique identifier for the target chat or username of the target supergroup
(in the format @supergroupusername)
:param user_id: Unique identifier of the target user
:return: True on success.
"""
return apihelper.approve_chat_join_request(self.token, chat_id, user_id)
def decline_chat_join_request(self, chat_id: Union[str, int], user_id: Union[int, str]) -> bool:
"""
Use this method to decline a chat join request.
The bot must be an administrator in the chat for this to work and must have
the can_invite_users administrator right. Returns True on success.
:param chat_id: Unique identifier for the target chat or username of the target supergroup
(in the format @supergroupusername)
:param user_id: Unique identifier of the target user
:return: True on success.
"""
return apihelper.decline_chat_join_request(self.token, chat_id, user_id)
def set_chat_photo(self, chat_id: Union[int, str], photo: Any) -> bool:
"""
Use this method to set a new profile photo for the chat. Photos can't be changed for private chats.
@ -2369,7 +2423,7 @@ class TeleBot:
chat_id = message.chat.id
self.register_next_step_handler_by_chat_id(chat_id, callback, *args, **kwargs)
def set_state(self, chat_id, state):
def set_state(self, chat_id: int, state: Union[int, str]):
"""
Sets a new state of a user.
:param chat_id:
@ -2377,7 +2431,7 @@ class TeleBot:
"""
self.current_states.add_state(chat_id, state)
def delete_state(self, chat_id):
def delete_state(self, chat_id: int):
"""
Delete the current state of a user.
:param chat_id:
@ -2385,10 +2439,10 @@ class TeleBot:
"""
self.current_states.delete_state(chat_id)
def retrieve_data(self, chat_id):
def retrieve_data(self, chat_id: int):
return self.current_states.retrieve_data(chat_id)
def get_state(self, chat_id):
def get_state(self, chat_id: int):
"""
Get current state of a user.
:param chat_id:
@ -2396,7 +2450,7 @@ class TeleBot:
"""
return self.current_states.current_state(chat_id)
def add_data(self, chat_id, **kwargs):
def add_data(self, chat_id: int, **kwargs):
"""
Add data to states.
:param chat_id:
@ -3136,6 +3190,39 @@ class TeleBot:
handler_dict = self._build_handler_dict(callback, func=func, **kwargs)
self.add_chat_member_handler(handler_dict)
def chat_join_request_handler(self, func=None, **kwargs):
"""
chat_join_request handler
:param func:
:param kwargs:
:return:
"""
def decorator(handler):
handler_dict = self._build_handler_dict(handler, func=func, **kwargs)
self.add_chat_join_request_handler(handler_dict)
return handler
return decorator
def add_chat_join_request_handler(self, handler_dict):
"""
Adds a chat_join_request handler
:param handler_dict:
:return:
"""
self.chat_join_request_handlers.append(handler_dict)
def register_chat_join_request_handler(self, callback, func=None, **kwargs):
"""
Registers chat join request handler.
:param callback: function to be called
:param func:
:return: decorated function
"""
handler_dict = self._build_handler_dict(callback, func=func, **kwargs)
self.add_chat_join_request_handler(handler_dict)
def _test_message_handler(self, message_handler, message):
"""
Test message handler

View File

@ -978,7 +978,7 @@ def set_chat_permissions(token, chat_id, permissions):
return _make_request(token, method_url, params=payload, method='post')
def create_chat_invite_link(token, chat_id, expire_date, member_limit):
def create_chat_invite_link(token, chat_id, name, expire_date, member_limit, creates_join_request):
method_url = 'createChatInviteLink'
payload = {
'chat_id': chat_id
@ -991,11 +991,15 @@ def create_chat_invite_link(token, chat_id, expire_date, member_limit):
payload['expire_date'] = expire_date
if member_limit:
payload['member_limit'] = member_limit
if creates_join_request is not None:
payload['creates_join_request'] = creates_join_request
if name:
payload['name'] = name
return _make_request(token, method_url, params=payload, method='post')
def edit_chat_invite_link(token, chat_id, invite_link, expire_date, member_limit):
def edit_chat_invite_link(token, chat_id, invite_link, name, expire_date, member_limit, creates_join_request):
method_url = 'editChatInviteLink'
payload = {
'chat_id': chat_id,
@ -1010,6 +1014,10 @@ def edit_chat_invite_link(token, chat_id, invite_link, expire_date, member_limit
if member_limit is not None:
payload['member_limit'] = member_limit
if name:
payload['name'] = name
if creates_join_request:
payload['creates_join_request'] = creates_join_request
return _make_request(token, method_url, params=payload, method='post')
@ -1028,7 +1036,20 @@ def export_chat_invite_link(token, chat_id):
payload = {'chat_id': chat_id}
return _make_request(token, method_url, params=payload, method='post')
def approve_chat_join_request(token, chat_id, user_id):
method_url = 'approveChatJoinRequest'
payload = {
'chat_id': chat_id,
'user_id': user_id
}
return _make_request(token, method_url, params=payload, method='post')
def decline_chat_join_request(token, chat_id, user_id):
method_url = 'declineChatJoinRequest'
payload = {
'chat_id': chat_id,
'user_id': user_id
}
return _make_request(token, method_url, params=payload, method='post')
def set_chat_photo(token, chat_id, photo):
method_url = 'setChatPhoto'
payload = {'chat_id': chat_id}
@ -1673,4 +1694,5 @@ class ApiTelegramException(ApiException):
result)
self.result_json = result_json
self.error_code = result_json['error_code']
self.description = result_json['description']

View File

@ -158,9 +158,9 @@ class StateFilter(AdvancedCustomFilter):
key = 'state'
def check(self, message, text):
if self.bot.current_states.current_state(message.from_user.id) is False:return False
elif text == '*':return True
elif type(text) is list:return self.bot.current_states.current_state(message.from_user.id) in text
if self.bot.current_states.current_state(message.from_user.id) is False: return False
elif text == '*': return True
elif type(text) is list: return self.bot.current_states.current_state(message.from_user.id) in text
return self.bot.current_states.current_state(message.from_user.id) == text
class IsDigitFilter(SimpleCustomFilter):

View File

@ -143,7 +143,7 @@ class RedisHandlerBackend(HandlerBackend):
return handlers
class State:
class StateMemory:
def __init__(self):
self._states = {}
@ -166,7 +166,7 @@ class State:
def delete_state(self, chat_id):
"""Delete a state"""
return self._states.pop(chat_id)
self._states.pop(chat_id)
def _get_data(self, chat_id):
return self._states[chat_id]['data']
@ -195,7 +195,7 @@ class State:
Save input text.
Usage:
with state.retrieve_data(message.chat.id) as data:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
@ -203,11 +203,114 @@ class State:
"""
return StateContext(self, chat_id)
class StateFile:
"""
Class to save states in a file.
"""
def __init__(self, filename):
self.file_path = filename
def add_state(self, chat_id, state):
"""
Add a state.
:param chat_id:
:param state: new state
"""
states_data = self._read_data()
if chat_id in states_data:
states_data[chat_id]['state'] = state
return self._save_data(states_data)
else:
new_data = states_data[chat_id] = {'state': state,'data': {}}
return self._save_data(states_data)
def current_state(self, chat_id):
"""Current state."""
states_data = self._read_data()
if chat_id in states_data: return states_data[chat_id]['state']
else: return False
def delete_state(self, chat_id):
"""Delete a state"""
states_data = self._read_data()
states_data.pop(chat_id)
self._save_data(states_data)
def _read_data(self):
"""
Read the data from file.
"""
file = open(self.file_path, 'rb')
states_data = pickle.load(file)
file.close()
return states_data
def _create_dir(self):
"""
Create directory .save-handlers.
"""
dirs = self.file_path.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
if not os.path.isfile(self.file_path):
with open(self.file_path,'wb') as file:
pickle.dump({}, file)
def _save_data(self, new_data):
"""
Save data after editing.
:param new_data:
"""
with open(self.file_path, 'wb+') as state_file:
pickle.dump(new_data, state_file, protocol=pickle.HIGHEST_PROTOCOL)
return True
def _get_data(self, chat_id):
return self._read_data()[chat_id]['data']
def set(self, chat_id, new_state):
"""
Set a new state for a user.
:param chat_id:
:param new_state: new_state of a user
"""
self.add_state(chat_id,new_state)
def _add_data(self, chat_id, key, value):
states_data = self._read_data()
result = states_data[chat_id]['data'][key] = value
self._save_data(result)
return result
def finish(self, chat_id):
"""
Finish(delete) state of a user.
:param chat_id:
"""
self.delete_state(chat_id)
def retrieve_data(self, chat_id):
"""
Save input text.
Usage:
with bot.retrieve_data(message.chat.id) as data:
data['name'] = message.text
Also, at the end of your 'Form' you can get the name:
data['name']
"""
return StateFileContext(self, chat_id)
class StateContext:
"""
Class for data.
"""
def __init__(self , obj: State, chat_id) -> None:
def __init__(self , obj: StateMemory, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = obj._get_data(chat_id)
@ -217,3 +320,23 @@ class StateContext:
def __exit__(self, exc_type, exc_val, exc_tb):
return
class StateFileContext:
"""
Class for data.
"""
def __init__(self , obj: StateFile, chat_id) -> None:
self.obj = obj
self.chat_id = chat_id
self.data = self.obj._get_data(self.chat_id)
def __enter__(self):
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
old_data = self.obj._read_data()
for i in self.data:
old_data[self.chat_id]['data'][i] = self.data.get(i)
self.obj._save_data(old_data)
return

View File

@ -107,13 +107,14 @@ class Update(JsonDeserializable):
poll_answer = PollAnswer.de_json(obj.get('poll_answer'))
my_chat_member = ChatMemberUpdated.de_json(obj.get('my_chat_member'))
chat_member = ChatMemberUpdated.de_json(obj.get('chat_member'))
chat_join_request = ChatJoinRequest.de_json(obj.get('chat_join_request'))
return cls(update_id, message, edited_message, channel_post, edited_channel_post, inline_query,
chosen_inline_result, callback_query, shipping_query, pre_checkout_query, poll, poll_answer,
my_chat_member, chat_member)
my_chat_member, chat_member, chat_join_request)
def __init__(self, update_id, message, edited_message, channel_post, edited_channel_post, inline_query,
chosen_inline_result, callback_query, shipping_query, pre_checkout_query, poll, poll_answer,
my_chat_member, chat_member):
my_chat_member, chat_member, chat_join_request):
self.update_id = update_id
self.message = message
self.edited_message = edited_message
@ -128,6 +129,7 @@ class Update(JsonDeserializable):
self.poll_answer = poll_answer
self.my_chat_member = my_chat_member
self.chat_member = chat_member
self.chat_join_request = chat_join_request
class ChatMemberUpdated(JsonDeserializable):
@ -166,6 +168,23 @@ class ChatMemberUpdated(JsonDeserializable):
dif[key] = [old[key], new[key]]
return dif
class ChatJoinRequest(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if json_string is None: return None
obj = cls.check_json(json_string)
obj['chat'] = Chat.de_json(obj['chat'])
obj['from_user'] = User.de_json(obj['from'])
obj['invite_link'] = ChatInviteLink.de_json(obj['invite_link'])
return cls(**obj)
def __init__(self, chat, from_user, date, bio=None, invite_link=None, **kwargs):
self.chat = Chat = chat
self.from_user: User = from_user
self.date: int = date
self.bio: Optional[str] = bio
self.invite_link: Optional[ChatInviteLink] = invite_link
class WebhookInfo(JsonDeserializable):
@classmethod
@ -2752,14 +2771,17 @@ class ChatInviteLink(JsonSerializable, JsonDeserializable, Dictionaryable):
obj['creator'] = User.de_json(obj['creator'])
return cls(**obj)
def __init__(self, invite_link, creator, is_primary, is_revoked,
expire_date=None, member_limit=None, **kwargs):
def __init__(self, invite_link, creator, creates_join_request , is_primary, is_revoked,
name=None, expire_date=None, member_limit=None, pending_join_request_count=None, **kwargs):
self.invite_link: str = invite_link
self.creator: User = creator
self.creates_join_request: bool = creates_join_request
self.is_primary: bool = is_primary
self.is_revoked: bool = is_revoked
self.name: str = name
self.expire_date: int = expire_date
self.member_limit: int = member_limit
self.pending_join_request_count: int = pending_join_request_count
def to_json(self):
return json.dumps(self.to_dict())
@ -2769,12 +2791,17 @@ class ChatInviteLink(JsonSerializable, JsonDeserializable, Dictionaryable):
"invite_link": self.invite_link,
"creator": self.creator.to_dict(),
"is_primary": self.is_primary,
"is_revoked": self.is_revoked
"is_revoked": self.is_revoked,
"creates_join_request": self.creates_join_request
}
if self.expire_date:
json_dict["expire_date"] = self.expire_date
if self.member_limit:
json_dict["member_limit"] = self.member_limit
if self.pending_join_request_count:
json_dict["pending_join_request_count"] = self.pending_join_request_count
if self.name:
json_dict["name"] = self.name
return json_dict

View File

@ -46,7 +46,7 @@ content_type_service = [
update_types = [
"update_id", "message", "edited_message", "channel_post", "edited_channel_post", "inline_query",
"chosen_inline_result", "callback_query", "shipping_query", "pre_checkout_query", "poll", "poll_answer",
"my_chat_member", "chat_member"
"my_chat_member", "chat_member", "chat_join_request"
]
class WorkerThread(threading.Thread):
@ -470,3 +470,25 @@ def webhook_google_functions(bot, request):
return 'Bot FAIL', 400
else:
return 'Bot ON'
def antiflood(function, *args, **kwargs):
"""
Use this function inside loops in order to avoid getting TooManyRequests error.
Example:
from telebot.util import antiflood
for chat_id in chat_id_list:
msg = antiflood(bot.send_message, chat_id, text)
You want get the
"""
from telebot.apihelper import ApiTelegramException
from time import sleep
try:
msg = function(*args, **kwargs)
except ApiTelegramException as ex:
if ex.error_code == 429:
sleep(ex.result_json['parameters']['retry_after'])
msg = function(*args, **kwargs)
finally:
return msg

View File

@ -64,9 +64,10 @@ def update_type(message):
poll_answer = None
my_chat_member = None
chat_member = None
chat_join_request = None
return types.Update(1001234038283, message, edited_message, channel_post, edited_channel_post, inline_query,
chosen_inline_result, callback_query, shipping_query, pre_checkout_query, poll, poll_answer,
my_chat_member, chat_member)
my_chat_member, chat_member, chat_join_request)
@pytest.fixture()
@ -83,9 +84,10 @@ def reply_to_message_update_type(reply_to_message):
poll_answer = None
my_chat_member = None
chat_member = None
chat_join_request = None
return types.Update(1001234038284, reply_to_message, edited_message, channel_post, edited_channel_post,
inline_query, chosen_inline_result, callback_query, shipping_query, pre_checkout_query,
poll, poll_answer, my_chat_member, chat_member)
poll, poll_answer, my_chat_member, chat_member, chat_join_request)
def next_handler(message):

View File

@ -455,6 +455,13 @@ class TestTeleBot:
new_msg = tb.edit_message_reply_markup(chat_id=CHAT_ID, message_id=ret_msg.message_id, reply_markup=markup)
assert new_msg.message_id
def test_antiflood(self):
text = "Flooding"
tb = telebot.TeleBot(TOKEN)
for _ in range(0,100):
util.antiflood(tb.send_message, CHAT_ID, text)
assert _
@staticmethod
def create_text_message(text):
params = {'text': text}
@ -478,9 +485,10 @@ class TestTeleBot:
poll_answer = None
my_chat_member = None
chat_member = None
chat_join_request = None
return types.Update(-1001234038283, message, edited_message, channel_post, edited_channel_post, inline_query,
chosen_inline_result, callback_query, shipping_query, pre_checkout_query, poll, poll_answer,
my_chat_member, chat_member)
my_chat_member, chat_member, chat_join_request)
def test_is_string_unicode(self):
s1 = u'string'

View File

@ -222,14 +222,19 @@ def test_KeyboardButtonPollType():
def test_json_chat_invite_link():
json_string = r'{"invite_link": "https://t.me/joinchat/z-abCdEFghijKlMn", "creator": {"id": 329343347, "is_bot": false, "first_name": "Test", "username": "test_user", "last_name": "User", "language_code": "en"}, "is_primary": false, "is_revoked": false, "expire_date": 1624119999, "member_limit": 10}'
json_string = r'{"invite_link":"https://t.me/joinchat/MeASP-Wi...","creator":{"id":927266710,"is_bot":false,"first_name":">_run","username":"coder2020","language_code":"ru"},"pending_join_request_count":1,"creates_join_request":true,"is_primary":false,"is_revoked":false}'
invite_link = types.ChatInviteLink.de_json(json_string)
assert invite_link.invite_link == 'https://t.me/joinchat/z-abCdEFghijKlMn'
assert invite_link.invite_link == 'https://t.me/joinchat/MeASP-Wi...'
assert isinstance(invite_link.creator, types.User)
assert not invite_link.is_primary
assert not invite_link.is_revoked
assert invite_link.expire_date == 1624119999
assert invite_link.member_limit == 10
assert invite_link.expire_date is None
assert invite_link.member_limit is None
assert invite_link.name is None
assert invite_link.creator.id == 927266710
assert invite_link.pending_join_request_count == 1
assert invite_link.creates_join_request
def test_chat_member_updated():
json_string = r'{"chat": {"id": -1234567890123, "type": "supergroup", "title": "No Real Group", "username": "NoRealGroup"}, "from": {"id": 133869498, "is_bot": false, "first_name": "Vincent"}, "date": 1624119999, "old_chat_member": {"user": {"id": 77777777, "is_bot": false, "first_name": "Pepe"}, "status": "member"}, "new_chat_member": {"user": {"id": 77777777, "is_bot": false, "first_name": "Pepe"}, "status": "administrator"}}'