1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00
This commit is contained in:
zeph1997
2021-12-11 02:28:41 +08:00
64 changed files with 11350 additions and 2176 deletions

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""
This Example will show you how to use CallbackData
"""
from telebot.callback_data import CallbackData, CallbackDataFilter
from telebot import types, TeleBot
from telebot.custom_filters import AdvancedCustomFilter
API_TOKEN = ''
PRODUCTS = [
{'id': '0', 'name': 'xiaomi mi 10', 'price': 400},
{'id': '1', 'name': 'samsung s20', 'price': 800},
{'id': '2', 'name': 'iphone 13', 'price': 1300}
]
bot = TeleBot(API_TOKEN)
products_factory = CallbackData('product_id', prefix='products')
def products_keyboard():
return types.InlineKeyboardMarkup(
keyboard=[
[
types.InlineKeyboardButton(
text=product['name'],
callback_data=products_factory.new(product_id=product["id"])
)
]
for product in PRODUCTS
]
)
def back_keyboard():
return types.InlineKeyboardMarkup(
keyboard=[
[
types.InlineKeyboardButton(
text='',
callback_data='back'
)
]
]
)
class ProductsCallbackFilter(AdvancedCustomFilter):
key = 'config'
def check(self, call: types.CallbackQuery, config: CallbackDataFilter):
return config.check(query=call)
@bot.message_handler(commands=['products'])
def products_command_handler(message: types.Message):
bot.send_message(message.chat.id, 'Products:', reply_markup=products_keyboard())
# Only product with field - product_id = 2
@bot.callback_query_handler(func=None, config=products_factory.filter(product_id='2'))
def product_one_callback(call: types.CallbackQuery):
bot.answer_callback_query(callback_query_id=call.id, text='Not available :(', show_alert=True)
# Any other products
@bot.callback_query_handler(func=None, config=products_factory.filter())
def products_callback(call: types.CallbackQuery):
callback_data: dict = products_factory.parse(callback_data=call.data)
product_id = int(callback_data['product_id'])
product = PRODUCTS[product_id]
text = f"Product name: {product['name']}\n" \
f"Product price: {product['price']}"
bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=text, reply_markup=back_keyboard())
@bot.callback_query_handler(func=lambda c: c.data == 'back')
def back_callback(call: types.CallbackQuery):
bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text='Products:', reply_markup=products_keyboard())
bot.add_custom_filter(ProductsCallbackFilter())
bot.infinity_polling()

117
examples/anonymous_bot.py Normal file
View File

@ -0,0 +1,117 @@
# This bot is needed to connect two people and their subsequent anonymous communication
#
# Avaiable commands:
# `/start` - Just send you a messsage how to start
# `/find` - Find a person you can contact
# `/stop` - Stop active conversation
import telebot
from telebot import types
# Initialize bot with your token
bot = telebot.TeleBot(TOKEN)
# The `users` variable is needed to contain chat ids that are either in the search or in the active dialog, like {chat_id, chat_id}
users = {}
# The `freeid` variable is needed to contain chat id, that want to start conversation
# Or, in other words: chat id of user in the search
freeid = None
# `/start` command handler
#
# That command only sends you 'Just use /find command!'
@bot.message_handler(commands=['start'])
def start(message: types.Message):
bot.send_message(message.chat.id, 'Just use /find command!')
# `/find` command handler
#
# That command finds opponent for you
#
# That command according to the following principle:
# 1. You have written `/find` command
# 2. If you are already in the search or have an active dialog, bot sends you 'Shut up!'
# 3. If not:
# 3.1. Bot sends you 'Finding...'
# 3.2. If there is no user in the search:
# 3.2.2. `freeid` updated with `your_chat_id`
# 3.3. If there is user in the search:
# 3.3.1. Both you and the user in the search recieve the message 'Founded!'
# 3.3.2. `users` updated with a {user_in_the_search_chat_id, your_chat_id}
# 3.3.3. `users` updated with a {your_chat_id, user_in_the_search_id}
# 3.3.4. `freeid` updated with `None`
@bot.message_handler(commands=['find'])
def find(message: types.Message):
global freeid
if message.chat.id not in users:
bot.send_message(message.chat.id, 'Finding...')
if freeid == None:
freeid = message.chat.id
else:
# Question:
# Is there any way to simplify this like `bot.send_message([message.chat.id, freeid], 'Founded!')`?
bot.send_message(message.chat.id, 'Founded!')
bot.send_message(freeid, 'Founded!')
users[freeid] = message.chat.id
users[message.chat.id] = freeid
freeid = None
print(users, freeid) # Debug purpose, you can remove that line
else:
bot.send_message(message.chat.id, 'Shut up!')
# `/stop` command handler
#
# That command stops your current conversation (if it exist)
#
# That command according to the following principle:
# 1. You have written `/stop` command
# 2. If you are not have active dialog or you are not in search, bot sends you 'You are not in search!'
# 3. If you are in active dialog:
# 3.1. Bot sends you 'Stopping...'
# 3.2. Bot sends 'Your opponent is leavin`...' to your opponent
# 3.3. {your_opponent_chat_id, your_chat_id} removes from `users`
# 3.4. {your_chat_id, your_opponent_chat_id} removes from `users`
# 4. If you are only in search:
# 4.1. Bot sends you 'Stopping...'
# 4.2. `freeid` updated with `None`
@bot.message_handler(commands=['stop'])
def stop(message: types.Message):
global freeid
if message.chat.id in users:
bot.send_message(message.chat.id, 'Stopping...')
bot.send_message(users[message.chat.id], 'Your opponent is leavin`...')
del users[users[message.chat.id]]
del users[message.chat.id]
print(users, freeid) # Debug purpose, you can remove that line
elif message.chat.id == freeid:
bot.send_message(message.chat.id, 'Stopping...')
freeid = None
print(users, freeid) # Debug purpose, you can remove that line
else:
bot.send_message(message.chat.id, 'You are not in search!')
# message handler for conversation
#
# That handler needed to send message from one opponent to another
# If you are not in `users`, you will recieve a message 'No one can hear you...'
# Otherwise all your messages are sent to your opponent
#
# Questions:
# 1. Is there any way to improve readability like `content_types=['all']`?
# 2. Is there any way to register this message handler only when i found the opponent?
@bot.message_handler(content_types=['animation', 'audio', 'contact', 'dice', 'document', 'location', 'photo', 'poll', 'sticker', 'text', 'venue', 'video', 'video_note', 'voice'])
def chatting(message: types.Message):
if message.chat.id in users:
bot.copy_message(users[message.chat.id], users[users[message.chat.id]], message.id)
else:
bot.send_message(message.chat.id, 'No one can hear you...')
bot.infinity_polling(skip_pending=True)

View File

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
"""
This Example will show you how to use CallbackData
"""
from telebot.callback_data import CallbackData, CallbackDataFilter
from telebot import types
from telebot.async_telebot import AsyncTeleBot
from telebot.asyncio_filters import AdvancedCustomFilter
API_TOKEN = 'TOKEN'
PRODUCTS = [
{'id': '0', 'name': 'xiaomi mi 10', 'price': 400},
{'id': '1', 'name': 'samsung s20', 'price': 800},
{'id': '2', 'name': 'iphone 13', 'price': 1300}
]
bot = AsyncTeleBot(API_TOKEN)
products_factory = CallbackData('product_id', prefix='products')
def products_keyboard():
return types.InlineKeyboardMarkup(
keyboard=[
[
types.InlineKeyboardButton(
text=product['name'],
callback_data=products_factory.new(product_id=product["id"])
)
]
for product in PRODUCTS
]
)
def back_keyboard():
return types.InlineKeyboardMarkup(
keyboard=[
[
types.InlineKeyboardButton(
text='',
callback_data='back'
)
]
]
)
class ProductsCallbackFilter(AdvancedCustomFilter):
key = 'config'
async def check(self, call: types.CallbackQuery, config: CallbackDataFilter):
return config.check(query=call)
@bot.message_handler(commands=['products'])
async def products_command_handler(message: types.Message):
await bot.send_message(message.chat.id, 'Products:', reply_markup=products_keyboard())
# Only product with field - product_id = 2
@bot.callback_query_handler(func=None, config=products_factory.filter(product_id='2'))
async def product_one_callback(call: types.CallbackQuery):
await bot.answer_callback_query(callback_query_id=call.id, text='Not available :(', show_alert=True)
# Any other products
@bot.callback_query_handler(func=None, config=products_factory.filter())
async def products_callback(call: types.CallbackQuery):
callback_data: dict = products_factory.parse(callback_data=call.data)
product_id = int(callback_data['product_id'])
product = PRODUCTS[product_id]
text = f"Product name: {product['name']}\n" \
f"Product price: {product['price']}"
await bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=text, reply_markup=back_keyboard())
@bot.callback_query_handler(func=lambda c: c.data == 'back')
async def back_callback(call: types.CallbackQuery):
await bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text='Products:', reply_markup=products_keyboard())
bot.add_custom_filter(ProductsCallbackFilter())
bot.polling()

View File

@ -0,0 +1,11 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
@bot.chat_join_request_handler()
async def make_some(message: telebot.types.ChatJoinRequest):
await bot.send_message(message.chat.id, 'I accepted a new user!')
await bot.approve_chat_join_request(message.chat.id, message.from_user.id)
bot.polling(skip_pending=True)

View File

@ -0,0 +1,33 @@
from telebot import types,util
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
#chat_member_handler. When status changes, telegram gives update. check status from old_chat_member and new_chat_member.
@bot.chat_member_handler()
async def chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
await bot.send_message(message.chat.id,"Hello {name}!".format(name=new.user.first_name)) # Welcome message
#if bot is added to group, this handler will work
@bot.my_chat_member_handler()
async def my_chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
await bot.send_message(message.chat.id,"Somebody added me to group") # Welcome message, if bot was added to group
await bot.leave_chat(message.chat.id)
#content_Type_service is:
#'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created',
#'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
#'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended',
#'voice_chat_participants_invited', 'message_auto_delete_timer_changed'
# this handler deletes service messages
@bot.message_handler(content_types=util.content_type_service)
async def delall(message: types.Message):
await bot.delete_message(message.chat.id,message.message_id)
bot.polling()

View File

@ -0,0 +1,12 @@
from telebot.async_telebot import AsyncTeleBot
from telebot import asyncio_filters
bot = AsyncTeleBot('TOKEN')
# Handler
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
async def answer_for_admin(message):
await bot.send_message(message.chat.id,"hello my admin")
# Register filter
bot.add_custom_filter(asyncio_filters.IsAdminFilter(bot))
bot.polling()

View File

@ -0,0 +1,43 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
# AdvancedCustomFilter is for list, string filter values
class MainFilter(telebot.asyncio_filters.AdvancedCustomFilter):
key='text'
@staticmethod
async def check(message, text):
return message.text in text
# SimpleCustomFilter is for boolean values, such as is_admin=True
class IsAdmin(telebot.asyncio_filters.SimpleCustomFilter):
key='is_admin'
@staticmethod
async def check(message: telebot.types.Message):
result = await bot.get_chat_member(message.chat.id,message.from_user.id)
return result.status in ['administrator','creator']
@bot.message_handler(is_admin=True, commands=['admin']) # Check if user is admin
async def admin_rep(message):
await bot.send_message(message.chat.id, "Hi admin")
@bot.message_handler(is_admin=False, commands=['admin']) # If user is not admin
async def not_admin(message):
await bot.send_message(message.chat.id, "You are not admin")
@bot.message_handler(text=['hi']) # Response to hi message
async def welcome_hi(message):
await bot.send_message(message.chat.id, 'You said hi')
@bot.message_handler(text=['bye']) # Response to bye message
async def bye_user(message):
await bot.send_message(message.chat.id, 'You said bye')
# Do not forget to register filters
bot.add_custom_filter(MainFilter())
bot.add_custom_filter(IsAdmin())
bot.polling()

View File

@ -0,0 +1,17 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
# Chat id can be private or supergroups.
@bot.message_handler(chat_id=[12345678], commands=['admin']) # chat_id checks id corresponds to your list or not.
async def admin_rep(message):
await bot.send_message(message.chat.id, "You are allowed to use this command.")
@bot.message_handler(commands=['admin'])
async def not_admin(message):
await bot.send_message(message.chat.id, "You are not allowed to use this command")
# Do not forget to register
bot.add_custom_filter(telebot.asyncio_filters.ChatFilter())
bot.polling()

View File

@ -0,0 +1,22 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
# Check if message is a reply
@bot.message_handler(is_reply=True)
async def start_filter(message):
await bot.send_message(message.chat.id, "Looks like you replied to my message.")
# Check if message was forwarded
@bot.message_handler(is_forwarded=True)
async def text_filter(message):
await bot.send_message(message.chat.id, "I do not accept forwarded messages!")
# Do not forget to register filters
bot.add_custom_filter(telebot.asyncio_filters.IsReplyFilter())
bot.add_custom_filter(telebot.asyncio_filters.ForwardFilter())
bot.polling()

View File

@ -0,0 +1,20 @@
from telebot.async_telebot import AsyncTeleBot
import telebot
bot = AsyncTeleBot('TOKEN')
# Check if message starts with @admin tag
@bot.message_handler(text_startswith="@admin")
async def start_filter(message):
await bot.send_message(message.chat.id, "Looks like you are calling admin, wait...")
# Check if text is hi or hello
@bot.message_handler(text=['hi','hello'])
async def text_filter(message):
await bot.send_message(message.chat.id, "Hi, {name}!".format(name=message.from_user.first_name))
# Do not forget to register filters
bot.add_custom_filter(telebot.asyncio_filters.TextMatchFilter())
bot.add_custom_filter(telebot.asyncio_filters.TextStartsFilter())
bot.polling()

View File

@ -0,0 +1,74 @@
import telebot
from telebot import asyncio_filters
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
class MyStates:
name = 1
surname = 2
age = 3
@bot.message_handler(commands=['start'])
async def start_ex(message):
"""
Start command. Here we are starting state
"""
await bot.set_state(message.from_user.id, MyStates.name)
await bot.send_message(message.chat.id, 'Hi, write me a name')
@bot.message_handler(state="*", commands='cancel')
async def any_state(message):
"""
Cancel state
"""
await bot.send_message(message.chat.id, "Your state was cancelled.")
await bot.delete_state(message.from_user.id)
@bot.message_handler(state=MyStates.name)
async def name_get(message):
"""
State 1. Will process when user's state is 1.
"""
await bot.send_message(message.chat.id, f'Now write me a surname')
await bot.set_state(message.from_user.id, MyStates.surname)
async with bot.retrieve_data(message.from_user.id) as data:
data['name'] = message.text
@bot.message_handler(state=MyStates.surname)
async def ask_age(message):
"""
State 2. Will process when user's state is 2.
"""
await bot.send_message(message.chat.id, "What is your age?")
await bot.set_state(message.from_user.id, MyStates.age)
async with bot.retrieve_data(message.from_user.id) as data:
data['surname'] = message.text
# result
@bot.message_handler(state=MyStates.age, is_digit=True)
async def ready_for_answer(message):
async with bot.retrieve_data(message.from_user.id) as data:
await bot.send_message(message.chat.id, "Ready, take a look:\n<b>Name: {name}\nSurname: {surname}\nAge: {age}</b>".format(name=data['name'], surname=data['surname'], age=message.text), parse_mode="html")
await bot.delete_state(message.from_user.id)
#incorrect number
@bot.message_handler(state=MyStates.age, is_digit=False)
async def age_incorrect(message):
await bot.send_message(message.chat.id, 'Looks like you are submitting a string in the field age. Please enter a number')
# register filters
bot.add_custom_filter(asyncio_filters.StateFilter(bot))
bot.add_custom_filter(asyncio_filters.IsDigitFilter())
# set saving states into file.
bot.enable_saving_states() # you can delete this if you do not need to save states
bot.polling()

View File

@ -0,0 +1,20 @@
import telebot
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
@bot.message_handler(content_types=['photo'])
async def new_message(message: telebot.types.Message):
result_message = await bot.send_message(message.chat.id, '<i>Downloading your photo...</i>', parse_mode='HTML', disable_web_page_preview=True)
file_path = await bot.get_file(message.photo[-1].file_id)
downloaded_file = await bot.download_file(file_path.file_path)
with open('file.jpg', 'wb') as new_file:
new_file.write(downloaded_file)
await bot.edit_message_text(chat_id=message.chat.id, message_id=result_message.id, text='<i>Done!</i>', parse_mode='HTML')
bot.polling(skip_pending=True)

View File

@ -0,0 +1,26 @@
#!/usr/bin/python
# This is a simple echo bot using the decorator mechanism.
# It echoes any incoming text messages.
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
async def send_welcome(message):
await bot.reply_to(message, """\
Hi there, I am EchoBot.
I am here to echo your kind words back to you. Just say anything nice and I'll say the exact same thing to you!\
""")
# Handle all other messages with content_type 'text' (content_types defaults to ['text'])
@bot.message_handler(func=lambda message: True)
async def echo_message(message):
await bot.reply_to(message, message.text)
bot.polling()

