1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

SyncWebhookListener was rewritten under fastapi. Extensions folder was divided into 2(namings are long, might be changed)

This commit is contained in:
_run 2022-07-08 21:13:07 +05:00
parent 0cf2a0fe77
commit 970b9d6be4
7 changed files with 238 additions and 16 deletions

View File

@ -23,9 +23,6 @@ import random
import string
# webhooks module
from telebot.extensions import SyncWebhookListener
import ssl
logger = logging.getLogger('TeleBot')
@ -372,7 +369,12 @@ class TeleBot:
)
if cert_file: cert_file.close()
ssl_context = (certificate, certificate_key) if certificate else None
ssl_context = (certificate, certificate_key) if certificate else (None, None)
# webhooks module
try:
from telebot.extensions.synchronous import SyncWebhookListener
except NameError:
raise ImportError("Please install uvicorn and fastapi in order to use `run_webhooks` method.")
self.webhook_listener = SyncWebhookListener(self, secret_token, listen, port, ssl_context, '/'+url_path, debug)
self.webhook_listener.run_app()

View File

@ -30,8 +30,6 @@ REPLY_MARKUP_TYPES = Union[
types.ReplyKeyboardRemove, types.ForceReply]
# for webhooks
from telebot.extensions import AsyncWebhookListener
import string
import random
import ssl
@ -1515,9 +1513,12 @@ class AsyncTeleBot:
if cert_file: cert_file.close()
ssl_context = (certificate, certificate_key) if certificate else (None, None)
# for webhooks
try:
from telebot.extensions.asynchronous import AsyncWebhookListener
except NameError:
raise ImportError("Please install uvicorn and fastapi in order to use `run_webhooks` method.")
self.webhook_listener = AsyncWebhookListener(self, secret_token, listen, port, ssl_context, '/'+url_path, debug)
# create a new loop, set it, and pass it
asyncio.set_event_loop(asyncio.new_event_loop())
await self.webhook_listener.run_app()

View File

@ -1,8 +1,3 @@
from .webhooks import SyncWebhookListener, AsyncWebhookListener
__all__ = [
'SyncWebhookListener',
'AsyncWebhookListener'
]
"""
A folder with asynchronous and synchronous extensions.
"""

View File

@ -0,0 +1,10 @@
"""
A folder with all the async extensions.
"""
from .webhooks import AsyncWebhookListener
__all__ = [
"AsyncWebhookListener"
]

View File

@ -0,0 +1,103 @@
"""
This file is used by AsyncTeleBot.run_webhooks() function.
Fastapi and starlette(0.20.2+) libraries are required to run this script.
"""
# modules required for running this script
fastapi_installed = True
try:
import fastapi
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from uvicorn import Server, Config
except ImportError:
fastapi_installed = False
import asyncio
from telebot.types import Update
from typing import Optional
class AsyncWebhookListener:
def __init__(self, bot,
secret_token: str, host: Optional[str]="127.0.0.1",
port: Optional[int]=443,
ssl_context: Optional[tuple]=None,
url_path: Optional[str]=None,
debug: Optional[bool]=False
) -> None:
"""
Aynchronous implementation of webhook listener
for asynchronous version of telebot.
:param bot: TeleBot instance
:param secret_token: Telegram secret token
:param host: Webhook host
:param port: Webhook port
:param ssl_context: SSL context
:param url_path: Webhook url path
:param debug: Debug mode
"""
self._check_dependencies()
self.app = fastapi.FastAPI()
self._secret_token = secret_token
self._bot = bot
self._port = port
self._host = host
self._ssl_context = ssl_context
self._url_path = url_path
self._debug = debug
self._prepare_endpoint_urls()
def _check_dependencies(self):
if not fastapi_installed:
raise ImportError('Fastapi or uvicorn is not installed. Please install it via pip.')
import starlette
if starlette.__version__ < '0.20.2':
raise ImportError('Starlette version is too old. Please upgrade it: `pip3 install starlette -U`')
return
def _prepare_endpoint_urls(self):
self.app.add_api_route(endpoint=self.process_update,path= self._url_path, methods=["POST"])
async def process_update(self, request: Request, update: dict):
"""
Processes updates.
"""
# header containsX-Telegram-Bot-Api-Secret-Token
if request.headers.get('X-Telegram-Bot-Api-Secret-Token') != self._secret_token:
# secret token didn't match
return JSONResponse(status_code=403, content={"error": "Forbidden"})
if request.headers.get('content-type') == 'application/json':
json_string = update
asyncio.create_task(self._bot.process_new_updates([Update.de_json(json_string)]))
return JSONResponse('', status_code=200)
return JSONResponse(status_code=403, content={"error": "Forbidden"})
async def run_app(self):
"""
Run app with the given parameters.
"""
config = Config(app=self.app,
host=self._host,
port=self._port,
debug=self._debug,
ssl_certfile=self._ssl_context[0],
ssl_keyfile=self._ssl_context[1]
)
server = Server(config)
await server.serve()
await self._bot.close_session()

