1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

fix: some intendation

This commit is contained in:
uburuntu 2018-08-17 13:01:03 +03:00
parent 99466017c5
commit 36621bb22a
13 changed files with 61 additions and 38 deletions

View File

@ -34,30 +34,35 @@ import telebot
bot = telebot.TeleBot('TOKEN') bot = telebot.TeleBot('TOKEN')
def extract_unique_code(text): def extract_unique_code(text):
# Extracts the unique_code from the sent /start command. # Extracts the unique_code from the sent /start command.
return text.split()[1] if len(text.split()) > 1 else None return text.split()[1] if len(text.split()) > 1 else None
def in_storage(unique_code): def in_storage(unique_code):
# (pseudo-code) Should check if a unique code exists in storage # (pseudo-code) Should check if a unique code exists in storage
return True return True
def get_username_from_storage(unique_code): def get_username_from_storage(unique_code):
# (pseudo-code) Does a query to the storage, retrieving the associated username # (pseudo-code) Does a query to the storage, retrieving the associated username
# Should be replaced by a real database-lookup. # Should be replaced by a real database-lookup.
return "ABC" if in_storage(unique_code) else None return "ABC" if in_storage(unique_code) else None
def save_chat_id(chat_id, username): def save_chat_id(chat_id, username):
# (pseudo-code) Save the chat_id->username to storage # (pseudo-code) Save the chat_id->username to storage
# Should be replaced by a real database query. # Should be replaced by a real database query.
pass pass
@bot.message_handler(commands=['start']) @bot.message_handler(commands=['start'])
def send_welcome(message): def send_welcome(message):
unique_code = extract_unique_code(message.text) unique_code = extract_unique_code(message.text)
if unique_code: # if the '/start' command contains a unique_code if unique_code: # if the '/start' command contains a unique_code
username = get_username_from_storage(unique_code) username = get_username_from_storage(unique_code)
if username: # if the username exists in our database if username: # if the username exists in our database
save_chat_id(message.chat.id, username) save_chat_id(message.chat.id, username)
reply = "Hello {0}, how are you?".format(username) reply = "Hello {0}, how are you?".format(username)
else: else:
@ -66,4 +71,5 @@ def send_welcome(message):
reply = "Please visit me via a provided URL from the website." reply = "Please visit me via a provided URL from the website."
bot.reply_to(message, reply) bot.reply_to(message, reply)
bot.polling() bot.polling()

View File

@ -13,10 +13,10 @@ knownUsers = [] # todo: save these in a file,
userStep = {} # so they won't reset every time the bot restarts userStep = {} # so they won't reset every time the bot restarts
commands = { # command description used in the "help" command commands = { # command description used in the "help" command
'start': 'Get used to the bot', 'start' : 'Get used to the bot',
'help': 'Gives you information about the available commands', 'help' : 'Gives you information about the available commands',
'sendLongText': 'A test using the \'send_chat_action\' command', 'sendLongText': 'A test using the \'send_chat_action\' command',
'getImage': 'A test using multi-stage messages, custom keyboard, and media sending' 'getImage' : 'A test using multi-stage messages, custom keyboard, and media sending'
} }
imageSelect = types.ReplyKeyboardMarkup(one_time_keyboard=True) # create the image selection keyboard imageSelect = types.ReplyKeyboardMarkup(one_time_keyboard=True) # create the image selection keyboard
@ -129,4 +129,5 @@ def command_default(m):
# this is the standard reply to a normal message # this is the standard reply to a normal message
bot.send_message(m.chat.id, "I don't understand \"" + m.text + "\"\nMaybe try the help page at /help") bot.send_message(m.chat.id, "I don't understand \"" + m.text + "\"\nMaybe try the help page at /help")
bot.polling() bot.polling()

View File

@ -7,6 +7,7 @@ API_TOKEN = '<api_token>'
bot = telebot.TeleBot(API_TOKEN) bot = telebot.TeleBot(API_TOKEN)
# Handle '/start' and '/help' # Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start']) @bot.message_handler(commands=['help', 'start'])
def send_welcome(message): def send_welcome(message):
@ -21,4 +22,5 @@ I am here to echo your kind words back to you. Just say anything nice and I'll s
def echo_message(message): def echo_message(message):
bot.reply_to(message, message.text) bot.reply_to(message, message.text)
bot.polling() bot.polling()