View File

@ -0,0 +1,27 @@
import telebot
from telebot.async_telebot import AsyncTeleBot
import logging
logger = telebot.logger
telebot.logger.setLevel(logging.DEBUG) # Outputs debug messages to console.
class ExceptionHandler(telebot.ExceptionHandler):
def handle(self, exception):
logger.error(exception)
bot = AsyncTeleBot('TOKEN',exception_handler=ExceptionHandler())
@bot.message_handler(commands=['photo'])
async def photo_send(message: telebot.types.Message):
await bot.send_message(message.chat.id, 'Hi, this is an example of exception handlers.')
raise Exception('test') # Exception goes to ExceptionHandler if it is set
bot.polling(skip_pending=True)

View File

@ -0,0 +1,39 @@
# Just a little example of middleware handlers
import telebot
from telebot.asyncio_handler_backends import BaseMiddleware
from telebot.async_telebot import AsyncTeleBot
from telebot.async_telebot import CancelUpdate
bot = AsyncTeleBot('TOKEN')
class SimpleMiddleware(BaseMiddleware):
def __init__(self, limit) -> None:
self.last_time = {}
self.limit = limit
self.update_types = ['message']
# Always specify update types, otherwise middlewares won't work
async def pre_process(self, message, data):
if not message.from_user.id in self.last_time:
# User is not in a dict, so lets add and cancel this function
self.last_time[message.from_user.id] = message.date
return
if message.date - self.last_time[message.from_user.id] < self.limit:
# User is flooding
await bot.send_message(message.chat.id, 'You are making request too often')
return CancelUpdate()
self.last_time[message.from_user.id] = message.date
async def post_process(self, message, data, exception):
pass
bot.setup_middleware(SimpleMiddleware(2))
@bot.message_handler(commands=['start'])
async def start(message):
await bot.send_message(message.chat.id, 'Hello!')
bot.polling()