View File

@ -0,0 +1,10 @@
"""
A folder with all the sync extensions.
"""
from .webhooks import SyncWebhookListener
__all__ = [
"SyncWebhookListener"
]

View File

@ -0,0 +1,101 @@
"""
This file is used by TeleBot.run_webhooks() &
AsyncTeleBot.run_webhooks() functions.
Flask/fastapi is required to run this script.
"""
# modules required for running this script
fastapi_installed = True
try:
import fastapi
from fastapi.responses import JSONResponse
from fastapi.requests import Request
import uvicorn
except ImportError:
fastapi_installed = False
from telebot.types import Update
from typing import Optional
class SyncWebhookListener:
def __init__(self, bot,
secret_token: str, host: Optional[str]="127.0.0.1",
port: Optional[int]=443,
ssl_context: Optional[tuple]=None,
url_path: Optional[str]=None,
debug: Optional[bool]=False
) -> None:
"""
Synchronous implementation of webhook listener
for synchronous version of telebot.
:param bot: TeleBot instance
:param secret_token: Telegram secret token
:param host: Webhook host
:param port: Webhook port
:param ssl_context: SSL context
:param url_path: Webhook url path
:param debug: Debug mode
"""
self._check_dependencies()
self.app = fastapi.FastAPI()
self._secret_token = secret_token
self._bot = bot
self._port = port
self._host = host
self._ssl_context = ssl_context
self._url_path = url_path
self._debug = debug
self._prepare_endpoint_urls()
def _check_dependencies(self):
if not fastapi_installed:
raise ImportError('Fastapi or uvicorn is not installed. Please install it via pip.')
import starlette
if starlette.__version__ < '0.20.2':
raise ImportError('Starlette version is too old. Please upgrade it: `pip3 install starlette -U`')
return
def _prepare_endpoint_urls(self):
self.app.add_api_route(endpoint=self.process_update,path= self._url_path, methods=["POST"])
def process_update(self, request: Request, update: dict):
"""
Processes updates.
"""
# header containsX-Telegram-Bot-Api-Secret-Token
if request.headers.get('X-Telegram-Bot-Api-Secret-Token') != self._secret_token:
# secret token didn't match
return JSONResponse(status_code=403, content={"error": "Forbidden"})
if request.headers.get('content-type') == 'application/json':
self._bot.process_new_updates([Update.de_json(update)])
return JSONResponse('', status_code=200)
return JSONResponse(status_code=403, content={"error": "Forbidden"})
def run_app(self):
"""
Run app with the given parameters.
"""
uvicorn.run(app=self.app,
host=self._host,
port=self._port,
debug=self._debug,
ssl_certfile=self._ssl_context[0],
ssl_keyfile=self._ssl_context[1]
)