View File

@ -83,5 +83,4 @@ bot.enable_save_next_step_handlers(delay=2)
# WARNING It will work only if enable_save_next_step_handlers was called! # WARNING It will work only if enable_save_next_step_handlers was called!
bot.load_next_step_handlers() bot.load_next_step_handlers()
bot.polling() bot.polling()

View File

@ -34,8 +34,10 @@ if "TELEBOT_BOT_TOKEN" not in os.environ or "GROUP_CHAT_ID" not in os.environ:
bot = telebot.AsyncTeleBot(os.environ["TELEBOT_BOT_TOKEN"]) bot = telebot.AsyncTeleBot(os.environ["TELEBOT_BOT_TOKEN"])
GROUP_CHAT_ID = int(os.environ["GROUP_CHAT_ID"]) GROUP_CHAT_ID = int(os.environ["GROUP_CHAT_ID"])
def is_api_group(chat_id): def is_api_group(chat_id):
return chat_id== GROUP_CHAT_ID return chat_id == GROUP_CHAT_ID
@bot.message_handler(func=lambda m: True, content_types=['new_chat_participant']) @bot.message_handler(func=lambda m: True, content_types=['new_chat_participant'])
def on_user_joins(message): def on_user_joins(message):
@ -51,6 +53,7 @@ def on_user_joins(message):
bot.reply_to(message, text_messages['welcome'].format(name=name)) bot.reply_to(message, text_messages['welcome'].format(name=name))
@bot.message_handler(commands=['info', 'help']) @bot.message_handler(commands=['info', 'help'])
def on_info(message): def on_info(message):
if not is_api_group(message.chat.id): if not is_api_group(message.chat.id):
@ -59,21 +62,23 @@ def on_info(message):
bot.reply_to(message, text_messages['info']) bot.reply_to(message, text_messages['info'])
@bot.message_handler(commands=["ping"]) @bot.message_handler(commands=["ping"])
def on_ping(message): def on_ping(message):
bot.reply_to(message, "Still alive and kicking!") bot.reply_to(message, "Still alive and kicking!")
@bot.message_handler(commands=['start']) @bot.message_handler(commands=['start'])
def on_start(message): def on_start(message):
if not is_api_group(message.chat.id): if not is_api_group(message.chat.id):
bot.reply_to(message, text_messages['wrong_chat']) bot.reply_to(message, text_messages['wrong_chat'])
return return
def listener(messages): def listener(messages):
for m in messages: for m in messages:
print(str(m)) print(str(m))
bot.set_update_listener(listener) bot.set_update_listener(listener)
bot.polling() bot.polling()

View File

@ -31,7 +31,6 @@ WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
WEBHOOK_URL_BASE = "https://{}:{}".format(WEBHOOK_HOST, WEBHOOK_PORT) WEBHOOK_URL_BASE = "https://{}:{}".format(WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/{}/".format(API_TOKEN) WEBHOOK_URL_PATH = "/{}/".format(API_TOKEN)
logger = telebot.logger logger = telebot.logger
telebot.logger.setLevel(logging.INFO) telebot.logger.setLevel(logging.INFO)
@ -50,6 +49,7 @@ async def handle(request):
else: else:
return web.Response(status=403) return web.Response(status=403)
app.router.add_post('/{token}/', handle) app.router.add_post('/{token}/', handle)
@ -71,7 +71,7 @@ def echo_message(message):
bot.remove_webhook() bot.remove_webhook()
# Set webhook # Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE+WEBHOOK_URL_PATH, bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r')) certificate=open(WEBHOOK_SSL_CERT, 'r'))
# Build ssl context # Build ssl context

View File

@ -30,7 +30,6 @@ WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT) WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN) WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN)
logger = telebot.logger logger = telebot.logger
telebot.logger.setLevel(logging.INFO) telebot.logger.setLevel(logging.INFO)
@ -71,7 +70,7 @@ def echo_message(message):
bot.remove_webhook() bot.remove_webhook()
# Set webhook # Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE+WEBHOOK_URL_PATH, bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r')) certificate=open(WEBHOOK_SSL_CERT, 'r'))
# Disable CherryPy requests log # Disable CherryPy requests log
@ -81,9 +80,9 @@ for handler in tuple(access_log.handlers):
# Start cherrypy server # Start cherrypy server
cherrypy.config.update({ cherrypy.config.update({
'server.socket_host': WEBHOOK_LISTEN, 'server.socket_host' : WEBHOOK_LISTEN,
'server.socket_port': WEBHOOK_PORT, 'server.socket_port' : WEBHOOK_PORT,
'server.ssl_module': 'builtin', 'server.ssl_module' : 'builtin',
'server.ssl_certificate': WEBHOOK_SSL_CERT, 'server.ssl_certificate': WEBHOOK_SSL_CERT,
'server.ssl_private_key': WEBHOOK_SSL_PRIV 'server.ssl_private_key': WEBHOOK_SSL_PRIV
}) })