View File

@ -0,0 +1,48 @@
#!/usr/bin/python
# This example shows how to implement i18n (internationalization) l10n (localization) to create
# multi-language bots with middleware handler.
#
# Also, you could check language code in handler itself too.
# But this example just to show the work of middlewares.
import telebot
from telebot.async_telebot import AsyncTeleBot
from telebot import asyncio_handler_backends
import logging
logger = telebot.logger
telebot.logger.setLevel(logging.DEBUG) # Outputs debug messages to console.
TRANSLATIONS = {
'hello': {
'en': 'hello',
'ru': 'привет',
'uz': 'salom'
}
}
bot = AsyncTeleBot('TOKEN')
class LanguageMiddleware(asyncio_handler_backends.BaseMiddleware):
def __init__(self):
self.update_types = ['message'] # Update types that will be handled by this middleware.
async def pre_process(self, message, data):
data['response'] = TRANSLATIONS['hello'][message.from_user.language_code]
async def post_process(self, message, data, exception):
if exception: # You can get exception occured in handler.
logger.exception(str(exception))
bot.setup_middleware(LanguageMiddleware()) # do not forget to setup
@bot.message_handler(commands=['start'])
async def start(message, data: dict):
# you can get the data in handler too.
# Not necessary to create data parameter in handler function.
await bot.send_message(message.chat.id, data['response'])
bot.polling()

