import asyncio # for future uses import aiohttp from telebot import types try: import ujson as json except ImportError: import json import os API_URL = 'https://api.telegram.org/bot{0}/{1}' from datetime import datetime import telebot from telebot import util, logger proxy = None session = None FILE_URL = None CONNECT_TIMEOUT = 15 READ_TIMEOUT = 30 LONG_POLLING_TIMEOUT = 10 # Should be positive, short polling should be used for testing purposes only (https://core.telegram.org/bots/api#getupdates) REQUEST_TIMEOUT = 10 MAX_RETRIES = 3 logger = telebot.logger REQUEST_LIMIT = 50 class SessionManager: def __init__(self) -> None: self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=REQUEST_LIMIT)) async def create_session(self): self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=REQUEST_LIMIT)) return self.session async def get_session(self): if self.session.closed: self.session = await self.create_session() # noinspection PyProtectedMember if not self.session._loop.is_running(): await self.session.close() self.session = await self.create_session() return self.session session_manager = SessionManager() async def _process_request(token, url, method='get', params=None, files=None, request_timeout=None): params = prepare_data(params, files) if request_timeout is None: request_timeout = REQUEST_TIMEOUT timeout = aiohttp.ClientTimeout(total=request_timeout) got_result = False current_try=0 session = await session_manager.get_session() while not got_result and current_try 0: ret = ret[:-1] return '[' + ret + ']' async def _convert_entites(entites): if entites is None: return None elif len(entites) == 0: return [] elif isinstance(entites[0], types.JsonSerializable): return [entity.to_json() for entity in entites] else: return entites async def _convert_poll_options(poll_options): if poll_options is None: return None elif len(poll_options) == 0: return [] elif isinstance(poll_options[0], str): # Compatibility mode with previous bug when only list of string was accepted as poll_options return poll_options elif isinstance(poll_options[0], types.PollOption): return [option.text for option in poll_options] else: return poll_options async def convert_input_media(media): if isinstance(media, types.InputMedia): return media.convert_input_media() return None, None async def convert_input_media_array(array): media = [] files = {} for input_media in array: if isinstance(input_media, types.InputMedia): media_dict = input_media.to_dict() if media_dict['media'].startswith('attach://'): key = media_dict['media'].replace('attach://', '') files[key] = input_media.media media.append(media_dict) return json.dumps(media), files async def _no_encode(func): def wrapper(key, val): if key == 'filename': return u'{0}={1}'.format(key, val) else: return func(key, val) return wrapper async def stop_poll(token, chat_id, message_id, reply_markup=None): method_url = r'stopPoll' payload = {'chat_id': str(chat_id), 'message_id': message_id} if reply_markup: payload['reply_markup'] = await _convert_markup(reply_markup) return await _process_request(token, method_url, params=payload) # exceptions class ApiException(Exception): """ This class represents a base Exception thrown when a call to the Telegram API fails. In addition to an informative message, it has a `function_name` and a `result` attribute, which respectively contain the name of the failed function and the returned result that made the function to be considered as failed. """ def __init__(self, msg, function_name, result): super(ApiException, self).__init__("A request to the Telegram API was unsuccessful. {0}".format(msg)) self.function_name = function_name self.result = result class ApiHTTPException(ApiException): """ This class represents an Exception thrown when a call to the Telegram API server returns HTTP code that is not 200. """ def __init__(self, function_name, result): super(ApiHTTPException, self).__init__( "The server returned HTTP {0} {1}. Response body:\n[{2}]" \ .format(result.status_code, result.reason, result), function_name, result) class ApiInvalidJSONException(ApiException): """ This class represents an Exception thrown when a call to the Telegram API server returns invalid json. """ def __init__(self, function_name, result): super(ApiInvalidJSONException, self).__init__( "The server returned an invalid JSON response. Response body:\n[{0}]" \ .format(result), function_name, result) class ApiTelegramException(ApiException): """ This class represents an Exception thrown when a Telegram API returns error code. """ def __init__(self, function_name, result, result_json): super(ApiTelegramException, self).__init__( "Error code: {0}. Description: {1}" \ .format(result_json['error_code'], result_json['description']), function_name, result) self.result_json = result_json self.error_code = result_json['error_code'] class RequestTimeout(Exception): """ This class represents a request timeout. """ pass