View File

@ -36,7 +36,6 @@ WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT) WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN) WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN)
logger = telebot.logger logger = telebot.logger
telebot.logger.setLevel(logging.INFO) telebot.logger.setLevel(logging.INFO)
@ -90,12 +89,12 @@ def echo_message(message):
bot.remove_webhook() bot.remove_webhook()
# Set webhook # Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE+WEBHOOK_URL_PATH, bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r')) certificate=open(WEBHOOK_SSL_CERT, 'r'))
# Start server # Start server
httpd = HTTPServer((WEBHOOK_LISTEN, WEBHOOK_PORT), httpd = HTTPServer((WEBHOOK_LISTEN, WEBHOOK_PORT),
WebhookHandler) WebhookHandler)
httpd.socket = ssl.wrap_socket(httpd.socket, httpd.socket = ssl.wrap_socket(httpd.socket,
certfile=WEBHOOK_SSL_CERT, certfile=WEBHOOK_SSL_CERT,

View File

@ -31,7 +31,6 @@ WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT) WEBHOOK_URL_BASE = "https://%s:%s" % (WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN) WEBHOOK_URL_PATH = "/%s/" % (API_TOKEN)
logger = telebot.logger logger = telebot.logger
telebot.logger.setLevel(logging.INFO) telebot.logger.setLevel(logging.INFO)
@ -78,7 +77,7 @@ bot.remove_webhook()
time.sleep(0.1) time.sleep(0.1)
# Set webhook # Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE+WEBHOOK_URL_PATH, bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r')) certificate=open(WEBHOOK_SSL_CERT, 'r'))
# Start flask server # Start flask server

View File

@ -31,15 +31,18 @@ WEBHOOK_URL_BASE = "https://{0}:{1}/{2}".format(WEBHOOK_HOST, str(WEBHOOK_PORT),
bot = telebot.TeleBot(API_TOKEN) bot = telebot.TeleBot(API_TOKEN)
class Root(tornado.web.RequestHandler): class Root(tornado.web.RequestHandler):
def get(self): def get(self):
self.write("Hi! This is webhook example!") self.write("Hi! This is webhook example!")
self.finish() self.finish()
class webhook_serv(tornado.web.RequestHandler): class webhook_serv(tornado.web.RequestHandler):
def get(self): def get(self):
self.write("What are you doing here?") self.write("What are you doing here?")
self.finish() self.finish()
def post(self): def post(self):
if "Content-Length" in self.request.headers and \ if "Content-Length" in self.request.headers and \
"Content-Type" in self.request.headers and \ "Content-Type" in self.request.headers and \
@ -54,21 +57,26 @@ class webhook_serv(tornado.web.RequestHandler):
else: else:
self.write("What are you doing here?") self.write("What are you doing here?")
self.finish() self.finish()
tornado.options.define("port", default=WEBHOOK_PORT, help="run on the given port", type=int) tornado.options.define("port", default=WEBHOOK_PORT, help="run on the given port", type=int)
is_closing = False is_closing = False
def signal_handler(signum, frame): def signal_handler(signum, frame):
global is_closing global is_closing
print("Exiting...") print("Exiting...")
is_closing = True is_closing = True
def try_exit(): def try_exit():
global is_closing global is_closing
if is_closing: if is_closing:
# clean up here # clean up here
tornado.ioloop.IOLoop.instance().stop() tornado.ioloop.IOLoop.instance().stop()
print("Exit success!") print("Exit success!")
# Handle '/start' and '/help' # Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start']) @bot.message_handler(commands=['help', 'start'])
def send_welcome(message): def send_welcome(message):
@ -76,6 +84,7 @@ def send_welcome(message):
("Hi there, I am EchoBot.\n" ("Hi there, I am EchoBot.\n"
"I am here to echo your kind words back to you.")) "I am here to echo your kind words back to you."))
bot.remove_webhook() bot.remove_webhook()
bot.set_webhook(url=WEBHOOK_URL_BASE, bot.set_webhook(url=WEBHOOK_URL_BASE,
certificate=open(WEBHOOK_CERT, 'r')) certificate=open(WEBHOOK_CERT, 'r'))
@ -88,9 +97,9 @@ application = tornado.web.Application([
]) ])
http_server = tornado.httpserver.HTTPServer(application, ssl_options={ http_server = tornado.httpserver.HTTPServer(application, ssl_options={
"certfile": WEBHOOK_CERT, "certfile": WEBHOOK_CERT,
"keyfile": WEBHOOK_PKEY, "keyfile" : WEBHOOK_PKEY,
}) })
http_server.listen(tornado.options.options.port) http_server.listen(tornado.options.options.port)
tornado.ioloop.PeriodicCallback(try_exit, 100).start() tornado.ioloop.PeriodicCallback(try_exit, 100).start()
tornado.ioloop.IOLoop.instance().start() tornado.ioloop.IOLoop.instance().start()

