1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge pull request #20 from pevdh/develop

Some new features and minor improvements.
This commit is contained in:
FrankWang 2015-07-02 21:21:57 +08:00
commit 144fa64604
5 changed files with 187 additions and 107 deletions

View File

@ -165,4 +165,4 @@ def listener1(*messages):
- [x] sendLocation
- [x] sendChatAction
- [x] getUserProfilePhotos
- [ ] getUpdat(contact and chat message not yet)
- [ ] getUpdat(chat message not yet)

View File

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import json
import time
import threading
import re
from telebot import apihelper, types
@ -41,18 +41,25 @@ class TeleBot:
self.last_update_id = 0
self.commands = []
def get_update(self):
result = apihelper.get_updates(self.token, offset=(self.last_update_id + 1))
updates = result['result']
"""
Retrieves any updates from the Telegram API.
Registered listeners and applicable message handlers will be notified when a new message arrives.
:raises ApiException when a call has failed.
"""
updates = apihelper.get_updates(self.token, offset=(self.last_update_id + 1))
new_messages = []
for update in updates:
if update['update_id'] > self.last_update_id:
self.last_update_id = update['update_id']
msg = types.Message.de_json(json.dumps(update['message']))
msg = types.Message.de_json(update['message'])
new_messages.append(msg)
if len(new_messages) > 0:
self.__notify_update(new_messages)
self._notify_command_handlers(new_messages)
def __notify_update(self, new_messages):
for listener in self.update_listener:
@ -93,7 +100,7 @@ class TeleBot:
def get_me(self):
result = apihelper.get_me(self.token)
return types.User.de_json(json.dumps(result['result']))
return types.User.de_json(result)
def get_user_profile_photos(self, user_id, offset=None, limit=None):
"""
@ -105,7 +112,7 @@ class TeleBot:
:return:
"""
result = apihelper.get_user_profile_photos(self.token, user_id, offset, limit)
return types.UserProfilePhotos.de_json(json.dumps(result['result']))
return types.UserProfilePhotos.de_json(result)
def send_message(self, chat_id, text, disable_web_page_preview=None, reply_to_message_id=None, reply_markup=None):
"""
@ -211,3 +218,55 @@ class TeleBot:
"""
return apihelper.send_chat_action(self.token, chat_id, action)
def message_handler(self, regexp=None, func=None, content_types=['text']):
"""
Message handler decorator.
This decorator can be used to decorate functions that must handle certain types of messages.
All message handlers are tested in the order they were added.
Example:
bot = TeleBot('TOKEN')
# Handles all messages which text matches regexp.
@bot.message_handler(regexp='someregexp')
def command_help(message):
bot.send_message(message.chat.id, 'Did someone call for help?')
# Handle all sent documents of type 'text/plain'.
@bot.message_handler(func=lambda message: message.document.mime_type == 'text/plain', content_types=['document'])
def command_handle_document(message):
bot.send_message(message.chat.id, 'Document received, sir!')
# Handle all other commands.
@bot.message_handler(func=lambda message: True, content_types=['audio', 'video', 'document', 'text', 'location', 'contact', 'sticker'])
def default_command(message):
bot.send_message(message.chat.id, "This is the default command handler.")
:param regexp: Optional regular expression.
:param func: Optional lambda function. The lambda receives the message to test as the first parameter. It must return True if the command should handle the message.
:param content_types: This commands' supported content types. Must be a list. Defaults to ['text'].
:return:
"""
def decorator(fn):
self.commands.append([fn, regexp, func, content_types])
return fn
return decorator
@staticmethod
def _test_command(command, message):
if message.content_type not in command[3]:
return False
if command[1] is not None and message.content_type == 'text' and re.search(command[1], message.text):
return True
if command[2] is not None:
return command[2](message)
return False
def _notify_command_handlers(self, new_messages):
for message in new_messages:
for command in self.commands:
if self._test_command(command, message):
t = threading.Thread(target=command[0], args=(message,))
t.start()
break

View File

