1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge pull request #1868 from Cub11k/master

Fix circular import
This commit is contained in:
Badiboy 2023-01-06 22:51:52 +03:00 committed by GitHub
commit a781929a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 113 deletions

View File

@ -14,6 +14,7 @@
## <p align="center">Supported Bot API version: <a href="https://core.telegram.org/bots/api#december-30-2022">6.4</a>! ## <p align="center">Supported Bot API version: <a href="https://core.telegram.org/bots/api#december-30-2022">6.4</a>!
<h2><a href='https://pytba.readthedocs.io/en/latest/index.html'>Official documentation</a></h2> <h2><a href='https://pytba.readthedocs.io/en/latest/index.html'>Official documentation</a></h2>
<h2><a href='https://pytba.readthedocs.io/ru/latest/index.html'>Official ru documentation</a></h2>
## Contents ## Contents

85
telebot/service_utils.py Normal file
View File

@ -0,0 +1,85 @@
import random
import string
try:
# noinspection PyPackageRequirements
from PIL import Image
from io import BytesIO
pil_imported = True
except:
pil_imported = False
def is_string(var) -> bool:
"""
Returns True if the given object is a string.
"""
return isinstance(var, str)
def is_dict(var) -> bool:
"""
Returns True if the given object is a dictionary.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a dictionary.
:rtype: :obj:`bool`
"""
return isinstance(var, dict)
def is_bytes(var) -> bool:
"""
Returns True if the given object is a bytes object.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a bytes object.
:rtype: :obj:`bool`
"""
return isinstance(var, bytes)
def is_pil_image(var) -> bool:
"""
Returns True if the given object is a PIL.Image.Image object.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a PIL.Image.Image object.
:rtype: :obj:`bool`
"""
return pil_imported and isinstance(var, Image.Image)
def pil_image_to_file(image, extension='JPEG', quality='web_low'):
if pil_imported:
photoBuffer = BytesIO()
image.convert('RGB').save(photoBuffer, extension, quality=quality)
photoBuffer.seek(0)
return photoBuffer
else:
raise RuntimeError('PIL module is not imported')
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
# https://stackoverflow.com/a/312464/9935473
for i in range(0, len(lst), n):
yield lst[i:i + n]
def generate_random_token() -> str:
"""
Generates a random token consisting of letters and digits, 16 characters long.
:return: a random token
:rtype: :obj:`str`
"""
return ''.join(random.sample(string.ascii_letters, 16))

View File

