Restored the non-threaded variant

This commit is contained in:
pieter 2015-10-02 00:00:54 +02:00
parent b801728924
commit 29a42a398b
2 changed files with 57 additions and 10 deletions

View File

@ -44,7 +44,7 @@ class TeleBot:
getUpdates
"""
def __init__(self, token):
def __init__(self, token, threaded=True):
"""
:param token: bot API token
:return: Telebot object.
@ -64,13 +64,16 @@ class TeleBot:
self.message_subscribers_next_step = {}
self.message_handlers = []
self.worker_pool = util.ThreadPool()
self.threaded = threaded
if self.threaded:
self.worker_pool = util.ThreadPool()
def set_webhook(self, url=None, certificate=None):
return apihelper.set_webhook(self.token, url, certificate)
def remove_webhook(self):
return self.set_webhook() # No params resets webhook
return self.set_webhook() # No params resets webhook
def get_updates(self, offset=None, limit=None, timeout=20):
"""
@ -110,9 +113,9 @@ class TeleBot:
def __notify_update(self, new_messages):
for listener in self.update_listener:
self.worker_pool.put(listener, new_messages)
self.__exec_task(listener, new_messages)
def polling(self, none_stop=False, interval=0, timeout=20):
def polling(self, none_stop=False, interval=0, timeout=3):
"""
This function creates a new Thread that calls an internal __retrieve_updates function.
This allows the bot to retrieve Updates automagically and notify listeners and message handlers accordingly.
@ -124,8 +127,14 @@ class TeleBot:
:param timeout: Timeout in seconds for long polling.
:return:
"""
logger.info('Started polling.')
if self.threaded:
self.__threaded_polling(none_stop, interval, timeout)
else:
self.__non_threaded_polling(none_stop, interval, timeout)
def __threaded_polling(self, none_stop=False, interval=0, timeout=3):
logger.info('Started polling.')
self.__stop_polling.clear()
error_interval = .25
polling_thread = util.WorkerThread(name="PollingThread")
@ -139,7 +148,9 @@ class TeleBot:
or_event.clear()
try:
polling_thread.put(self.__retrieve_updates, timeout)
or_event.wait()
while not or_event.is_set():
time.sleep(.05) # wait for polling thread finish, polling thread error or thread pool error
polling_thread.raise_exceptions()
self.worker_pool.raise_exceptions()
@ -156,9 +167,45 @@ class TeleBot:
logger.info("Waiting for {0} seconds until retry".format(error_interval))
time.sleep(error_interval)
error_interval *= 2
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received.")
self.__stop_polling.set()
polling_thread.stop()
break
logger.info('Stopped polling.')
def __non_threaded_polling(self, none_stop=False, interval=0, timeout=3):
logger.info('Started polling.')
self.__stop_polling.clear()
error_interval = .25
while not self.__stop_polling.wait(interval):
try:
self.__retrieve_updates(timeout)
error_interval = .25
except apihelper.ApiException as e:
logger.error(e)
if not none_stop:
self.__stop_polling.set()
logger.info("Exception occurred. Stopping.")
else:
logger.info("Waiting for {0} seconds until retry".format(error_interval))
time.sleep(error_interval)
error_interval *= 2
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received.")
self.__stop_polling.set()
break
logger.info('Stopped polling.')
def __exec_task(self, task, *args, **kwargs):
if self.threaded:
self.worker_pool.put(task, *args, **kwargs)
else:
task(*args, **kwargs)
def stop_polling(self):
self.__stop_polling.set()
@ -381,7 +428,7 @@ class TeleBot:
if chat_id in self.message_subscribers_next_step:
handlers = self.message_subscribers_next_step[chat_id]
for handler in handlers:
self.worker_pool.put(handler, message)
self.__exec_task(handler, message)
self.message_subscribers_next_step.pop(chat_id, None)
def message_handler(self, commands=None, regexp=None, func=None, content_types=['text']):
@ -452,7 +499,7 @@ class TeleBot:
for message in new_messages:
for message_handler in self.message_handlers:
if self._test_message_handler(message_handler, message):
self.worker_pool.put(message_handler['function'], message)
self.__exec_task(message_handler['function'], message)
break

View File

@ -43,7 +43,7 @@ class WorkerThread(threading.Thread):
self.exception_event.clear()
try:
task, args, kwargs = self.queue.get(block=True, timeout=.01)
task, args, kwargs = self.queue.get(block=True, timeout=.5)
logger.debug("Received task")
self.received_task_event.set()