@ -6,12 +6,32 @@ import telebot
from telebot import types
def _make_request(token, method_name, method='get', params=None, files=None):
"""
Makes a request to the Telegram API.
:param token: The bot's API token. (Created with @BotFather)
:param method_name: Name of the API method to be called. (E.g. 'getUpdates')
:param method: HTTP method to be used. Defaults to 'get'.
:param params: Optional parameters. Should be a dictionary with key-value pairs.
:param files: Optional files.
:return:
"""
request_url = telebot.API_URL + 'bot' + token + '/' + method_name
result = requests.request(method, request_url, params=params, files=files)
if result.status_code != 200:
raise ApiException(method_name, result)
try:
result_json = result.json()
if not result_json['ok']:
raise Exception()
except:
raise ApiException(method_name, result)
return result_json['result']
def get_me(token):
api_url = telebot.API_URL
method_url = r'getMe'
request_url = api_url + 'bot' + token + '/' + method_url
req = requests.get(request_url)
return check_result(method_url, req)
method_url = 'getMe'
return _make_request(token, method_url)
def send_message(token, chat_id, text, disable_web_page_preview=None, reply_to_message_id=None, reply_markup=None):
@ -25,56 +45,43 @@ def send_message(token, chat_id, text, disable_web_page_preview=None, reply_to_m
:param reply_markup:
:return:
"""
api_url = telebot.API_URL
method_url = r'sendMessage'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': str(chat_id), 'text': text}
if disable_web_page_preview:
payload['disable_web_page_preview'] = disable_web_page_preview
if reply_to_message_id:
payload['reply_to_message_id'] = reply_to_message_id
if reply_markup:
payload['reply_markup'] = convert_markup(reply_markup)
req = requests.get(request_url, params=payload)
return check_result(method_url, req)
payload['reply_markup'] = _convert_markup(reply_markup)
return _make_request(token, method_url, params=payload)
def get_updates(token, offset=None):
api_url = telebot.API_URL
method_url = r'getUpdates'
if offset is not None:
request_url = api_url + 'bot' + token + '/' + method_url + '?offset=' + str(offset)
return _make_request(token, method_url, params={'offset': offset})
else:
request_url = api_url + 'bot' + token + '/' + method_url
req = requests.get(request_url)
return check_result(method_url, req)
return _make_request(token, method_url)
def get_user_profile_photos(token, user_id, offset=None, limit=None):
api_url = telebot.API_URL
method_url = r'getUserProfilePhotos'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'user_id': user_id}
if offset:
payload['offset'] = offset
if limit:
payload['limit'] = limit
req = requests.get(request_url, params=payload)
return check_result(method_url, req)
return _make_request(token, method_url, params=payload)
def forward_message(token, chat_id, from_chat_id, message_id):
api_url = telebot.API_URL
method_url = r'forwardMessage'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': chat_id, 'from_chat_id': from_chat_id, 'message_id': message_id}
req = requests.get(request_url, params=payload)
return check_result(method_url, req)
return _make_request(token, method_url, params=payload)
def send_photo(token, chat_id, photo, caption=None, reply_to_message_id=None, reply_markup=None):
api_url = telebot.API_URL
method_url = r'sendPhoto'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': chat_id}
files = {'photo': photo}
if caption:
@ -82,45 +89,35 @@ def send_photo(token, chat_id, photo, caption=None, reply_to_message_id=None, re
if reply_to_message_id:
payload['reply_to_message_id'] = reply_to_message_id
if reply_markup:
payload['reply_markup'] = convert_markup(reply_markup)
req = requests.post(request_url, params=payload, files=files)
return check_result(method_url, req)
payload['reply_markup'] = _convert_markup(reply_markup)
return _make_request(token, method_url, params=payload, files=files, method='post')
def send_location(token, chat_id, latitude, longitude, reply_to_message_id=None, reply_markup=None):
api_url = telebot.API_URL
method_url = r'sendLocation'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': chat_id, 'latitude': latitude, 'longitude': longitude}
if reply_to_message_id:
payload['reply_to_message_id'] = reply_to_message_id
if reply_markup:
payload['reply_markup'] = convert_markup(reply_markup)
req = requests.get(request_url, params=payload)
return check_result(method_url, req)
payload['reply_markup'] = _convert_markup(reply_markup)
return _make_request(token, method_url, params=payload)
def send_chat_action(token, chat_id, action):
api_url = telebot.API_URL
method_url = r'sendChatAction'
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': chat_id, 'action': action}
req = requests.get(request_url, params=payload)
return check_result(method_url, req)
return _make_request(token, method_url, params=payload)
def send_data(token, chat_id, data, data_type, reply_to_message_id=None, reply_markup=None):
api_url = telebot.API_URL
method_url = get_method_by_type(data_type)
request_url = api_url + 'bot' + token + '/' + method_url
payload = {'chat_id': chat_id}
files = {data_type: data}
if reply_to_message_id:
payload['reply_to_message_id'] = reply_to_message_id
if reply_markup:
payload['reply_markup'] = convert_markup(reply_markup)
req = requests.post(request_url, params=payload, files=files)
return check_result(method_url, req)
payload['reply_markup'] = _convert_markup(reply_markup)
return _make_request(token, method_url, params=payload, files=files, method='post')
def get_method_by_type(data_type):
@ -134,25 +131,17 @@ def get_method_by_type(data_type):
return 'sendVideo'
def check_result(func_name, result):
if result.status_code != 200:
raise ApiError(func_name + r' error.', result)
try:
result_json = result.json()
if not result_json['ok']:
raise Exception(func_name, ' failed, result=' + result_json)
except:
raise ApiError(func_name + r' error.', result)
return result_json
def convert_markup(markup):
if isinstance(markup, types.Jsonable):
def _convert_markup(markup):
if isinstance(markup, types.JsonSerializable):
return markup.to_json()
return markup
class ApiError(Exception):
def __init__(self, message, result):
super(ApiError, self).__init__(message)
class ApiException(Exception):
"""
This class represents an Exception thrown when a call to the Telegram API fails.
"""
def __init__(self, function_name, result):
super(ApiException, self).__init__('{0} failed. Returned result: {1}'.format(function_name, result))
self.function_name = function_name
self.result = result

View File

@ -23,7 +23,7 @@ ForceReply
import json
class Jsonable:
class JsonSerializable:
"""
Subclasses of this class are guaranteed to be able to be converted to JSON format.
All subclasses of this class must override to_json.
@ -39,10 +39,41 @@ class Jsonable:
raise NotImplementedError
class User:
class JsonDeserializable:
"""
Subclasses of this class are guaranteed to be able to be created from a json-style dict or json formatted string.
All subclasses of this class must override de_json.
"""
@classmethod
def de_json(cls, json_type):
"""
Returns an instance of this class from the given json dict or string.
This function must be overridden by subclasses.
:return: an instance of this class created from the given json dict or string.
"""
raise NotImplementedError
@staticmethod
def check_json(json_type):
"""
Checks whether json_type is a dict or a string. If it is already a dict, it is returned as-is.
If it is not, it is converted to a dict by means of json.loads(json_type)
:param json_type:
:return:
"""
if type(json_type) == dict:
return json_type
elif type(json_type) == str:
return json.loads(json_type)
else:
raise ValueError("json_type should be a json dict or string.")
class User(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
id = obj['id']
first_name = obj['first_name']
last_name = None
@ -60,10 +91,10 @@ class User:
self.last_name = last_name
class GroupChat:
class GroupChat(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
id = obj['id']
title = obj['title']
return GroupChat(id, title)
@ -73,12 +104,12 @@ class GroupChat:
self.title = title
class Message:
class Message(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
message_id = obj['message_id']
from_user = User.de_json(json.dumps(obj['from']))
from_user = User.de_json(obj['from'])
chat = Message.parse_chat(obj['chat'])
date = obj['date']
content_type = None
@ -87,22 +118,22 @@ class Message:
opts['text'] = obj['text']
content_type = 'text'
if 'audio' in obj:
opts['audio'] = Audio.de_json(json.dumps(obj['audio']))
opts['audio'] = Audio.de_json(obj['audio'])
content_type = 'audio'
if 'document' in obj:
opts['document'] = Document.de_json(json.dumps(obj['document']))
opts['document'] = Document.de_json(obj['document'])
content_type = 'document'
if 'photo' in obj:
opts['photo'] = Message.parse_photo(obj['photo'])
content_type = 'photo'
if 'sticker' in obj:
opts['sticker'] = Sticker.de_json(json.dumps(obj['sticker']))
opts['sticker'] = Sticker.de_json(obj['sticker'])
content_type = 'sticker'
if 'video' in obj:
opts['video'] = Video.de_json(json.dumps(obj['video']))
opts['video'] = Video.de_json(obj['video'])
content_type = 'video'
if 'location' in obj:
opts['location'] = Location.de_json(json.dumps(obj['location']))
opts['location'] = Location.de_json(obj['location'])
content_type = 'location'
if 'contact' in obj:
opts['contact'] = Contact.de_json(json.dumps(obj['contact']))
@ -112,15 +143,15 @@ class Message:
@classmethod
def parse_chat(cls, chat):
if 'first_name' not in chat:
return GroupChat.de_json(json.dumps(chat))
return GroupChat.de_json(chat)
else:
return User.de_json(json.dumps(chat))
return User.de_json(chat)
@classmethod
def parse_photo(cls, photo_size_array):
ret = []
for ps in photo_size_array:
ret.append(PhotoSize.de_json(json.dumps(ps)))
ret.append(PhotoSize.de_json(ps))
return ret
def __init__(self, message_id, from_user, date, chat, content_type, options):
@ -133,10 +164,10 @@ class Message:
setattr(self, key, options[key])
class PhotoSize:
class PhotoSize(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
file_id = obj['file_id']
width = obj['width']
height = obj['height']
@ -152,10 +183,10 @@ class PhotoSize:
self.file_id = file_id
class Audio:
class Audio(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
file_id = obj['file_id']
duration = obj['duration']
mime_type = None
@ -173,14 +204,14 @@ class Audio:
self.file_size = file_size
class Document:
class Document(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
file_id = obj['file_id']
thumb = None
if 'file_id' in obj['thumb']:
thumb = PhotoSize.de_json(json.dumps(obj['thumb']))
thumb = PhotoSize.de_json(obj['thumb'])
file_name = None
mime_type = None
file_size = None
@ -200,14 +231,14 @@ class Document:
self.file_size = file_size
class Sticker:
class Sticker(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
file_id = obj['file_id']
width = obj['width']
height = obj['height']
thumb = PhotoSize.de_json(json.dumps(obj['thumb']))
thumb = PhotoSize.de_json(obj['thumb'])
file_size = None
if 'file_size' in obj:
file_size = obj['file_size']
@ -221,16 +252,15 @@ class Sticker:
self.file_size = file_size
class Video:
class Video(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
file_id = obj['file_id']
width = obj['width']
height = obj['height']
duration = obj['duration']
if 'file_id' in obj['thumb']:
thumb = PhotoSize.de_json(json.dumps(obj['thumb']))
thumb = PhotoSize.de_json(obj['thumb'])
caption = None
mime_type = None
file_size = None
@ -253,10 +283,10 @@ class Video:
self.caption = caption
class Contact:
class Contact(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
phone_number = obj['phone_number']
first_name = obj['first_name']
last_name = None
@ -273,10 +303,10 @@ class Contact:
self.user_id = user_id
class Location:
class Location(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
longitude = obj['longitude']
latitude = obj['latitude']
return Location(longitude, latitude)
@ -286,12 +316,12 @@ class Location:
self.latitude = latitude
class UserProfilePhotos:
class UserProfilePhotos(JsonDeserializable):
@classmethod
def de_json(cls, json_string):
obj = json.loads(json_string)
obj = cls.check_json(json_string)
total_count = obj['total_count']
photos = [[PhotoSize.de_json(json.dumps(y)) for y in x] for x in obj['photos']]
photos = [[PhotoSize.de_json(y) for y in x] for x in obj['photos']]
return UserProfilePhotos(total_count, photos)
def __init__(self, total_count, photos):
@ -299,7 +329,7 @@ class UserProfilePhotos:
self.photos = photos
class ForceReply(Jsonable):
class ForceReply(JsonSerializable):
def __init__(self, selective=None):
self.selective = selective
@ -310,7 +340,7 @@ class ForceReply(Jsonable):
return json.dumps(json_dict)
class ReplyKeyboardHide(Jsonable):
class ReplyKeyboardHide(JsonSerializable):
def __init__(self, selective=None):
self.selective = selective
@ -321,7 +351,7 @@ class ReplyKeyboardHide(Jsonable):
return json.dumps(json_dict)
class ReplyKeyboardMarkup(Jsonable):
class ReplyKeyboardMarkup(JsonSerializable):
def __init__(self, resize_keyboard=None, one_time_keyboard=None, selective=None, row_width=3):
self.resize_keyboard = resize_keyboard
self.one_time_keyboard = one_time_keyboard

View File

@ -82,14 +82,16 @@ def test_json_Message_Location():
assert msg.location.latitude == 26.090577
assert msg.content_type == 'location'
def test_json_UserProfilePhotos():
json_string = r'{"total_count":1,"photos":[[{"file_id":"AgADAgADqacxG6wpRwABvEB6fpeIcKS4HAIkAATZH_SpyZjzIwdVAAIC","file_size":6150,"width":160,"height":160},{"file_id":"AgADAgADqacxG6wpRwABvEB6fpeIcKS4HAIkAATOiTNi_YoJMghVAAIC","file_size":13363,"width":320,"height":320},{"file_id":"AgADAgADqacxG6wpRwABvEB6fpeIcKS4HAIkAAQW4DyFv0-lhglVAAIC","file_size":28347,"width":640,"height":640},{"file_id":"AgADAgADqacxG6wpRwABvEB6fpeIcKS4HAIkAAT50RvJCg0GQApVAAIC","file_size":33953,"width":800,"height":800}]]}'
upp = types.UserProfilePhotos.de_json(json_string)
assert upp.photos[0][0].width == 160
assert upp.photos[0][-1].height == 800
def test_json_contact():
json_string = r'{"phone_number":"00011111111","first_name":"dd","last_name":"ddl","user_id":8633}'
contact = types.Contact.de_json(json_string)
assert contact.first_name == 'dd'
assert contact.last_name == 'ddl'
assert contact.last_name == 'ddl'