1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00
pyTelegramBotAPI/examples/asynchronous_telebot/webhooks/webhook_starlette_echo_bot.py

90 lines
2.6 KiB
Python
Raw Normal View History

2022-08-30 13:41:58 +03:00
#!/usr/bin/env python
"""
Asynchronous Telegram Echo Bot example.
This is a simple bot that echoes each message that is received onto the chat.
It uses the Starlette ASGI framework to receive updates via webhook requests.
"""
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
from starlette.routing import Route
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, Update
API_TOKEN = "TOKEN"
WEBHOOK_HOST = "<ip/domain>"
WEBHOOK_PORT = 8443 # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = "0.0.0.0"
2022-08-30 14:57:43 +03:00
WEBHOOK_SSL_CERT = "./webhook_cert.pem" # Path to the ssl certificate
WEBHOOK_SSL_PRIV = "./webhook_pkey.pem" # Path to the ssl private key
2022-08-30 13:41:58 +03:00
WEBHOOK_URL = f"https://{WEBHOOK_HOST}:{WEBHOOK_PORT}/telegram"
WEBHOOK_SECRET_TOKEN = "SECRET_TOKEN"
logger = telebot.logger
telebot.logger.setLevel(logging.INFO)
bot = AsyncTeleBot(token=API_TOKEN)
# BOT HANDLERS
@bot.message_handler(commands=["help", "start"])
async def send_welcome(message: Message):
"""
Handle '/start' and '/help'
"""
await bot.reply_to(
message,
("Hi there, I am EchoBot.\n" "I am here to echo your kind words back to you."),
)
@bot.message_handler(func=lambda _: True, content_types=["text"])
async def echo_message(message: Message):
"""
Handle all other messages
"""
await bot.reply_to(message, message.text)
# WEBSERVER HANDLERS
async def telegram(request: Request) -> Response:
"""Handle incoming Telegram updates."""
token_header_name = "X-Telegram-Bot-Api-Secret-Token"
if request.headers.get(token_header_name) != WEBHOOK_SECRET_TOKEN:
return PlainTextResponse("Forbidden", status_code=403)
await bot.process_new_updates([Update.de_json(await request.json())])
return Response()
async def startup() -> None:
"""Register webhook for telegram updates."""
webhook_info = await bot.get_webhook_info(30)
if WEBHOOK_URL != webhook_info.url:
logger.debug(
f"updating webhook url, old: {webhook_info.url}, new: {WEBHOOK_URL}"
)
if not await bot.set_webhook(
url=WEBHOOK_URL, secret_token=WEBHOOK_SECRET_TOKEN
):
raise RuntimeError("unable to set webhook")
app = Starlette(
routes=[
Route("/telegram", telegram, methods=["POST"]),
],
on_startup=[startup],
)
uvicorn.run(
app,
host=WEBHOOK_HOST,
port=WEBHOOK_LISTEN,
2022-08-30 14:57:43 +03:00
ssl_certfile=WEBHOOK_SSL_CERT,
ssl_keyfile=WEBHOOK_SSL_PRIV,
2022-08-30 13:41:58 +03:00
)