View File

@ -0,0 +1,18 @@
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
async def start_executor(message):
await bot.send_message(message.chat.id, 'Hello!')
bot.register_message_handler(start_executor, commands=['start']) # Start command executor
# See also
# bot.register_callback_query_handler(*args, **kwargs)
# bot.register_channel_post_handler(*args, **kwargs)
# bot.register_chat_member_handler(*args, **kwargs)
# bot.register_inline_handler(*args, **kwargs)
# bot.register_my_chat_member_handler(*args, **kwargs)
# bot.register_edited_message_handler(*args, **kwargs)
# And other functions..
bot.polling(skip_pending=True)

View File

@ -0,0 +1,27 @@
import telebot
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
@bot.message_handler(commands=['photo'])
async def photo_send(message: telebot.types.Message):
with open('test.png', 'rb') as new_file:
await bot.send_photo(message.chat.id, new_file)
@bot.message_handler(commands=['document'])
async def document_send(message: telebot.types.Message):
with open('test.docx', 'rb') as new_file:
await bot.send_document(message.chat.id, new_file)
@bot.message_handler(commands=['photos'])
async def photos_send(message: telebot.types.Message):
with open('test.png', 'rb') as new_file, open('test2.png', 'rb') as new_file2:
await bot.send_media_group(message.chat.id, [telebot.types.InputMediaPhoto(new_file), telebot.types.InputMediaPhoto(new_file2)])
bot.polling(skip_pending=True)