View File

@ -33,6 +33,7 @@ class Handler:
""" """
Class for (next step|reply) handlers Class for (next step|reply) handlers
""" """
def __init__(self, callback, *args, **kwargs): def __init__(self, callback, *args, **kwargs):
self.callback = callback self.callback = callback
self.args = args self.args = args
@ -46,6 +47,7 @@ class Saver:
""" """
Class for saving (next step|reply) handlers Class for saving (next step|reply) handlers
""" """
def __init__(self, handlers, filename, delay): def __init__(self, handlers, filename, delay):
self.handlers = handlers self.handlers = handlers
self.filename = filename self.filename = filename
@ -1303,12 +1305,11 @@ class TeleBot:
if not was_poped: if not was_poped:
i += 1 i += 1
@staticmethod @staticmethod
def _build_handler_dict(handler, **filters): def _build_handler_dict(handler, **filters):
return { return {
'function': handler, 'function': handler,
'filters': filters 'filters' : filters
} }
def message_handler(self, commands=None, regexp=None, func=None, content_types=['text'], **kwargs): def message_handler(self, commands=None, regexp=None, func=None, content_types=['text'], **kwargs):
@ -1518,8 +1519,6 @@ class AsyncTeleBot(TeleBot):
def load_reply_handlers(self, filename="./.handler-saves/reply.save", del_file_after_loading=True): def load_reply_handlers(self, filename="./.handler-saves/reply.save", del_file_after_loading=True):
return TeleBot.load_reply_handlers(self, filename, del_file_after_loading) return TeleBot.load_reply_handlers(self, filename, del_file_after_loading)
@util.async_dec()
@util.async_dec() @util.async_dec()
def get_me(self): def get_me(self):
return TeleBot.get_me(self) return TeleBot.get_me(self)

View File