@ -12,7 +12,7 @@ try:
except ImportError: except ImportError:
import json import json
from telebot import util from telebot import service_utils
DISABLE_KEYLEN_ERROR = False DISABLE_KEYLEN_ERROR = False
@ -87,9 +87,9 @@ class JsonDeserializable(object):
:param dict_copy: if dict is passed and it is changed outside - should be True! :param dict_copy: if dict is passed and it is changed outside - should be True!
:return: Dictionary parsed from json or original dict :return: Dictionary parsed from json or original dict
""" """
if util.is_dict(json_type): if service_utils.is_dict(json_type):
return json_type.copy() if dict_copy else json_type return json_type.copy() if dict_copy else json_type
elif util.is_string(json_type): elif service_utils.is_string(json_type):
return json.loads(json_type) return json.loads(json_type)
else: else:
raise ValueError("json_type should be a json dict or string.") raise ValueError("json_type should be a json dict or string.")
@ -2156,12 +2156,12 @@ class ReplyKeyboardMarkup(JsonSerializable):
logger.error('Telegram does not support reply keyboard row width over %d.' % self.max_row_keys) logger.error('Telegram does not support reply keyboard row width over %d.' % self.max_row_keys)
row_width = self.max_row_keys row_width = self.max_row_keys
for row in util.chunks(args, row_width): for row in service_utils.chunks(args, row_width):
button_array = [] button_array = []
for button in row: for button in row:
if util.is_string(button): if service_utils.is_string(button):
button_array.append({'text': button}) button_array.append({'text': button})
elif util.is_bytes(button): elif service_utils.is_bytes(button):
button_array.append({'text': button.decode('utf-8')}) button_array.append({'text': button.decode('utf-8')})
else: else:
button_array.append(button.to_dict()) button_array.append(button.to_dict())
@ -2278,12 +2278,12 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
This object represents an inline keyboard that appears right next to the message it belongs to. This object represents an inline keyboard that appears right next to the message it belongs to.
.. note:: .. note::
It is recommended to use :meth:`telebot.util.quick_markup` instead. It is recommended to use :meth:`telebot.service_utils..quick_markup` instead.
.. code-block:: python3 .. code-block:: python3
:caption: Example of a custom keyboard with buttons. :caption: Example of a custom keyboard with buttons.
from telebot.util import quick_markup from telebot.service_utils..import quick_markup
markup = quick_markup( markup = quick_markup(
{'text': 'Press me', 'callback_data': 'press'}, {'text': 'Press me', 'callback_data': 'press'},
@ -2345,7 +2345,7 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
logger.error('Telegram does not support inline keyboard row width over %d.' % self.max_row_keys) logger.error('Telegram does not support inline keyboard row width over %d.' % self.max_row_keys)
row_width = self.max_row_keys row_width = self.max_row_keys
for row in util.chunks(args, row_width): for row in service_utils.chunks(args, row_width):
button_array = [button for button in row] button_array = [button for button in row]
self.keyboard.append(button_array) self.keyboard.append(button_array)
@ -5786,11 +5786,11 @@ class InputMedia(Dictionaryable, JsonSerializable):
self.parse_mode: Optional[str] = parse_mode self.parse_mode: Optional[str] = parse_mode
self.caption_entities: Optional[List[MessageEntity]] = caption_entities self.caption_entities: Optional[List[MessageEntity]] = caption_entities
if util.is_string(self.media): if service_utils.is_string(self.media):
self._media_name = '' self._media_name = ''
self._media_dic = self.media self._media_dic = self.media
else: else:
self._media_name = util.generate_random_token() self._media_name = service_utils.generate_random_token()
self._media_dic = 'attach://{0}'.format(self._media_name) self._media_dic = 'attach://{0}'.format(self._media_name)
def to_json(self): def to_json(self):
@ -5810,7 +5810,7 @@ class InputMedia(Dictionaryable, JsonSerializable):
""" """
:meta private: :meta private:
""" """
if util.is_string(self.media): if service_utils.is_string(self.media):
return self.to_json(), None return self.to_json(), None
return self.to_json(), {self._media_name: self.media} return self.to_json(), {self._media_name: self.media}
@ -5845,8 +5845,8 @@ class InputMediaPhoto(InputMedia):
:rtype: :class:`telebot.types.InputMediaPhoto` :rtype: :class:`telebot.types.InputMediaPhoto`
""" """
def __init__(self, media, caption=None, parse_mode=None, caption_entities=None, has_spoiler=None): def __init__(self, media, caption=None, parse_mode=None, caption_entities=None, has_spoiler=None):
if util.is_pil_image(media): if service_utils.is_pil_image(media):
media = util.pil_image_to_file(media) media = service_utils.pil_image_to_file(media)
super(InputMediaPhoto, self).__init__( super(InputMediaPhoto, self).__init__(
type="photo", media=media, caption=caption, parse_mode=parse_mode, caption_entities=caption_entities) type="photo", media=media, caption=caption, parse_mode=parse_mode, caption_entities=caption_entities)

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import random
import re import re
import string
import threading import threading
import traceback import traceback
from typing import Any, Callable, List, Dict, Optional, Union from typing import Any, Callable, List, Dict, Optional, Union
@ -14,21 +12,13 @@ import queue as Queue
import logging import logging
from telebot import types from telebot import types
from telebot.service_utils import is_pil_image, is_dict, is_string, is_bytes, chunks, generate_random_token, pil_image_to_file
try: try:
import ujson as json import ujson as json
except ImportError: except ImportError:
import json import json
try:
# noinspection PyPackageRequirements
from PIL import Image
from io import BytesIO
pil_imported = True
except:
pil_imported = False
MAX_MESSAGE_LENGTH = 4096 MAX_MESSAGE_LENGTH = 4096
logger = logging.getLogger('TeleBot') logger = logging.getLogger('TeleBot')
@ -44,7 +34,8 @@ content_type_media = [
#: Contains all service content types such as `User joined the group`. #: Contains all service content types such as `User joined the group`.
content_type_service = [ content_type_service = [
'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created', 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo',
'group_chat_created',
'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message', 'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
'proximity_alert_triggered', 'video_chat_scheduled', 'video_chat_started', 'video_chat_ended', 'proximity_alert_triggered', 'video_chat_scheduled', 'video_chat_started', 'video_chat_ended',
'video_chat_participants_invited', 'message_auto_delete_timer_changed', 'forum_topic_created', 'forum_topic_closed', 'video_chat_participants_invited', 'message_auto_delete_timer_changed', 'forum_topic_created', 'forum_topic_closed',
@ -129,6 +120,7 @@ class ThreadPool:
""" """
:meta private: :meta private:
""" """
def __init__(self, telebot, num_threads=2): def __init__(self, telebot, num_threads=2):
self.telebot = telebot self.telebot = telebot
self.tasks = Queue.Queue() self.tasks = Queue.Queue()
@ -169,6 +161,7 @@ class AsyncTask:
""" """
:meta private: :meta private:
""" """
def __init__(self, target, *args, **kwargs): def __init__(self, target, *args, **kwargs):
self.target = target self.target = target
self.args = args self.args = args
@ -198,7 +191,8 @@ class CustomRequestResponse():
""" """
:meta private: :meta private:
""" """
def __init__(self, json_text, status_code = 200, reason = ""):
def __init__(self, json_text, status_code=200, reason=""):
self.status_code = status_code self.status_code = status_code
self.text = json_text self.text = json_text
self.reason = reason self.reason = reason
@ -211,6 +205,7 @@ def async_dec():
""" """
:meta private: :meta private:
""" """
def decorator(fn): def decorator(fn):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
return AsyncTask(fn, *args, **kwargs) return AsyncTask(fn, *args, **kwargs)
@ -220,63 +215,6 @@ def async_dec():
return decorator return decorator
def is_string(var) -> bool:
"""
Returns True if the given object is a string.
"""
return isinstance(var, str)
def is_dict(var) -> bool:
"""
Returns True if the given object is a dictionary.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a dictionary.
:rtype: :obj:`bool`
"""
return isinstance(var, dict)
def is_bytes(var) -> bool:
"""
Returns True if the given object is a bytes object.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a bytes object.
:rtype: :obj:`bool`
"""
return isinstance(var, bytes)
def is_pil_image(var) -> bool:
"""
Returns True if the given object is a PIL.Image.Image object.
:param var: object to be checked
:type var: :obj:`object`
:return: True if the given object is a PIL.Image.Image object.
:rtype: :obj:`bool`
"""
return pil_imported and isinstance(var, Image.Image)
def pil_image_to_file(image, extension='JPEG', quality='web_low'):
if pil_imported:
photoBuffer = BytesIO()
image.convert('RGB').save(photoBuffer, extension, quality=quality)
photoBuffer.seek(0)
return photoBuffer
else:
raise RuntimeError('PIL module is not imported')
def is_command(text: str) -> bool: def is_command(text: str) -> bool:
r""" r"""
Checks if `text` is a command. Telegram chat commands start with the '/' character. Checks if `text` is a command. Telegram chat commands start with the '/' character.
@ -353,7 +291,7 @@ def split_string(text: str, chars_per_string: int) -> List[str]:
return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)] return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)]
def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]: def smart_split(text: str, chars_per_string: int = MAX_MESSAGE_LENGTH) -> List[str]:
r""" r"""
Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string. Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string.
This is very useful for splitting one giant message into multiples. This is very useful for splitting one giant message into multiples.
@ -383,9 +321,12 @@ def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str
part = text[:chars_per_string] part = text[:chars_per_string]
if "\n" in part: part = _text_before_last("\n") if "\n" in part:
elif ". " in part: part = _text_before_last(". ") part = _text_before_last("\n")
elif " " in part: part = _text_before_last(" ") elif ". " in part:
part = _text_before_last(". ")
elif " " in part:
part = _text_before_last(" ")
parts.append(part) parts.append(part)
text = text[len(part):] text = text[len(part):]
@ -401,12 +342,12 @@ def escape(text: str) -> str:
chars = {"&": "&amp;", "<": "&lt;", ">": "&gt;"} chars = {"&": "&amp;", "<": "&lt;", ">": "&gt;"}
if text is None: if text is None:
return None return None
for old, new in chars.items(): for old, new in chars.items():
text = text.replace(old, new) text = text.replace(old, new)
return text return text
def user_link(user: types.User, include_id: bool=False) -> str: def user_link(user: types.User, include_id: bool = False) -> str:
""" """
Returns an HTML user link. This is useful for reports. Returns an HTML user link. This is useful for reports.
Attention: Don't forget to set parse_mode to 'HTML'! Attention: Don't forget to set parse_mode to 'HTML'!
@ -433,10 +374,10 @@ def user_link(user: types.User, include_id: bool=False) -> str:
""" """
name = escape(user.first_name) name = escape(user.first_name)
return (f"<a href='tg://user?id={user.id}'>{name}</a>" return (f"<a href='tg://user?id={user.id}'>{name}</a>"
+ (f" (<pre>{user.id}</pre>)" if include_id else "")) + (f" (<pre>{user.id}</pre>)" if include_id else ""))
def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.InlineKeyboardMarkup: def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int = 2) -> types.InlineKeyboardMarkup:
""" """
Returns a reply markup from a dict in this format: {'text': kwargs} Returns a reply markup from a dict in this format: {'text': kwargs}
This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)' This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)'
@ -551,24 +492,7 @@ def per_thread(key, construct_value, reset=False):
return getattr(thread_local, key) return getattr(thread_local, key)
def chunks(lst, n): def deprecated(warn: bool = True, alternative: Optional[Callable] = None, deprecation_text=None):
"""Yield successive n-sized chunks from lst."""
# https://stackoverflow.com/a/312464/9935473
for i in range(0, len(lst), n):
yield lst[i:i + n]
def generate_random_token() -> str:
"""
Generates a random token consisting of letters and digits, 16 characters long.
:return: a random token
:rtype: :obj:`str`
"""
return ''.join(random.sample(string.ascii_letters, 16))
def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecation_text=None):
""" """
Use this decorator to mark functions as deprecated. Use this decorator to mark functions as deprecated.
When the function is used, an info (or warning if `warn` is True) is logged. When the function is used, an info (or warning if `warn` is True) is logged.
@ -586,6 +510,7 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio
:return: The decorated function :return: The decorated function
""" """
def decorator(function): def decorator(function):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
info = f"`{function.__name__}` is deprecated." info = f"`{function.__name__}` is deprecated."
@ -598,7 +523,9 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio
else: else:
logger.warning(info) logger.warning(info)
return function(*args, **kwargs) return function(*args, **kwargs)
return wrapper return wrapper
return decorator return decorator
@ -661,8 +588,8 @@ def antiflood(function: Callable, *args, **kwargs):
return function(*args, **kwargs) return function(*args, **kwargs)
else: else:
raise raise
def parse_web_app_data(token: str, raw_init_data: str): def parse_web_app_data(token: str, raw_init_data: str):
""" """
Parses web app data. Parses web app data.
@ -715,4 +642,16 @@ def validate_web_app_data(token: str, raw_init_data: str):
return hmac.new(secret_key.digest(), data_check_string.encode(), sha256).hexdigest() == init_data_hash return hmac.new(secret_key.digest(), data_check_string.encode(), sha256).hexdigest() == init_data_hash
__all__ = (
"content_type_media", "content_type_service", "update_types",
"WorkerThread", "AsyncTask", "CustomRequestResponse",
"async_dec", "deprecated",
"is_bytes", "is_string", "is_dict", "is_pil_image",
"chunks", "generate_random_token", "pil_image_to_file",
"is_command", "extract_command", "extract_arguments",
"split_string", "smart_split", "escape", "user_link", "quick_markup",
"antiflood", "parse_web_app_data", "validate_web_app_data",
"or_set", "or_clear", "orify", "OrEvent", "per_thread",
"webhook_google_functions"
)