diff --git a/README.md b/README.md
index 5eb412c..cfcc195 100644
--- a/README.md
+++ b/README.md
@@ -14,6 +14,7 @@
##
Supported Bot API version: 6.4!
+
## Contents
diff --git a/telebot/service_utils.py b/telebot/service_utils.py
new file mode 100644
index 0000000..d4c1eb2
--- /dev/null
+++ b/telebot/service_utils.py
@@ -0,0 +1,85 @@
+import random
+import string
+
+try:
+ # noinspection PyPackageRequirements
+ from PIL import Image
+ from io import BytesIO
+
+ pil_imported = True
+except:
+ pil_imported = False
+
+
+def is_string(var) -> bool:
+ """
+ Returns True if the given object is a string.
+ """
+ return isinstance(var, str)
+
+
+def is_dict(var) -> bool:
+ """
+ Returns True if the given object is a dictionary.
+
+ :param var: object to be checked
+ :type var: :obj:`object`
+
+ :return: True if the given object is a dictionary.
+ :rtype: :obj:`bool`
+ """
+ return isinstance(var, dict)
+
+
+def is_bytes(var) -> bool:
+ """
+ Returns True if the given object is a bytes object.
+
+ :param var: object to be checked
+ :type var: :obj:`object`
+
+ :return: True if the given object is a bytes object.
+ :rtype: :obj:`bool`
+ """
+ return isinstance(var, bytes)
+
+
+def is_pil_image(var) -> bool:
+ """
+ Returns True if the given object is a PIL.Image.Image object.
+
+ :param var: object to be checked
+ :type var: :obj:`object`
+
+ :return: True if the given object is a PIL.Image.Image object.
+ :rtype: :obj:`bool`
+ """
+ return pil_imported and isinstance(var, Image.Image)
+
+
+def pil_image_to_file(image, extension='JPEG', quality='web_low'):
+ if pil_imported:
+ photoBuffer = BytesIO()
+ image.convert('RGB').save(photoBuffer, extension, quality=quality)
+ photoBuffer.seek(0)
+
+ return photoBuffer
+ else:
+ raise RuntimeError('PIL module is not imported')
+
+
+def chunks(lst, n):
+ """Yield successive n-sized chunks from lst."""
+ # https://stackoverflow.com/a/312464/9935473
+ for i in range(0, len(lst), n):
+ yield lst[i:i + n]
+
+
+def generate_random_token() -> str:
+ """
+ Generates a random token consisting of letters and digits, 16 characters long.
+
+ :return: a random token
+ :rtype: :obj:`str`
+ """
+ return ''.join(random.sample(string.ascii_letters, 16))
diff --git a/telebot/types.py b/telebot/types.py
index 2938674..fa52e2a 100644
--- a/telebot/types.py
+++ b/telebot/types.py
@@ -12,7 +12,7 @@ try:
except ImportError:
import json
-from telebot import util
+from telebot import service_utils
DISABLE_KEYLEN_ERROR = False
@@ -87,9 +87,9 @@ class JsonDeserializable(object):
:param dict_copy: if dict is passed and it is changed outside - should be True!
:return: Dictionary parsed from json or original dict
"""
- if util.is_dict(json_type):
+ if service_utils.is_dict(json_type):
return json_type.copy() if dict_copy else json_type
- elif util.is_string(json_type):
+ elif service_utils.is_string(json_type):
return json.loads(json_type)
else:
raise ValueError("json_type should be a json dict or string.")
@@ -2156,12 +2156,12 @@ class ReplyKeyboardMarkup(JsonSerializable):
logger.error('Telegram does not support reply keyboard row width over %d.' % self.max_row_keys)
row_width = self.max_row_keys
- for row in util.chunks(args, row_width):
+ for row in service_utils.chunks(args, row_width):
button_array = []
for button in row:
- if util.is_string(button):
+ if service_utils.is_string(button):
button_array.append({'text': button})
- elif util.is_bytes(button):
+ elif service_utils.is_bytes(button):
button_array.append({'text': button.decode('utf-8')})
else:
button_array.append(button.to_dict())
@@ -2278,12 +2278,12 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
This object represents an inline keyboard that appears right next to the message it belongs to.
.. note::
- It is recommended to use :meth:`telebot.util.quick_markup` instead.
+ It is recommended to use :meth:`telebot.service_utils..quick_markup` instead.
.. code-block:: python3
:caption: Example of a custom keyboard with buttons.
- from telebot.util import quick_markup
+ from telebot.service_utils..import quick_markup
markup = quick_markup(
{'text': 'Press me', 'callback_data': 'press'},
@@ -2345,7 +2345,7 @@ class InlineKeyboardMarkup(Dictionaryable, JsonSerializable, JsonDeserializable)
logger.error('Telegram does not support inline keyboard row width over %d.' % self.max_row_keys)
row_width = self.max_row_keys
- for row in util.chunks(args, row_width):
+ for row in service_utils.chunks(args, row_width):
button_array = [button for button in row]
self.keyboard.append(button_array)
@@ -5786,11 +5786,11 @@ class InputMedia(Dictionaryable, JsonSerializable):
self.parse_mode: Optional[str] = parse_mode
self.caption_entities: Optional[List[MessageEntity]] = caption_entities
- if util.is_string(self.media):
+ if service_utils.is_string(self.media):
self._media_name = ''
self._media_dic = self.media
else:
- self._media_name = util.generate_random_token()
+ self._media_name = service_utils.generate_random_token()
self._media_dic = 'attach://{0}'.format(self._media_name)
def to_json(self):
@@ -5810,7 +5810,7 @@ class InputMedia(Dictionaryable, JsonSerializable):
"""
:meta private:
"""
- if util.is_string(self.media):
+ if service_utils.is_string(self.media):
return self.to_json(), None
return self.to_json(), {self._media_name: self.media}
@@ -5845,8 +5845,8 @@ class InputMediaPhoto(InputMedia):
:rtype: :class:`telebot.types.InputMediaPhoto`
"""
def __init__(self, media, caption=None, parse_mode=None, caption_entities=None, has_spoiler=None):
- if util.is_pil_image(media):
- media = util.pil_image_to_file(media)
+ if service_utils.is_pil_image(media):
+ media = service_utils.pil_image_to_file(media)
super(InputMediaPhoto, self).__init__(
type="photo", media=media, caption=caption, parse_mode=parse_mode, caption_entities=caption_entities)
diff --git a/telebot/util.py b/telebot/util.py
index e52ee83..3e76f19 100644
--- a/telebot/util.py
+++ b/telebot/util.py
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
-import random
import re
-import string
import threading
import traceback
from typing import Any, Callable, List, Dict, Optional, Union
@@ -14,21 +12,13 @@ import queue as Queue
import logging
from telebot import types
+from telebot.service_utils import is_pil_image, is_dict, is_string, is_bytes, chunks, generate_random_token, pil_image_to_file
try:
import ujson as json
except ImportError:
import json
-try:
- # noinspection PyPackageRequirements
- from PIL import Image
- from io import BytesIO
-
- pil_imported = True
-except:
- pil_imported = False
-
MAX_MESSAGE_LENGTH = 4096
logger = logging.getLogger('TeleBot')
@@ -44,7 +34,8 @@ content_type_media = [
#: Contains all service content types such as `User joined the group`.
content_type_service = [
- 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created',
+ 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo',
+ 'group_chat_created',
'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
'proximity_alert_triggered', 'video_chat_scheduled', 'video_chat_started', 'video_chat_ended',
'video_chat_participants_invited', 'message_auto_delete_timer_changed', 'forum_topic_created', 'forum_topic_closed',
@@ -129,6 +120,7 @@ class ThreadPool:
"""
:meta private:
"""
+
def __init__(self, telebot, num_threads=2):
self.telebot = telebot
self.tasks = Queue.Queue()
@@ -169,6 +161,7 @@ class AsyncTask:
"""
:meta private:
"""
+
def __init__(self, target, *args, **kwargs):
self.target = target
self.args = args
@@ -198,7 +191,8 @@ class CustomRequestResponse():
"""
:meta private:
"""
- def __init__(self, json_text, status_code = 200, reason = ""):
+
+ def __init__(self, json_text, status_code=200, reason=""):
self.status_code = status_code
self.text = json_text
self.reason = reason
@@ -211,6 +205,7 @@ def async_dec():
"""
:meta private:
"""
+
def decorator(fn):
def wrapper(*args, **kwargs):
return AsyncTask(fn, *args, **kwargs)
@@ -220,63 +215,6 @@ def async_dec():
return decorator
-def is_string(var) -> bool:
- """
- Returns True if the given object is a string.
- """
- return isinstance(var, str)
-
-
-def is_dict(var) -> bool:
- """
- Returns True if the given object is a dictionary.
-
- :param var: object to be checked
- :type var: :obj:`object`
-
- :return: True if the given object is a dictionary.
- :rtype: :obj:`bool`
- """
- return isinstance(var, dict)
-
-
-def is_bytes(var) -> bool:
- """
- Returns True if the given object is a bytes object.
-
- :param var: object to be checked
- :type var: :obj:`object`
-
- :return: True if the given object is a bytes object.
- :rtype: :obj:`bool`
- """
- return isinstance(var, bytes)
-
-
-def is_pil_image(var) -> bool:
- """
- Returns True if the given object is a PIL.Image.Image object.
-
- :param var: object to be checked
- :type var: :obj:`object`
-
- :return: True if the given object is a PIL.Image.Image object.
- :rtype: :obj:`bool`
- """
- return pil_imported and isinstance(var, Image.Image)
-
-
-def pil_image_to_file(image, extension='JPEG', quality='web_low'):
- if pil_imported:
- photoBuffer = BytesIO()
- image.convert('RGB').save(photoBuffer, extension, quality=quality)
- photoBuffer.seek(0)
-
- return photoBuffer
- else:
- raise RuntimeError('PIL module is not imported')
-
-
def is_command(text: str) -> bool:
r"""
Checks if `text` is a command. Telegram chat commands start with the '/' character.
@@ -353,7 +291,7 @@ def split_string(text: str, chars_per_string: int) -> List[str]:
return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)]
-def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]:
+def smart_split(text: str, chars_per_string: int = MAX_MESSAGE_LENGTH) -> List[str]:
r"""
Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string.
This is very useful for splitting one giant message into multiples.
@@ -383,9 +321,12 @@ def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str
part = text[:chars_per_string]
- if "\n" in part: part = _text_before_last("\n")
- elif ". " in part: part = _text_before_last(". ")
- elif " " in part: part = _text_before_last(" ")
+ if "\n" in part:
+ part = _text_before_last("\n")
+ elif ". " in part:
+ part = _text_before_last(". ")
+ elif " " in part:
+ part = _text_before_last(" ")
parts.append(part)
text = text[len(part):]
@@ -401,12 +342,12 @@ def escape(text: str) -> str:
chars = {"&": "&", "<": "<", ">": ">"}
if text is None:
return None
- for old, new in chars.items():
+ for old, new in chars.items():
text = text.replace(old, new)
return text
-def user_link(user: types.User, include_id: bool=False) -> str:
+def user_link(user: types.User, include_id: bool = False) -> str:
"""
Returns an HTML user link. This is useful for reports.
Attention: Don't forget to set parse_mode to 'HTML'!
@@ -433,10 +374,10 @@ def user_link(user: types.User, include_id: bool=False) -> str:
"""
name = escape(user.first_name)
return (f"{name}"
- + (f" ({user.id}
)" if include_id else ""))
+ + (f" ({user.id}
)" if include_id else ""))
-def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.InlineKeyboardMarkup:
+def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int = 2) -> types.InlineKeyboardMarkup:
"""
Returns a reply markup from a dict in this format: {'text': kwargs}
This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)'
@@ -551,24 +492,7 @@ def per_thread(key, construct_value, reset=False):
return getattr(thread_local, key)
-def chunks(lst, n):
- """Yield successive n-sized chunks from lst."""
- # https://stackoverflow.com/a/312464/9935473
- for i in range(0, len(lst), n):
- yield lst[i:i + n]
-
-
-def generate_random_token() -> str:
- """
- Generates a random token consisting of letters and digits, 16 characters long.
-
- :return: a random token
- :rtype: :obj:`str`
- """
- return ''.join(random.sample(string.ascii_letters, 16))
-
-
-def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecation_text=None):
+def deprecated(warn: bool = True, alternative: Optional[Callable] = None, deprecation_text=None):
"""
Use this decorator to mark functions as deprecated.
When the function is used, an info (or warning if `warn` is True) is logged.
@@ -586,6 +510,7 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio
:return: The decorated function
"""
+
def decorator(function):
def wrapper(*args, **kwargs):
info = f"`{function.__name__}` is deprecated."
@@ -598,7 +523,9 @@ def deprecated(warn: bool=True, alternative: Optional[Callable]=None, deprecatio
else:
logger.warning(info)
return function(*args, **kwargs)
+
return wrapper
+
return decorator
@@ -661,8 +588,8 @@ def antiflood(function: Callable, *args, **kwargs):
return function(*args, **kwargs)
else:
raise
-
-
+
+
def parse_web_app_data(token: str, raw_init_data: str):
"""
Parses web app data.
@@ -715,4 +642,16 @@ def validate_web_app_data(token: str, raw_init_data: str):
return hmac.new(secret_key.digest(), data_check_string.encode(), sha256).hexdigest() == init_data_hash
-
+
+__all__ = (
+ "content_type_media", "content_type_service", "update_types",
+ "WorkerThread", "AsyncTask", "CustomRequestResponse",
+ "async_dec", "deprecated",
+ "is_bytes", "is_string", "is_dict", "is_pil_image",
+ "chunks", "generate_random_token", "pil_image_to_file",
+ "is_command", "extract_command", "extract_arguments",
+ "split_string", "smart_split", "escape", "user_link", "quick_markup",
+ "antiflood", "parse_web_app_data", "validate_web_app_data",
+ "or_set", "or_clear", "orify", "OrEvent", "per_thread",
+ "webhook_google_functions"
+)