1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Starlette ASGI example

This commit is contained in:
Ananth Bhaskararaman 2022-08-30 16:11:58 +05:30
parent 095bf03227
commit d7770bf670
No known key found for this signature in database
GPG Key ID: 66A20525398E18D7

View File

@ -0,0 +1,85 @@
#!/usr/bin/env python
"""
Asynchronous Telegram Echo Bot example.
This is a simple bot that echoes each message that is received onto the chat.
It uses the Starlette ASGI framework to receive updates via webhook requests.
"""
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
from starlette.routing import Route
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, Update
API_TOKEN = "TOKEN"
WEBHOOK_HOST = "<ip/domain>"
WEBHOOK_PORT = 8443 # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = "0.0.0.0"
WEBHOOK_URL = f"https://{WEBHOOK_HOST}:{WEBHOOK_PORT}/telegram"
WEBHOOK_SECRET_TOKEN = "SECRET_TOKEN"
logger = telebot.logger
telebot.logger.setLevel(logging.INFO)
bot = AsyncTeleBot(token=API_TOKEN)
# BOT HANDLERS
@bot.message_handler(commands=["help", "start"])
async def send_welcome(message: Message):
"""
Handle '/start' and '/help'
"""
await bot.reply_to(
message,
("Hi there, I am EchoBot.\n" "I am here to echo your kind words back to you."),
)
@bot.message_handler(func=lambda _: True, content_types=["text"])
async def echo_message(message: Message):
"""
Handle all other messages
"""
await bot.reply_to(message, message.text)
# WEBSERVER HANDLERS
async def telegram(request: Request) -> Response:
"""Handle incoming Telegram updates."""
token_header_name = "X-Telegram-Bot-Api-Secret-Token"
if request.headers.get(token_header_name) != WEBHOOK_SECRET_TOKEN:
return PlainTextResponse("Forbidden", status_code=403)
await bot.process_new_updates([Update.de_json(await request.json())])
return Response()
async def startup() -> None:
"""Register webhook for telegram updates."""
webhook_info = await bot.get_webhook_info(30)
if WEBHOOK_URL != webhook_info.url:
logger.debug(
f"updating webhook url, old: {webhook_info.url}, new: {WEBHOOK_URL}"
)
if not await bot.set_webhook(
url=WEBHOOK_URL, secret_token=WEBHOOK_SECRET_TOKEN
):
raise RuntimeError("unable to set webhook")
app = Starlette(
routes=[
Route("/telegram", telegram, methods=["POST"]),
],
on_startup=[startup],
)
uvicorn.run(
app,
host=WEBHOOK_HOST,
port=WEBHOOK_LISTEN,
)