1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Some small changes

* Fixed type warnings in some editors by changing `var: Type = None` to `var: Union[Type, None] = None`
* changed some args from `obj['arg']` to `obj.get('arg')` if arg is optional
* better PEP-8 compliance for less weak warnings
* added tests for the new type `ChatInviteLink`
This commit is contained in:
SwissCorePy 2021-06-19 17:59:55 +02:00
parent a9ae070256
commit 795f7fff7f
5 changed files with 245 additions and 156 deletions

View File

@ -9,6 +9,7 @@ import time
import traceback
from typing import List, Union
# this imports are used to avoid circular import error
import telebot.util
import telebot.types
@ -23,7 +24,7 @@ logger.addHandler(console_output_handler)
logger.setLevel(logging.ERROR)
from telebot import apihelper, types, util
from telebot import apihelper, util, types
from telebot.handler_backends import MemoryHandlerBackend, FileHandlerBackend
"""
@ -252,19 +253,30 @@ class TeleBot:
drop_pending_updates = None, timeout=None):
"""
Use this method to specify a url and receive incoming updates via an outgoing webhook. Whenever there is an
update for the bot, we will send an HTTPS POST request to the specified url, containing a JSON-serialized Update.
In case of an unsuccessful request, we will give up after a reasonable amount of attempts. Returns True on success.
update for the bot, we will send an HTTPS POST request to the specified url,
containing a JSON-serialized Update.
In case of an unsuccessful request, we will give up after a reasonable amount of attempts.
Returns True on success.
:param url: HTTPS url to send updates to. Use an empty string to remove webhook integration
:param certificate: Upload your public key certificate so that the root certificate in use can be checked. See our self-signed guide for details.
:param max_connections: Maximum allowed number of simultaneous HTTPS connections to the webhook for update delivery, 1-100. Defaults to 40. Use lower values to limit the load on your bot's server, and higher values to increase your bot's throughput.
:param allowed_updates: A JSON-serialized list of the update types you want your bot to receive. For example, specify [message, edited_channel_post, callback_query] to only receive updates of these types. See Update for a complete list of available update types. Specify an empty list to receive all updates regardless of type (default). If not specified, the previous setting will be used.
:param ip_address: The fixed IP address which will be used to send webhook requests instead of the IP address resolved through DNS
:param certificate: Upload your public key certificate so that the root certificate in use can be checked.
See our self-signed guide for details.
:param max_connections: Maximum allowed number of simultaneous HTTPS connections to the webhook
for update delivery, 1-100. Defaults to 40. Use lower values to limit the load on your bot's server,
and higher values to increase your bot's throughput.
:param allowed_updates: A JSON-serialized list of the update types you want your bot to receive.
For example, specify [message, edited_channel_post, callback_query] to only receive updates
of these types. See Update for a complete list of available update types.
Specify an empty list to receive all updates regardless of type (default).
If not specified, the previous setting will be used.
:param ip_address: The fixed IP address which will be used to send webhook requests instead of the IP address
resolved through DNS
:param drop_pending_updates: Pass True to drop all pending updates
:param timeout: Integer. Request connection timeout
:return:
"""
return apihelper.set_webhook(self.token, url, certificate, max_connections, allowed_updates, ip_address, drop_pending_updates, timeout)
return apihelper.set_webhook(self.token, url, certificate, max_connections, allowed_updates, ip_address,
drop_pending_updates, timeout)
def delete_webhook(self, drop_pending_updates=None, timeout=None):
"""
@ -330,14 +342,14 @@ class TeleBot:
if self.skip_pending:
logger.debug('Skipped {0} pending messages'.format(self.__skip_updates()))
self.skip_pending = False
updates = self.get_updates(offset=(self.last_update_id + 1), timeout=timeout, long_polling_timeout = long_polling_timeout)
updates = self.get_updates(offset=(self.last_update_id + 1),
timeout=timeout, long_polling_timeout=long_polling_timeout)
self.process_new_updates(updates)
def process_new_updates(self, updates):
upd_count = len(updates)
logger.debug('Received {0} new updates'.format(upd_count))
if (upd_count == 0):
return
if upd_count == 0: return
new_messages = None
new_edited_messages = None
@ -488,11 +500,13 @@ class TeleBot:
:param timeout: Request connection timeout
:param long_polling_timeout: Timeout in seconds for long polling (see API docs)
:param logger_level: Custom logging level for infinity_polling logging. Use logger levels from logging as a value. None/NOTSET = no error logging
:param logger_level: Custom logging level for infinity_polling logging.
Use logger levels from logging as a value. None/NOTSET = no error logging
"""
while not self.__stop_polling.is_set():
try:
self.polling(none_stop=True, timeout=timeout, long_polling_timeout=long_polling_timeout, *args, **kwargs)
self.polling(none_stop=True, timeout=timeout, long_polling_timeout=long_polling_timeout,
*args, **kwargs)
except Exception as e:
if logger_level and logger_level >= logging.ERROR:
logger.error("Infinity polling exception: %s", str(e))
@ -590,7 +604,7 @@ class TeleBot:
self.worker_pool.clear_exceptions() #*
logger.info('Stopped polling.')
def __non_threaded_polling(self, non_stop=False, interval=0, timeout = None, long_polling_timeout = None):
def __non_threaded_polling(self, non_stop=False, interval=0, timeout=None, long_polling_timeout=None):
logger.info('Started polling.')
self.__stop_polling.clear()
error_interval = 0.25
@ -675,7 +689,8 @@ class TeleBot:
def log_out(self) -> bool:
"""
Use this method to log out from the cloud Bot API server before launching the bot locally.
You MUST log out the bot before running it locally, otherwise there is no guarantee that the bot will receive updates.
You MUST log out the bot before running it locally, otherwise there is no guarantee
that the bot will receive updates.
After a successful call, you can immediately log in on a local server,
but will not be able to log in back to the cloud Bot API server for 10 minutes.
Returns True on success.
@ -685,13 +700,13 @@ class TeleBot:
def close(self) -> bool:
"""
Use this method to close the bot instance before moving it from one local server to another.
You need to delete the webhook before calling this method to ensure that the bot isn't launched again after server restart.
You need to delete the webhook before calling this method to ensure that the bot isn't launched again
after server restart.
The method will return error 429 in the first 10 minutes after the bot is launched.
Returns True on success.
"""
return apihelper.close(self.token)
def get_user_profile_photos(self, user_id, offset=None, limit=None) -> types.UserProfilePhotos:
"""
Retrieves the user profile photos of the person with 'user_id'
@ -807,7 +822,8 @@ class TeleBot:
apihelper.send_message(self.token, chat_id, text, disable_web_page_preview, reply_to_message_id,
reply_markup, parse_mode, disable_notification, timeout))
def forward_message(self, chat_id, from_chat_id, message_id, disable_notification=None, timeout=None) -> types.Message:
def forward_message(self, chat_id, from_chat_id, message_id,
disable_notification=None, timeout=None) -> types.Message:
"""
Use this method to forward messages of any kind.
:param disable_notification:
@ -897,7 +913,8 @@ class TeleBot:
reply_to_message_id=None, reply_markup=None, parse_mode=None, disable_notification=None,
timeout=None, thumb=None) -> types.Message:
"""
Use this method to send audio files, if you want Telegram clients to display them in the music player. Your audio must be in the .mp3 format.
Use this method to send audio files, if you want Telegram clients to display them in the music player.
Your audio must be in the .mp3 format.
:param chat_id:Unique identifier for the message recipient
:param audio:Audio file to send.
:param caption:
@ -921,7 +938,8 @@ class TeleBot:
def send_voice(self, chat_id, voice, caption=None, duration=None, reply_to_message_id=None, reply_markup=None,
parse_mode=None, disable_notification=None, timeout=None) -> types.Message:
"""
Use this method to send audio files, if you want Telegram clients to display the file as a playable voice message.
Use this method to send audio files, if you want Telegram clients to display the file
as a playable voice message.
:param chat_id:Unique identifier for the message recipient.
:param voice:
:param caption:
@ -984,7 +1002,8 @@ class TeleBot:
"""
Use this method to send video files, Telegram clients support mp4 videos.
:param chat_id: Integer : Unique identifier for the message recipient User or GroupChat id
:param data: InputFile or String : Video to send. You can either pass a file_id as String to resend a video that is already on the Telegram server
:param data: InputFile or String : Video to send. You can either pass a file_id as String to resend
a video that is already on the Telegram server
:param duration: Integer : Duration of sent video in seconds
:param caption: String : Video caption (may also be used when resending videos by file_id).
:param parse_mode:
@ -1011,7 +1030,8 @@ class TeleBot:
"""
Use this method to send animation files (GIF or H.264/MPEG-4 AVC video without sound).
:param chat_id: Integer : Unique identifier for the message recipient User or GroupChat id
:param animation: InputFile or String : Animation to send. You can either pass a file_id as String to resend an animation that is already on the Telegram server
:param animation: InputFile or String : Animation to send. You can either pass a file_id as String to resend an
animation that is already on the Telegram server
:param duration: Integer : Duration of sent video in seconds
:param caption: String : Animation caption (may also be used when resending animation by file_id).
:param parse_mode:
@ -1025,16 +1045,18 @@ class TeleBot:
parse_mode = self.parse_mode if (parse_mode is None) else parse_mode
return types.Message.de_json(
apihelper.send_animation(self.token, chat_id, animation, duration, caption, reply_to_message_id, reply_markup,
parse_mode, disable_notification, timeout, thumb))
apihelper.send_animation(self.token, chat_id, animation, duration, caption, reply_to_message_id,
reply_markup, parse_mode, disable_notification, timeout, thumb))
def send_video_note(self, chat_id, data, duration=None, length=None,
reply_to_message_id=None, reply_markup=None,
disable_notification=None, timeout=None, thumb=None) -> types.Message:
"""
As of v.4.0, Telegram clients support rounded square mp4 videos of up to 1 minute long. Use this method to send video messages.
As of v.4.0, Telegram clients support rounded square mp4 videos of up to 1 minute long. Use this method to send
video messages.
:param chat_id: Integer : Unique identifier for the message recipient User or GroupChat id
:param data: InputFile or String : Video note to send. You can either pass a file_id as String to resend a video that is already on the Telegram server
:param data: InputFile or String : Video note to send. You can either pass a file_id as String to resend
a video that is already on the Telegram server
:param duration: Integer : Duration of sent video in seconds
:param length: Integer : Video width and height, Can't be None and should be in range of (0, 640)
:param reply_to_message_id:
@ -1133,7 +1155,8 @@ class TeleBot:
:param title: String : Name of the venue
:param address: String : Address of the venue
:param foursquare_id: String : Foursquare identifier of the venue
:param foursquare_type: Foursquare type of the venue, if known. (For example, arts_entertainment/default, arts_entertainment/aquarium or food/icecream.)
:param foursquare_type: Foursquare type of the venue, if known. (For example, arts_entertainment/default,
arts_entertainment/aquarium or food/icecream.)
:param disable_notification:
:param reply_to_message_id:
:param reply_markup:
@ -1162,7 +1185,8 @@ class TeleBot:
its typing status).
:param chat_id:
:param action: One of the following strings: 'typing', 'upload_photo', 'record_video', 'upload_video',
'record_audio', 'upload_audio', 'upload_document', 'find_location', 'record_video_note', 'upload_video_note'.
'record_audio', 'upload_audio', 'upload_document', 'find_location', 'record_video_note',
'upload_video_note'.
:param timeout:
:return: API reply. :type: boolean
"""
@ -1187,7 +1211,8 @@ class TeleBot:
the user is not a member of the chat, but will be able to join it. So if the user is a member of the chat
they will also be removed from the chat. If you don't want this, use the parameter only_if_banned.
:param chat_id: Unique identifier for the target group or username of the target supergroup or channel (in the format @username)
:param chat_id: Unique identifier for the target group or username of the target supergroup or channel
(in the format @username)
:param user_id: Unique identifier of the target user
:param only_if_banned: Do nothing if the user is not banned
:return: True on success
@ -1219,7 +1244,8 @@ class TeleBot:
use inline bots, implies can_send_media_messages
:param can_add_web_page_previews: Pass True, if the user may add web page previews to their messages,
implies can_send_media_messages
:param can_change_info: Pass True, if the user is allowed to change the chat title, photo and other settings. Ignored in public supergroups
:param can_change_info: Pass True, if the user is allowed to change the chat title, photo and other settings.
Ignored in public supergroups
:param can_invite_users: Pass True, if the user is allowed to invite new users to the chat,
implies can_invite_users
:param can_pin_messages: Pass True, if the user is allowed to pin messages. Ignored in public supergroups
@ -1463,9 +1489,13 @@ class TeleBot:
return result
return types.Message.de_json(result)
def edit_message_media(self, media, chat_id=None, message_id=None, inline_message_id=None, reply_markup=None) -> Union[types.Message, bool]:
def edit_message_media(self, media, chat_id=None, message_id=None,
inline_message_id=None, reply_markup=None) -> Union[types.Message, bool]:
"""
Use this method to edit animation, audio, document, photo, or video messages. If a message is a part of a message album, then it can be edited only to a photo or a video. Otherwise, message type can be changed arbitrarily. When inline message is edited, new file can't be uploaded. Use previously uploaded file via its file_id or specify a URL.
Use this method to edit animation, audio, document, photo, or video messages.
If a message is a part of a message album, then it can be edited only to a photo or a video.
Otherwise, message type can be changed arbitrarily. When inline message is edited, new file can't be uploaded.
Use previously uploaded file via its file_id or specify a URL.
:param media:
:param chat_id:
:param message_id:
@ -1478,7 +1508,8 @@ class TeleBot:
return result
return types.Message.de_json(result)
def edit_message_reply_markup(self, chat_id=None, message_id=None, inline_message_id=None, reply_markup=None) -> Union[types.Message, bool]:
def edit_message_reply_markup(self, chat_id=None, message_id=None,
inline_message_id=None, reply_markup=None) -> Union[types.Message, bool]:
"""
Use this method to edit only the reply markup of messages.
:param chat_id:
@ -1523,13 +1554,14 @@ class TeleBot:
:param disable_edit_message:
:return:
"""
result = apihelper.set_game_score(self.token, user_id, score, force, disable_edit_message, chat_id, message_id,
inline_message_id)
result = apihelper.set_game_score(self.token, user_id, score, force, disable_edit_message, chat_id,
message_id, inline_message_id)
if type(result) == bool:
return result
return types.Message.de_json(result)
def get_game_high_scores(self, user_id, chat_id=None, message_id=None, inline_message_id=None) -> List[types.GameHighScore]:
def get_game_high_scores(self, user_id, chat_id=None,
message_id=None, inline_message_id=None) -> List[types.GameHighScore]:
"""
Gets top points and game play
:param user_id:
@ -1555,12 +1587,17 @@ class TeleBot:
:param chat_id: Unique identifier for the target private chat
:param title: Product name
:param description: Product description
:param invoice_payload: Bot-defined invoice payload, 1-128 bytes. This will not be displayed to the user, use for your internal processes.
:param invoice_payload: Bot-defined invoice payload, 1-128 bytes. This will not be displayed to the user,
use for your internal processes.
:param provider_token: Payments provider token, obtained via @Botfather
:param currency: Three-letter ISO 4217 currency code, see https://core.telegram.org/bots/payments#supported-currencies
:param prices: Price breakdown, a list of components (e.g. product price, tax, discount, delivery cost, delivery tax, bonus, etc.)
:param start_parameter: Unique deep-linking parameter that can be used to generate this invoice when used as a start parameter
:param photo_url: URL of the product photo for the invoice. Can be a photo of the goods or a marketing image for a service. People like it better when they see what they are paying for.
:param currency: Three-letter ISO 4217 currency code,
see https://core.telegram.org/bots/payments#supported-currencies
:param prices: Price breakdown, a list of components
(e.g. product price, tax, discount, delivery cost, delivery tax, bonus, etc.)
:param start_parameter: Unique deep-linking parameter that can be used to generate this invoice
when used as a start parameter
:param photo_url: URL of the product photo for the invoice. Can be a photo of the goods
or a marketing image for a service. People like it better when they see what they are paying for.
:param photo_size: Photo size
:param photo_width: Photo width
:param photo_height: Photo height
@ -1573,8 +1610,10 @@ class TeleBot:
:param send_email_to_provider: Pass True, if user's email address should be sent to provider
:param disable_notification: Sends the message silently. Users will receive a notification with no sound.
:param reply_to_message_id: If the message is a reply, ID of the original message
:param reply_markup: A JSON-serialized object for an inline keyboard. If empty, one 'Pay total price' button will be shown. If not empty, the first button must be a Pay button
:param provider_data: A JSON-serialized data about the invoice, which will be shared with the payment provider. A detailed description of required fields should be provided by the payment provider.
:param reply_markup: A JSON-serialized object for an inline keyboard. If empty,
one 'Pay total price' button will be shown. If not empty, the first button must be a Pay button
:param provider_data: A JSON-serialized data about the invoice, which will be shared with the payment provider.
A detailed description of required fields should be provided by the payment provider.
:param timeout:
:return:
"""
@ -1691,9 +1730,12 @@ class TeleBot:
No more than 50 results per query are allowed.
:param inline_query_id: Unique identifier for the answered query
:param results: Array of results for the inline query
:param cache_time: The maximum amount of time in seconds that the result of the inline query may be cached on the server.
:param is_personal: Pass True, if results may be cached on the server side only for the user that sent the query.
:param next_offset: Pass the offset that a client should send in the next query with the same text to receive more results.
:param cache_time: The maximum amount of time in seconds that the result of the inline query
may be cached on the server.
:param is_personal: Pass True, if results may be cached on the server side only for
the user that sent the query.
:param next_offset: Pass the offset that a client should send in the next query with the same text
to receive more results.
:param switch_pm_parameter: If passed, clients will display a button with specified text that switches the user
to a private chat with the bot and sends the bot a start message with the parameter switch_pm_parameter
:param switch_pm_text: Parameter for the start message sent to the bot when user presses the switch button
@ -1973,18 +2015,21 @@ class TeleBot:
bot.send_message(message.chat.id, 'Did someone call for help?')
# Handle all sent documents of type 'text/plain'.
@bot.message_handler(func=lambda message: message.document.mime_type == 'text/plain', content_types=['document'])
@bot.message_handler(func=lambda message: message.document.mime_type == 'text/plain',
content_types=['document'])
def command_handle_document(message):
bot.send_message(message.chat.id, 'Document received, sir!')
# Handle all other messages.
@bot.message_handler(func=lambda message: True, content_types=['audio', 'photo', 'voice', 'video', 'document', 'text', 'location', 'contact', 'sticker'])
@bot.message_handler(func=lambda message: True, content_types=['audio', 'photo', 'voice', 'video', 'document',
'text', 'location', 'contact', 'sticker'])
def default_command(message):
bot.send_message(message.chat.id, "This is the default command handler.")
:param commands: Optional list of strings (commands to handle).
:param regexp: Optional regular expression.
:param func: Optional lambda function. The lambda receives the message to test as the first parameter. It must return True if the command should handle the message.
:param func: Optional lambda function. The lambda receives the message to test as the first parameter.
It must return True if the command should handle the message.
:param content_types: Supported message content types. Must be a list. Defaults to ['text'].
"""

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import logging
from typing import Dict, List
from typing import Dict, List, Union
try:
import ujson as json
@ -92,7 +92,7 @@ class JsonDeserializable(object):
class Update(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
update_id = obj['update_id']
message = Message.de_json(obj.get('message'))
@ -128,7 +128,7 @@ class Update(JsonDeserializable):
class WebhookInfo(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
url = obj['url']
has_custom_certificate = obj['has_custom_certificate']
@ -156,7 +156,7 @@ class WebhookInfo(JsonDeserializable):
class User(JsonDeserializable, Dictionaryable, JsonSerializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
return cls(**obj)
@ -197,7 +197,7 @@ class User(JsonDeserializable, Dictionaryable, JsonSerializable):
class GroupChat(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
return cls(**obj)
@ -249,8 +249,7 @@ class Chat(JsonDeserializable):
class MessageID(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if(json_string is None):
return None
if json_string is None: return None
obj = cls.check_json(json_string)
message_id = obj['message_id']
return cls(message_id)
@ -262,7 +261,7 @@ class MessageID(JsonDeserializable):
class Message(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
message_id = obj['message_id']
from_user = User.de_json(obj.get('from'))
@ -306,7 +305,8 @@ class Message(JsonDeserializable):
opts['document'] = Document.de_json(obj['document'])
content_type = 'document'
if 'animation' in obj:
# Document content type accompanies "animation", so "animation" should be checked below "document" to override it
# Document content type accompanies "animation",
# so "animation" should be checked below "document" to override it
opts['animation'] = Animation.de_json(obj['animation'])
content_type = 'animation'
if 'game' in obj:
@ -424,49 +424,49 @@ class Message(JsonDeserializable):
self.from_user: User = from_user
self.date: int = date
self.chat: Chat = chat
self.forward_from: User = None
self.forward_from_chat: Chat = None
self.forward_from_message_id: int = None
self.forward_signature: str = None
self.forward_sender_name: str = None
self.forward_date: int = None
self.reply_to_message: Message = None
self.via_bot: User = None
self.edit_date: int = None
self.media_group_id: str = None
self.author_signature: str = None
self.text: str = None
self.entities: List[MessageEntity] = None
self.caption_entities: List[MessageEntity] = None
self.audio: Audio = None
self.document: Document = None
self.photo: List[PhotoSize] = None
self.sticker: Sticker = None
self.video: Video = None
self.video_note: VideoNote = None
self.voice: Voice = None
self.caption: str = None
self.contact: Contact = None
self.location: Location = None
self.venue: Venue = None
self.animation: Animation = None
self.dice: Dice = None
self.new_chat_member: User = None # Deprecated since Bot API 3.0. Not processed anymore
self.new_chat_members: List[User] = None
self.left_chat_member: User = None
self.new_chat_title: str = None
self.new_chat_photo: List[PhotoSize] = None
self.delete_chat_photo: bool = None
self.group_chat_created: bool = None
self.supergroup_chat_created: bool = None
self.channel_chat_created: bool = None
self.migrate_to_chat_id: int = None
self.migrate_from_chat_id: int = None
self.pinned_message: Message = None
self.invoice: Invoice = None
self.successful_payment: SuccessfulPayment = None
self.connected_website: str = None
self.reply_markup: InlineKeyboardMarkup = None
self.forward_from: Union[User, None] = None
self.forward_from_chat: Union[Chat, None] = None
self.forward_from_message_id: Union[int, None] = None
self.forward_signature: Union[str, None] = None
self.forward_sender_name: Union[str, None] = None
self.forward_date: Union[int, None] = None
self.reply_to_message: Union[Message, None] = None
self.via_bot: Union[User, None] = None
self.edit_date: Union[int, None] = None
self.media_group_id: Union[str, None] = None
self.author_signature: Union[str, None] = None
self.text: Union[str, None] = None
self.entities: Union[List[MessageEntity], None] = None
self.caption_entities: Union[List[MessageEntity], None] = None
self.audio: Union[Audio, None] = None
self.document: Union[Document, None] = None
self.photo: Union[List[PhotoSize], None] = None
self.sticker: Union[Sticker, None] = None
self.video: Union[Video, None] = None
self.video_note: Union[VideoNote, None] = None
self.voice: Union[Voice, None] = None
self.caption: Union[str, None] = None
self.contact: Union[Contact, None] = None
self.location: Union[Location, None] = None
self.venue: Union[Venue, None] = None
self.animation: Union[Animation, None] = None
self.dice: Union[Dice, None] = None
self.new_chat_member: Union[User, None] = None # Deprecated since Bot API 3.0. Not processed anymore
self.new_chat_members: Union[List[User], None] = None
self.left_chat_member: Union[User, None] = None
self.new_chat_title: Union[str, None] = None
self.new_chat_photo: Union[List[PhotoSize], None] = None
self.delete_chat_photo: Union[bool, None] = None
self.group_chat_created: Union[bool, None] = None
self.supergroup_chat_created: Union[bool, None] = None
self.channel_chat_created: Union[bool, None] = None
self.migrate_to_chat_id: Union[int, None] = None
self.migrate_from_chat_id: Union[int, None] = None
self.pinned_message: Union[Message, None] = None
self.invoice: Union[Invoice, None] = None
self.successful_payment: Union[SuccessfulPayment, None] = None
self.connected_website: Union[str, None] = None
self.reply_markup: Union[InlineKeyboardMarkup, None] = None
for key in options:
setattr(self, key, options[key])
self.json = json_string
@ -481,7 +481,7 @@ class Message(JsonDeserializable):
message.html_text
>> "<b>Test</b> parse <i>formatting</i>, <a href=\"https://example.com\">url</a>, <a href=\"tg://user?id=123456\">text_mention</a> and mention @username"
Cusom subs:
Custom subs:
You can customize the substitutes. By default, there is no substitute for the entities: hashtag, bot_command, email. You can add or modify substitute an existing entity.
Example:
message.custom_subs = {"bold": "<strong class=\"example\">{text}</strong>", "italic": "<i class=\"example\">{text}</i>", "mention": "<a href={url}>{text}</a>"}
@ -493,11 +493,11 @@ class Message(JsonDeserializable):
return text
_subs = {
"bold" : "<b>{text}</b>",
"italic" : "<i>{text}</i>",
"pre" : "<pre>{text}</pre>",
"code" : "<code>{text}</code>",
#"url" : "<a href=\"{url}\">{text}</a>", # @badiboy plain URLs have no text and do not need tags
"bold": "<b>{text}</b>",
"italic": "<i>{text}</i>",
"pre": "<pre>{text}</pre>",
"code": "<code>{text}</code>",
# "url": "<a href=\"{url}\">{text}</a>", # @badiboy plain URLs have no text and do not need tags
"text_link": "<a href=\"{url}\">{text}</a>",
"strikethrough": "<s>{text}</s>",
"underline": "<u>{text}</u>"
@ -551,11 +551,12 @@ class Message(JsonDeserializable):
class MessageEntity(Dictionaryable, JsonSerializable, JsonDeserializable):
@staticmethod
def to_list_of_dicts(entity_list) -> List[Dict]:
def to_list_of_dicts(entity_list) -> Union[List[Dict], None]:
res = []
for e in entity_list:
res.append(MessageEntity.to_dict(e))
return res or None
@classmethod
def de_json(cls, json_string):
if json_string is None: return None
@ -587,7 +588,7 @@ class MessageEntity(Dictionaryable, JsonSerializable, JsonDeserializable):
class Dice(JsonSerializable, Dictionaryable, JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
return cls(**obj)
@ -606,7 +607,7 @@ class Dice(JsonSerializable, Dictionaryable, JsonDeserializable):
class PhotoSize(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
return cls(**obj)
@ -621,7 +622,7 @@ class PhotoSize(JsonDeserializable):
class Audio(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
if 'thumb' in obj and 'file_id' in obj['thumb']:
obj['thumb'] = PhotoSize.de_json(obj['thumb'])
@ -645,7 +646,7 @@ class Audio(JsonDeserializable):
class Voice(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
if (json_string is None): return None
if json_string is None: return None
obj = cls.check_json(json_string)
return cls(**obj)
@ -682,7 +683,7 @@ class Video(JsonDeserializable):
def de_json(cls, json_string):
if json_string is None: return None
obj = cls.check_json(json_string)
if ('thumb' in obj and 'file_id' in obj['thumb']):
if 'thumb' in obj and 'file_id' in obj['thumb']:
obj['thumb'] = PhotoSize.de_json(obj['thumb'])
return cls(**obj)
@ -703,7 +704,7 @@ class VideoNote(JsonDeserializable):
def de_json(cls, json_string):
if json_string is None: return None
obj = cls.check_json(json_string)
if ('thumb' in obj and 'file_id' in obj['thumb']):
if 'thumb' in obj and 'file_id' in obj['thumb']:
obj['thumb'] = PhotoSize.de_json(obj['thumb'])
return cls(**obj)
@ -738,8 +739,8 @@ class Location(JsonDeserializable, JsonSerializable, Dictionaryable):
obj = cls.check_json(json_string)
return cls(**obj)
def __init__(self, longitude: float, latitude: float, horizontal_accuracy:float=None,
live_period: int=None, heading: int=None, proximity_alert_radius: int=None, **kwargs):
def __init__(self, longitude, latitude, horizontal_accuracy=None,
live_period=None, heading=None, proximity_alert_radius=None, **kwargs):
self.longitude: float = longitude
self.latitude: float = latitude
self.horizontal_accuracy: float = horizontal_accuracy
@ -766,7 +767,7 @@ class Venue(JsonDeserializable):
def de_json(cls, json_string):
if json_string is None: return None
obj = cls.check_json(json_string)
obj['location'] = Location.de_json(obj['location'])
obj['location'] = Location.de_json(obj.get('location'))
return cls(**obj)
def __init__(self, location, title, address, foursquare_id=None, foursquare_type=None,
@ -785,11 +786,12 @@ class UserProfilePhotos(JsonDeserializable):
def de_json(cls, json_string):
if json_string is None: return None
obj = cls.check_json(json_string)
if 'photos' in obj:
photos = [[PhotoSize.de_json(y) for y in x] for x in obj['photos']]
obj['photos'] = photos
return cls(**obj)
def __init__(self, total_count, photos, **kwargs):
def __init__(self, total_count, photos=None, **kwargs):
self.total_count: int = total_count
self.photos: List[PhotoSize] = photos
@ -860,7 +862,6 @@ class ReplyKeyboardMarkup(JsonSerializable):
if row_width is None:
row_width = self.row_width
if row_width > self.max_row_keys:
# Todo: Will be replaced with Exception in future releases
if not DISABLE_KEYLEN_ERROR:
@ -946,7 +947,7 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
keyboard = [[InlineKeyboardButton.de_json(button) for button in row] for row in obj['inline_keyboard']]
return cls(keyboard)
def __init__(self, keyboard=None ,row_width=3):
def __init__(self, keyboard=None, row_width=3):
"""
This object represents an inline keyboard that appears
right next to the message it belongs to.
@ -993,7 +994,7 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
def row(self, *args):
"""
Adds a list of InlineKeyboardButton to the keyboard.
This metod does not consider row_width.
This method does not consider row_width.
InlineKeyboardMarkup.row("A").row("B", "C").to_json() outputs:
'{keyboard: [["A"], ["B", "C"]]}'
@ -1257,8 +1258,6 @@ class InlineQuery(JsonDeserializable):
self.location: Location = location
class InputTextMessageContent(Dictionaryable):
def __init__(self, message_text, parse_mode=None, entities=None, disable_web_page_preview=None):
self.message_text: str = message_text
@ -1337,7 +1336,7 @@ class InputContactMessageContent(Dictionaryable):
self.vcard: str = vcard
def to_dict(self):
json_dict = {'phone_numbe': self.phone_number, 'first_name': self.first_name}
json_dict = {'phone_number': self.phone_number, 'first_name': self.first_name}
if self.last_name:
json_dict['last_name'] = self.last_name
if self.vcard:
@ -2043,7 +2042,10 @@ class Animation(JsonDeserializable):
def de_json(cls, json_string):
if (json_string is None): return None
obj = cls.check_json(json_string)
if 'thumb' in obj and 'file_id' in obj['thumb']:
obj["thumb"] = PhotoSize.de_json(obj['thumb'])
else:
obj['thumb'] = None
return cls(**obj)
def __init__(self, file_id, file_unique_id, width=None, height=None, duration=None,
@ -2122,10 +2124,10 @@ class OrderInfo(JsonDeserializable):
def de_json(cls, json_string):
if (json_string is None): return None
obj = cls.check_json(json_string)
obj['shipping_address'] = ShippingAddress.de_json(obj['shipping_address'])
obj['shipping_address'] = ShippingAddress.de_json(obj.get('shipping_address'))
return cls(**obj)
def __init__(self, name, phone_number, email, shipping_address, **kwargs):
def __init__(self, name=None, phone_number=None, email=None, shipping_address=None, **kwargs):
self.name: str = name
self.phone_number: str = phone_number
self.email: str = email
@ -2160,8 +2162,7 @@ class SuccessfulPayment(JsonDeserializable):
def de_json(cls, json_string):
if (json_string is None): return None
obj = cls.check_json(json_string)
if 'order_info' in obj:
obj['order_info'] = OrderInfo.de_json(obj['order_info'])
obj['order_info'] = OrderInfo.de_json(obj.get('order_info'))
return cls(**obj)
def __init__(self, currency, total_amount, invoice_payload, shipping_option_id=None, order_info=None,
@ -2197,8 +2198,7 @@ class PreCheckoutQuery(JsonDeserializable):
if (json_string is None): return None
obj = cls.check_json(json_string)
obj['from_user'] = User.de_json(obj.pop('from'))
if 'order_info' in obj:
obj['order_info'] = OrderInfo.de_json(obj['order_info'])
obj['order_info'] = OrderInfo.de_json(obj.get('order_info'))
return cls(**obj)
def __init__(self, id, from_user, currency, total_amount, invoice_payload, shipping_option_id=None, order_info=None, **kwargs):
@ -2473,6 +2473,7 @@ class PollAnswer(JsonSerializable, JsonDeserializable, Dictionaryable):
if (json_string is None): return None
obj = cls.check_json(json_string)
obj['user'] = User.de_json(obj['user'])
# Strange name, i think it should be `option_ids` not `options_ids` maybe replace that
obj['options_ids'] = obj.pop('option_ids')
return cls(**obj)
@ -2487,6 +2488,7 @@ class PollAnswer(JsonSerializable, JsonDeserializable, Dictionaryable):
def to_dict(self):
return {'poll_id': self.poll_id,
'user': self.user.to_dict(),
#should be `option_ids` not `options_ids` could cause problems here
'options_ids': self.options_ids}
@ -2498,7 +2500,7 @@ class ChatLocation(JsonSerializable, JsonDeserializable, Dictionaryable):
obj['location'] = Location.de_json(obj['location'])
return cls(**obj)
def __init__(self, location: Location, address: str, **kwargs):
def __init__(self, location, address, **kwargs):
self.location: Location = location
self.address: str = address
@ -2520,8 +2522,8 @@ class ChatInviteLink(JsonSerializable, JsonDeserializable, Dictionaryable):
obj['creator'] = User.de_json(obj['creator'])
return cls(**obj)
def __init__(self, invite_link: str, creator: User, is_primary: bool, is_revoked: bool,
expire_date: int=None, member_limit: int=None, **kwargs):
def __init__(self, invite_link, creator, is_primary, is_revoked,
expire_date=None, member_limit=None, **kwargs):
self.invite_link: str = invite_link
self.creator: User = creator
self.is_primary: bool = is_primary

View File

@ -6,7 +6,7 @@ import threading
import traceback
import warnings
import functools
from typing import Any, List, Dict
from typing import Any, List, Dict, Union
import queue as Queue
import logging
@ -36,6 +36,7 @@ content_type_service = [
'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message'
]
class WorkerThread(threading.Thread):
count = 0
@ -170,15 +171,19 @@ def async_dec():
def is_string(var):
return isinstance(var, str)
def is_dict(var):
return isinstance(var, dict)
def is_bytes(var):
return isinstance(var, bytes)
def is_pil_image(var):
return pil_imported and isinstance(var, Image.Image)
def pil_image_to_file(image, extension='JPEG', quality='web_low'):
if pil_imported:
photoBuffer = BytesIO()
@ -189,17 +194,18 @@ def pil_image_to_file(image, extension='JPEG', quality='web_low'):
else:
raise RuntimeError('PIL module is not imported')
def is_command(text: str) -> bool:
"""
Checks if `text` is a command. Telegram chat commands start with the '/' character.
:param text: Text to check.
:return: True if `text` is a command, else False.
"""
if (text is None): return False
if text is None: return False
return text.startswith('/')
def extract_command(text: str) -> str:
def extract_command(text: str) -> Union[str, None]:
"""
Extracts the command from `text` (minus the '/') if `text` is a command (see is_command).
If `text` is not a command, this function returns None.
@ -213,7 +219,7 @@ def extract_command(text: str) -> str:
:param text: String to extract the command from
:return: the command if `text` is a command (according to is_command), else None.
"""
if (text is None): return None
if text is None: return None
return text.split()[0].split('@')[0][1:] if is_command(text) else None
@ -229,7 +235,7 @@ def extract_arguments(text: str) -> str:
:param text: String to extract the arguments from a command
:return: the arguments if `text` is a command (according to is_command), else None.
"""
regexp = re.compile(r"/\w*(@\w*)*\s*([\s\S]*)",re.IGNORECASE)
regexp = re.compile(r"/\w*(@\w*)*\s*([\s\S]*)", re.IGNORECASE)
result = regexp.match(text)
return result.group(2) if is_command(text) else None
@ -247,16 +253,17 @@ def split_string(text: str, chars_per_string: int) -> List[str]:
def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]:
f"""
"""
Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string.
This is very useful for splitting one giant message into multiples.
If `chars_per_string` > {MAX_MESSAGE_LENGTH}: `chars_per_string` = {MAX_MESSAGE_LENGTH}.
If `chars_per_string` > 4096: `chars_per_string` = 4096.
Splits by '\n', '. ' or ' ' in exactly this priority.
:param text: The text to split
:param chars_per_string: The number of maximum characters per part the text is split to.
:return: The splitted text as a list of strings.
"""
def _text_before_last(substr: str) -> str:
return substr.join(part.split(substr)[:-1]) + substr
@ -270,9 +277,9 @@ def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str
part = text[:chars_per_string]
if ("\n" in part): part = _text_before_last("\n")
elif (". " in part): part = _text_before_last(". ")
elif (" " in part): part = _text_before_last(" ")
if "\n" in part: part = _text_before_last("\n")
elif ". " in part: part = _text_before_last(". ")
elif " " in part: part = _text_before_last(" ")
parts.append(part)
text = text[len(part):]
@ -296,7 +303,7 @@ def user_link(user: types.User, include_id: bool=False) -> str:
Attention: Don't forget to set parse_mode to 'HTML'!
Example:
bot.send_message(your_user_id, user_link(message.from_user) + ' startet the bot!', parse_mode='HTML')
bot.send_message(your_user_id, user_link(message.from_user) + ' started the bot!', parse_mode='HTML')
:param user: the user (not the user_id)
:param include_id: include the user_id
@ -333,6 +340,7 @@ def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.I
}
:param values: a dict containing all buttons to create in this format: {text: kwargs} {str:}
:param row_width: int row width
:return: InlineKeyboardMarkup
"""
markup = types.InlineKeyboardMarkup(row_width=row_width)
@ -363,8 +371,10 @@ def orify(e, changed_callback):
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)
def OrEvent(*events):
or_event = threading.Event()
def changed():
bools = [ev.is_set() for ev in events]
if any(bools):
@ -391,15 +401,18 @@ def per_thread(key, construct_value, reset=False):
return getattr(thread_local, key)
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
# https://stackoverflow.com/a/312464/9935473
for i in range(0, len(lst), n):
yield lst[i:i + n]
def generate_random_token():
return ''.join(random.sample(string.ascii_letters, 16))
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted

View File

@ -6,6 +6,7 @@ sys.path.append('../')
import time
import pytest
import os
from datetime import datetime, timedelta
import telebot
from telebot import types
@ -407,6 +408,23 @@ class TestTeleBot:
cn = tb.get_chat_members_count(GROUP_ID)
assert cn > 1
def test_export_chat_invite_link(self):
tb = telebot.TeleBot(TOKEN)
il = tb.export_chat_invite_link(GROUP_ID)
assert isinstance(il, str)
def test_create_revoke_detailed_chat_invite_link(self):
tb = telebot.TeleBot(TOKEN)
cil = tb.create_chat_invite_link(GROUP_ID,
(datetime.now() + timedelta(minutes=1)).timestamp(), member_limit=5)
assert isinstance(cil.invite_link, str)
assert cil.creator.id == tb.get_me().id
assert isinstance(cil.expire_date, (float, int))
assert cil.member_limit == 5
assert not cil.is_revoked
rcil = tb.revoke_chat_invite_link(GROUP_ID, cil.invite_link)
assert rcil.is_revoked
def test_edit_markup(self):
text = 'CI Test Message'
tb = telebot.TeleBot(TOKEN)

View File

@ -219,3 +219,14 @@ def test_KeyboardButtonPollType():
json_str = markup.to_json()
assert 'request_poll' in json_str
assert 'quiz' in json_str
def test_json_chat_invite_link():
json_string = r'{"invite_link": "https://t.me/joinchat/z-abCdEFghijKlMn", "creator": {"id": 329343347, "is_bot": false, "first_name": "Test", "username": "test_user", "last_name": "User", "language_code": "en"}, "is_primary": false, "is_revoked": false, "expire_date": 1624119999, "member_limit": 10}'
invite_link = types.ChatInviteLink.de_json(json_string)
assert invite_link.invite_link == 'https://t.me/joinchat/z-abCdEFghijKlMn'
assert isinstance(invite_link.creator, types.User)
assert not invite_link.is_primary
assert not invite_link.is_revoked
assert invite_link.expire_date == 1624119999
assert invite_link.member_limit == 10