View File

@ -0,0 +1,13 @@
from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot('TOKEN')
@bot.message_handler(commands=['start', 'help'])
async def send_welcome(message):
await bot.reply_to(message, "Howdy, how are you doing?")
@bot.message_handler(func=lambda message: True)
async def echo_all(message):
await bot.reply_to(message, message.text)
bot.polling(skip_pending=True)# Skip pending skips old updates

View File

@ -0,0 +1,14 @@
from telebot.async_telebot import AsyncTeleBot
# Update listeners are functions that are called when any update is received.
bot = AsyncTeleBot(token='TOKEN')
async def update_listener(messages):
for message in messages:
if message.text == '/start':
await bot.send_message(message.chat.id, 'Hello!')
bot.set_update_listener(update_listener)
bot.polling()

View File

@ -0,0 +1,11 @@
import telebot
bot = telebot.TeleBot('TOKEN')
@bot.chat_join_request_handler()
def make_some(message: telebot.types.ChatJoinRequest):
bot.send_message(message.chat.id, 'I accepted a new user!')
bot.approve_chat_join_request(message.chat.id, message.from_user.id)
bot.infinity_polling(allowed_updates=telebot.util.update_types)

View File

@ -0,0 +1,33 @@
import telebot
from telebot import types,util
bot = telebot.TeleBot("token")
#chat_member_handler. When status changes, telegram gives update. check status from old_chat_member and new_chat_member.
@bot.chat_member_handler()
def chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
bot.send_message(message.chat.id,"Hello {name}!".format(name=new.user.first_name)) # Welcome message
#if bot is added to group, this handler will work
@bot.my_chat_member_handler()
def my_chat_m(message: types.ChatMemberUpdated):
old = message.old_chat_member
new = message.new_chat_member
if new.status == "member":
bot.send_message(message.chat.id,"Somebody added me to group") # Welcome message, if bot was added to group
bot.leave_chat(message.chat.id)
#content_Type_service is:
#'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created',
#'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
#'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended',
#'voice_chat_participants_invited', 'message_auto_delete_timer_changed'
# this handler deletes service messages
@bot.message_handler(content_types=util.content_type_service)
def delall(message: types.Message):
bot.delete_message(message.chat.id,message.message_id)
bot.infinity_polling(allowed_updates=util.update_types)

