diff --git a/davtelepot/custombot.py b/davtelepot/custombot.py
index 5720d1a..bc8133d 100644
--- a/davtelepot/custombot.py
+++ b/davtelepot/custombot.py
@@ -1,9 +1,10 @@
-"""WARNING: this is only a legacy module.
+"""WARNING: this is only a legacy module, written for backward compatibility.
For newer versions use `bot.py`.
-This module relies on third party `telepot` library by Nick Lee (@Nickoala).
+This module used to rely on third party `telepot` library by Nick Lee
+ (@Nickoala).
The `telepot` repository was archived in may 2019 and will no longer be listed
- in requirements. For legacy code, install telepot manually.
+ in requirements. To run legacy code, install telepot manually.
`pip install telepot`
Subclass of third party telepot.aio.Bot, providing the following features.
@@ -24,84 +25,26 @@ Check requirements.txt for third party dependencies.
# Standard library modules
import asyncio
+from collections import OrderedDict
import datetime
-import io
+import inspect
import logging
import os
# Third party modules
-import dataset
-import telepot
-import telepot.aio
+import davtelepot.bot
# Project modules
from davtelepot.utilities import (
- get_secure_key, Gettable, escape_html_chars, extract,
- line_drawing_unordered_list, make_lines_of_buttons, markdown_check, MyOD,
- pick_most_similar_from_list, remove_html_tags, sleep_until
+ get_secure_key, extract, sleep_until
)
-def split_text_gracefully(text, limit, parse_mode):
- r"""Split text if it hits telegram limits for text messages.
+class Bot(davtelepot.bot.Bot):
+ """Legacy adapter for backward compatibility.
- Split at `\n` if possible.
- Add a `[...]` at the end and beginning of split messages,
- with proper code markdown.
- """
- text = text.split("\n")[::-1]
- result = []
- while len(text) > 0:
- temp = []
- while len(text) > 0 and len("\n".join(temp + [text[-1]])) < limit:
- temp.append(text.pop())
- if len(temp) == 0:
- temp.append(text[-1][:limit])
- text[-1] = text[-1][limit:]
- result.append("\n".join(temp))
- if len(result) > 1:
- for i in range(1, len(result)):
- result[i] = "{tag[0]}[...]{tag[1]}\n{text}".format(
- tag=(
- ('`', '`') if parse_mode == 'Markdown'
- else ('', '
') if parse_mode.lower() == 'html'
- else ('', '')
- ),
- text=result[i]
- )
- result[i-1] = "{text}\n{tag[0]}[...]{tag[1]}".format(
- tag=(
- ('`', '`') if parse_mode == 'Markdown'
- else ('', '
') if parse_mode.lower() == 'html'
- else ('', '')
- ),
- text=result[i-1]
- )
- return result
-
-
-def make_inline_query_answer(answer):
- """Return an article-type answer to inline query.
-
- Takes either a string or a dictionary and returns a list.
- """
- if type(answer) is str:
- answer = dict(
- type='article',
- id=0,
- title=remove_html_tags(answer),
- input_message_content=dict(
- message_text=answer,
- parse_mode='HTML'
- )
- )
- if type(answer) is dict:
- answer = [answer]
- return answer
-
-
-class Bot(telepot.aio.Bot, Gettable):
- """telepot.aio.Bot (async Telegram bot framework) convenient subclass.
+ Old description:
+ telepot.aio.Bot (async Telegram bot framework) convenient subclass.
=== General functioning ===
- While Bot.run() coroutine is executed, HTTP get requests are made
@@ -127,272 +70,87 @@ class Bot(telepot.aio.Bot, Gettable):
- All uncaught events are ignored.
"""
- instances = {}
- stop = False
- # Cooldown time between sent messages, to prevent hitting
- # Telegram flood limits
- # Current limits: 30 total messages sent per second,
- # 1 message per second per chat, 20 messages per minute per group
- COOLDOWN_TIME_ABSOLUTE = datetime.timedelta(seconds=1/30)
- COOLDOWN_TIME_PER_CHAT = datetime.timedelta(seconds=1)
- MAX_GROUP_MESSAGES_PER_MINUTE = 20
- # Max length of text field for a Telegram message (UTF-8 text)
- TELEGRAM_MESSAGES_MAX_LEN = 4096
- _path = '.'
- _unauthorized_message = None
- _unknown_command_message = None
- _maintenance_message = None
- _default_inline_query_answer = [
- dict(
- type='article',
- id=0,
- title="I cannot answer this query, sorry",
- input_message_content=dict(
- message_text="I'm sorry "
- "but I could not find an answer for your query."
- )
- )
- ]
-
def __init__(self, token, db_name=None):
"""Instantiate Bot instance, given a token and a db name."""
- super().__init__(token)
- self.routing_table = {
- 'chat': self.on_chat_message,
- 'inline_query': self.on_inline_query,
- 'chosen_inline_result': self.on_chosen_inline_result,
- 'callback_query': self.on_callback_query
- }
- self.chat_message_handlers = {
- 'text': self.handle_text_message,
- 'pinned_message': self.handle_pinned_message,
- 'photo': self.handle_photo_message,
- 'location': self.handle_location
- }
- if db_name:
- self._db_url = 'sqlite:///{name}{ext}'.format(
- name=db_name,
- ext='.db' if not db_name.endswith('.db') else ''
- )
- self._database = dataset.connect(self.db_url)
- else:
- self._db_url = None
- self._database = None
- self._unauthorized_message = None
- self.authorization_function = lambda update, authorization_level: True
- self.get_chat_id = lambda update: (
- update['message']['chat']['id']
- if 'message' in update
- else update['chat']['id']
- )
- self.commands = dict()
- self.callback_handlers = dict()
- self.inline_query_handlers = MyOD()
- self._default_inline_query_answer = None
- self.chosen_inline_result_handlers = dict()
- self.aliases = MyOD()
- self.parsers = MyOD()
- self.custom_parsers = dict()
+ davtelepot.bot.Bot.__init__(self, token=token, database_url=db_name)
+ self.message_handlers['pinned_message'] = self.handle_pinned_message
+ self.message_handlers['photo'] = self.handle_photo_message
+ self.message_handlers['location'] = self.handle_location
self.custom_photo_parsers = dict()
self.custom_location_parsers = dict()
- self.bot_name = None
- self.default_reply_keyboard_elements = []
- self._default_keyboard = dict()
- self.run_before_loop = []
- self.run_after_loop = []
self.to_be_obscured = []
self.to_be_destroyed = []
- self.last_sending_time = dict(
- absolute=(
- datetime.datetime.now()
- - self.__class__.COOLDOWN_TIME_ABSOLUTE
- )
- )
- self._maintenance = False
- self._maintenance_message = None
self.chat_actions = dict(
- pinned=MyOD()
+ pinned=OrderedDict()
)
self.messages = dict()
- @property
- def name(self):
- """Bot name."""
- return self.bot_name
-
- @property
- def path(self):
- """custombot.py file path."""
- return self.__class__._path
-
- @property
- def db_url(self):
- """Return complete path to database."""
- return self._db_url
-
- @property
- def db(self):
- """Return the dataset.Database instance related to `self`.
-
- To start a transaction with bot's database, use a with statement:
- ```python3
- with bot.db as db:
- db['your_table'].insert(
- dict(
- name='John',
- age=45
- )
- )
- ```
- """
- return self._database
-
- @property
- def default_keyboard(self):
- """Get the default keyboard.
-
- It is sent when reply_markup is left blank and chat is private.
- """
- return self._default_keyboard
-
- @property
- def default_inline_query_answer(self):
- """Answer to be returned if inline query returned None."""
- if self._default_inline_query_answer:
- return self._default_inline_query_answer
- return self.__class__._default_inline_query_answer
-
@property
def unauthorized_message(self):
"""Return this if user is unauthorized to make a request.
- If instance message is not set, class message is returned.
+ This property is deprecated: use `authorization_denied_message`
+ instead.
"""
- if self._unauthorized_message:
- return self._unauthorized_message
- return self.__class__._unauthorized_message
-
- @property
- def unknown_command_message(self):
- """Message to be returned if user sends an unknown command.
-
- If instance message is not set, class message is returned.
- """
- if self._unknown_command_message:
- return self._unknown_command_message
- return self.__class__._unknown_command_message
+ return self.authorization_denied_message
@property
def maintenance(self):
"""Check whether bot is under maintenance.
- While under maintenance, bot will reply with
- `self.maintenance_message` to any request, with few exceptions.
+ This property is deprecated: use `under_maintenance` instead.
"""
- return self._maintenance
-
- @property
- def maintenance_message(self):
- """Message to be returned if bot is under maintenance.
-
- If instance message is not set, class message is returned.
- """
- if self._maintenance_message:
- return self._maintenance_message
- if self.__class__.maintenance_message:
- return self.__class__._maintenance_message
- return "Bot is currently under maintenance! Retry later please."
-
- @classmethod
- def set_class_path(csl, path):
- """Set class path, where files will be looked for.
-
- For example, if send_photo receives `photo='mypic.png'`,
- it will parse it as `'{path}/mypic.png'.format(path=self.path)`
- """
- csl._path = path
+ return self.under_maintenance
@classmethod
def set_class_unauthorized_message(csl, unauthorized_message):
"""Set class unauthorized message.
- It will be returned if user is unauthorized to make a request.
+ This method is deprecated: use `set_class_authorization_denied_message`
+ instead.
"""
- csl._unauthorized_message = unauthorized_message
-
- @classmethod
- def set_class_unknown_command_message(cls, unknown_command_message):
- """Set class unknown command message.
-
- It will be returned if user sends an unknown command in private chat.
- """
- cls._unknown_command_message = unknown_command_message
-
- @classmethod
- def set_class_maintenance_message(cls, maintenance_message):
- """Set class maintenance message.
-
- It will be returned if bot is under maintenance.
- """
- cls._maintenance_message = maintenance_message
-
- @classmethod
- def set_class_default_inline_query_answer(cls,
- default_inline_query_answer):
- """Set class default inline query answer.
-
- It will be returned if an inline query returned no answer.
- """
- cls._default_inline_query_answer = default_inline_query_answer
+ return csl.set_class_authorization_denied_message(unauthorized_message)
def set_unauthorized_message(self, unauthorized_message):
"""Set instance unauthorized message.
- If instance message is None, default class message is used.
+ This method is deprecated: use `set_authorization_denied_message`
+ instead.
"""
- self._unauthorized_message = unauthorized_message
+ return self.set_authorization_denied_message(unauthorized_message)
- def set_unknown_command_message(self, unknown_command_message):
- """Set instance unknown command message.
+ def set_authorization_function(self, authorization_function):
+ """Set a custom authorization_function.
- It will be returned if user sends an unknown command in private chat.
- If instance message is None, default class message is used.
+ It should evaluate True if user is authorized to perform a specific
+ action and False otherwise.
+ It should take update and role and return a Boolean.
+ Default authorization_function always evaluates True.
"""
- self._unknown_command_message = unknown_command_message
-
- def set_maintenance_message(self, maintenance_message):
- """Set instance maintenance message.
-
- It will be returned if bot is under maintenance.
- If instance message is None, default class message is used.
- """
- self._maintenance_message = maintenance_message
-
- def set_default_inline_query_answer(self, default_inline_query_answer):
- """Set a custom default_inline_query_answer.
-
- It will be returned when no answer is found for an inline query.
- If instance answer is None, default class answer is used.
- """
- if type(default_inline_query_answer) in (str, dict):
- default_inline_query_answer = make_inline_query_answer(
- default_inline_query_answer
+ def _authorization_function(update, authorization_level,
+ user_record=None):
+ privileges = authorization_level # noqa: W0612, this variable
+ # is used by locals()
+ return authorization_function(
+ **{
+ name: argument
+ for name, argument in locals().items()
+ if name in inspect.signature(
+ authorization_function
+ ).parameters
+ }
)
- if type(default_inline_query_answer) is not list:
- return 1
- self._default_inline_query_answer = default_inline_query_answer
- return 0
+ self.authorization_function = _authorization_function
def set_maintenance(self, maintenance_message):
- """Put the bot under maintenance or ends it.
+ """Put the bot under maintenance or end it.
- While in maintenance, bot will reply to users with maintenance_message.
- Bot will accept /coma, /stop and /restart commands from admins.
+ This method is deprecated: use `change_maintenance_status` instead.
"""
- self._maintenance = not self.maintenance
- if maintenance_message:
- self.set_maintenance_message(maintenance_message)
- if self.maintenance:
+ bot_in_maintenance = self.change_maintenance_status(
+ maintenance_message=maintenance_message
+ )
+ if bot_in_maintenance:
return (
"Bot has just been put under maintenance!\n\n"
"Until further notice, it will reply to users "
@@ -402,93 +160,12 @@ class Bot(telepot.aio.Bot, Gettable):
)
return "Maintenance ended!"
- def set_authorization_function(self, authorization_function):
- """Set a custom authorization_function.
-
- It should evaluate True if user is authorized to perform
- a specific action and False otherwise.
- It should take update and role and return a Boolean.
- Default authorization_function always evaluates True.
- """
- self.authorization_function = authorization_function
-
def set_get_chat_id_function(self, get_chat_id_function):
"""Set a custom get_chat_id function.
- It should take and update and return the chat in which
- a reply should be sent.
- For instance, a bot could reply in private to group messages
- as a default behaviour.
- Default chat_id returned is current chat id.
+ This method is deprecated: use `set_chat_id_getter` instead.
"""
- self.get_chat_id = get_chat_id_function
-
- async def avoid_flooding(self, chat_id):
- """asyncio-sleep until COOLDOWN_TIME has passed.
-
- To prevent hitting Telegram flood limits, send_message and
- send_photo await this function.
- Consider cooldown time per chat and absolute.
- """
- if type(chat_id) is int and chat_id > 0:
- while (
- datetime.datetime.now() < (
- self.last_sending_time['absolute']
- + self.__class__.COOLDOWN_TIME_ABSOLUTE
- )
- ) or (
- chat_id in self.last_sending_time
- and (
- datetime.datetime.now() < (
- self.last_sending_time[chat_id]
- + self.__class__.COOLDOWN_TIME_PER_CHAT
- )
- )
- ):
- await asyncio.sleep(
- self.__class__.COOLDOWN_TIME_ABSOLUTE.seconds
- )
- self.last_sending_time[chat_id] = datetime.datetime.now()
- else:
- while (
- datetime.datetime.now() < (
- self.last_sending_time['absolute']
- + self.__class__.COOLDOWN_TIME_ABSOLUTE
- )
- ) or (
- chat_id in self.last_sending_time
- and len(
- [
- sending_datetime
- for sending_datetime in self.last_sending_time[chat_id]
- if sending_datetime >= (
- datetime.datetime.now()
- - datetime.timedelta(minutes=1)
- )
- ]
- ) >= self.__class__.MAX_GROUP_MESSAGES_PER_MINUTE
- ) or (
- chat_id in self.last_sending_time
- and len(self.last_sending_time[chat_id]) > 0
- and datetime.datetime.now() < (
- self.last_sending_time[chat_id][-1]
- + self.__class__.COOLDOWN_TIME_PER_CHAT
- )
- ):
- await asyncio.sleep(0.5)
- if chat_id not in self.last_sending_time:
- self.last_sending_time[chat_id] = []
- self.last_sending_time[chat_id].append(datetime.datetime.now())
- self.last_sending_time[chat_id] = [
- sending_datetime
- for sending_datetime in self.last_sending_time[chat_id]
- if sending_datetime >= (
- datetime.datetime.now()
- - datetime.timedelta(minutes=1)
- )
- ]
- self.last_sending_time['absolute'] = datetime.datetime.now()
- return
+ return self.set_chat_id_getter(get_chat_id_function)
def get_message(self, *fields, update=None, user_record=None,
language=None, **format_kwargs):
@@ -553,334 +230,29 @@ class Bot(telepot.aio.Bot, Gettable):
**format_kwargs
)
- async def on_inline_query(self, update):
- """Schedule handling of received inline queries.
+ async def on_chat_message(self, update, user_record=None):
+ """Handle text message.
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests
- before this one is completed.
+ This method is deprecated: use `text_message_handler` instead.
"""
- asyncio.ensure_future(self.handle_inline_query(update))
- return
-
- async def on_chosen_inline_result(self, update):
- """Schedule handling of received chosen inline result events.
-
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests
- before this one is completed.
- """
- asyncio.ensure_future(self.handle_chosen_inline_result(update))
- return
-
- async def on_callback_query(self, update):
- """Schedule handling of received callback queries.
-
- A callback query is sent when users press inline keyboard buttons.
- Bad clients may send malformed or deceiving callback queries:
- never use secret keys in buttons and always check request validity!
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests
- before this one is completed.
- """
- # Reject malformed updates lacking of data field
- if 'data' not in update:
- return
- asyncio.ensure_future(self.handle_callback_query(update))
- return
-
- async def on_chat_message(self, update):
- """Schedule handling of received chat message.
-
- Notice that handling is only scheduled, not awaited.
- According to update type, the corresponding handler is
- scheduled (see self.chat_message_handlers).
- This means that all Bot instances may now handle other
- requests before this one is completed.
- """
- answer = None
- content_type, chat_type, chat_id = telepot.glance(
- update,
- flavor='chat',
- long=False
+ return await self.text_message_handler(
+ update=update,
+ user_record=user_record
)
- if content_type in self.chat_message_handlers:
- answer = asyncio.ensure_future(
- self.chat_message_handlers[content_type](update)
- )
- else:
- answer = None
- logging.debug("Unhandled message")
- return answer
-
- async def handle_inline_query(self, update):
- """Handle inline query and answer it with results, or log errors."""
- query = update['query']
- answer = None
- switch_pm_text, switch_pm_parameter = None, None
- if self.maintenance:
- answer = self.maintenance_message
- else:
- for condition, handler in self.inline_query_handlers.items():
- answerer = handler['function']
- if condition(query):
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- break
- if not answer:
- answer = self.default_inline_query_answer
- if type(answer) is dict:
- if 'switch_pm_text' in answer:
- switch_pm_text = answer['switch_pm_text']
- if 'switch_pm_parameter' in answer:
- switch_pm_parameter = answer['switch_pm_parameter']
- answer = answer['answer']
- if type(answer) is str:
- answer = make_inline_query_answer(answer)
- try:
- await self.answerInlineQuery(
- update['id'],
- answer,
- cache_time=10,
- is_personal=True,
- switch_pm_text=switch_pm_text,
- switch_pm_parameter=switch_pm_parameter
- )
- except Exception as e:
- logging.info("Error answering inline query\n{}".format(e))
- return
-
- async def handle_chosen_inline_result(self, update):
- """When an inline query result is chosen, perform an action.
-
- If chosen inline result id is in self.chosen_inline_result_handlers,
- call the related function passing the update as argument.
- """
- user_id = update['from']['id'] if 'from' in update else None
- if self.maintenance:
- return
- if user_id in self.chosen_inline_result_handlers:
- result_id = update['result_id']
- handlers = self.chosen_inline_result_handlers[user_id]
- if result_id in handlers:
- func = handlers[result_id]
- if asyncio.iscoroutinefunction(func):
- await func(update)
- else:
- func(update)
- return
def set_inline_result_handler(self, user_id, result_id, func):
- """Associate a func to a result_id.
+ """Associate a `func` with a `result_id` for `user_id`.
- When an inline result is chosen having that id, function will
- be passed the update as argument.
+ This method is deprecated: use `set_chosen_inline_result_handler`
+ instead.
"""
- if type(user_id) is dict:
- user_id = user_id['from']['id']
- assert type(user_id) is int, "user_id must be int!"
- # Query result ids are parsed as str by telegram
- result_id = str(result_id)
- assert callable(func), "func must be a callable"
- if user_id not in self.chosen_inline_result_handlers:
- self.chosen_inline_result_handlers[user_id] = {}
- self.chosen_inline_result_handlers[user_id][result_id] = func
- return
-
- async def handle_callback_query(self, update):
- """Answer callback queries.
-
- Call the callback handler associated to the query prefix.
- The answer is used to edit the source message or send new ones
- if text is longer than single message limit.
- Anyway, the query is answered, otherwise the client would hang and
- the bot would look like idle.
- """
- answer = None
- if self.maintenance:
- answer = remove_html_tags(self.maintenance_message[:45])
- else:
- data = update['data']
- for start_text, handler in self.callback_handlers.items():
- answerer = handler['function']
- if data.startswith(start_text):
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- break
- if answer:
- if type(answer) is str:
- answer = {'text': answer}
- if type(answer) is not dict:
- return
- if 'edit' in answer:
- if 'message' in update:
- message_identifier = telepot.message_identifier(
- update['message']
- )
- else:
- message_identifier = telepot.message_identifier(update)
- edit = answer['edit']
- reply_markup = (
- edit['reply_markup']
- if 'reply_markup' in edit
- else None
- )
- text = (
- edit['text']
- if 'text' in edit
- else None
- )
- caption = (
- edit['caption']
- if 'caption' in edit
- else None
- )
- parse_mode = (
- edit['parse_mode']
- if 'parse_mode' in edit
- else None
- )
- disable_web_page_preview = (
- edit['disable_web_page_preview']
- if 'disable_web_page_preview' in edit
- else None
- )
- try:
- if 'text' in edit:
- if (
- len(text)
- > self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 200
- ):
- if 'from' in update:
- await self.send_message(
- chat_id=update['from']['id'],
- text=text,
- reply_markup=reply_markup,
- parse_mode=parse_mode,
- disable_web_page_preview=(
- disable_web_page_preview
- )
- )
- else:
- await self.editMessageText(
- msg_identifier=message_identifier,
- text=text,
- parse_mode=parse_mode,
- disable_web_page_preview=(
- disable_web_page_preview
- ),
- reply_markup=reply_markup
- )
- elif 'caption' in edit:
- await self.editMessageCaption(
- msg_identifier=message_identifier,
- caption=caption,
- reply_markup=reply_markup
- )
- elif 'reply_markup' in edit:
- await self.editMessageReplyMarkup(
- msg_identifier=message_identifier,
- reply_markup=reply_markup
- )
- except Exception as e:
- logging.info("Message was not modified:\n{}".format(e))
- text = answer['text'][:180] if 'text' in answer else None
- show_alert = (
- answer['show_alert']
- if 'show_alert' in answer
- else None
- )
- cache_time = (
- answer['cache_time']
- if 'cache_time' in answer
- else None
- )
- try:
- await self.answerCallbackQuery(
- callback_query_id=update['id'],
- text=text,
- show_alert=show_alert,
- cache_time=cache_time
- )
- except telepot.exception.TelegramError as e:
- logging.error(e)
- else:
- try:
- await self.answerCallbackQuery(callback_query_id=update['id'])
- except telepot.exception.TelegramError as e:
- logging.error(e)
- return
-
- async def handle_text_message(self, update):
- """Answer to chat text messages.
-
- 1) Ignore bot name (case-insensitive) and search bot custom parsers,
- commands, aliases and parsers for an answerer.
- 2) Get an answer from answerer(update).
- 3) Send it to the user.
- """
- answerer, answer = None, None
- # Lower text and replace only bot's tag,
- # meaning that `/command@OtherBot` will be ignored.
- text = update['text'].lower().replace(
- '@{}'.format(
- self.name.lower()
- ),
- ''
+ return self.set_chosen_inline_result_handler(
+ user_id=user_id,
+ result_id=result_id,
+ handler=func
)
- user_id = update['from']['id'] if 'from' in update else None
- if self.maintenance and not any(
- text.startswith(x)
- for x in ('/coma', '/restart')
- ):
- if update['chat']['id'] > 0:
- answer = self.maintenance_message
- elif user_id in self.custom_parsers:
- answerer = self.custom_parsers[user_id]
- del self.custom_parsers[user_id]
- elif text.startswith('/'):
- command = text.split()[0].strip(' /@')
- if command in self.commands:
- answerer = self.commands[command]['function']
- elif update['chat']['id'] > 0:
- answer = self.unknown_command_message
- else:
- # If text starts with an alias
- # Aliases are case insensitive: text and alias are both .lower()
- for alias, parser in self.aliases.items():
- if text.startswith(alias.lower()):
- answerer = parser
- break
- # If update matches any parser
- for check_function, parser in self.parsers.items():
- if (
- parser['argument'] == 'text'
- and check_function(text)
- ) or (
- parser['argument'] == 'update'
- and check_function(update)
- ):
- answerer = parser['function']
- break
- if answerer:
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- if answer:
- try:
- return await self.send_message(answer=answer, chat_id=update)
- except Exception as e:
- logging.error(
- "Failed to process answer:\n{}".format(e),
- exc_info=True
- )
- async def handle_pinned_message(self, update):
+ async def handle_pinned_message(self, update, user_record=None):
"""Handle pinned message chat action."""
if self.maintenance:
return
@@ -910,7 +282,7 @@ class Bot(telepot.aio.Bot, Gettable):
)
return
- async def handle_photo_message(self, update):
+ async def handle_photo_message(self, update, user_record=None):
"""Handle photo chat message."""
user_id = update['from']['id'] if 'from' in update else None
answerer, answer = None, None
@@ -967,51 +339,14 @@ class Bot(telepot.aio.Bot, Gettable):
def set_custom_parser(self, parser, update=None, user=None):
"""Set a custom parser for the user.
- Any chat message update coming from the user will be handled by
- this custom parser instead of default parsers (commands, aliases
- and text parsers).
- Custom parsers last one single use, but their handler can call this
- function to provide multiple tries.
+ This method is deprecated: use `set_individual_text_message_handler`
+ instead.
"""
- if user and type(user) is int:
- pass
- elif type(update) is int:
- user = update
- elif type(user) is dict:
- user = (
- user['from']['id']
- if 'from' in user
- and 'id' in user['from']
- else None
- )
- elif not user and type(update) is dict:
- user = (
- update['from']['id']
- if 'from' in update
- and 'id' in update['from']
- else None
- )
- else:
- raise TypeError(
- 'Invalid user.\nuser: {}\nupdate: {}'.format(
- user,
- update
- )
- )
- if not type(user) is int:
- raise TypeError(
- 'User {} is not an int id'.format(
- user
- )
- )
- if not callable(parser):
- raise TypeError(
- 'Parser {} is not a callable'.format(
- parser.__name__
- )
- )
- self.custom_parsers[user] = parser
- return
+ return self.set_individual_text_message_handler(
+ handler=parser,
+ update=update,
+ user_id=user
+ )
def set_custom_photo_parser(self, parser, update=None, user=None):
"""Set a custom photo parser for the user.
@@ -1110,133 +445,39 @@ class Bot(telepot.aio.Bot, Gettable):
return
def command(self, command, aliases=None, show_in_keyboard=False,
- descr="", auth='admin'):
+ descr="", auth='admin', description=None,
+ authorization_level=None):
"""Define a bot command.
- Decorator: `@bot.command(*args)`
- When a message text starts with `/command[@bot_name]`, or with an
- alias, it gets passed to the decorated function.
- `command` is the command name (with or without /)
- `aliases` is a list of aliases
- `show_in_keyboard`, if True, makes first alias appear
- in default_keyboard
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
+ `descr` and `auth` parameters are deprecated: use `description` and
+ `authorization_level` instead.
"""
- command = command.replace('/', '').lower()
- if not isinstance(command, str):
- raise TypeError('Command {} is not a string'.format(command))
- if aliases:
- if not isinstance(aliases, list):
- raise TypeError('Aliases is not a list: {}'.format(aliases))
- for alias in aliases:
- if not isinstance(alias, str):
- raise TypeError('Alias {} is not a string'.format(alias))
+ authorization_level = authorization_level or auth
+ description = description or descr
+ return super().command(
+ command=command,
+ aliases=aliases,
+ show_in_keyboard=show_in_keyboard,
+ description=description,
+ authorization_level=authorization_level
+ )
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info(
- "COMMAND({c}) @{n} FROM({f})".format(
- c=command,
- n=self.name,
- f=(
- message['from']
- if 'from' in message
- else message['chat']
- )
- )
- )
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info(
- "COMMAND({c}) @{n} FROM({f})".format(
- c=command,
- n=self.name,
- f=(
- message['from']
- if 'from' in message
- else message['chat']
- )
- )
- )
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.commands[command] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- if aliases:
- for alias in aliases:
- self.aliases[alias] = decorated
- if show_in_keyboard:
- self.default_reply_keyboard_elements.append(aliases[0])
- return decorator
-
- def parser(self, condition, descr='', auth='admin', argument='text'):
+ def parser(self, condition, descr='', auth='admin', argument='text',
+ description=None,
+ authorization_level=None):
"""Define a message parser.
- Decorator: `@bot.parser(condition)`
- If condition evaluates True when run on a message text
- (not starting with '/'), such decorated function gets
- called on update.
- Conditions of parsers are evaluated in order; when one is True,
- others will be skipped.
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
+ `descr` and `auth` parameters are deprecated: use `description` and
+ `authorization_level` instead.
"""
- if not callable(condition):
- raise TypeError(
- 'Condition {} is not a callable'.format(
- condition.__name__
- )
- )
-
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info(
- "TEXT MATCHING CONDITION({c}) @{n} FROM({f})".format(
- c=condition.__name__,
- n=self.name,
- f=(
- message['from']
- if 'from' in message
- else message['chat']
- )
- )
- )
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info(
- "TEXT MATCHING CONDITION({c}) @{n} FROM({f})".format(
- c=condition.__name__,
- n=self.name,
- f=(
- message['from']
- if 'from' in message
- else message['chat']
- )
- )
- )
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.parsers[condition] = dict(
- function=decorated,
- descr=descr,
- auth=auth,
- argument=argument
- )
- return decorator
+ authorization_level = authorization_level or auth
+ description = description or descr
+ return super().parser(
+ condition=condition,
+ description=description,
+ authorization_level=authorization_level,
+ argument=argument
+ )
def pinned(self, condition, descr='', auth='admin'):
"""Handle pinned messages.
@@ -1296,480 +537,102 @@ class Bot(telepot.aio.Bot, Gettable):
)
return decorator
- def button(self, data, descr='', auth='admin'):
+ def button(self, data, descr='', auth='admin', authorization_level=None,
+ prefix=None, description=None, separator=None):
"""Define a bot button.
- Decorator: `@bot.button('example:///')`
- When a callback data text starts with , it gets passed to the
- decorated function
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
+ `descr` and `auth` parameters are deprecated: use `description` and
+ `authorization_level` instead.
+ `data` parameter renamed `prefix`.
"""
- if not isinstance(data, str):
- raise TypeError(
- 'Inline button callback_data {d} is not a string'.format(
- d=data
- )
- )
+ authorization_level = authorization_level or auth
+ description = description or descr
+ prefix = prefix or data
+ return super().button(
+ prefix=prefix,
+ separator=separator,
+ description=description,
+ authorization_level=authorization_level,
+ )
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info(
- "INLINE BUTTON({d}) @{n} FROM({f})".format(
- d=message['data'],
- n=self.name,
- f=(
- message['from']
- )
- )
- )
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info(
- "INLINE BUTTON({d}) @{n} FROM({f})".format(
- d=message['data'],
- n=self.name,
- f=(
- message['from']
- )
- )
- )
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.callback_handlers[data] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- return decorator
-
- def query(self, condition, descr='', auth='admin'):
+ def query(self, condition, descr='', auth='admin', description=None,
+ authorization_level=None):
"""Define an inline query.
- Decorator: `@bot.query(example)`
- When an inline query matches the `condition` function,
- decorated function is called and passed the query update object
- as argument.
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
+ `descr` and `auth` parameters are deprecated: use `description` and
+ `authorization_level` instead.
"""
- if not callable(condition):
- raise TypeError(
- 'Condition {c} is not a callable'.format(
- c=condition.__name__
- )
- )
-
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info(
- "QUERY MATCHING CONDITION({c}) @{n} FROM({f})".format(
- c=condition.__name__,
- n=self.name,
- f=message['from']
- )
- )
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info(
- "QUERY MATCHING CONDITION({c}) @{n} FROM({f})".format(
- c=condition.__name__,
- n=self.name,
- f=message['from']
- )
- )
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.inline_query_handlers[condition] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- return decorator
-
- def additional_task(self, when='BEFORE'):
- """Add a task before or after message_loop.
-
- Decorator: such decorated async functions get awaited BEFORE or
- AFTER messageloop
- """
- when = when[0].lower()
-
- def decorator(func):
- if when == 'b':
- self.run_before_loop.append(func)
- elif when == 'a':
- self.run_after_loop.append(func)
- return decorator
-
- def set_default_keyboard(self, keyboard='set_default'):
- """Set a default keyboard for the bot.
-
- If a keyboard is not passed as argument, a default one is generated,
- based on aliases of commands.
- """
- if keyboard == 'set_default':
- btns = [
- dict(
- text=x
- )
- for x in self.default_reply_keyboard_elements
- ]
- row_len = 2 if len(btns) < 4 else 3
- self._default_keyboard = dict(
- keyboard=make_lines_of_buttons(
- btns,
- row_len
- ),
- resize_keyboard=True
- )
- else:
- self._default_keyboard = keyboard
- return
+ authorization_level = authorization_level or auth
+ description = description or descr
+ return super().query(
+ condition=condition,
+ description=description,
+ authorization_level=authorization_level,
+ )
async def edit_message(self, update, *args, **kwargs):
"""Edit given update with given *args and **kwargs.
- Please note, that it is currently only possible to edit messages
- without reply_markup or with inline keyboards.
+ This method is deprecated: use `edit_message_text` instead.
"""
- try:
- return await self.editMessageText(
- telepot.message_identifier(update),
- *args,
- **kwargs
- )
- except Exception as e:
- logging.error("{}".format(e))
-
- async def delete_message(self, update, *args, **kwargs):
- """Delete given update with given *args and **kwargs.
-
- Please note, that a bot can delete only messages sent by itself
- or sent in a group which it is administrator of.
- """
- try:
- return await self.deleteMessage(
- telepot.message_identifier(update),
- *args,
- **kwargs
- )
- except Exception as e:
- logging.error("{}".format(e))
+ return await self.edit_message_text(
+ *args,
+ update=update,
+ **kwargs
+ )
async def send_message(self, answer=dict(), chat_id=None, text='',
parse_mode="HTML", disable_web_page_preview=None,
disable_notification=None, reply_to_message_id=None,
- reply_markup=None):
+ reply_markup=None, update=dict(),
+ reply_to_update=False, send_default_keyboard=True):
"""Send a message.
- Convenient method to call telepot.Bot(token).sendMessage
- All sendMessage **kwargs can be either **kwargs of send_message
- or key:val of answer argument.
- Messages longer than telegram limit will be split properly.
- Telegram flood limits won't be reached thanks to
- `await avoid_flooding(chat_id)`
- parse_mode will be checked and edited if necessary.
- Arguments will be checked and adapted.
+ This method is deprecated: use `super().send_message` instead.
"""
- if type(answer) is dict and 'chat_id' in answer:
- chat_id = answer['chat_id']
- # chat_id may simply be the update to which the bot should repy
- if type(chat_id) is dict:
- chat_id = self.get_chat_id(chat_id)
- if type(answer) is str:
- text = answer
- if (
- not reply_markup
- and chat_id > 0
- and text != self.unauthorized_message
- ):
- reply_markup = self.default_keyboard
- elif type(answer) is dict:
- if 'text' in answer:
- text = answer['text']
- if 'parse_mode' in answer:
- parse_mode = answer['parse_mode']
- if 'disable_web_page_preview' in answer:
- disable_web_page_preview = answer['disable_web_page_preview']
- if 'disable_notification' in answer:
- disable_notification = answer['disable_notification']
- if 'reply_to_message_id' in answer:
- reply_to_message_id = answer['reply_to_message_id']
- if 'reply_markup' in answer:
- reply_markup = answer['reply_markup']
- elif (
- not reply_markup
- and type(chat_id) is int
- and chat_id > 0
- and text != self.unauthorized_message
- ):
- reply_markup = self.default_keyboard
- assert type(text) is str, "Text is not a string!"
- assert (
- type(chat_id) is int
- or (type(chat_id) is str and chat_id.startswith('@'))
- ), "Invalid chat_id:\n\t\t{}".format(chat_id)
- if not text:
- return
- parse_mode = str(parse_mode)
- text_chunks = split_text_gracefully(
- text=text,
- limit=self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 100,
- parse_mode=parse_mode
- )
- n = len(text_chunks)
- for text_chunk in text_chunks:
- n -= 1
- if parse_mode.lower() == "html":
- this_parse_mode = "HTML"
- # Check that all tags are well-formed
- if not markdown_check(
- text_chunk,
- [
- "<", ">",
- "code>", "bold>", "italic>",
- "b>", "i>", "a>", "pre>"
- ]
- ):
- this_parse_mode = "None"
- text_chunk = (
- "!!![invalid markdown syntax]!!!\n\n"
- + text_chunk
- )
- elif parse_mode != "None":
- this_parse_mode = "Markdown"
- # Check that all markdowns are well-formed
- if not markdown_check(
- text_chunk,
- [
- "*", "_", "`"
- ]
- ):
- this_parse_mode = "None"
- text_chunk = (
- "!!![invalid markdown syntax]!!!\n\n"
- + text_chunk
- )
+ if update is None:
+ update = dict()
+ parameters = dict()
+ for parameter, value in locals().items():
+ if parameter in ['self', 'answer', 'parameters', '__class__']:
+ continue
+ if parameter in answer:
+ parameters[parameter] = answer[parameter]
else:
- this_parse_mode = parse_mode
- this_reply_markup = reply_markup if n == 0 else None
- try:
- await self.avoid_flooding(chat_id)
- result = await self.sendMessage(
- chat_id=chat_id,
- text=text_chunk,
- parse_mode=this_parse_mode,
- disable_web_page_preview=disable_web_page_preview,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=this_reply_markup
- )
- except Exception as e:
- logging.debug(
- e,
- exc_info=False # Set exc_info=True for more information
- )
- result = e
- return result
+ parameters[parameter] = value
+ if type(parameters['chat_id']) is dict:
+ parameters['update'] = parameters['chat_id']
+ del parameters['chat_id']
+ return await super().send_message(**parameters)
- async def send_photo(self, chat_id=None, answer={},
+ async def send_photo(self, chat_id=None, answer=dict(),
photo=None, caption='', parse_mode='HTML',
disable_notification=None, reply_to_message_id=None,
reply_markup=None, use_stored=True,
- second_chance=False):
+ second_chance=False, use_stored_file_id=None,
+ update=dict(), reply_to_update=False,
+ send_default_keyboard=True):
"""Send a photo.
- Convenient method to call telepot.Bot(token).sendPhoto
- All sendPhoto **kwargs can be either **kwargs of send_message
- or key:val of answer argument.
- Captions longer than telegram limit will be shortened gently.
- Telegram flood limits won't be reached thanks to
- `await avoid_flooding(chat_id)`
- Most arguments will be checked and adapted.
- If use_stored is set to True, the bot will store sent photo
- telegram_id and use it for faster sending next times (unless
- future errors).
- Sending photos by their file_id already stored on telegram servers
- is way faster: that's why bot stores and uses this info,
- if required.
- A second_chance is given to send photo on error.
+ This method is deprecated: use `super().send_photo` instead.
"""
- if 'chat_id' in answer:
- chat_id = answer['chat_id']
- # chat_id may simply be the update to which the bot should repy
- if type(chat_id) is dict:
- chat_id = self.get_chat_id(chat_id)
- assert (
- type(chat_id) is int
- or (type(chat_id) is str and chat_id.startswith('@'))
- ), "Invalid chat_id:\n\t\t{}".format(chat_id)
- if 'photo' in answer:
- photo = answer['photo']
- assert photo is not None, "Null photo!"
- if 'caption' in answer:
- caption = answer['caption']
- if 'parse_mode' in answer:
- parse_mode = answer['parse_mode']
- if 'disable_notification' in answer:
- disable_notification = answer['disable_notification']
- if 'reply_to_message_id' in answer:
- reply_to_message_id = answer['reply_to_message_id']
- if 'reply_markup' in answer:
- reply_markup = answer['reply_markup']
- already_sent = False
- if type(photo) is str:
- photo_url = photo
- with self.db as db:
- already_sent = db['sent_pictures'].find_one(
- url=photo_url,
- errors=False
- )
- if already_sent and use_stored:
- photo = already_sent['file_id']
- already_sent = True
+ if update is None:
+ update = dict()
+ if use_stored is not None:
+ use_stored_file_id = use_stored
+ parameters = dict()
+ for parameter, value in locals().items():
+ if parameter in ['self', 'answer', 'parameters', '__class__',
+ 'second_chance', 'use_stored']:
+ continue
+ if parameter in answer:
+ parameters[parameter] = answer[parameter]
else:
- already_sent = False
- if not any(photo_url.startswith(x) for x in ['http', 'www']):
- with io.BytesIO() as buffered_picture:
- with open(
- "{}/{}".format(
- self.path,
- photo_url
- ),
- 'rb'
- ) as photo_file:
- buffered_picture.write(photo_file.read())
- photo = buffered_picture.getvalue()
- else:
- use_stored = False
- caption = escape_html_chars(caption)
- if len(caption) > 199:
- new_caption = ''
- tag = False
- tag_body = False
- count = 0
- temp = ''
- for char in caption:
- if tag and char == '>':
- tag = False
- elif char == '<':
- tag = True
- tag_body = not tag_body
- elif not tag:
- count += 1
- if count == 199:
- break
- temp += char
- if not tag_body:
- new_caption += temp
- temp = ''
- caption = new_caption
- sent = None
- try:
- await self.avoid_flooding(chat_id)
- sent = await self.sendPhoto(
- chat_id=chat_id,
- photo=photo,
- caption=caption,
- parse_mode=parse_mode,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=reply_markup
- )
- if isinstance(sent, Exception):
- raise Exception("SendingFailed")
- except Exception as e:
- logging.error(
- "Error sending photo\n{}".format(
- e
- ),
- exc_info=False # Set exc_info=True for more information
- )
- if already_sent:
- with self.db as db:
- db['sent_pictures'].update(
- dict(
- url=photo_url,
- errors=True
- ),
- ['url']
- )
- if not second_chance:
- logging.info("Trying again (only once)...")
- sent = await self.send_photo(
- chat_id=chat_id,
- answer=answer,
- photo=photo,
- caption=caption,
- parse_mode=parse_mode,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=reply_markup,
- second_chance=True
- )
- if (
- sent is not None
- and hasattr(sent, '__getitem__')
- and 'photo' in sent
- and len(sent['photo']) > 0
- and 'file_id' in sent['photo'][0]
- and (not already_sent)
- and use_stored
- ):
- with self.db as db:
- db['sent_pictures'].insert(
- dict(
- url=photo_url,
- file_id=sent['photo'][0]['file_id'],
- errors=False
- )
- )
- return sent
-
- async def forward_message(self, chat_id, update=None, from_chat_id=None,
- message_id=None, disable_notification=False):
- """Forward message from `from_chat_id` to `chat_id`.
-
- Set disable_notification to True to avoid disturbing recipient.
- """
- try:
- if from_chat_id is None or message_id is None:
- if (
- type(update) is not dict
- or 'chat' not in update
- or 'id' not in update['chat']
- or 'message_id' not in update
- ):
- raise Exception("Wrong parameters, cannot forward.")
- from_chat_id = update['chat']['id']
- message_id = update['message_id']
- await self.avoid_flooding(chat_id)
- sent = await self.forwardMessage(
- chat_id=chat_id,
- from_chat_id=from_chat_id,
- disable_notification=disable_notification,
- message_id=message_id,
- )
- if isinstance(sent, Exception):
- raise Exception("Forwarding failed.")
- except Exception as e:
- logging.error(
- "Error forwarding message:\n{}".format(
- e
- ),
- exc_info=False # Set exc_info=True for more information
- )
+ parameters[parameter] = value
+ if type(parameters['chat_id']) is dict:
+ parameters['update'] = parameters['chat_id']
+ del parameters['chat_id']
+ return await super().send_photo(**parameters)
async def send_and_destroy(self, chat_id, answer,
timer=60, mode='text', **kwargs):
@@ -1818,13 +681,13 @@ class Bot(telepot.aio.Bot, Gettable):
await sleep_until(when)
try:
await self.editMessageCaption(
- inline_message_id,
+ inline_message_id=inline_message_id,
text="Time over"
)
except Exception:
try:
await self.editMessageText(
- inline_message_id,
+ inline_message_id=inline_message_id,
text="Time over"
)
except Exception as e:
@@ -1881,472 +744,69 @@ class Bot(telepot.aio.Bot, Gettable):
error=None
)
- async def get_me(self):
- """Get bot information.
-
- Restart bots if bot can't be got.
- """
- try:
- me = await self.getMe()
- self.bot_name = me["username"]
- self.telegram_id = me['id']
- except Exception as e:
- logging.error(
- "Could not get bot\n{e}".format(
- e=e
- )
- )
- await asyncio.sleep(5*60)
- self.restart_bots()
- return
-
- async def continue_running(self):
- """Get updates.
-
- If bot can be got, sets name and telegram_id,
- awaits preliminary tasks and starts getting updates from telegram.
- If bot can't be got, restarts all bots in 5 minutes.
- """
- await self.get_me()
- for task in self.run_before_loop:
- await task()
- self.set_default_keyboard()
- asyncio.ensure_future(
- self.message_loop(handler=self.routing_table)
- )
- return
-
def stop_bots(self):
- """Exit script with code 0."""
- Bot.stop = True
+ """Exit script with code 0.
+
+ This method is deprecated: use `Bot.stop` instead.
+ """
+ self.__class__.stop(
+ message=f"Stopping bots via bot `@{self.name}` method.",
+ final_state=0
+ )
def restart_bots(self):
"""Restart the script exiting with code 65.
- Actually, you need to catch Bot.stop state when Bot.run() returns
- and handle the situation yourself.
+ This method is deprecated: use `Bot.stop` instead.
"""
- Bot.stop = "Restart"
+ self.__class__.stop(
+ message=f"Restarting bots via bot `@{self.name}` method.",
+ final_state=65
+ )
- @classmethod
- async def check_task(cls):
- """Await until cls.stop, then end session and return."""
- for bot in cls.instances.values():
- asyncio.ensure_future(bot.continue_running())
- while not cls.stop:
- await asyncio.sleep(10)
- return await cls.end_session()
-
- @classmethod
- async def end_session(cls):
+ async def delete_and_obscure_messages(self):
"""Run after stop, before the script exits.
Await final tasks, obscure and delete pending messages,
log current operation (stop/restart).
"""
- for bot in cls.instances.values():
- for task in bot.run_after_loop:
- await task()
- for message in bot.to_be_destroyed:
+ for message in self.to_be_destroyed:
+ try:
+ await self.delete_message(message)
+ except Exception as e:
+ logging.error(
+ "Couldn't delete message\n{}\n\n{}".format(
+ message,
+ e
+ )
+ )
+ for inline_message_id in self.to_be_obscured:
+ try:
+ await self.editMessageCaption(
+ inline_message_id,
+ text="Time over"
+ )
+ except Exception:
try:
- await bot.delete_message(message)
+ await self.editMessageText(
+ inline_message_id=inline_message_id,
+ text="Time over"
+ )
except Exception as e:
logging.error(
- "Couldn't delete message\n{}\n\n{}".format(
- message,
+ "Couldn't obscure message\n{}\n\n{}".format(
+ inline_message_id,
e
)
)
- for inline_message_id in bot.to_be_obscured:
- try:
- await bot.editMessageCaption(
- inline_message_id,
- text="Time over"
- )
- except Exception:
- try:
- await bot.editMessageText(
- inline_message_id,
- text="Time over"
- )
- except Exception as e:
- logging.error(
- "Couldn't obscure message\n{}\n\n{}".format(
- inline_message_id,
- e
- )
- )
- if cls.stop == "Restart":
- logging.info("\n\t\t---Restart!---")
- elif cls.stop == "KeyboardInterrupt":
- logging.info("Stopped by KeyboardInterrupt.")
- else:
- logging.info("Stopped gracefully by user.")
- return
@classmethod
- def run(cls, loop=None):
- """Call this method to run the async bots."""
- if loop is None:
- loop = asyncio.get_event_loop()
- logging.info(
- "{sep}{subjvb} STARTED{sep}".format(
- sep='-'*10,
- subjvb=('BOT HAS' if len(cls.instances) == 1 else 'BOTS HAVE')
- )
- )
- try:
- loop.run_until_complete(
- cls.check_task()
- )
- except KeyboardInterrupt:
- logging.info(
- (
- "\n\t\tYour script received a KeyboardInterrupt signal, "
- "your bot{} being stopped."
- ).format(
- 's are'
- if len(cls.instances) > 1
- else ' is'
- )
- )
+ def run(cls, loop=None, *args, **kwargs):
+ """Call this method to run the async bots.
- cls.stop = "KeyboardInterrupt"
- loop.run_until_complete(cls.end_session())
- except Exception as e:
- logging.error(
- '\nYour bot{vb} been stopped. with error \'{e}\''.format(
- e=e,
- vb='s have' if len(cls.instances) > 1 else ' has'
- ),
- exc_info=True
- )
- logging.info(
- "{sep}{subjvb} STOPPED{sep}".format(
- sep='-'*10,
- subjvb='BOT HAS' if len(cls.instances) == 1 else 'BOTS HAVE'
- )
- )
- return
-
- @classmethod
- async def _run_manual_mode(cls):
- available_bots = MyOD()
- for code, bot in enumerate(
- cls.instances.values()
- ):
- await bot.get_me()
- available_bots[code] = dict(
- bot=bot,
- code=code,
- name=bot.name
- )
- selected_bot = None
- while selected_bot is None:
- user_input = input(
- "\n=============================================\n"
- "Which bot would you like to control manually?\n"
- "Available bots:\n{}\n\n\t\t".format(
- line_drawing_unordered_list(
- list(
- "{b[code]:>3} - {b[bot].name}".format(
- b=bot,
- )
- for bot in available_bots.values()
- )
- )
- )
- )
- if (
- user_input.isnumeric()
- and int(user_input) in available_bots
- ):
- selected_bot = available_bots[int(user_input)]
- else:
- selected_bot = pick_most_similar_from_list(
- [
- bot['name']
- for bot in available_bots.values()
- ],
- user_input
- )
- selected_bot = available_bots.get_by_key_val(
- key='name',
- val=selected_bot,
- case_sensitive=False,
- return_value=True
- )
- if selected_bot is None:
- logging.error("Invalid selection.")
- continue
- logging.info(
- "Bot `{b[name]}` selected.".format(
- b=selected_bot
- )
- )
- exit_code = await selected_bot['bot']._run_manually()
- if exit_code == 0:
- break
- elif exit_code == 65:
- selected_bot = None
- return
-
- @classmethod
- def run_manual_mode(cls, loop=None):
- """Run in manual mode: send messages via bots."""
- if loop is None:
- loop = asyncio.get_event_loop()
- logging.info(
- "=== MANUAL MODE STARTED ==="
- )
- try:
- loop.run_until_complete(
- cls._run_manual_mode()
- )
- except KeyboardInterrupt:
- logging.info(
- (
- "\n\t\tYour script received a KeyboardInterrupt signal, "
- "your bot{} being stopped."
- ).format(
- 's are' if len(cls.instances) > 1 else ' is'
- )
- )
- except Exception as e:
- logging.error(
- '\nYour bot{vb} been stopped. with error \'{e}\''.format(
- e=e,
- vb='s have' if len(cls.instances) > 1 else ' has'
- ),
- exc_info=True
- )
- logging.info(
- "=== MANUAL MODE STOPPED ==="
- )
-
- async def _run_manually(self):
- user_input = ' choose_addressee'
- selected_user = None
- users = []
- while user_input:
- if user_input == ' choose_addressee':
- try:
- user_input = input(
- "Choose an addressee.\n"
- "Press enter to change bot.\n"
- "\n\t\t"
- )
- if len(user_input) == 0:
- return 65 # Let user select a different bot
- except KeyboardInterrupt:
- logging.error("Keyboard interrupt.")
- return 0 # Stop running
- if (
- selected_user is None
- and user_input.strip('-').isnumeric()
- ):
- user_input = int(user_input)
- users = list(
- filter(
- lambda user: user['telegram_id'] == user_input,
- users
- )
- )
- if len(users) == 0:
- users = [
- dict(
- telegram_id=user_input,
- name='Unknown user'
- )
- ]
- elif (
- selected_user is None
- and self.db_url is not None
- ):
- with self.db as db:
- if 'users' not in db.tables:
- db['users'].insert(
- dict(
- telegram_id=0,
- privileges=100,
- username='username',
- first_name='first_name',
- last_name='last_name'
- )
- )
- if 'contacts' not in db.tables:
- db['contacts'].insert(
- dict(
- telegram_id=0,
- name='ZZZXXXAAA',
- )
- )
- users = list(
- db.query(
- """SELECT telegram_id, MAX(name) name
- FROM (
- SELECT telegram_id,
- COALESCE(
- first_name || ' ' || last_name ||
- ' (' || username || ')',
- username,
- first_name || ' ' || last_name,
- last_name,
- first_name
- ) AS name
- FROM users
- WHERE COALESCE(
- first_name || last_name || username,
- first_name || username,
- last_name || username,
- first_name || last_name,
- username,
- last_name,
- first_name
- )
- LIKE '%{u}%'
- UNION ALL
- SELECT telegram_id, name
- FROM contacts
- WHERE name LIKE '%{u}%'
- )
- GROUP BY telegram_id
- """.format(
- u=user_input
- )
- )
- )
- if len(users) == 0:
- logging.info("Sorry, no user matches your query.")
- user_input = ' choose_addressee'
- elif len(users) > 1:
- user_input = input(
- "Please select one of the following users:\n"
- "\n"
- "{users}\n"
- "\n"
- "Paste their telegram_id\n"
- "\t\t".format(
- users=line_drawing_unordered_list(
- sorted(
- map(
- lambda user: (
- "{u[telegram_id]} - {u[name]}"
- ).format(
- u=user
- ),
- users
- )
- )
- )
- )
- )
- elif len(users) == 1:
- selected_user = users[0]
- while selected_user is not None:
- sent = None
- text = input(
- "What would you like to send "
- "{u[name]} ({u[telegram_id]})?\n"
- "Leave it blank if you want to select another "
- "addressee.\n"
- "\t\t\t".format(
- u=selected_user
- )
- )
- if len(text) == 0:
- selected_user = None
- user_input = ' choose_addressee'
- elif text.lower() == 'photo':
- caption = input(
- 'Write a caption (you can leave it blank)\n'
- '\t\t\t'
- )
- try:
- with io.BytesIO() as buffered_picture:
- with open(
- '{path}/sendme.png'.format(
- path=self.path
- ),
- 'rb' # Read bytes
- ) as photo_file:
- buffered_picture.write(
- photo_file.read()
- )
- photo = buffered_picture.getvalue()
- sent = await self.send_photo(
- chat_id=selected_user['telegram_id'],
- photo=photo,
- caption=caption,
- parse_mode='HTML',
- use_stored=False
- )
- except Exception as e:
- logging.error(e)
- else:
- try:
- sent = await self.send_message(
- chat_id=selected_user['telegram_id'],
- text=text,
- parse_mode='HTML'
- )
- except Exception as e:
- logging.error(e)
- if (
- sent is not None
- and not isinstance(sent, Exception)
- ):
- logging.info(
- '\n'
- 'Sent message:\n'
- '{s}\n'
- '\n'.format(
- s=sent
- )
- )
- while (
- self.db_url
- and selected_user['name'] == 'Unknown user'
- ):
- new_name = input(
- "Please enter a nickname for this user.\n"
- "Next time you may retrieve their telegram_id "
- "by passing this nickname (or a part of it).\n"
- "\t\t"
- )
- if len(new_name):
- selected_user['name'] = new_name
- with self.db as db:
- db['contacts'].upsert(
- selected_user,
- ['telegram_id'],
- ensure=True
- )
- else:
- logging.info("Invalid name, please try again.")
- return 65 # Keep running, making user select another bot
-
- def create_views(self, views, overwrite=False):
- """Take a list of `views` and add them to bot database.
-
- Each element of this list should have
- - a `name` field
- - a `query field`
+ This method is deprecated: use `super(Bot, cls).run` instead.
+ `loop` must not be determined outside that method.
"""
- with self.db as db:
- for view in views:
- try:
- if overwrite:
- db.query(
- f"""
- DROP VIEW IF EXISTS {view['name']}
- """
- )
- db.query(
- f"""
- CREATE VIEW IF NOT EXISTS {view['name']}
- AS {view['query']}
- """
- )
- except Exception as e:
- logging.error(f"{e}")
+ for bot in cls.bots:
+ bot.additional_task('AFTER')(bot.delete_and_obscure_messages)
+ return super(Bot, cls).run(*args, **kwargs)