diff --git a/telebot/_util.py b/telebot/_util.py new file mode 100644 index 0000000..d4c1eb2 --- /dev/null +++ b/telebot/_util.py @@ -0,0 +1,85 @@ +import random +import string + +try: + # noinspection PyPackageRequirements + from PIL import Image + from io import BytesIO + + pil_imported = True +except: + pil_imported = False + + +def is_string(var) -> bool: + """ + Returns True if the given object is a string. + """ + return isinstance(var, str) + + +def is_dict(var) -> bool: + """ + Returns True if the given object is a dictionary. + + :param var: object to be checked + :type var: :obj:`object` + + :return: True if the given object is a dictionary. + :rtype: :obj:`bool` + """ + return isinstance(var, dict) + + +def is_bytes(var) -> bool: + """ + Returns True if the given object is a bytes object. + + :param var: object to be checked + :type var: :obj:`object` + + :return: True if the given object is a bytes object. + :rtype: :obj:`bool` + """ + return isinstance(var, bytes) + + +def is_pil_image(var) -> bool: + """ + Returns True if the given object is a PIL.Image.Image object. + + :param var: object to be checked + :type var: :obj:`object` + + :return: True if the given object is a PIL.Image.Image object. + :rtype: :obj:`bool` + """ + return pil_imported and isinstance(var, Image.Image) + + +def pil_image_to_file(image, extension='JPEG', quality='web_low'): + if pil_imported: + photoBuffer = BytesIO() + image.convert('RGB').save(photoBuffer, extension, quality=quality) + photoBuffer.seek(0) + + return photoBuffer + else: + raise RuntimeError('PIL module is not imported') + + +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + # https://stackoverflow.com/a/312464/9935473 + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +def generate_random_token() -> str: + """ + Generates a random token consisting of letters and digits, 16 characters long. + + :return: a random token + :rtype: :obj:`str` + """ + return ''.join(random.sample(string.ascii_letters, 16)) diff --git a/telebot/types.py b/telebot/types.py index 2938674..6ab520e 100644 --- a/telebot/types.py +++ b/telebot/types.py @@ -12,7 +12,7 @@ try: except ImportError: import json -from telebot import util +from telebot import _util DISABLE_KEYLEN_ERROR = False @@ -87,9 +87,9 @@ class JsonDeserializable(object): :param dict_copy: if dict is passed and it is changed outside - should be True! :return: Dictionary parsed from json or original dict """ - if util.is_dict(json_type): + if _util.is_dict(json_type): return json_type.copy() if dict_copy else json_type - elif util.is_string(json_type): + elif _util.is_string(json_type): return json.loads(json_type) else: raise ValueError("json_type should be a json dict or string.") @@ -2156,12 +2156,12 @@ class ReplyKeyboardMarkup(JsonSerializable): logger.error('Telegram does not support reply keyboard row width over %d.' % self.max_row_keys) row_width = self.max_row_keys - for row in util.chunks(args, row_width): + for row in _util.chunks(args, row_width): button_array = [] for button in row: - if util.is_string(button): + if _util.is_string(button): button_array.append({'text': button}) - elif util.is_bytes(button): + elif _util.is_bytes(button): button_array.append({'text': button.decode('utf-8')}) else: button_array.append(button.to_dict()) @@ -2278,12 +2278,12 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable) This object represents an inline keyboard that appears right next to the message it belongs to. .. note:: - It is recommended to use :meth:`telebot.util.quick_markup` instead. + It is recommended to use :meth:`telebot._util.quick_markup` instead. .. code-block:: python3 :caption: Example of a custom keyboard with buttons. - from telebot.util import quick_markup + from telebot._util.import quick_markup markup = quick_markup( {'text': 'Press me', 'callback_data': 'press'}, @@ -2345,7 +2345,7 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable) logger.error('Telegram does not support inline keyboard row width over %d.' % self.max_row_keys) row_width = self.max_row_keys - for row in util.chunks(args, row_width): + for row in _util.chunks(args, row_width): button_array = [button for button in row] self.keyboard.append(button_array) @@ -5786,11 +5786,11 @@ class InputMedia(Dictionaryable, JsonSerializable): self.parse_mode: Optional[str] = parse_mode self.caption_entities: Optional[List[MessageEntity]] = caption_entities - if util.is_string(self.media): + if _util.is_string(self.media): self._media_name = '' self._media_dic = self.media else: - self._media_name = util.generate_random_token() + self._media_name = _util.generate_random_token() self._media_dic = 'attach://{0}'.format(self._media_name) def to_json(self): @@ -5810,7 +5810,7 @@ class InputMedia(Dictionaryable, JsonSerializable): """ :meta private: """ - if util.is_string(self.media): + if _util.is_string(self.media): return self.to_json(), None return self.to_json(), {self._media_name: self.media} @@ -5845,8 +5845,8 @@ class InputMediaPhoto(InputMedia): :rtype: :class:`telebot.types.InputMediaPhoto` """ def __init__(self, media, caption=None, parse_mode=None, caption_entities=None, has_spoiler=None): - if util.is_pil_image(media): - media = util.pil_image_to_file(media) + if _util.is_pil_image(media): + media = _util.pil_image_to_file(media) super(InputMediaPhoto, self).__init__( type="photo", media=media, caption=caption, parse_mode=parse_mode, caption_entities=caption_entities) diff --git a/telebot/util.py b/telebot/util.py index e52ee83..4c4efa9 100644 --- a/telebot/util.py +++ b/telebot/util.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -import random import re -import string import threading import traceback from typing import Any, Callable, List, Dict, Optional, Union @@ -14,21 +12,13 @@ import queue as Queue import logging from telebot import types +from telebot._util import is_pil_image, is_dict, is_string, is_bytes, chunks, generate_random_token, pil_image_to_file try: import ujson as json except ImportError: import json -try: - # noinspection PyPackageRequirements - from PIL import Image - from io import BytesIO - - pil_imported = True -except: - pil_imported = False - MAX_MESSAGE_LENGTH = 4096 logger = logging.getLogger('TeleBot') @@ -44,7 +34,8 @@ content_type_media = [ #: Contains all service content types such as `User joined the group`. content_type_service = [ - 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created', + 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', + 'group_chat_created', 'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message', 'proximity_alert_triggered', 'video_chat_scheduled', 'video_chat_started', 'video_chat_ended', 'video_chat_participants_invited', 'message_auto_delete_timer_changed', 'forum_topic_created', 'forum_topic_closed', @@ -129,6 +120,7 @@ class ThreadPool: """ :meta private: """ + def __init__(self, telebot, num_threads=2): self.telebot = telebot self.tasks = Queue.Queue() @@ -169,6 +161,7 @@ class AsyncTask: """ :meta private: """ + def __init__(self, target, *args, **kwargs): self.target = target self.args = args @@ -198,7 +191,8 @@ class CustomRequestResponse(): """ :meta private: """ - def __init__(self, json_text, status_code = 200, reason = ""): + + def __init__(self, json_text, status_code=200, reason=""): self.status_code = status_code self.text = json_text self.reason = reason @@ -211,6 +205,7 @@ def async_dec(): """ :meta private: """ + def decorator(fn): def wrapper(*args, **kwargs): return AsyncTask(fn, *args, **kwargs) @@ -220,63 +215,6 @@ def async_dec(): return decorator -def is_string(var) -> bool: - """ - Returns True if the given object is a string. - """ - return isinstance(var, str) - - -def is_dict(var) -> bool: - """ - Returns True if the given object is a dictionary. - - :param var: object to be checked - :type var: :obj:`object` - - :return: True if the given object is a dictionary. - :rtype: :obj:`bool` - """ - return isinstance(var, dict) - - -def is_bytes(var) -> bool: - """ - Returns True if the given object is a bytes object. - - :param var: object to be checked - :type var: :obj:`object` - - :return: True if the given object is a bytes object. - :rtype: :obj:`bool` - """ - return isinstance(var, bytes) - - -def is_pil_image(var) -> bool: - """ - Returns True if the given object is a PIL.Image.Image object. - - :param var: object to be checked - :type var: :obj:`object` - - :return: True if the given object is a PIL.Image.Image object. - :rtype: :obj:`bool` - """ - return pil_imported and isinstance(var, Image.Image) - - -def pil_image_to_file(image, extension='JPEG', quality='web_low'): - if pil_imported: - photoBuffer = BytesIO() - image.convert('RGB').save(photoBuffer, extension, quality=quality) - photoBuffer.seek(0) - - return photoBuffer - else: - raise RuntimeError('PIL module is not imported') - - def is_command(text: str) -> bool: r""" Checks if `text` is a command. Telegram chat commands start with the '/' character. @@ -353,7 +291,7 @@ def split_string(text: str, chars_per_string: int) -> List[str]: return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)] -def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]: +def smart_split(text: str, chars_per_string: int = MAX_MESSAGE_LENGTH) -> List[str]: r""" Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string. This is very useful for splitting one giant message into multiples. @@ -383,9 +321,12 @@ def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str part = text[:chars_per_string] - if "\n" in part: part = _text_before_last("\n") - elif ". " in part: part = _text_before_last(". ") - elif " " in part: part = _text_before_last(" ") + if "\n" in part: + part = _text_before_last("\n") + elif ". " in part: + part = _text_before_last(". ") + elif " " in part: + part = _text_before_last(" ") parts.append(part) text = text[len(part):] @@ -401,12 +342,12 @@ def escape(text: str) -> str: chars = {"&": "&", "<": "<", ">": ">"} if text is None: return None - for old, new in chars.items(): + for old, new in chars.items(): text = text.replace(old, new) return text -def user_link(user: types.User, include_id: bool=False) -> str: +def user_link(user: types.User, include_id: bool = False) -> str: """ Returns an HTML user link. This is useful for reports. Attention: Don't forget to set parse_mode to 'HTML'! @@ -433,10 +374,10 @@ def user_link(user: types.User, include_id: bool=False) -> str: """ name = escape(user.first_name) return (f"{name}" - + (f" (
{user.id}
)" if include_id else "")) + + (f" (
{user.id}
)" if include_id else "")) -def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.InlineKeyboardMarkup: +def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int = 2) -> types.InlineKeyboardMarkup: """ Returns a reply markup from a dict in this format: {'text': kwargs} This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)' @@ -551,24 +492,7 @@ def per_thread(key, construct_value, reset=False): return getattr(thread_local, key) -def chunks(lst, n): - """Yield successive n-sized chunks from lst.""" - # https://stackoverflow.com/a/312464/9935473 - for i in range(0, len(lst), n): - yield lst[i:i + n] - - -def generate_random_token() -> str: - """ - Generates a random token consisting of letters and digits, 16 characters long. - - :return: a random token - :rtype: :obj:`str` - """ - return ''.join(random.sample(string.ascii_letters, 16)) - - -def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecation_text=None): +def deprecated(warn: bool = True, alternative: Optional[Callable] = None, deprecation_text=None): """ Use this decorator to mark functions as deprecated. When the function is used, an info (or warning if `warn` is True) is logged. @@ -586,6 +510,7 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio :return: The decorated function """ + def decorator(function): def wrapper(*args, **kwargs): info = f"`{function.__name__}` is deprecated." @@ -598,7 +523,9 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio else: logger.warning(info) return function(*args, **kwargs) + return wrapper + return decorator @@ -661,8 +588,8 @@ def antiflood(function: Callable, *args, **kwargs): return function(*args, **kwargs) else: raise - - + + def parse_web_app_data(token: str, raw_init_data: str): """ Parses web app data. @@ -715,4 +642,16 @@ def validate_web_app_data(token: str, raw_init_data: str): return hmac.new(secret_key.digest(), data_check_string.encode(), sha256).hexdigest() == init_data_hash - + +__all__ = ( + "content_type_media", "content_type_service", "update_types", + "WorkerThread", "AsyncTask", "CustomRequestResponse", + "async_dec", "deprecated", + "is_bytes", "is_string", "is_dict", "is_pil_image", + "chunks", "generate_random_token", "pil_image_to_file", + "is_command", "extract_command", "extract_arguments", + "split_string", "smart_split", "escape", "user_link", "quick_markup", + "antiflood", "parse_web_app_data", "validate_web_app_data", + "or_set", "or_clear", "orify", "OrEvent", "per_thread", + "webhook_google_functions" +)