mirror of
https://github.com/eternnoir/pyTelegramBotAPI.git
synced 2023-08-10 21:12:57 +03:00
Webhook processing function using flask for sync
This commit is contained in:
87
telebot/extensions/webhooks.py
Normal file
87
telebot/extensions/webhooks.py
Normal file
@ -0,0 +1,87 @@
|
||||
"""
|
||||
This file is used by TeleBot.run_webhooks() &
|
||||
AsyncTeleBot.run_webhooks() functions.
|
||||
|
||||
Flask/Aiohttp is required to run this script.
|
||||
"""
|
||||
|
||||
|
||||
flask_installed = True
|
||||
try:
|
||||
import flask
|
||||
from werkzeug.serving import _TSSLContextArg
|
||||
except ImportError:
|
||||
flask_installed = False
|
||||
|
||||
|
||||
from telebot.types import Update
|
||||
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class SyncWebhookListener:
|
||||
def __init__(self, bot,
|
||||
secret_token: str, host: Optional[str]="127.0.0.1",
|
||||
port: Optional[int]=8000,
|
||||
ssl_context: Optional[_TSSLContextArg]=None,
|
||||
url_path: Optional[str]=None,
|
||||
debug: Optional[bool]=False
|
||||
) -> None:
|
||||
"""
|
||||
Synchronous implementation of webhook listener
|
||||
for synchronous version of telebot.
|
||||
|
||||
:param bot: TeleBot instance
|
||||
:param secret_token: Telegram secret token
|
||||
:param host: Webhook host
|
||||
:param port: Webhook port
|
||||
:param ssl_context: SSL context
|
||||
"""
|
||||
if not flask_installed:
|
||||
raise ImportError('Flask is not installed. Please install it via pip.')
|
||||
self.app = flask.Flask(__name__)
|
||||
self._secret_token = secret_token
|
||||
self._bot = bot
|
||||
self._port = port
|
||||
self._host = host
|
||||
self._ssl_context = ssl_context
|
||||
self._url_path = url_path
|
||||
self._debug = debug
|
||||
self._prepare_endpoint_urls()
|
||||
|
||||
|
||||
def _prepare_endpoint_urls(self):
|
||||
self.app.add_url_rule(self._url_path, 'index', self.process_update, methods=['POST'])
|
||||
|
||||
|
||||
def process_update(self):
|
||||
"""
|
||||
Processes updates.
|
||||
"""
|
||||
# header containsX-Telegram-Bot-Api-Secret-Token
|
||||
if flask.request.headers.get('X-Telegram-Bot-Api-Secret-Token') != self._secret_token:
|
||||
# secret token didn't match
|
||||
flask.abort(403)
|
||||
if flask.request.headers.get('content-type') == 'application/json':
|
||||
json_string = flask.request.get_data().decode('utf-8')
|
||||
self._bot.process_new_updates([Update.de_json(json_string)])
|
||||
return ''
|
||||
|
||||
flask.abort(403)
|
||||
|
||||
|
||||
def run_app(self):
|
||||
"""
|
||||
Run app with the given parameters.
|
||||
"""
|
||||
self.app.run(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
ssl_context=self._ssl_context,
|
||||
debug=self._debug
|
||||
)
|
||||
return self
|
Reference in New Issue
Block a user