diff --git a/telebot/util.py b/telebot/util.py index 24f6b55..ebf7466 100644 --- a/telebot/util.py +++ b/telebot/util.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import gettext +import os import random import re import string @@ -21,6 +23,7 @@ try: # noinspection PyPackageRequirements from PIL import Image from io import BytesIO + pil_imported = True except: pil_imported = False @@ -32,23 +35,26 @@ logger = logging.getLogger('TeleBot') thread_local = threading.local() content_type_media = [ - 'text', 'audio', 'animation', 'document', 'photo', 'sticker', 'video', 'video_note', 'voice', 'contact', 'dice', 'poll', + 'text', 'audio', 'animation', 'document', 'photo', 'sticker', 'video', 'video_note', 'voice', 'contact', 'dice', + 'poll', 'venue', 'location' ] content_type_service = [ - 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created', + 'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', + 'group_chat_created', 'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message', - 'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended', + 'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended', 'voice_chat_participants_invited', 'message_auto_delete_timer_changed' ] update_types = [ - "update_id", "message", "edited_message", "channel_post", "edited_channel_post", "inline_query", - "chosen_inline_result", "callback_query", "shipping_query", "pre_checkout_query", "poll", "poll_answer", + "update_id", "message", "edited_message", "channel_post", "edited_channel_post", "inline_query", + "chosen_inline_result", "callback_query", "shipping_query", "pre_checkout_query", "poll", "poll_answer", "my_chat_member", "chat_member", "chat_join_request" ] + class WorkerThread(threading.Thread): count = 0 @@ -177,7 +183,7 @@ class AsyncTask: class CustomRequestResponse(): - def __init__(self, json_text, status_code = 200, reason = ""): + def __init__(self, json_text, status_code=200, reason=""): self.status_code = status_code self.text = json_text self.reason = reason @@ -217,7 +223,7 @@ def pil_image_to_file(image, extension='JPEG', quality='web_low'): photoBuffer = BytesIO() image.convert('RGB').save(photoBuffer, extension, quality=quality) photoBuffer.seek(0) - + return photoBuffer else: raise RuntimeError('PIL module is not imported') @@ -280,7 +286,7 @@ def split_string(text: str, chars_per_string: int) -> List[str]: return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)] -def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]: +def smart_split(text: str, chars_per_string: int = MAX_MESSAGE_LENGTH) -> List[str]: """ Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string. This is very useful for splitting one giant message into multiples. @@ -305,9 +311,12 @@ def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str part = text[:chars_per_string] - if "\n" in part: part = _text_before_last("\n") - elif ". " in part: part = _text_before_last(". ") - elif " " in part: part = _text_before_last(" ") + if "\n" in part: + part = _text_before_last("\n") + elif ". " in part: + part = _text_before_last(". ") + elif " " in part: + part = _text_before_last(" ") parts.append(part) text = text[len(part):] @@ -325,7 +334,7 @@ def escape(text: str) -> str: return text -def user_link(user: types.User, include_id: bool=False) -> str: +def user_link(user: types.User, include_id: bool = False) -> str: """ Returns an HTML user link. This is useful for reports. Attention: Don't forget to set parse_mode to 'HTML'! @@ -338,11 +347,11 @@ def user_link(user: types.User, include_id: bool=False) -> str: :return: HTML user link """ name = escape(user.first_name) - return (f"{name}" - + (f" (
{user.id}
)" if include_id else "")) + return (f"{name}" + + (f" (
{user.id}
)" if include_id else "")) -def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.InlineKeyboardMarkup: +def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int = 2) -> types.InlineKeyboardMarkup: """ Returns a reply markup from a dict in this format: {'text': kwargs} This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)' @@ -443,22 +452,26 @@ def generate_random_token(): return ''.join(random.sample(string.ascii_letters, 16)) -def deprecated(warn: bool=True, alternative: Optional[Callable]=None): +def deprecated(warn: bool = True, alternative: Optional[Callable] = None): """ Use this decorator to mark functions as deprecated. When the function is used, an info (or warning if `warn` is True) is logged. :param warn: If True a warning is logged else an info :param alternative: The new function to use instead """ + def decorator(function): def wrapper(*args, **kwargs): - info = f"`{function.__name__}` is deprecated." + (f" Use `{alternative.__name__}` instead" if alternative else "") + info = f"`{function.__name__}` is deprecated." + ( + f" Use `{alternative.__name__}` instead" if alternative else "") if not warn: logger.info(info) else: logger.warning(info) return function(*args, **kwargs) + return wrapper + return decorator @@ -477,6 +490,7 @@ def webhook_google_functions(bot, request): else: return 'Bot ON' + def antiflood(function, *args, **kwargs): """ Use this function inside loops in order to avoid getting TooManyRequests error. @@ -499,3 +513,68 @@ def antiflood(function, *args, **kwargs): msg = function(*args, **kwargs) finally: return msg + + +def find_translations(path, domain): + """ + Looks for translations with passed 'domain' in passed 'path' + """ + if not os.path.exists(path): + raise RuntimeError(f"Translations directory by path: {path!r} was not found") + + result = {} + + for name in os.listdir(path): + translations_path = os.path.join(path, name, 'LC_MESSAGES') + + if not os.path.isdir(translations_path): + continue + + po_file = os.path.join(translations_path, domain + '.po') + mo_file = po_file[:-2] + 'mo' + + if os.path.isfile(po_file) and not os.path.isfile(mo_file): + raise FileNotFoundError(f"Translations for: {name!r} were not compiled!") + + with open(mo_file, 'rb') as file: + result[name] = gettext.GNUTranslations(file) + + return result + + +class I18N: + """ + This class provides high-level tool for internationalization + It is based on gettext util. + """ + + def __init__(self, translations_path, domain_name: str): + self.path = translations_path + self.domain = domain_name + self.translations = find_translations(self.path, self.domain) + + @property + def available_translations(self): + return list(self.translations) + + def gettext(self, text: str, lang: str = None): + """ + Singular translations + """ + if not lang or lang not in self.translations: + return text + + translator = self.translations[lang] + return translator.gettext(text) + + def ngettext(self, singular: str, plural: str, lang: str = None, n=1): + """ + Plural translations + """ + if not lang or lang not in self.translations: + if n == 1: + return singular + return plural + + translator = self.translations[lang] + return translator.ngettext(singular, plural, n)