1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge branch 'eternnoir:master' into master

This commit is contained in:
_run 2022-07-13 21:52:57 +05:00 committed by GitHub
commit 2fcfdc2584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 516 additions and 59 deletions

View File

@ -0,0 +1,45 @@
#!/usr/bin/python
# This is a simple echo bot using the decorator mechanism.
# It echoes any incoming text messages.
# Example on built-in function to receive and process webhooks.
from telebot.async_telebot import AsyncTeleBot
import asyncio
bot = AsyncTeleBot('TOKEN')
WEBHOOK_SSL_CERT = './webhook_cert.pem' # Path to the ssl certificate
WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
DOMAIN = '1.2.3.4' # either domain, or ip address of vps
# Quick'n'dirty SSL certificate generation:
#
# openssl genrsa -out webhook_pkey.pem 2048
# openssl req -new -x509 -days 3650 -key webhook_pkey.pem -out webhook_cert.pem
#
# When asked for "Common Name (e.g. server FQDN or YOUR name)" you should reply
# with the same value in you put in WEBHOOK_HOST
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
async def send_welcome(message):
await bot.reply_to(message, """\
Hi there, I am EchoBot.
I am here to echo your kind words back to you. Just say anything nice and I'll say the exact same thing to you!\
""")
# Handle all other messages with content_type 'text' (content_types defaults to ['text'])
@bot.message_handler(func=lambda message: True)
async def echo_message(message):
await bot.reply_to(message, message.text)
# it uses fastapi + uvicorn
asyncio.run(bot.run_webhooks(
listen=DOMAIN,
certificate=WEBHOOK_SSL_CERT,
certificate_key=WEBHOOK_SSL_PRIV
))

View File

@ -0,0 +1,45 @@
#!/usr/bin/python
# This is a simple echo bot using the decorator mechanism.
# It echoes any incoming text messages.
# Example on built-in function to receive and process webhooks.
import telebot
API_TOKEN = 'TOKEN'
bot = telebot.TeleBot(API_TOKEN)
WEBHOOK_SSL_CERT = './webhook_cert.pem' # Path to the ssl certificate
WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
DOMAIN = '1.2.3.4' # either domain, or ip address of vps
# Quick'n'dirty SSL certificate generation:
#
# openssl genrsa -out webhook_pkey.pem 2048
# openssl req -new -x509 -days 3650 -key webhook_pkey.pem -out webhook_cert.pem
#
# When asked for "Common Name (e.g. server FQDN or YOUR name)" you should reply
# with the same value in you put in WEBHOOK_HOST
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
def send_welcome(message):
bot.reply_to(message, """\
Hi there, I am EchoBot.
I am here to echo your kind words back to you. Just say anything nice and I'll say the exact same thing to you!\
""")
# Handle all other messages with content_type 'text' (content_types defaults to ['text'])
@bot.message_handler(func=lambda message: True)
def echo_message(message):
bot.reply_to(message, message.text)
bot.run_webhooks(
listen=DOMAIN,
certificate=WEBHOOK_SSL_CERT,
certificate_key=WEBHOOK_SSL_PRIV
)

View File