View File

@ -0,0 +1,12 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot('TOKEN')
# Handler
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
def answer_for_admin(message):
bot.send_message(message.chat.id,"hello my admin")
# Register filter
bot.add_custom_filter(custom_filters.IsAdminFilter(bot))
bot.infinity_polling()

View File

@ -0,0 +1,42 @@
import telebot
bot = telebot.TeleBot('TOKEN')
# AdvancedCustomFilter is for list, string filter values
class MainFilter(telebot.custom_filters.AdvancedCustomFilter):
key='text'
@staticmethod
def check(message, text):
return message.text in text
# SimpleCustomFilter is for boolean values, such as is_admin=True
class IsAdmin(telebot.custom_filters.SimpleCustomFilter):
key='is_admin'
@staticmethod
def check(message: telebot.types.Message):
return bot.get_chat_member(message.chat.id,message.from_user.id).status in ['administrator','creator']
@bot.message_handler(is_admin=True, commands=['admin']) # Check if user is admin
def admin_rep(message):
bot.send_message(message.chat.id, "Hi admin")
@bot.message_handler(is_admin=False, commands=['admin']) # If user is not admin
def not_admin(message):
bot.send_message(message.chat.id, "You are not admin")
@bot.message_handler(text=['hi']) # Response to hi message
def welcome_hi(message):
bot.send_message(message.chat.id, 'You said hi')
@bot.message_handler(text=['bye']) # Response to bye message
def bye_user(message):
bot.send_message(message.chat.id, 'You said bye')
# Do not forget to register filters
bot.add_custom_filter(MainFilter())
bot.add_custom_filter(IsAdmin())
bot.infinity_polling(skip_pending=True) # Skip old updates

View File

@ -0,0 +1,19 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot('token')
# Chat id can be private or supergroups.
@bot.message_handler(chat_id=[12345678], commands=['admin']) # chat_id checks id corresponds to your list or not.
def admin_rep(message):
bot.send_message(message.chat.id, "You are allowed to use this command.")
@bot.message_handler(commands=['admin'])
def not_admin(message):
bot.send_message(message.chat.id, "You are not allowed to use this command")
# Do not forget to register
bot.add_custom_filter(custom_filters.ChatFilter())
bot.infinity_polling()

View File

@ -0,0 +1,21 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot('TOKEN')
# Check if message is a reply
@bot.message_handler(is_reply=True)
def start_filter(message):
bot.send_message(message.chat.id, "Looks like you replied to my message.")
# Check if message was forwarded
@bot.message_handler(is_forwarded=True)
def text_filter(message):
bot.send_message(message.chat.id, "I do not accept forwarded messages!")
# Do not forget to register filters
bot.add_custom_filter(custom_filters.IsReplyFilter())
bot.add_custom_filter(custom_filters.ForwardFilter())
bot.infinity_polling()

View File

@ -0,0 +1,21 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot('TOKEN')
# Check if message starts with @admin tag
@bot.message_handler(text_startswith="@admin")
def start_filter(message):
bot.send_message(message.chat.id, "Looks like you are calling admin, wait...")
# Check if text is hi or hello
@bot.message_handler(text=['hi','hello'])
def text_filter(message):
bot.send_message(message.chat.id, "Hi, {name}!".format(name=message.from_user.first_name))
# Do not forget to register filters
bot.add_custom_filter(custom_filters.TextMatchFilter())
bot.add_custom_filter(custom_filters.TextStartsFilter())
bot.infinity_polling()

74
examples/custom_states.py Normal file
View File