@ -457,11 +457,11 @@ class Message(JsonDeserializable):
if not entities: if not entities:
return text return text
_subs = { _subs = {
"bold": "<b>{text}</b>", "bold" : "<b>{text}</b>",
"italic": "<i>{text}</i>", "italic" : "<i>{text}</i>",
"pre": "<pre>{text}</pre>", "pre" : "<pre>{text}</pre>",
"code": "<code>{text}</code>", "code" : "<code>{text}</code>",
"url": "<a href=\"{url}\">{text}</a>", "url" : "<a href=\"{url}\">{text}</a>",
"text_link": "<a href=\"{url}\">{text}</a>" "text_link": "<a href=\"{url}\">{text}</a>"
} }
if hasattr(self, "custom_subs"): if hasattr(self, "custom_subs"):
@ -469,6 +469,7 @@ class Message(JsonDeserializable):
_subs[type] = self.custom_subs[type] _subs[type] = self.custom_subs[type]
utf16_text = text.encode("utf-16-le") utf16_text = text.encode("utf-16-le")
html_text = "" html_text = ""
def func(text, type=None, url=None, user=None): def func(text, type=None, url=None, user=None):
text = text.decode("utf-16-le") text = text.decode("utf-16-le")
if type == "text_mention": if type == "text_mention":
@ -501,6 +502,7 @@ class Message(JsonDeserializable):
def html_caption(self): def html_caption(self):
return self.__html_text(self.caption, self.caption_entities) return self.__html_text(self.caption, self.caption_entities)
class MessageEntity(JsonDeserializable): class MessageEntity(JsonDeserializable):
@classmethod @classmethod
def de_json(cls, json_string): def de_json(cls, json_string):
@ -1069,7 +1071,7 @@ class InputVenueMessageContent(Dictionaryable):
def to_dic(self): def to_dic(self):
json_dic = {'latitude': self.latitude, 'longitude': self.longitude, 'title': self.title, json_dic = {'latitude': self.latitude, 'longitude': self.longitude, 'title': self.title,
'address': self.address} 'address' : self.address}
if self.foursquare_id: if self.foursquare_id:
json_dic['foursquare_id'] = self.foursquare_id json_dic['foursquare_id'] = self.foursquare_id
return json_dic return json_dic

View File

@ -113,6 +113,7 @@ def test_json_voice():
assert voice.duration == 0 assert voice.duration == 0
assert voice.file_size == 10481 assert voice.file_size == 10481
def test_json_update(): def test_json_update():
json_string = r'{"update_id":938203,"message":{"message_id":241,"from":{"is_bot":true,"id":9734,"first_name":"Fk","last_name":"Wg","username":"nir"},"chat":{"id":1111,"first_name":"Fk","type":"private","last_name":"Wg","username":"oir"},"date":1441447009,"text":"HIHI"}}' json_string = r'{"update_id":938203,"message":{"message_id":241,"from":{"is_bot":true,"id":9734,"first_name":"Fk","last_name":"Wg","username":"nir"},"chat":{"id":1111,"first_name":"Fk","type":"private","last_name":"Wg","username":"oir"},"date":1441447009,"text":"HIHI"}}'
update = types.Update.de_json(json_string) update = types.Update.de_json(json_string)
@ -120,6 +121,7 @@ def test_json_update():
assert update.message.message_id == 241 assert update.message.message_id == 241
assert update.message.from_user.id == 9734 assert update.message.from_user.id == 9734
def test_json_chat(): def test_json_chat():
json_string = r'{"id": -111111,"title": "Test Title","type": "group"}' json_string = r'{"id": -111111,"title": "Test Title","type": "group"}'
chat = types.Chat.de_json(json_string) chat = types.Chat.de_json(json_string)
@ -127,6 +129,7 @@ def test_json_chat():
assert chat.type == 'group' assert chat.type == 'group'
assert chat.title == 'Test Title' assert chat.title == 'Test Title'
def test_InlineQueryResultCachedPhoto(): def test_InlineQueryResultCachedPhoto():
iq = types.InlineQueryResultCachedPhoto('aaa', 'Fileid') iq = types.InlineQueryResultCachedPhoto('aaa', 'Fileid')
json_str = iq.to_json() json_str = iq.to_json()
@ -143,6 +146,7 @@ def test_InlineQueryResultCachedPhoto_with_title():
assert 'Title' in json_str assert 'Title' in json_str
assert 'caption' not in json_str assert 'caption' not in json_str
def test_InlineQueryResultCachedPhoto_with_markup(): def test_InlineQueryResultCachedPhoto_with_markup():
markup = types.InlineKeyboardMarkup() markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("Google", url="http://www.google.com")) markup.add(types.InlineKeyboardButton("Google", url="http://www.google.com"))
@ -154,4 +158,3 @@ def test_InlineQueryResultCachedPhoto_with_markup():
assert 'Title' in json_str assert 'Title' in json_str
assert 'caption' not in json_str assert 'caption' not in json_str
assert 'reply_markup' in json_str assert 'reply_markup' in json_str