@ -27,7 +27,10 @@ setup(name='pyTelegramBotAPI',
'json': 'ujson', 'json': 'ujson',
'PIL': 'Pillow', 'PIL': 'Pillow',
'redis': 'redis>=3.4.1', 'redis': 'redis>=3.4.1',
'aioredis': 'aioredis',
'aiohttp': 'aiohttp', 'aiohttp': 'aiohttp',
'fastapi': 'fastapi',
'uvicorn': 'uvicorn',
}, },
classifiers=[ classifiers=[
'Development Status :: 5 - Production/Stable', 'Development Status :: 5 - Production/Stable',

View File

@ -9,7 +9,6 @@ import time
import traceback import traceback
from typing import Any, Callable, List, Optional, Union from typing import Any, Callable, List, Optional, Union
# these imports are used to avoid circular import error # these imports are used to avoid circular import error
import telebot.util import telebot.util
import telebot.types import telebot.types
@ -17,7 +16,11 @@ import telebot.types
# storage # storage
from telebot.storage import StatePickleStorage, StateMemoryStorage from telebot.storage import StatePickleStorage, StateMemoryStorage
# random module to generate random string
import random
import string
import ssl
logger = logging.getLogger('TeleBot') logger = logging.getLogger('TeleBot')
@ -87,7 +90,6 @@ class TeleBot:
See more examples in examples/ directory: See more examples in examples/ directory:
https://github.com/eternnoir/pyTelegramBotAPI/tree/master/examples https://github.com/eternnoir/pyTelegramBotAPI/tree/master/examples
""" """
def __init__( def __init__(
@ -159,13 +161,11 @@ class TeleBot:
} }
self.default_middleware_handlers = [] self.default_middleware_handlers = []
if apihelper.ENABLE_MIDDLEWARE and use_class_middlewares: if apihelper.ENABLE_MIDDLEWARE and use_class_middlewares:
logger.warning( self.typed_middleware_handlers = None
'You are using class based middlewares, but you have ' logger.error(
'ENABLE_MIDDLEWARE set to True. This is not recommended.' 'You are using class based middlewares while having ENABLE_MIDDLEWARE set to True. This is not recommended.'
) )
self.middlewares = [] if use_class_middlewares else None self.middlewares = [] if use_class_middlewares else None
self.threaded = threaded self.threaded = threaded
if self.threaded: if self.threaded:
self.worker_pool = util.ThreadPool(self, num_threads=num_threads) self.worker_pool = util.ThreadPool(self, num_threads=num_threads)
@ -293,6 +293,81 @@ class TeleBot:
return apihelper.set_webhook(self.token, url, certificate, max_connections, allowed_updates, ip_address, return apihelper.set_webhook(self.token, url, certificate, max_connections, allowed_updates, ip_address,
drop_pending_updates, timeout, secret_token) drop_pending_updates, timeout, secret_token)
def run_webhooks(self,
listen: Optional[str]="127.0.0.1",
port: Optional[int]=443,
url_path: Optional[str]=None,
certificate: Optional[str]=None,
certificate_key: Optional[str]=None,
webhook_url: Optional[str]=None,
max_connections: Optional[int]=None,
allowed_updates: Optional[List]=None,
ip_address: Optional[str]=None,
drop_pending_updates: Optional[bool] = None,
timeout: Optional[int]=None,
secret_token: Optional[str]=None,
secret_token_length: Optional[int]=20,
debug: Optional[bool]=False):
"""
This class sets webhooks and listens to a given url and port.
:param listen: IP address to listen to. Defaults to
0.0.0.0
:param port: A port which will be used to listen to webhooks.
:param url_path: Path to the webhook. Defaults to /token
:param certificate: Path to the certificate file.
:param certificate_key: Path to the certificate key file.
:param webhook_url: Webhook URL.
:param max_connections: Maximum allowed number of simultaneous HTTPS connections to the webhook for update delivery, 1-100. Defaults to 40. Use lower values to limit the load on your bot's server, and higher values to increase your bot's throughput.
:param allowed_updates: A JSON-serialized list of the update types you want your bot to receive. For example, specify [message, edited_channel_post, callback_query] to only receive updates of these types. See Update for a complete list of available update types. Specify an empty list to receive all updates regardless of type (default). If not specified, the previous setting will be used.
:param ip_address: The fixed IP address which will be used to send webhook requests instead of the IP address resolved through DNS
:param drop_pending_updates: Pass True to drop all pending updates
:param timeout: Integer. Request connection timeout
:param secret_token: Secret token to be used to verify the webhook request.
:param secret_token_length:
:param debug:
:return:
"""
# generate secret token if not set
if not secret_token:
secret_token = ''.join(random.choices(string.ascii_uppercase + string.digits, k=secret_token_length))
if not url_path:
url_path = self.token + '/'
if url_path[-1] != '/': url_path += '/'
protocol = "https" if certificate else "http"
if not webhook_url:
webhook_url = "{}://{}:{}/{}".format(protocol, listen, port, url_path)
if certificate and certificate_key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(certificate, certificate_key)
# open certificate if it exists
cert_file = open(certificate, 'rb') if certificate else None
self.set_webhook(
url=webhook_url,
certificate=cert_file,
max_connections=max_connections,
allowed_updates=allowed_updates,
ip_address=ip_address,
drop_pending_updates=drop_pending_updates,
timeout=timeout,
secret_token=secret_token
)
if cert_file: cert_file.close()
ssl_context = (certificate, certificate_key) if certificate else (None, None)
# webhooks module
try:
from telebot.ext.sync import SyncWebhookListener
except (NameError, ImportError):
raise ImportError("Please install uvicorn and fastapi in order to use `run_webhooks` method.")
self.webhook_listener = SyncWebhookListener(self, secret_token, listen, port, ssl_context, '/'+url_path, debug)
self.webhook_listener.run_app()
def delete_webhook(self, drop_pending_updates=None, timeout=None): def delete_webhook(self, drop_pending_updates=None, timeout=None):
""" """
Use this method to remove webhook integration if you decide to switch back to getUpdates. Use this method to remove webhook integration if you decide to switch back to getUpdates.
@ -389,7 +464,7 @@ class TeleBot:
new_chat_join_request = None new_chat_join_request = None
for update in updates: for update in updates:
if apihelper.ENABLE_MIDDLEWARE: if apihelper.ENABLE_MIDDLEWARE and not self.use_class_middlewares:
try: try:
self.process_middlewares(update) self.process_middlewares(update)
except Exception as e: except Exception as e:
@ -400,7 +475,6 @@ class TeleBot:
if update.update_id > self.last_update_id: self.last_update_id = update.update_id if update.update_id > self.last_update_id: self.last_update_id = update.update_id
continue continue
if update.update_id > self.last_update_id: if update.update_id > self.last_update_id:
self.last_update_id = update.update_id self.last_update_id = update.update_id
if update.message: if update.message:
@ -521,6 +595,7 @@ class TeleBot:
self._notify_command_handlers(self.chat_join_request_handlers, chat_join_request, 'chat_join_request') self._notify_command_handlers(self.chat_join_request_handlers, chat_join_request, 'chat_join_request')
def process_middlewares(self, update): def process_middlewares(self, update):
if self.typed_middleware_handlers:
for update_type, middlewares in self.typed_middleware_handlers.items(): for update_type, middlewares in self.typed_middleware_handlers.items():
if getattr(update, update_type) is not None: if getattr(update, update_type) is not None:
for typed_middleware_handler in middlewares: for typed_middleware_handler in middlewares:
@ -530,7 +605,7 @@ class TeleBot:
e.args = e.args + (f'Typed middleware handler "{typed_middleware_handler.__qualname__}"',) e.args = e.args + (f'Typed middleware handler "{typed_middleware_handler.__qualname__}"',)
raise raise
if len(self.default_middleware_handlers) > 0: if self.default_middleware_handlers:
for default_middleware_handler in self.default_middleware_handlers: for default_middleware_handler in self.default_middleware_handlers:
try: try:
default_middleware_handler(self, update) default_middleware_handler(self, update)
@ -1432,9 +1507,6 @@ class TeleBot:
allow_sending_without_reply: Optional[bool]=None, allow_sending_without_reply: Optional[bool]=None,
protect_content: Optional[bool]=None) -> types.Message: protect_content: Optional[bool]=None) -> types.Message:
""" """
As of v.4.0, Telegram clients support rounded square mp4 videos of up to 1 minute long. Use this method to send
video messages.
Telegram documentation: https://core.telegram.org/bots/api#sendvideonote Telegram documentation: https://core.telegram.org/bots/api#sendvideonote
:param chat_id: Integer : Unique identifier for the message recipient User or GroupChat id :param chat_id: Integer : Unique identifier for the message recipient User or GroupChat id
@ -1655,7 +1727,6 @@ class TeleBot:
:param protect_content: :param protect_content:
:return: :return:
""" """
return types.Message.de_json( return types.Message.de_json(
apihelper.send_contact( apihelper.send_contact(
self.token, chat_id, phone_number, first_name, last_name, vcard, self.token, chat_id, phone_number, first_name, last_name, vcard,
@ -2071,11 +2142,9 @@ class TeleBot:
:param chat_id: Unique identifier for the target private chat. :param chat_id: Unique identifier for the target private chat.
If not specified, default bot's menu button will be changed. If not specified, default bot's menu button will be changed.
:param menu_button: A JSON-serialized object for the new bot's menu button. Defaults to MenuButtonDefault :param menu_button: A JSON-serialized object for the new bot's menu button. Defaults to MenuButtonDefault
""" """
return apihelper.set_chat_menu_button(self.token, chat_id, menu_button) return apihelper.set_chat_menu_button(self.token, chat_id, menu_button)
def get_chat_menu_button(self, chat_id: Union[int, str]=None) -> types.MenuButton: def get_chat_menu_button(self, chat_id: Union[int, str]=None) -> types.MenuButton:
""" """
Use this method to get the current value of the bot's menu button Use this method to get the current value of the bot's menu button
@ -2087,11 +2156,9 @@ class TeleBot:
:param chat_id: Unique identifier for the target private chat. :param chat_id: Unique identifier for the target private chat.
If not specified, default bot's menu button will be returned. If not specified, default bot's menu button will be returned.
:return: types.MenuButton :return: types.MenuButton
""" """
return types.MenuButton.de_json(apihelper.get_chat_menu_button(self.token, chat_id)) return types.MenuButton.de_json(apihelper.get_chat_menu_button(self.token, chat_id))
def set_my_default_administrator_rights(self, rights: types.ChatAdministratorRights=None, def set_my_default_administrator_rights(self, rights: types.ChatAdministratorRights=None,
for_channels: bool=None) -> bool: for_channels: bool=None) -> bool:
""" """
@ -2106,10 +2173,8 @@ class TeleBot:
:param rights: A JSON-serialized object describing new default administrator rights. If not specified, the default administrator rights will be cleared. :param rights: A JSON-serialized object describing new default administrator rights. If not specified, the default administrator rights will be cleared.
:param for_channels: Pass True to change the default administrator rights of the bot in channels. Otherwise, the default administrator rights of the bot for groups and supergroups will be changed. :param for_channels: Pass True to change the default administrator rights of the bot in channels. Otherwise, the default administrator rights of the bot for groups and supergroups will be changed.
""" """
return apihelper.set_my_default_administrator_rights(self.token, rights, for_channels) return apihelper.set_my_default_administrator_rights(self.token, rights, for_channels)
def get_my_default_administrator_rights(self, for_channels: bool=None) -> types.ChatAdministratorRights: def get_my_default_administrator_rights(self, for_channels: bool=None) -> types.ChatAdministratorRights:
""" """
Use this method to get the current default administrator rights of the bot. Use this method to get the current default administrator rights of the bot.
@ -2120,10 +2185,8 @@ class TeleBot:
:param for_channels: Pass True to get the default administrator rights of the bot in channels. Otherwise, the default administrator rights of the bot for groups and supergroups will be returned. :param for_channels: Pass True to get the default administrator rights of the bot in channels. Otherwise, the default administrator rights of the bot for groups and supergroups will be returned.
:return: types.ChatAdministratorRights :return: types.ChatAdministratorRights
""" """
return types.ChatAdministratorRights.de_json(apihelper.get_my_default_administrator_rights(self.token, for_channels)) return types.ChatAdministratorRights.de_json(apihelper.get_my_default_administrator_rights(self.token, for_channels))
def set_my_commands(self, commands: List[types.BotCommand], def set_my_commands(self, commands: List[types.BotCommand],
scope: Optional[types.BotCommandScope]=None, scope: Optional[types.BotCommandScope]=None,
language_code: Optional[str]=None) -> bool: language_code: Optional[str]=None) -> bool:
@ -2464,7 +2527,6 @@ class TeleBot:
max_tip_amount, suggested_tip_amounts, protect_content) max_tip_amount, suggested_tip_amounts, protect_content)
return types.Message.de_json(result) return types.Message.de_json(result)
def create_invoice_link(self, def create_invoice_link(self,
title: str, description: str, payload:str, provider_token: str, title: str, description: str, payload:str, provider_token: str,
currency: str, prices: List[types.LabeledPrice], currency: str, prices: List[types.LabeledPrice],
@ -2525,8 +2587,6 @@ class TeleBot:
send_email_to_provider, is_flexible) send_email_to_provider, is_flexible)
return result return result
# noinspection PyShadowingBuiltins # noinspection PyShadowingBuiltins
# TODO: rewrite this method like in API # TODO: rewrite this method like in API
def send_poll( def send_poll(
@ -2853,11 +2913,9 @@ class TeleBot:
:param result: A JSON-serialized object describing the message to be sent :param result: A JSON-serialized object describing the message to be sent
:return: :return:
""" """
return apihelper.answer_web_app_query(self.token, web_app_query_id, result) return apihelper.answer_web_app_query(self.token, web_app_query_id, result)
def register_for_reply( def register_for_reply(self, message: types.Message, callback: Callable, *args, **kwargs) -> None:
self, message: types.Message, callback: Callable, *args, **kwargs) -> None:
""" """
Registers a callback function to be notified when a reply to `message` arrives. Registers a callback function to be notified when a reply to `message` arrives.
@ -2897,8 +2955,7 @@ class TeleBot:
for handler in handlers: for handler in handlers:
self._exec_task(handler["callback"], message, *handler["args"], **handler["kwargs"]) self._exec_task(handler["callback"], message, *handler["args"], **handler["kwargs"])
def register_next_step_handler( def register_next_step_handler(self, message: types.Message, callback: Callable, *args, **kwargs) -> None:
self, message: types.Message, callback: Callable, *args, **kwargs) -> None:
""" """
Registers a callback function to be notified when new message arrives after `message`. Registers a callback function to be notified when new message arrives after `message`.
@ -2912,7 +2969,6 @@ class TeleBot:
chat_id = message.chat.id chat_id = message.chat.id
self.register_next_step_handler_by_chat_id(chat_id, callback, *args, **kwargs) self.register_next_step_handler_by_chat_id(chat_id, callback, *args, **kwargs)
def setup_middleware(self, middleware: BaseMiddleware): def setup_middleware(self, middleware: BaseMiddleware):
""" """
Register middleware Register middleware
@ -2925,8 +2981,6 @@ class TeleBot:
return return
self.middlewares.append(middleware) self.middlewares.append(middleware)
def set_state(self, user_id: int, state: Union[int, str, State], chat_id: int=None) -> None: def set_state(self, user_id: int, state: Union[int, str, State], chat_id: int=None) -> None:
""" """
Sets a new state of a user. Sets a new state of a user.
@ -2948,8 +3002,8 @@ class TeleBot:
""" """
if chat_id is None: if chat_id is None:
chat_id = user_id chat_id = user_id
self.current_states.reset_data(chat_id, user_id) self.current_states.reset_data(chat_id, user_id)
def delete_state(self, user_id: int, chat_id: int=None) -> None: def delete_state(self, user_id: int, chat_id: int=None) -> None:
""" """
Delete the current state of a user. Delete the current state of a user.
@ -3117,10 +3171,17 @@ class TeleBot:
if not apihelper.ENABLE_MIDDLEWARE: if not apihelper.ENABLE_MIDDLEWARE:
raise RuntimeError("Middleware is not enabled. Use apihelper.ENABLE_MIDDLEWARE before initialising TeleBot.") raise RuntimeError("Middleware is not enabled. Use apihelper.ENABLE_MIDDLEWARE before initialising TeleBot.")
if update_types: if self.use_class_middlewares:
logger.error("middleware_handler/register_middleware_handler/add_middleware_handler cannot be used with use_class_middlewares=True. Skipped.")
return
added = False
if update_types and self.typed_middleware_handlers:
for update_type in update_types: for update_type in update_types:
if update_type in self.typed_middleware_handlers:
added = True
self.typed_middleware_handlers[update_type].append(handler) self.typed_middleware_handlers[update_type].append(handler)
else: if not added:
self.default_middleware_handlers.append(handler) self.default_middleware_handlers.append(handler)
# function register_middleware_handler # function register_middleware_handler
@ -3362,7 +3423,6 @@ class TeleBot:
**kwargs) **kwargs)
self.add_edited_message_handler(handler_dict) self.add_edited_message_handler(handler_dict)
def channel_post_handler(self, commands=None, regexp=None, func=None, content_types=None, **kwargs): def channel_post_handler(self, commands=None, regexp=None, func=None, content_types=None, **kwargs):
""" """
Channel post handler decorator Channel post handler decorator
@ -4000,7 +4060,6 @@ class TeleBot:
elif isinstance(result, SkipHandler) and skip_handler is False: elif isinstance(result, SkipHandler) and skip_handler is False:
skip_handler = True skip_handler = True
try: try:
if handlers and not skip_handler: if handlers and not skip_handler:
for handler in handlers: for handler in handlers:
@ -4035,23 +4094,16 @@ class TeleBot:
return return
handler["function"](message, **data_copy) handler["function"](message, **data_copy)
except Exception as e: except Exception as e:
handler_error = e handler_error = e
if not middlewares:
if self.exception_handler: if self.exception_handler:
return self.exception_handler.handle(e) self.exception_handler.handle(e)
logging.error(str(e)) else: logging.error(str(e))
return
# remove the bot from data
if middlewares: if middlewares:
for middleware in middlewares: for middleware in middlewares:
middleware.post_process(message, data, handler_error) middleware.post_process(message, data, handler_error)
def _notify_command_handlers(self, handlers, new_messages, update_type): def _notify_command_handlers(self, handlers, new_messages, update_type):
""" """
Notifies command handlers. Notifies command handlers.

View File

@ -30,6 +30,11 @@ REPLY_MARKUP_TYPES = Union[
types.ReplyKeyboardRemove, types.ForceReply] types.ReplyKeyboardRemove, types.ForceReply]
import string
import random
import ssl
""" """
Module : telebot Module : telebot
""" """
@ -347,6 +352,7 @@ class AsyncTeleBot:
self.exception_handler.handle(e) self.exception_handler.handle(e)
else: logger.error(str(e)) else: logger.error(str(e))
if middlewares: if middlewares:
for middleware in middlewares: for middleware in middlewares:
await middleware.post_process(message, data, handler_error) await middleware.post_process(message, data, handler_error)
@ -1438,6 +1444,85 @@ class AsyncTeleBot:
drop_pending_updates, timeout, secret_token) drop_pending_updates, timeout, secret_token)
async def run_webhooks(self,
listen: Optional[str]="127.0.0.1",
port: Optional[int]=443,
url_path: Optional[str]=None,
certificate: Optional[str]=None,
certificate_key: Optional[str]=None,
webhook_url: Optional[str]=None,
max_connections: Optional[int]=None,
allowed_updates: Optional[List]=None,
ip_address: Optional[str]=None,
drop_pending_updates: Optional[bool] = None,
timeout: Optional[int]=None,
secret_token: Optional[str]=None,
secret_token_length: Optional[int]=20,
debug: Optional[bool]=False):
"""
This class sets webhooks and listens to a given url and port.
:param listen: IP address to listen to. Defaults to
0.0.0.0
:param port: A port which will be used to listen to webhooks.
:param url_path: Path to the webhook. Defaults to /token
:param certificate: Path to the certificate file.
:param certificate_key: Path to the certificate key file.
:param webhook_url: Webhook URL.
:param max_connections: Maximum allowed number of simultaneous HTTPS connections to the webhook for update delivery, 1-100. Defaults to 40. Use lower values to limit the load on your bot's server, and higher values to increase your bot's throughput.
:param allowed_updates: A JSON-serialized list of the update types you want your bot to receive. For example, specify [message, edited_channel_post, callback_query] to only receive updates of these types. See Update for a complete list of available update types. Specify an empty list to receive all updates regardless of type (default). If not specified, the previous setting will be used.
:param ip_address: The fixed IP address which will be used to send webhook requests instead of the IP address resolved through DNS
:param drop_pending_updates: Pass True to drop all pending updates
:param timeout: Integer. Request connection timeout
:param secret_token: Secret token to be used to verify the webhook request.
:return:
"""
# generate secret token if not set
if not secret_token:
secret_token = ''.join(random.choices(string.ascii_uppercase + string.digits, k=secret_token_length))
if not url_path:
url_path = self.token + '/'
if url_path[-1] != '/': url_path += '/'
protocol = "https" if certificate else "http"
if not webhook_url:
webhook_url = "{}://{}:{}/{}".format(protocol, listen, port, url_path)
if certificate and certificate_key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(certificate, certificate_key)
else:
ssl_ctx = None
# open certificate if it exists
cert_file = open(certificate, 'rb') if certificate else None
await self.set_webhook(
url=webhook_url,
certificate=cert_file,
max_connections=max_connections,
allowed_updates=allowed_updates,
ip_address=ip_address,
drop_pending_updates=drop_pending_updates,
timeout=timeout,
secret_token=secret_token
)
if cert_file: cert_file.close()
ssl_context = (certificate, certificate_key) if certificate else (None, None)
# for webhooks
try:
from telebot.ext.aio import AsyncWebhookListener
except (NameError, ImportError):
raise ImportError("Please install uvicorn and fastapi in order to use `run_webhooks` method.")
self.webhook_listener = AsyncWebhookListener(self, secret_token, listen, port, ssl_context, '/'+url_path, debug)
await self.webhook_listener.run_app()
async def delete_webhook(self, drop_pending_updates=None, timeout=None): async def delete_webhook(self, drop_pending_updates=None, timeout=None):
""" """