@ -0,0 +1,74 @@
import telebot
from telebot import custom_filters
bot = telebot.TeleBot("")
class MyStates:
name = 1
surname = 2
age = 3
@bot.message_handler(commands=['start'])
def start_ex(message):
"""
Start command. Here we are starting state
"""
bot.set_state(message.from_user.id, MyStates.name)
bot.send_message(message.chat.id, 'Hi, write me a name')
@bot.message_handler(state="*", commands='cancel')
def any_state(message):
"""
Cancel state
"""
bot.send_message(message.chat.id, "Your state was cancelled.")
bot.delete_state(message.from_user.id)
@bot.message_handler(state=MyStates.name)
def name_get(message):
"""
State 1. Will process when user's state is 1.
"""
bot.send_message(message.chat.id, f'Now write me a surname')
bot.set_state(message.from_user.id, MyStates.surname)
with bot.retrieve_data(message.from_user.id) as data:
data['name'] = message.text
@bot.message_handler(state=MyStates.surname)
def ask_age(message):
"""
State 2. Will process when user's state is 2.
"""
bot.send_message(message.chat.id, "What is your age?")
bot.set_state(message.from_user.id, MyStates.age)
with bot.retrieve_data(message.from_user.id) as data:
data['surname'] = message.text
# result
@bot.message_handler(state=MyStates.age, is_digit=True)
def ready_for_answer(message):
with bot.retrieve_data(message.from_user.id) as data:
bot.send_message(message.chat.id, "Ready, take a look:\n<b>Name: {name}\nSurname: {surname}\nAge: {age}</b>".format(name=data['name'], surname=data['surname'], age=message.text), parse_mode="html")
bot.delete_state(message.from_user.id)
#incorrect number
@bot.message_handler(state=MyStates.age, is_digit=False)
def age_incorrect(message):
bot.send_message(message.chat.id, 'Looks like you are submitting a string in the field age. Please enter a number')
# register filters
bot.add_custom_filter(custom_filters.StateFilter(bot))
bot.add_custom_filter(custom_filters.IsDigitFilter())
# set saving states into file.
bot.enable_saving_states() # you can delete this if you do not need to save states
bot.infinity_polling(skip_pending=True)

View File

@ -74,4 +74,4 @@ def send_welcome(message):
bot.reply_to(message, reply)
bot.polling()
bot.infinity_polling()

View File

@ -130,4 +130,4 @@ def command_default(m):
bot.send_message(m.chat.id, "I don't understand \"" + m.text + "\"\nMaybe try the help page at /help")
bot.polling()
bot.infinity_polling()

View File

@ -25,4 +25,4 @@ def echo_message(message):
bot.reply_to(message, message.text)
bot.polling()
bot.infinity_polling()

View File

@ -61,7 +61,7 @@ def default_query(inline_query):
def main_loop():
bot.polling(True)
bot.infinity_polling()
while 1:
time.sleep(3)

View File

@ -24,4 +24,4 @@ def callback_query(call):
def message_handler(message):
bot.send_message(message.chat.id, "Yes/no?", reply_markup=gen_markup())
bot.polling(none_stop=True)
bot.infinity_polling()

View File

@ -50,4 +50,4 @@ def start(message):
bot.send_message(message.chat.id, _('hello'))
bot.polling()
bot.infinity_polling()

View File

@ -58,4 +58,4 @@ def start(message):
bot.send_message(message.chat.id, bot.session['state'])
bot.polling()
bot.infinity_polling()

View File

@ -78,5 +78,4 @@ def got_payment(message):
parse_mode='Markdown')
bot.skip_pending = True
bot.polling(none_stop=True, interval=0)
bot.infinity_polling(skip_pending = True)

View File

@ -0,0 +1,21 @@
import telebot
api_token = 'token'
bot = telebot.TeleBot(api_token)
def start_executor(message):
bot.send_message(message.chat.id, 'Hello!')
bot.register_message_handler(start_executor, commands=['start']) # Start command executor
# See also
# bot.register_callback_query_handler(*args, **kwargs)
# bot.register_channel_post_handler(*args, **kwargs)
# bot.register_chat_member_handler(*args, **kwargs)
# bot.register_inline_handler(*args, **kwargs)
# bot.register_my_chat_member_handler(*args, **kwargs)
# bot.register_edited_message_handler(*args, **kwargs)
# And other functions..
bot.infinity_polling()

View File

@ -0,0 +1,13 @@
import telebot
bot = telebot.TeleBot("TOKEN")
@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
bot.reply_to(message, "Howdy, how are you doing?")
@bot.message_handler(func=lambda message: True)
def echo_all(message):
bot.reply_to(message, message.text)
bot.infinity_polling(skip_pending=True)# Skip pending skips old updates