3
telebot/ext/__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""
A folder with asynchronous and synchronous extensions.
"""

View File

@ -0,0 +1,10 @@
"""
A folder with all the async extensions.
"""
from .webhooks import AsyncWebhookListener
__all__ = [
"AsyncWebhookListener"
]

103
telebot/ext/aio/webhooks.py Normal file
View File

@ -0,0 +1,103 @@
"""
This file is used by AsyncTeleBot.run_webhooks() function.
Fastapi and starlette(0.20.2+) libraries are required to run this script.
"""
# modules required for running this script
fastapi_installed = True
try:
import fastapi
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from uvicorn import Server, Config
except ImportError:
fastapi_installed = False
import asyncio
from telebot.types import Update
from typing import Optional
class AsyncWebhookListener:
def __init__(self, bot,
secret_token: str, host: Optional[str]="127.0.0.1",
port: Optional[int]=443,
ssl_context: Optional[tuple]=None,
url_path: Optional[str]=None,
debug: Optional[bool]=False
) -> None:
"""
Aynchronous implementation of webhook listener
for asynchronous version of telebot.
:param bot: TeleBot instance
:param secret_token: Telegram secret token
:param host: Webhook host
:param port: Webhook port
:param ssl_context: SSL context
:param url_path: Webhook url path
:param debug: Debug mode
"""
self._check_dependencies()
self.app = fastapi.FastAPI()
self._secret_token = secret_token
self._bot = bot
self._port = port
self._host = host
self._ssl_context = ssl_context
self._url_path = url_path
self._debug = debug
self._prepare_endpoint_urls()
def _check_dependencies(self):
if not fastapi_installed:
raise ImportError('Fastapi or uvicorn is not installed. Please install it via pip.')
import starlette
if starlette.__version__ < '0.20.2':
raise ImportError('Starlette version is too old. Please upgrade it: `pip3 install starlette -U`')
return
def _prepare_endpoint_urls(self):
self.app.add_api_route(endpoint=self.process_update,path= self._url_path, methods=["POST"])
async def process_update(self, request: Request, update: dict):
"""
Processes updates.
"""
# header containsX-Telegram-Bot-Api-Secret-Token
if request.headers.get('X-Telegram-Bot-Api-Secret-Token') != self._secret_token:
# secret token didn't match
return JSONResponse(status_code=403, content={"error": "Forbidden"})
if request.headers.get('content-type') == 'application/json':
json_string = update
asyncio.create_task(self._bot.process_new_updates([Update.de_json(json_string)]))
return JSONResponse('', status_code=200)
return JSONResponse(status_code=403, content={"error": "Forbidden"})
async def run_app(self):
"""
Run app with the given parameters.
"""
config = Config(app=self.app,
host=self._host,
port=self._port,
debug=self._debug,
ssl_certfile=self._ssl_context[0],
ssl_keyfile=self._ssl_context[1]
)
server = Server(config)
await server.serve()
await self._bot.close_session()

View File

@ -0,0 +1,10 @@
"""
A folder with all the sync extensions.
"""
from .webhooks import SyncWebhookListener
__all__ = [
"SyncWebhookListener"
]

View File

@ -0,0 +1,101 @@
"""
This file is used by TeleBot.run_webhooks() &
AsyncTeleBot.run_webhooks() functions.
Flask/fastapi is required to run this script.
"""
# modules required for running this script
fastapi_installed = True
try:
import fastapi
from fastapi.responses import JSONResponse
from fastapi.requests import Request
import uvicorn
except ImportError:
fastapi_installed = False
from telebot.types import Update
from typing import Optional
class SyncWebhookListener:
def __init__(self, bot,
secret_token: str, host: Optional[str]="127.0.0.1",
port: Optional[int]=443,
ssl_context: Optional[tuple]=None,
url_path: Optional[str]=None,
debug: Optional[bool]=False
) -> None:
"""
Synchronous implementation of webhook listener
for synchronous version of telebot.
:param bot: TeleBot instance
:param secret_token: Telegram secret token
:param host: Webhook host
:param port: Webhook port
:param ssl_context: SSL context
:param url_path: Webhook url path
:param debug: Debug mode
"""
self._check_dependencies()
self.app = fastapi.FastAPI()
self._secret_token = secret_token
self._bot = bot
self._port = port
self._host = host
self._ssl_context = ssl_context
self._url_path = url_path
self._debug = debug
self._prepare_endpoint_urls()
def _check_dependencies(self):
if not fastapi_installed:
raise ImportError('Fastapi or uvicorn is not installed. Please install it via pip.')
import starlette
if starlette.__version__ < '0.20.2':
raise ImportError('Starlette version is too old. Please upgrade it: `pip3 install starlette -U`')
return
def _prepare_endpoint_urls(self):
self.app.add_api_route(endpoint=self.process_update,path= self._url_path, methods=["POST"])
def process_update(self, request: Request, update: dict):
"""
Processes updates.
"""
# header containsX-Telegram-Bot-Api-Secret-Token
if request.headers.get('X-Telegram-Bot-Api-Secret-Token') != self._secret_token:
# secret token didn't match
return JSONResponse(status_code=403, content={"error": "Forbidden"})
if request.headers.get('content-type') == 'application/json':
self._bot.process_new_updates([Update.de_json(update)])
return JSONResponse('', status_code=200)
return JSONResponse(status_code=403, content={"error": "Forbidden"})
def run_app(self):
"""
Run app with the given parameters.
"""
uvicorn.run(app=self.app,
host=self._host,
port=self._port,
debug=self._debug,
ssl_certfile=self._ssl_context[0],
ssl_keyfile=self._ssl_context[1]
)