View File

@ -68,7 +68,7 @@ def process_sex_step(message):
if (sex == u'Male') or (sex == u'Female'):
user.sex = sex
else:
raise Exception()
raise Exception("Unknown sex")
bot.send_message(chat_id, 'Nice to meet you ' + user.name + '\n Age:' + str(user.age) + '\n Sex:' + user.sex)
except Exception as e:
bot.reply_to(message, 'oooops')
@ -83,4 +83,4 @@ bot.enable_save_next_step_handlers(delay=2)
# WARNING It will work only if enable_save_next_step_handlers was called!
bot.load_next_step_handlers()
bot.polling()
bot.infinity_polling()

View File

@ -81,4 +81,4 @@ def listener(messages):
bot.set_update_listener(listener)
bot.polling()
bot.infinity_polling()

View File

@ -1,6 +1,6 @@
# Webhook examples using pyTelegramBotAPI
There are 4 examples in this directory using different libraries:
There are 5 examples in this directory using different libraries:
* **Python (CPython):** *webhook_cpython_echo_bot.py*
* **Pros:**
@ -37,9 +37,18 @@ There are 4 examples in this directory using different libraries:
* **Pros:**
* It's a web application framework
* Python 3 compatible
* Asynchronous, excellent perfomance
* Asynchronous, excellent performance
* Utilizes new async/await syntax
* **Cons:**
* Requires Python 3.4.2+, don't work with Python 2
*Latest update of this document: 2017-01-30*
* **Twisted (20.3.0):** *webhook_twisted_echo_bot.py*
* **Pros:**
* Asynchronous event-driven networking engine
* Very high performance
* Built-in support for many internet protocols
* **Cons:**
* Twisted is low-level, which may be good or bad depending on use case
* Considerable learning curve - reading docs is a must.
*Latest update of this document: 2020-12-17*

View File

@ -21,7 +21,9 @@ def echo_message(message):
@server.route('/' + TOKEN, methods=['POST'])
def getMessage():
bot.process_new_updates([telebot.types.Update.de_json(request.stream.read().decode("utf-8"))])
json_string = request.get_data().decode('utf-8')
update = telebot.types.Update.de_json(json_string)
bot.process_new_updates([update])
return "!", 200

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This is an example echo bot using webhook with Twisted network framework.
# Updates are received with Twisted web server and processed in reactor thread pool.
# Relevant docs:
# https://twistedmatrix.com/documents/current/core/howto/reactor-basics.html
# https://twistedmatrix.com/documents/current/web/howto/using-twistedweb.html
import logging
import telebot
import json
from twisted.internet import ssl, reactor
from twisted.web.resource import Resource, ErrorPage
from twisted.web.server import Site, Request
API_TOKEN = '<api_token>'
WEBHOOK_HOST = '<ip or hostname>'
WEBHOOK_PORT = 8443 # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0' # In some VPS you may need to put here the IP addr
WEBHOOK_SSL_CERT = './webhook_cert.pem' # Path to the ssl certificate
WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
# Quick'n'dirty SSL certificate generation:
#
# openssl genrsa -out webhook_pkey.pem 2048
# openssl req -new -x509 -days 3650 -key webhook_pkey.pem -out webhook_cert.pem
#
# When asked for "Common Name (e.g. server FQDN or YOUR name)" you should reply
# with the same value in you put in WEBHOOK_HOST
WEBHOOK_URL_BASE = "https://{}:{}".format(WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/{}/".format(API_TOKEN)
logger = telebot.logger
telebot.logger.setLevel(logging.INFO)
bot = telebot.TeleBot(API_TOKEN)
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
def send_welcome(message):
bot.reply_to(message,
("Hi there, I am EchoBot.\n"
"I am here to echo your kind words back to you."))
# Handle all other messages
@bot.message_handler(func=lambda message: True, content_types=['text'])
def echo_message(message):
bot.reply_to(message, message.text)
# Remove webhook, it fails sometimes the set if there is a previous webhook
bot.remove_webhook()
# Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r'))
# Process webhook calls
class WebhookHandler(Resource):
isLeaf = True
def render_POST(self, request: Request):
request_body_dict = json.load(request.content)
update = telebot.types.Update.de_json(request_body_dict)
reactor.callInThread(lambda: bot.process_new_updates([update]))
return b''
root = ErrorPage(403, 'Forbidden', '')
root.putChild(API_TOKEN.encode(), WebhookHandler())
site = Site(root)
sslcontext = ssl.DefaultOpenSSLContextFactory(WEBHOOK_SSL_PRIV, WEBHOOK_SSL_CERT)
reactor.listenSSL(8443, site, sslcontext)
reactor.run()