2306 lines
85 KiB
Python
2306 lines
85 KiB
Python
"""Subclass of third party telepot.aio.Bot, providing the following features.
|
|
|
|
- It prevents hitting Telegram flood limits by waiting
|
|
between text and photo messages.
|
|
- It provides command, parser, button and other decorators to associate
|
|
common Telegram actions with custom handlers.
|
|
- It supports multiple bots running in the same script
|
|
and allows communications between them
|
|
as well as complete independency from each other.
|
|
- Each bot is associated with a sqlite database
|
|
using dataset, a third party library.
|
|
|
|
Please note that you need Python3.5+ to run async code
|
|
Check requirements.txt for third party dependencies.
|
|
"""
|
|
|
|
# Standard library modules
|
|
import asyncio
|
|
import datetime
|
|
import io
|
|
import logging
|
|
import os
|
|
|
|
# Third party modules
|
|
import dataset
|
|
from davteutil.utilities import (
|
|
get_secure_key, Gettable, escape_html_chars, extract,
|
|
line_drawing_unordered_list, make_lines_of_buttons, markdown_check, MyOD,
|
|
pick_most_similar_from_list, remove_html_tags, sleep_until
|
|
)
|
|
import telepot
|
|
import telepot.aio
|
|
|
|
|
|
def split_text_gracefully(text, limit, parse_mode):
|
|
r"""Split text if it hits telegram limits for text messages.
|
|
|
|
Split at `\n` if possible.
|
|
Add a `[...]` at the end and beginning of split messages,
|
|
with proper code markdown.
|
|
"""
|
|
text = text.split("\n")[::-1]
|
|
result = []
|
|
while len(text) > 0:
|
|
temp = []
|
|
while len(text) > 0 and len("\n".join(temp + [text[-1]])) < limit:
|
|
temp.append(text.pop())
|
|
if len(temp) == 0:
|
|
temp.append(text[-1][:limit])
|
|
text[-1] = text[-1][limit:]
|
|
result.append("\n".join(temp))
|
|
if len(result) > 1:
|
|
for i in range(1, len(result)):
|
|
result[i] = "{tag[0]}[...]{tag[1]}\n{text}".format(
|
|
tag=(
|
|
('`', '`') if parse_mode == 'Markdown'
|
|
else ('<code>', '</code>') if parse_mode.lower() == 'html'
|
|
else ('', '')
|
|
),
|
|
text=result[i]
|
|
)
|
|
result[i-1] = "{text}\n{tag[0]}[...]{tag[1]}".format(
|
|
tag=(
|
|
('`', '`') if parse_mode == 'Markdown'
|
|
else ('<code>', '</code>') if parse_mode.lower() == 'html'
|
|
else ('', '')
|
|
),
|
|
text=result[i-1]
|
|
)
|
|
return result
|
|
|
|
|
|
def make_inline_query_answer(answer):
|
|
"""Return an article-type answer to inline query.
|
|
|
|
Takes either a string or a dictionary and returns a list.
|
|
"""
|
|
if type(answer) is str:
|
|
answer = dict(
|
|
type='article',
|
|
id=0,
|
|
title=remove_html_tags(answer),
|
|
input_message_content=dict(
|
|
message_text=answer,
|
|
parse_mode='HTML'
|
|
)
|
|
)
|
|
if type(answer) is dict:
|
|
answer = [answer]
|
|
return answer
|
|
|
|
|
|
class Bot(telepot.aio.Bot, Gettable):
|
|
"""telepot.aio.Bot (async Telegram bot framework) convenient subclass.
|
|
|
|
=== General functioning ===
|
|
- While Bot.run() coroutine is executed, HTTP get requests are made
|
|
to Telegram servers asking for new messages for each Bot instance.
|
|
- Each message causes the proper Bot instance method coroutine
|
|
to be awaited, according to its flavour (see routing_table)
|
|
-- For example, chat messages cause `Bot().on_chat_message(message)`
|
|
to be awaited.
|
|
- This even-processing coroutine ensures the proper handling function
|
|
a future and returns.
|
|
-- That means that simpler tasks are completed before slower ones,
|
|
since handling functions are not awaited but scheduled
|
|
by `asyncio.ensure_future(handling_function(...))`
|
|
-- For example, chat text messages are handled by
|
|
`handle_text_message`, which looks for the proper function
|
|
to elaborate the request (in bot's commands and parsers)
|
|
- The handling function evaluates an answer, depending on the message
|
|
content, and eventually provides a reply
|
|
-- For example, `handle_text_message` sends its
|
|
answer via `send_message`
|
|
- All Bot.instances run simultaneously and faster requests
|
|
are completed earlier.
|
|
- All uncaught events are ignored.
|
|
"""
|
|
|
|
instances = {}
|
|
stop = False
|
|
# Cooldown time between sent messages, to prevent hitting
|
|
# Telegram flood limits
|
|
# Current limits: 30 total messages sent per second,
|
|
# 1 message per second per chat, 20 messages per minute per group
|
|
COOLDOWN_TIME_ABSOLUTE = datetime.timedelta(seconds=1/30)
|
|
COOLDOWN_TIME_PER_CHAT = datetime.timedelta(seconds=1)
|
|
MAX_GROUP_MESSAGES_PER_MINUTE = 20
|
|
# Max length of text field for a Telegram message (UTF-8 text)
|
|
TELEGRAM_MESSAGES_MAX_LEN = 4096
|
|
_path = '.'
|
|
_unauthorized_message = None
|
|
_unknown_command_message = None
|
|
_maintenance_message = None
|
|
_default_inline_query_answer = [
|
|
dict(
|
|
type='article',
|
|
id=0,
|
|
title="I cannot answer this query, sorry",
|
|
input_message_content=dict(
|
|
message_text="I'm sorry "
|
|
"but I could not find an answer for your query."
|
|
)
|
|
)
|
|
]
|
|
|
|
def __init__(self, token, db_name=None):
|
|
"""Instantiate Bot instance, given a token and a db name."""
|
|
super().__init__(token)
|
|
self.routing_table = {
|
|
'chat': self.on_chat_message,
|
|
'inline_query': self.on_inline_query,
|
|
'chosen_inline_result': self.on_chosen_inline_result,
|
|
'callback_query': self.on_callback_query
|
|
}
|
|
self.chat_message_handlers = {
|
|
'text': self.handle_text_message,
|
|
'pinned_message': self.handle_pinned_message,
|
|
'photo': self.handle_photo_message,
|
|
'location': self.handle_location
|
|
}
|
|
if db_name:
|
|
self._db_url = 'sqlite:///{name}{ext}'.format(
|
|
name=db_name,
|
|
ext='.db' if not db_name.endswith('.db') else ''
|
|
)
|
|
else:
|
|
self._db_url = None
|
|
self._unauthorized_message = None
|
|
self.authorization_function = lambda update, authorization_level: True
|
|
self.get_chat_id = lambda update: (
|
|
update['message']['chat']['id']
|
|
if 'message' in update
|
|
else update['chat']['id']
|
|
)
|
|
self.commands = dict()
|
|
self.callback_handlers = dict()
|
|
self.inline_query_handlers = MyOD()
|
|
self._default_inline_query_answer = None
|
|
self.chosen_inline_result_handlers = dict()
|
|
self.aliases = MyOD()
|
|
self.parsers = MyOD()
|
|
self.custom_parsers = dict()
|
|
self.custom_photo_parsers = dict()
|
|
self.custom_location_parsers = dict()
|
|
self.bot_name = None
|
|
self.default_reply_keyboard_elements = []
|
|
self._default_keyboard = dict()
|
|
self.run_before_loop = []
|
|
self.run_after_loop = []
|
|
self.to_be_obscured = []
|
|
self.to_be_destroyed = []
|
|
self.last_sending_time = dict(
|
|
absolute=(
|
|
datetime.datetime.now()
|
|
- self.__class__.COOLDOWN_TIME_ABSOLUTE
|
|
)
|
|
)
|
|
self._maintenance = False
|
|
self._maintenance_message = None
|
|
self.chat_actions = dict(
|
|
pinned=MyOD()
|
|
)
|
|
self.messages = dict()
|
|
|
|
@property
|
|
def name(self):
|
|
"""Bot name."""
|
|
return self.bot_name
|
|
|
|
@property
|
|
def path(self):
|
|
"""custombot.py file path."""
|
|
return self.__class__._path
|
|
|
|
@property
|
|
def db_url(self):
|
|
"""Return complete path to database."""
|
|
return self._db_url
|
|
|
|
@property
|
|
def db(self):
|
|
"""Connect to bot's database.
|
|
|
|
It must be used inside a with statement: `with bot.db as db`
|
|
"""
|
|
if self.db_url:
|
|
return dataset.connect(self.db_url)
|
|
|
|
@property
|
|
def default_keyboard(self):
|
|
"""Get the default keyboard.
|
|
|
|
It is sent when reply_markup is left blank and chat is private.
|
|
"""
|
|
return self._default_keyboard
|
|
|
|
@property
|
|
def default_inline_query_answer(self):
|
|
"""Answer to be returned if inline query returned None."""
|
|
if self._default_inline_query_answer:
|
|
return self._default_inline_query_answer
|
|
return self.__class__._default_inline_query_answer
|
|
|
|
@property
|
|
def unauthorized_message(self):
|
|
"""Return this if user is unauthorized to make a request.
|
|
|
|
If instance message is not set, class message is returned.
|
|
"""
|
|
if self._unauthorized_message:
|
|
return self._unauthorized_message
|
|
return self.__class__._unauthorized_message
|
|
|
|
@property
|
|
def unknown_command_message(self):
|
|
"""Message to be returned if user sends an unknown command.
|
|
|
|
If instance message is not set, class message is returned.
|
|
"""
|
|
if self._unknown_command_message:
|
|
return self._unknown_command_message
|
|
return self.__class__._unknown_command_message
|
|
|
|
@property
|
|
def maintenance(self):
|
|
"""Check whether bot is under maintenance.
|
|
|
|
While under maintenance, bot will reply with
|
|
`self.maintenance_message` to any request, with few exceptions.
|
|
"""
|
|
return self._maintenance
|
|
|
|
@property
|
|
def maintenance_message(self):
|
|
"""Message to be returned if bot is under maintenance.
|
|
|
|
If instance message is not set, class message is returned.
|
|
"""
|
|
if self._maintenance_message:
|
|
return self._maintenance_message
|
|
if self.__class__.maintenance_message:
|
|
return self.__class__._maintenance_message
|
|
return "Bot is currently under maintenance! Retry later please."
|
|
|
|
@classmethod
|
|
def set_class_path(csl, path):
|
|
"""Set class path, where files will be looked for.
|
|
|
|
For example, if send_photo receives `photo='mypic.png'`,
|
|
it will parse it as `'{path}/mypic.png'.format(path=self.path)`
|
|
"""
|
|
csl._path = path
|
|
|
|
@classmethod
|
|
def set_class_unauthorized_message(csl, unauthorized_message):
|
|
"""Set class unauthorized message.
|
|
|
|
It will be returned if user is unauthorized to make a request.
|
|
"""
|
|
csl._unauthorized_message = unauthorized_message
|
|
|
|
@classmethod
|
|
def set_class_unknown_command_message(cls, unknown_command_message):
|
|
"""Set class unknown command message.
|
|
|
|
It will be returned if user sends an unknown command in private chat.
|
|
"""
|
|
cls._unknown_command_message = unknown_command_message
|
|
|
|
@classmethod
|
|
def set_class_maintenance_message(cls, maintenance_message):
|
|
"""Set class maintenance message.
|
|
|
|
It will be returned if bot is under maintenance.
|
|
"""
|
|
cls._maintenance_message = maintenance_message
|
|
|
|
@classmethod
|
|
def set_class_default_inline_query_answer(cls,
|
|
default_inline_query_answer):
|
|
"""Set class default inline query answer.
|
|
|
|
It will be returned if an inline query returned no answer.
|
|
"""
|
|
cls._default_inline_query_answer = default_inline_query_answer
|
|
|
|
def set_unauthorized_message(self, unauthorized_message):
|
|
"""Set instance unauthorized message.
|
|
|
|
If instance message is None, default class message is used.
|
|
"""
|
|
self._unauthorized_message = unauthorized_message
|
|
|
|
def set_unknown_command_message(self, unknown_command_message):
|
|
"""Set instance unknown command message.
|
|
|
|
It will be returned if user sends an unknown command in private chat.
|
|
If instance message is None, default class message is used.
|
|
"""
|
|
self._unknown_command_message = unknown_command_message
|
|
|
|
def set_maintenance_message(self, maintenance_message):
|
|
"""Set instance maintenance message.
|
|
|
|
It will be returned if bot is under maintenance.
|
|
If instance message is None, default class message is used.
|
|
"""
|
|
self._maintenance_message = maintenance_message
|
|
|
|
def set_default_inline_query_answer(self, default_inline_query_answer):
|
|
"""Set a custom default_inline_query_answer.
|
|
|
|
It will be returned when no answer is found for an inline query.
|
|
If instance answer is None, default class answer is used.
|
|
"""
|
|
if type(default_inline_query_answer) in (str, dict):
|
|
default_inline_query_answer = make_inline_query_answer(
|
|
default_inline_query_answer
|
|
)
|
|
if type(default_inline_query_answer) is not list:
|
|
return 1
|
|
self._default_inline_query_answer = default_inline_query_answer
|
|
return 0
|
|
|
|
def set_maintenance(self, maintenance_message):
|
|
"""Put the bot under maintenance or ends it.
|
|
|
|
While in maintenance, bot will reply to users with maintenance_message.
|
|
Bot will accept /coma, /stop and /restart commands from admins.
|
|
"""
|
|
self._maintenance = not self.maintenance
|
|
if maintenance_message:
|
|
self.set_maintenance_message(maintenance_message)
|
|
if self.maintenance:
|
|
return (
|
|
"<i>Bot has just been put under maintenance!</i>\n\n"
|
|
"Until further notice, it will reply to users "
|
|
"with the following message:\n\n{}"
|
|
).format(
|
|
self.maintenance_message
|
|
)
|
|
return "<i>Maintenance ended!</i>"
|
|
|
|
def set_authorization_function(self, authorization_function):
|
|
"""Set a custom authorization_function.
|
|
|
|
It should evaluate True if user is authorized to perform
|
|
a specific action and False otherwise.
|
|
It should take update and role and return a Boolean.
|
|
Default authorization_function always evaluates True.
|
|
"""
|
|
self.authorization_function = authorization_function
|
|
|
|
def set_get_chat_id_function(self, get_chat_id_function):
|
|
"""Set a custom get_chat_id function.
|
|
|
|
It should take and update and return the chat in which
|
|
a reply should be sent.
|
|
For instance, a bot could reply in private to group messages
|
|
as a default behaviour.
|
|
Default chat_id returned is current chat id.
|
|
"""
|
|
self.get_chat_id = get_chat_id_function
|
|
|
|
async def avoid_flooding(self, chat_id):
|
|
"""asyncio-sleep until COOLDOWN_TIME has passed.
|
|
|
|
To prevent hitting Telegram flood limits, send_message and
|
|
send_photo await this function.
|
|
Consider cooldown time per chat and absolute.
|
|
"""
|
|
if type(chat_id) is int and chat_id > 0:
|
|
while (
|
|
datetime.datetime.now() < (
|
|
self.last_sending_time['absolute']
|
|
+ self.__class__.COOLDOWN_TIME_ABSOLUTE
|
|
)
|
|
) or (
|
|
chat_id in self.last_sending_time
|
|
and (
|
|
datetime.datetime.now() < (
|
|
self.last_sending_time[chat_id]
|
|
+ self.__class__.COOLDOWN_TIME_PER_CHAT
|
|
)
|
|
)
|
|
):
|
|
await asyncio.sleep(
|
|
self.__class__.COOLDOWN_TIME_ABSOLUTE.seconds
|
|
)
|
|
self.last_sending_time[chat_id] = datetime.datetime.now()
|
|
else:
|
|
while (
|
|
datetime.datetime.now() < (
|
|
self.last_sending_time['absolute']
|
|
+ self.__class__.COOLDOWN_TIME_ABSOLUTE
|
|
)
|
|
) or (
|
|
chat_id in self.last_sending_time
|
|
and len(
|
|
[
|
|
sending_datetime
|
|
for sending_datetime in self.last_sending_time[chat_id]
|
|
if sending_datetime >= (
|
|
datetime.datetime.now()
|
|
- datetime.timedelta(minutes=1)
|
|
)
|
|
]
|
|
) >= self.__class__.MAX_GROUP_MESSAGES_PER_MINUTE
|
|
) or (
|
|
chat_id in self.last_sending_time
|
|
and len(self.last_sending_time[chat_id]) > 0
|
|
and datetime.datetime.now() < (
|
|
self.last_sending_time[chat_id][-1]
|
|
+ self.__class__.COOLDOWN_TIME_PER_CHAT
|
|
)
|
|
):
|
|
await asyncio.sleep(0.5)
|
|
if chat_id not in self.last_sending_time:
|
|
self.last_sending_time[chat_id] = []
|
|
self.last_sending_time[chat_id].append(datetime.datetime.now())
|
|
self.last_sending_time[chat_id] = [
|
|
sending_datetime
|
|
for sending_datetime in self.last_sending_time[chat_id]
|
|
if sending_datetime >= (
|
|
datetime.datetime.now()
|
|
- datetime.timedelta(minutes=1)
|
|
)
|
|
]
|
|
self.last_sending_time['absolute'] = datetime.datetime.now()
|
|
return
|
|
|
|
def get_message(self, *fields, update=None, user_record=None,
|
|
language=None, **format_kwargs):
|
|
"""Given a list of strings (`fields`), return proper message.
|
|
|
|
If `language` is not passed, it is extracted from `update`.
|
|
If `update` is not passed either, `language` is extracted from
|
|
`user_record`.
|
|
Fall back to English message if language is not available.
|
|
Pass `format_kwargs` to format function.
|
|
"""
|
|
if (
|
|
language is None
|
|
and isinstance(update, dict)
|
|
and 'from' in update
|
|
and 'language_code' in update['from']
|
|
):
|
|
language = update['from']['language_code']
|
|
if (
|
|
language is None
|
|
and isinstance(user_record, dict)
|
|
and 'language_code' in user_record
|
|
):
|
|
language = user_record['language_code']
|
|
if language is None:
|
|
language = 'en'
|
|
result = self.messages
|
|
for field in fields:
|
|
if field not in result:
|
|
logging.error(
|
|
"Please define self.message{f}".format(
|
|
f=''.join(
|
|
'[\'{field}\']'.format(
|
|
field=field
|
|
)
|
|
for field in fields
|
|
)
|
|
)
|
|
)
|
|
return "Invalid message!"
|
|
result = result[field]
|
|
if language not in result:
|
|
language = extract(
|
|
language,
|
|
ender='-'
|
|
)
|
|
if language not in result:
|
|
language = 'en'
|
|
if language not in result:
|
|
logging.error(
|
|
"Please define self.message{f}['en']".format(
|
|
f=''.join(
|
|
'[\'{field}\']'.format(
|
|
field=field
|
|
)
|
|
for field in fields
|
|
)
|
|
)
|
|
)
|
|
return "Invalid message!"
|
|
return result[language].format(
|
|
**format_kwargs
|
|
)
|
|
|
|
async def on_inline_query(self, update):
|
|
"""Schedule handling of received inline queries.
|
|
|
|
Notice that handling is only scheduled, not awaited.
|
|
This means that all Bot instances may now handle other requests
|
|
before this one is completed.
|
|
"""
|
|
asyncio.ensure_future(self.handle_inline_query(update))
|
|
return
|
|
|
|
async def on_chosen_inline_result(self, update):
|
|
"""Schedule handling of received chosen inline result events.
|
|
|
|
Notice that handling is only scheduled, not awaited.
|
|
This means that all Bot instances may now handle other requests
|
|
before this one is completed.
|
|
"""
|
|
asyncio.ensure_future(self.handle_chosen_inline_result(update))
|
|
return
|
|
|
|
async def on_callback_query(self, update):
|
|
"""Schedule handling of received callback queries.
|
|
|
|
A callback query is sent when users press inline keyboard buttons.
|
|
Bad clients may send malformed or deceiving callback queries:
|
|
never use secret keys in buttons and always check request validity!
|
|
Notice that handling is only scheduled, not awaited.
|
|
This means that all Bot instances may now handle other requests
|
|
before this one is completed.
|
|
"""
|
|
# Reject malformed updates lacking of data field
|
|
if 'data' not in update:
|
|
return
|
|
asyncio.ensure_future(self.handle_callback_query(update))
|
|
return
|
|
|
|
async def on_chat_message(self, update):
|
|
"""Schedule handling of received chat message.
|
|
|
|
Notice that handling is only scheduled, not awaited.
|
|
According to update type, the corresponding handler is
|
|
scheduled (see self.chat_message_handlers).
|
|
This means that all Bot instances may now handle other
|
|
requests before this one is completed.
|
|
"""
|
|
answer = None
|
|
content_type, chat_type, chat_id = telepot.glance(
|
|
update,
|
|
flavor='chat',
|
|
long=False
|
|
)
|
|
if content_type in self.chat_message_handlers:
|
|
answer = asyncio.ensure_future(
|
|
self.chat_message_handlers[content_type](update)
|
|
)
|
|
else:
|
|
answer = None
|
|
logging.debug("Unhandled message")
|
|
return answer
|
|
|
|
async def handle_inline_query(self, update):
|
|
"""Handle inline query and answer it with results, or log errors."""
|
|
query = update['query']
|
|
answer = None
|
|
switch_pm_text, switch_pm_parameter = None, None
|
|
if self.maintenance:
|
|
answer = self.maintenance_message
|
|
else:
|
|
for condition, handler in self.inline_query_handlers.items():
|
|
answerer = handler['function']
|
|
if condition(query):
|
|
if asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
break
|
|
if not answer:
|
|
answer = self.default_inline_query_answer
|
|
if type(answer) is dict:
|
|
if 'switch_pm_text' in answer:
|
|
switch_pm_text = answer['switch_pm_text']
|
|
if 'switch_pm_parameter' in answer:
|
|
switch_pm_parameter = answer['switch_pm_parameter']
|
|
answer = answer['answer']
|
|
if type(answer) is str:
|
|
answer = make_inline_query_answer(answer)
|
|
try:
|
|
await self.answerInlineQuery(
|
|
update['id'],
|
|
answer,
|
|
cache_time=10,
|
|
is_personal=True,
|
|
switch_pm_text=switch_pm_text,
|
|
switch_pm_parameter=switch_pm_parameter
|
|
)
|
|
except Exception as e:
|
|
logging.info("Error answering inline query\n{}".format(e))
|
|
return
|
|
|
|
async def handle_chosen_inline_result(self, update):
|
|
"""When an inline query result is chosen, perform an action.
|
|
|
|
If chosen inline result id is in self.chosen_inline_result_handlers,
|
|
call the related function passing the update as argument.
|
|
"""
|
|
user_id = update['from']['id'] if 'from' in update else None
|
|
if self.maintenance:
|
|
return
|
|
if user_id in self.chosen_inline_result_handlers:
|
|
result_id = update['result_id']
|
|
handlers = self.chosen_inline_result_handlers[user_id]
|
|
if result_id in handlers:
|
|
func = handlers[result_id]
|
|
if asyncio.iscoroutinefunction(func):
|
|
await func(update)
|
|
else:
|
|
func(update)
|
|
return
|
|
|
|
def set_inline_result_handler(self, user_id, result_id, func):
|
|
"""Associate a func to a result_id.
|
|
|
|
When an inline result is chosen having that id, function will
|
|
be passed the update as argument.
|
|
"""
|
|
if type(user_id) is dict:
|
|
user_id = user_id['from']['id']
|
|
assert type(user_id) is int, "user_id must be int!"
|
|
# Query result ids are parsed as str by telegram
|
|
result_id = str(result_id)
|
|
assert callable(func), "func must be a callable"
|
|
if user_id not in self.chosen_inline_result_handlers:
|
|
self.chosen_inline_result_handlers[user_id] = {}
|
|
self.chosen_inline_result_handlers[user_id][result_id] = func
|
|
return
|
|
|
|
async def handle_callback_query(self, update):
|
|
"""Answer callback queries.
|
|
|
|
Call the callback handler associated to the query prefix.
|
|
The answer is used to edit the source message or send new ones
|
|
if text is longer than single message limit.
|
|
Anyway, the query is answered, otherwise the client would hang and
|
|
the bot would look like idle.
|
|
"""
|
|
answer = None
|
|
if self.maintenance:
|
|
answer = remove_html_tags(self.maintenance_message[:45])
|
|
else:
|
|
data = update['data']
|
|
for start_text, handler in self.callback_handlers.items():
|
|
answerer = handler['function']
|
|
if data.startswith(start_text):
|
|
if asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
break
|
|
if answer:
|
|
if type(answer) is str:
|
|
answer = {'text': answer}
|
|
if type(answer) is not dict:
|
|
return
|
|
if 'edit' in answer:
|
|
if 'message' in update:
|
|
message_identifier = telepot.message_identifier(
|
|
update['message']
|
|
)
|
|
else:
|
|
message_identifier = telepot.message_identifier(update)
|
|
edit = answer['edit']
|
|
reply_markup = (
|
|
edit['reply_markup']
|
|
if 'reply_markup' in edit
|
|
else None
|
|
)
|
|
text = (
|
|
edit['text']
|
|
if 'text' in edit
|
|
else None
|
|
)
|
|
caption = (
|
|
edit['caption']
|
|
if 'caption' in edit
|
|
else None
|
|
)
|
|
parse_mode = (
|
|
edit['parse_mode']
|
|
if 'parse_mode' in edit
|
|
else None
|
|
)
|
|
disable_web_page_preview = (
|
|
edit['disable_web_page_preview']
|
|
if 'disable_web_page_preview' in edit
|
|
else None
|
|
)
|
|
try:
|
|
if 'text' in edit:
|
|
if (
|
|
len(text)
|
|
> self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 200
|
|
):
|
|
if 'from' in update:
|
|
await self.send_message(
|
|
chat_id=update['from']['id'],
|
|
text=text,
|
|
reply_markup=reply_markup,
|
|
parse_mode=parse_mode,
|
|
disable_web_page_preview=(
|
|
disable_web_page_preview
|
|
)
|
|
)
|
|
else:
|
|
await self.editMessageText(
|
|
msg_identifier=message_identifier,
|
|
text=text,
|
|
parse_mode=parse_mode,
|
|
disable_web_page_preview=(
|
|
disable_web_page_preview
|
|
),
|
|
reply_markup=reply_markup
|
|
)
|
|
elif 'caption' in edit:
|
|
await self.editMessageCaption(
|
|
msg_identifier=message_identifier,
|
|
caption=caption,
|
|
reply_markup=reply_markup
|
|
)
|
|
elif 'reply_markup' in edit:
|
|
await self.editMessageReplyMarkup(
|
|
msg_identifier=message_identifier,
|
|
reply_markup=reply_markup
|
|
)
|
|
except Exception as e:
|
|
logging.info("Message was not modified:\n{}".format(e))
|
|
text = answer['text'][:180] if 'text' in answer else None
|
|
show_alert = (
|
|
answer['show_alert']
|
|
if 'show_alert' in answer
|
|
else None
|
|
)
|
|
cache_time = (
|
|
answer['cache_time']
|
|
if 'cache_time' in answer
|
|
else None
|
|
)
|
|
try:
|
|
await self.answerCallbackQuery(
|
|
callback_query_id=update['id'],
|
|
text=text,
|
|
show_alert=show_alert,
|
|
cache_time=cache_time
|
|
)
|
|
except telepot.exception.TelegramError as e:
|
|
logging.error(e)
|
|
else:
|
|
try:
|
|
await self.answerCallbackQuery(callback_query_id=update['id'])
|
|
except telepot.exception.TelegramError as e:
|
|
logging.error(e)
|
|
return
|
|
|
|
async def handle_text_message(self, update):
|
|
"""Answer to chat text messages.
|
|
|
|
1) Ignore bot name (case-insensitive) and search bot custom parsers,
|
|
commands, aliases and parsers for an answerer.
|
|
2) Get an answer from answerer(update).
|
|
3) Send it to the user.
|
|
"""
|
|
answerer, answer = None, None
|
|
# Lower text and replace only bot's tag,
|
|
# meaning that `/command@OtherBot` will be ignored.
|
|
text = update['text'].lower().replace(
|
|
'@{}'.format(
|
|
self.name.lower()
|
|
),
|
|
''
|
|
)
|
|
user_id = update['from']['id'] if 'from' in update else None
|
|
if self.maintenance and not any(
|
|
text.startswith(x)
|
|
for x in ('/coma', '/restart')
|
|
):
|
|
if update['chat']['id'] > 0:
|
|
answer = self.maintenance_message
|
|
elif user_id in self.custom_parsers:
|
|
answerer = self.custom_parsers[user_id]
|
|
del self.custom_parsers[user_id]
|
|
elif text.startswith('/'):
|
|
command = text.split()[0].strip(' /@')
|
|
if command in self.commands:
|
|
answerer = self.commands[command]['function']
|
|
elif update['chat']['id'] > 0:
|
|
answer = self.unknown_command_message
|
|
else:
|
|
# If text starts with an alias
|
|
# Aliases are case insensitive: text and alias are both .lower()
|
|
for alias, parser in self.aliases.items():
|
|
if text.startswith(alias.lower()):
|
|
answerer = parser
|
|
break
|
|
# If update matches any parser
|
|
for check_function, parser in self.parsers.items():
|
|
if (
|
|
parser['argument'] == 'text'
|
|
and check_function(text)
|
|
) or (
|
|
parser['argument'] == 'update'
|
|
and check_function(update)
|
|
):
|
|
answerer = parser['function']
|
|
break
|
|
if answerer:
|
|
if asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
if answer:
|
|
try:
|
|
return await self.send_message(answer=answer, chat_id=update)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Failed to process answer:\n{}".format(e),
|
|
exc_info=True
|
|
)
|
|
|
|
async def handle_pinned_message(self, update):
|
|
"""Handle pinned message chat action."""
|
|
if self.maintenance:
|
|
return
|
|
answerer = None
|
|
for criteria, handler in self.chat_actions['pinned'].items():
|
|
if criteria(update):
|
|
answerer = handler['function']
|
|
break
|
|
if answerer is None:
|
|
return
|
|
elif asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
if answer:
|
|
try:
|
|
return await self.send_message(
|
|
answer=answer,
|
|
chat_id=update['chat']['id']
|
|
)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Failed to process answer:\n{}".format(
|
|
e
|
|
),
|
|
exc_info=True
|
|
)
|
|
return
|
|
|
|
async def handle_photo_message(self, update):
|
|
"""Handle photo chat message."""
|
|
user_id = update['from']['id'] if 'from' in update else None
|
|
answerer, answer = None, None
|
|
if self.maintenance:
|
|
if update['chat']['id'] > 0:
|
|
answer = self.maintenance_message
|
|
elif user_id in self.custom_photo_parsers:
|
|
answerer = self.custom_photo_parsers[user_id]
|
|
del self.custom_photo_parsers[user_id]
|
|
if answerer:
|
|
if asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
if answer:
|
|
try:
|
|
return await self.send_message(answer=answer, chat_id=update)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Failed to process answer:\n{}".format(
|
|
e
|
|
),
|
|
exc_info=True
|
|
)
|
|
return
|
|
|
|
async def handle_location(self, update):
|
|
"""Handle location sent by user."""
|
|
user_id = update['from']['id'] if 'from' in update else None
|
|
answerer, answer = None, None
|
|
if self.maintenance:
|
|
if update['chat']['id'] > 0:
|
|
answer = self.maintenance_message
|
|
elif user_id in self.custom_location_parsers:
|
|
answerer = self.custom_location_parsers[user_id]
|
|
del self.custom_location_parsers[user_id]
|
|
if answerer:
|
|
if asyncio.iscoroutinefunction(answerer):
|
|
answer = await answerer(update)
|
|
else:
|
|
answer = answerer(update)
|
|
if answer:
|
|
try:
|
|
return await self.send_message(answer=answer, chat_id=update)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Failed to process answer:\n{}".format(
|
|
e
|
|
),
|
|
exc_info=True
|
|
)
|
|
return
|
|
|
|
def set_custom_parser(self, parser, update=None, user=None):
|
|
"""Set a custom parser for the user.
|
|
|
|
Any chat message update coming from the user will be handled by
|
|
this custom parser instead of default parsers (commands, aliases
|
|
and text parsers).
|
|
Custom parsers last one single use, but their handler can call this
|
|
function to provide multiple tries.
|
|
"""
|
|
if user and type(user) is int:
|
|
pass
|
|
elif type(update) is int:
|
|
user = update
|
|
elif type(user) is dict:
|
|
user = (
|
|
user['from']['id']
|
|
if 'from' in user
|
|
and 'id' in user['from']
|
|
else None
|
|
)
|
|
elif not user and type(update) is dict:
|
|
user = (
|
|
update['from']['id']
|
|
if 'from' in update
|
|
and 'id' in update['from']
|
|
else None
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
'Invalid user.\nuser: {}\nupdate: {}'.format(
|
|
user,
|
|
update
|
|
)
|
|
)
|
|
if not type(user) is int:
|
|
raise TypeError(
|
|
'User {} is not an int id'.format(
|
|
user
|
|
)
|
|
)
|
|
if not callable(parser):
|
|
raise TypeError(
|
|
'Parser {} is not a callable'.format(
|
|
parser.__name__
|
|
)
|
|
)
|
|
self.custom_parsers[user] = parser
|
|
return
|
|
|
|
def set_custom_photo_parser(self, parser, update=None, user=None):
|
|
"""Set a custom photo parser for the user.
|
|
|
|
Any photo chat update coming from the user will be handled by
|
|
this custom parser instead of default parsers.
|
|
Custom photo parsers last one single use, but their handler can
|
|
call this function to provide multiple tries.
|
|
"""
|
|
if user and type(user) is int:
|
|
pass
|
|
elif type(update) is int:
|
|
user = update
|
|
elif type(user) is dict:
|
|
user = (
|
|
user['from']['id']
|
|
if 'from' in user
|
|
and 'id' in user['from']
|
|
else None
|
|
)
|
|
elif not user and type(update) is dict:
|
|
user = (
|
|
update['from']['id']
|
|
if 'from' in update
|
|
and 'id' in update['from']
|
|
else None
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
'Invalid user.\nuser: {}\nupdate: {}'.format(
|
|
user,
|
|
update
|
|
)
|
|
)
|
|
if not type(user) is int:
|
|
raise TypeError(
|
|
'User {} is not an int id'.format(
|
|
user
|
|
)
|
|
)
|
|
if not callable(parser):
|
|
raise TypeError(
|
|
'Parser {} is not a callable'.format(
|
|
parser.__name__
|
|
)
|
|
)
|
|
self.custom_photo_parsers[user] = parser
|
|
return
|
|
|
|
def set_custom_location_parser(self, parser, update=None, user=None):
|
|
"""Set a custom location parser for the user.
|
|
|
|
Any location chat update coming from the user will be handled by
|
|
this custom parser instead of default parsers.
|
|
Custom location parsers last one single use, but their handler can
|
|
call this function to provide multiple tries.
|
|
"""
|
|
if user and type(user) is int:
|
|
pass
|
|
elif type(update) is int:
|
|
user = update
|
|
elif type(user) is dict:
|
|
user = (
|
|
user['from']['id']
|
|
if 'from' in user
|
|
and 'id' in user['from']
|
|
else None
|
|
)
|
|
elif not user and type(update) is dict:
|
|
user = (
|
|
update['from']['id']
|
|
if 'from' in update
|
|
and 'id' in update['from']
|
|
else None
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
'Invalid user.\nuser: {}\nupdate: {}'.format(
|
|
user,
|
|
update
|
|
)
|
|
)
|
|
if not type(user) is int:
|
|
raise TypeError(
|
|
'User {} is not an int id'.format(
|
|
user
|
|
)
|
|
)
|
|
if not callable(parser):
|
|
raise TypeError(
|
|
'Parser {} is not a callable'.format(
|
|
parser.__name__
|
|
)
|
|
)
|
|
self.custom_location_parsers[user] = parser
|
|
return
|
|
|
|
def command(self, command, aliases=None, show_in_keyboard=False,
|
|
descr="", auth='admin'):
|
|
"""Define a bot command.
|
|
|
|
Decorator: `@bot.command(*args)`
|
|
When a message text starts with `/command[@bot_name]`, or with an
|
|
alias, it gets passed to the decorated function.
|
|
`command` is the command name (with or without /)
|
|
`aliases` is a list of aliases
|
|
`show_in_keyboard`, if True, makes first alias appear
|
|
in default_keyboard
|
|
`descr` is a description
|
|
`auth` is the lowest authorization level needed to run the command
|
|
"""
|
|
command = command.replace('/', '').lower()
|
|
if not isinstance(command, str):
|
|
raise TypeError('Command {} is not a string'.format(command))
|
|
if aliases:
|
|
if not isinstance(aliases, list):
|
|
raise TypeError('Aliases is not a list: {}'.format(aliases))
|
|
for alias in aliases:
|
|
if not isinstance(alias, str):
|
|
raise TypeError('Alias {} is not a string'.format(alias))
|
|
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def decorated(message):
|
|
logging.info(
|
|
"COMMAND({c}) @{n} FROM({f})".format(
|
|
c=command,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return await func(message)
|
|
return self.unauthorized_message
|
|
else:
|
|
def decorated(message):
|
|
logging.info(
|
|
"COMMAND({c}) @{n} FROM({f})".format(
|
|
c=command,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return func(message)
|
|
return self.unauthorized_message
|
|
self.commands[command] = dict(
|
|
function=decorated,
|
|
descr=descr,
|
|
auth=auth
|
|
)
|
|
if aliases:
|
|
for alias in aliases:
|
|
self.aliases[alias] = decorated
|
|
if show_in_keyboard:
|
|
self.default_reply_keyboard_elements.append(aliases[0])
|
|
return decorator
|
|
|
|
def parser(self, condition, descr='', auth='admin', argument='text'):
|
|
"""Define a message parser.
|
|
|
|
Decorator: `@bot.parser(condition)`
|
|
If condition evaluates True when run on a message text
|
|
(not starting with '/'), such decorated function gets
|
|
called on update.
|
|
Conditions of parsers are evaluated in order; when one is True,
|
|
others will be skipped.
|
|
`descr` is a description
|
|
`auth` is the lowest authorization level needed to run the command
|
|
"""
|
|
if not callable(condition):
|
|
raise TypeError(
|
|
'Condition {} is not a callable'.format(
|
|
condition.__name__
|
|
)
|
|
)
|
|
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def decorated(message):
|
|
logging.info(
|
|
"TEXT MATCHING CONDITION({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return await func(message)
|
|
return self.unauthorized_message
|
|
else:
|
|
def decorated(message):
|
|
logging.info(
|
|
"TEXT MATCHING CONDITION({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return func(message)
|
|
return self.unauthorized_message
|
|
self.parsers[condition] = dict(
|
|
function=decorated,
|
|
descr=descr,
|
|
auth=auth,
|
|
argument=argument
|
|
)
|
|
return decorator
|
|
|
|
def pinned(self, condition, descr='', auth='admin'):
|
|
"""Handle pinned messages.
|
|
|
|
Decorator: `@bot.pinned(condition)`
|
|
If condition evaluates True when run on a pinned_message update,
|
|
such decorated function gets called on update.
|
|
Conditions are evaluated in order; when one is True,
|
|
others will be skipped.
|
|
`descr` is a description
|
|
`auth` is the lowest authorization level needed to run the command
|
|
"""
|
|
if not callable(condition):
|
|
raise TypeError(
|
|
'Condition {c} is not a callable'.format(
|
|
c=condition.__name__
|
|
)
|
|
)
|
|
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def decorated(message):
|
|
logging.info(
|
|
"PINNED MESSAGE MATCHING({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return await func(message)
|
|
return
|
|
else:
|
|
def decorated(message):
|
|
logging.info(
|
|
"PINNED MESSAGE MATCHING({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
if 'from' in message
|
|
else message['chat']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return func(message)
|
|
return
|
|
self.chat_actions['pinned'][condition] = dict(
|
|
function=decorated,
|
|
descr=descr,
|
|
auth=auth
|
|
)
|
|
return decorator
|
|
|
|
def button(self, data, descr='', auth='admin'):
|
|
"""Define a bot button.
|
|
|
|
Decorator: `@bot.button('example:///')`
|
|
When a callback data text starts with <data>, it gets passed to the
|
|
decorated function
|
|
`descr` is a description
|
|
`auth` is the lowest authorization level needed to run the command
|
|
"""
|
|
if not isinstance(data, str):
|
|
raise TypeError(
|
|
'Inline button callback_data {d} is not a string'.format(
|
|
d=data
|
|
)
|
|
)
|
|
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def decorated(message):
|
|
logging.info(
|
|
"INLINE BUTTON({d}) @{n} FROM({f})".format(
|
|
d=message['data'],
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return await func(message)
|
|
return self.unauthorized_message
|
|
else:
|
|
def decorated(message):
|
|
logging.info(
|
|
"INLINE BUTTON({d}) @{n} FROM({f})".format(
|
|
d=message['data'],
|
|
n=self.name,
|
|
f=(
|
|
message['from']
|
|
)
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return func(message)
|
|
return self.unauthorized_message
|
|
self.callback_handlers[data] = dict(
|
|
function=decorated,
|
|
descr=descr,
|
|
auth=auth
|
|
)
|
|
return decorator
|
|
|
|
def query(self, condition, descr='', auth='admin'):
|
|
"""Define an inline query.
|
|
|
|
Decorator: `@bot.query(example)`
|
|
When an inline query matches the `condition` function,
|
|
decorated function is called and passed the query update object
|
|
as argument.
|
|
`descr` is a description
|
|
`auth` is the lowest authorization level needed to run the command
|
|
"""
|
|
if not callable(condition):
|
|
raise TypeError(
|
|
'Condition {c} is not a callable'.format(
|
|
c=condition.__name__
|
|
)
|
|
)
|
|
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def decorated(message):
|
|
logging.info(
|
|
"QUERY MATCHING CONDITION({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=message['from']
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return await func(message)
|
|
return self.unauthorized_message
|
|
else:
|
|
def decorated(message):
|
|
logging.info(
|
|
"QUERY MATCHING CONDITION({c}) @{n} FROM({f})".format(
|
|
c=condition.__name__,
|
|
n=self.name,
|
|
f=message['from']
|
|
)
|
|
)
|
|
if self.authorization_function(message, auth):
|
|
return func(message)
|
|
return self.unauthorized_message
|
|
self.inline_query_handlers[condition] = dict(
|
|
function=decorated,
|
|
descr=descr,
|
|
auth=auth
|
|
)
|
|
return decorator
|
|
|
|
def additional_task(self, when='BEFORE'):
|
|
"""Add a task before or after message_loop.
|
|
|
|
Decorator: such decorated async functions get awaited BEFORE or
|
|
AFTER messageloop
|
|
"""
|
|
when = when[0].lower()
|
|
|
|
def decorator(func):
|
|
if when == 'b':
|
|
self.run_before_loop.append(func)
|
|
elif when == 'a':
|
|
self.run_after_loop.append(func)
|
|
return decorator
|
|
|
|
def set_default_keyboard(self, keyboard='set_default'):
|
|
"""Set a default keyboard for the bot.
|
|
|
|
If a keyboard is not passed as argument, a default one is generated,
|
|
based on aliases of commands.
|
|
"""
|
|
if keyboard == 'set_default':
|
|
btns = [
|
|
dict(
|
|
text=x
|
|
)
|
|
for x in self.default_reply_keyboard_elements
|
|
]
|
|
row_len = 2 if len(btns) < 4 else 3
|
|
self._default_keyboard = dict(
|
|
keyboard=make_lines_of_buttons(
|
|
btns,
|
|
row_len
|
|
),
|
|
resize_keyboard=True
|
|
)
|
|
else:
|
|
self._default_keyboard = keyboard
|
|
return
|
|
|
|
async def edit_message(self, update, *args, **kwargs):
|
|
"""Edit given update with given *args and **kwargs.
|
|
|
|
Please note, that it is currently only possible to edit messages
|
|
without reply_markup or with inline keyboards.
|
|
"""
|
|
try:
|
|
return await self.editMessageText(
|
|
telepot.message_identifier(update),
|
|
*args,
|
|
**kwargs
|
|
)
|
|
except Exception as e:
|
|
logging.error("{}".format(e))
|
|
|
|
async def delete_message(self, update, *args, **kwargs):
|
|
"""Delete given update with given *args and **kwargs.
|
|
|
|
Please note, that a bot can delete only messages sent by itself
|
|
or sent in a group which it is administrator of.
|
|
"""
|
|
try:
|
|
return await self.deleteMessage(
|
|
telepot.message_identifier(update),
|
|
*args,
|
|
**kwargs
|
|
)
|
|
except Exception as e:
|
|
logging.error("{}".format(e))
|
|
|
|
async def send_message(self, answer=dict(), chat_id=None, text='',
|
|
parse_mode="HTML", disable_web_page_preview=None,
|
|
disable_notification=None, reply_to_message_id=None,
|
|
reply_markup=None):
|
|
"""Send a message.
|
|
|
|
Convenient method to call telepot.Bot(token).sendMessage
|
|
All sendMessage **kwargs can be either **kwargs of send_message
|
|
or key:val of answer argument.
|
|
Messages longer than telegram limit will be split properly.
|
|
Telegram flood limits won't be reached thanks to
|
|
`await avoid_flooding(chat_id)`
|
|
parse_mode will be checked and edited if necessary.
|
|
Arguments will be checked and adapted.
|
|
"""
|
|
if type(answer) is dict and 'chat_id' in answer:
|
|
chat_id = answer['chat_id']
|
|
# chat_id may simply be the update to which the bot should repy
|
|
if type(chat_id) is dict:
|
|
chat_id = self.get_chat_id(chat_id)
|
|
if type(answer) is str:
|
|
text = answer
|
|
if (
|
|
not reply_markup
|
|
and chat_id > 0
|
|
and text != self.unauthorized_message
|
|
):
|
|
reply_markup = self.default_keyboard
|
|
elif type(answer) is dict:
|
|
if 'text' in answer:
|
|
text = answer['text']
|
|
if 'parse_mode' in answer:
|
|
parse_mode = answer['parse_mode']
|
|
if 'disable_web_page_preview' in answer:
|
|
disable_web_page_preview = answer['disable_web_page_preview']
|
|
if 'disable_notification' in answer:
|
|
disable_notification = answer['disable_notification']
|
|
if 'reply_to_message_id' in answer:
|
|
reply_to_message_id = answer['reply_to_message_id']
|
|
if 'reply_markup' in answer:
|
|
reply_markup = answer['reply_markup']
|
|
elif (
|
|
not reply_markup
|
|
and type(chat_id) is int
|
|
and chat_id > 0
|
|
and text != self.unauthorized_message
|
|
):
|
|
reply_markup = self.default_keyboard
|
|
assert type(text) is str, "Text is not a string!"
|
|
assert (
|
|
type(chat_id) is int
|
|
or (type(chat_id) is str and chat_id.startswith('@'))
|
|
), "Invalid chat_id:\n\t\t{}".format(chat_id)
|
|
if not text:
|
|
return
|
|
parse_mode = str(parse_mode)
|
|
text_chunks = split_text_gracefully(
|
|
text=text,
|
|
limit=self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 100,
|
|
parse_mode=parse_mode
|
|
)
|
|
n = len(text_chunks)
|
|
for text_chunk in text_chunks:
|
|
n -= 1
|
|
if parse_mode.lower() == "html":
|
|
this_parse_mode = "HTML"
|
|
# Check that all tags are well-formed
|
|
if not markdown_check(
|
|
text_chunk,
|
|
[
|
|
"<", ">",
|
|
"code>", "bold>", "italic>",
|
|
"b>", "i>", "a>", "pre>"
|
|
]
|
|
):
|
|
this_parse_mode = "None"
|
|
text_chunk = (
|
|
"!!![invalid markdown syntax]!!!\n\n"
|
|
+ text_chunk
|
|
)
|
|
elif parse_mode != "None":
|
|
this_parse_mode = "Markdown"
|
|
# Check that all markdowns are well-formed
|
|
if not markdown_check(
|
|
text_chunk,
|
|
[
|
|
"*", "_", "`"
|
|
]
|
|
):
|
|
this_parse_mode = "None"
|
|
text_chunk = (
|
|
"!!![invalid markdown syntax]!!!\n\n"
|
|
+ text_chunk
|
|
)
|
|
else:
|
|
this_parse_mode = parse_mode
|
|
this_reply_markup = reply_markup if n == 0 else None
|
|
try:
|
|
await self.avoid_flooding(chat_id)
|
|
result = await self.sendMessage(
|
|
chat_id=chat_id,
|
|
text=text_chunk,
|
|
parse_mode=this_parse_mode,
|
|
disable_web_page_preview=disable_web_page_preview,
|
|
disable_notification=disable_notification,
|
|
reply_to_message_id=reply_to_message_id,
|
|
reply_markup=this_reply_markup
|
|
)
|
|
except Exception as e:
|
|
logging.debug(
|
|
e,
|
|
exc_info=False # Set exc_info=True for more information
|
|
)
|
|
result = e
|
|
return result
|
|
|
|
async def send_photo(self, chat_id=None, answer={},
|
|
photo=None, caption='', parse_mode='HTML',
|
|
disable_notification=None, reply_to_message_id=None,
|
|
reply_markup=None, use_stored=True,
|
|
second_chance=False):
|
|
"""Send a photo.
|
|
|
|
Convenient method to call telepot.Bot(token).sendPhoto
|
|
All sendPhoto **kwargs can be either **kwargs of send_message
|
|
or key:val of answer argument.
|
|
Captions longer than telegram limit will be shortened gently.
|
|
Telegram flood limits won't be reached thanks to
|
|
`await avoid_flooding(chat_id)`
|
|
Most arguments will be checked and adapted.
|
|
If use_stored is set to True, the bot will store sent photo
|
|
telegram_id and use it for faster sending next times (unless
|
|
future errors).
|
|
Sending photos by their file_id already stored on telegram servers
|
|
is way faster: that's why bot stores and uses this info,
|
|
if required.
|
|
A second_chance is given to send photo on error.
|
|
"""
|
|
if 'chat_id' in answer:
|
|
chat_id = answer['chat_id']
|
|
# chat_id may simply be the update to which the bot should repy
|
|
if type(chat_id) is dict:
|
|
chat_id = self.get_chat_id(chat_id)
|
|
assert (
|
|
type(chat_id) is int
|
|
or (type(chat_id) is str and chat_id.startswith('@'))
|
|
), "Invalid chat_id:\n\t\t{}".format(chat_id)
|
|
if 'photo' in answer:
|
|
photo = answer['photo']
|
|
assert photo is not None, "Null photo!"
|
|
if 'caption' in answer:
|
|
caption = answer['caption']
|
|
if 'parse_mode' in answer:
|
|
parse_mode = answer['parse_mode']
|
|
if 'disable_notification' in answer:
|
|
disable_notification = answer['disable_notification']
|
|
if 'reply_to_message_id' in answer:
|
|
reply_to_message_id = answer['reply_to_message_id']
|
|
if 'reply_markup' in answer:
|
|
reply_markup = answer['reply_markup']
|
|
already_sent = False
|
|
if type(photo) is str:
|
|
photo_url = photo
|
|
with self.db as db:
|
|
already_sent = db['sent_pictures'].find_one(
|
|
url=photo_url,
|
|
errors=False
|
|
)
|
|
if already_sent and use_stored:
|
|
photo = already_sent['file_id']
|
|
already_sent = True
|
|
else:
|
|
already_sent = False
|
|
if not any(photo_url.startswith(x) for x in ['http', 'www']):
|
|
with io.BytesIO() as buffered_picture:
|
|
with open(
|
|
"{}/{}".format(
|
|
self.path,
|
|
photo_url
|
|
),
|
|
'rb'
|
|
) as photo_file:
|
|
buffered_picture.write(photo_file.read())
|
|
photo = buffered_picture.getvalue()
|
|
else:
|
|
use_stored = False
|
|
caption = escape_html_chars(caption)
|
|
if len(caption) > 199:
|
|
new_caption = ''
|
|
tag = False
|
|
tag_body = False
|
|
count = 0
|
|
temp = ''
|
|
for char in caption:
|
|
if tag and char == '>':
|
|
tag = False
|
|
elif char == '<':
|
|
tag = True
|
|
tag_body = not tag_body
|
|
elif not tag:
|
|
count += 1
|
|
if count == 199:
|
|
break
|
|
temp += char
|
|
if not tag_body:
|
|
new_caption += temp
|
|
temp = ''
|
|
caption = new_caption
|
|
sent = None
|
|
try:
|
|
await self.avoid_flooding(chat_id)
|
|
sent = await self.sendPhoto(
|
|
chat_id=chat_id,
|
|
photo=photo,
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
reply_to_message_id=reply_to_message_id,
|
|
reply_markup=reply_markup
|
|
)
|
|
if isinstance(sent, Exception):
|
|
raise Exception("SendingFailed")
|
|
except Exception as e:
|
|
logging.error(
|
|
"Error sending photo\n{}".format(
|
|
e
|
|
),
|
|
exc_info=False # Set exc_info=True for more information
|
|
)
|
|
if already_sent:
|
|
with self.db as db:
|
|
db['sent_pictures'].update(
|
|
dict(
|
|
url=photo_url,
|
|
errors=True
|
|
),
|
|
['url']
|
|
)
|
|
if not second_chance:
|
|
logging.info("Trying again (only once)...")
|
|
sent = await self.send_photo(
|
|
chat_id=chat_id,
|
|
answer=answer,
|
|
photo=photo,
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
reply_to_message_id=reply_to_message_id,
|
|
reply_markup=reply_markup,
|
|
second_chance=True
|
|
)
|
|
if (
|
|
sent is not None
|
|
and hasattr(sent, '__getitem__')
|
|
and 'photo' in sent
|
|
and len(sent['photo']) > 0
|
|
and 'file_id' in sent['photo'][0]
|
|
and (not already_sent)
|
|
and use_stored
|
|
):
|
|
with self.db as db:
|
|
db['sent_pictures'].insert(
|
|
dict(
|
|
url=photo_url,
|
|
file_id=sent['photo'][0]['file_id'],
|
|
errors=False
|
|
)
|
|
)
|
|
return sent
|
|
|
|
async def forward_message(self, chat_id, update=None, from_chat_id=None,
|
|
message_id=None, disable_notification=False):
|
|
"""Forward message from `from_chat_id` to `chat_id`.
|
|
|
|
Set disable_notification to True to avoid disturbing recipient.
|
|
"""
|
|
try:
|
|
if from_chat_id is None or message_id is None:
|
|
if (
|
|
type(update) is not dict
|
|
or 'chat' not in update
|
|
or 'id' not in update['chat']
|
|
or 'message_id' not in update
|
|
):
|
|
raise Exception("Wrong parameters, cannot forward.")
|
|
from_chat_id = update['chat']['id']
|
|
message_id = update['message_id']
|
|
await self.avoid_flooding(chat_id)
|
|
sent = await self.forwardMessage(
|
|
chat_id=chat_id,
|
|
from_chat_id=from_chat_id,
|
|
disable_notification=disable_notification,
|
|
message_id=message_id,
|
|
)
|
|
if isinstance(sent, Exception):
|
|
raise Exception("Forwarding failed.")
|
|
except Exception as e:
|
|
logging.error(
|
|
"Error forwarding message:\n{}".format(
|
|
e
|
|
),
|
|
exc_info=False # Set exc_info=True for more information
|
|
)
|
|
|
|
async def send_and_destroy(self, chat_id, answer,
|
|
timer=60, mode='text', **kwargs):
|
|
"""Send a message or photo and delete it after `timer` seconds."""
|
|
if mode == 'text':
|
|
sent_message = await self.send_message(
|
|
chat_id=chat_id,
|
|
answer=answer,
|
|
**kwargs
|
|
)
|
|
elif mode == 'pic':
|
|
sent_message = await self.send_photo(
|
|
chat_id=chat_id,
|
|
answer=answer,
|
|
**kwargs
|
|
)
|
|
if sent_message is None:
|
|
return
|
|
self.to_be_destroyed.append(sent_message)
|
|
await asyncio.sleep(timer)
|
|
if await self.delete_message(sent_message):
|
|
self.to_be_destroyed.remove(sent_message)
|
|
return
|
|
|
|
async def wait_and_obscure(self, update, when, inline_message_id):
|
|
"""Obscure messages which can't be deleted.
|
|
|
|
Obscure an inline_message `timer` seconds after sending it,
|
|
by editing its text or caption.
|
|
At the moment Telegram won't let bots delete sent inline query results.
|
|
"""
|
|
if type(when) is int:
|
|
when = datetime.datetime.now() + datetime.timedelta(seconds=when)
|
|
assert type(when) is datetime.datetime, (
|
|
"when must be a datetime instance or a number of seconds (int) "
|
|
"to be awaited"
|
|
)
|
|
if 'inline_message_id' not in update:
|
|
logging.info(
|
|
"This inline query result owns no inline_keyboard, so it "
|
|
"can't be modified"
|
|
)
|
|
return
|
|
inline_message_id = update['inline_message_id']
|
|
self.to_be_obscured.append(inline_message_id)
|
|
await sleep_until(when)
|
|
try:
|
|
await self.editMessageCaption(
|
|
inline_message_id,
|
|
text="Time over"
|
|
)
|
|
except Exception:
|
|
try:
|
|
await self.editMessageText(
|
|
inline_message_id,
|
|
text="Time over"
|
|
)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Couldn't obscure message\n{}\n\n{}".format(
|
|
inline_message_id,
|
|
e
|
|
)
|
|
)
|
|
self.to_be_obscured.remove(inline_message_id)
|
|
return
|
|
|
|
async def save_picture(self, update, file_name=None, path='img/',
|
|
extension='jpg'):
|
|
"""Store `update` picture as `path`/`file_name`.`extension`."""
|
|
if not path.endswith('/'):
|
|
path = '{p}/'.format(
|
|
p=path
|
|
)
|
|
if not os.path.isdir(path):
|
|
path = '{path}/img/'.format(
|
|
path=self.path
|
|
)
|
|
if file_name is None:
|
|
file_name = get_secure_key(length=6)
|
|
if file_name.endswith('.'):
|
|
file_name = file_name[:-1]
|
|
complete_file_name = '{path}{name}.{ext}'.format(
|
|
path=self.path,
|
|
name=file_name,
|
|
ext=extension
|
|
)
|
|
while os.path.isfile(complete_file_name):
|
|
file_name += get_secure_key(length=1)
|
|
complete_file_name = '{path}{name}.{ext}'.format(
|
|
path=self.path,
|
|
name=file_name,
|
|
ext=extension
|
|
)
|
|
try:
|
|
await self.download_file(
|
|
update['photo'][-1]['file_id'],
|
|
complete_file_name
|
|
)
|
|
except Exception as e:
|
|
return dict(
|
|
result=1, # Error
|
|
file_name=None,
|
|
error=e
|
|
)
|
|
return dict(
|
|
result=0, # Success
|
|
file_name=complete_file_name,
|
|
error=None
|
|
)
|
|
|
|
async def get_me(self):
|
|
"""Get bot information.
|
|
|
|
Restart bots if bot can't be got.
|
|
"""
|
|
try:
|
|
me = await self.getMe()
|
|
self.bot_name = me["username"]
|
|
self.telegram_id = me['id']
|
|
except Exception as e:
|
|
logging.error(
|
|
"Could not get bot\n{e}".format(
|
|
e=e
|
|
)
|
|
)
|
|
await asyncio.sleep(5*60)
|
|
self.restart_bots()
|
|
return
|
|
|
|
async def continue_running(self):
|
|
"""Get updates.
|
|
|
|
If bot can be got, sets name and telegram_id,
|
|
awaits preliminary tasks and starts getting updates from telegram.
|
|
If bot can't be got, restarts all bots in 5 minutes.
|
|
"""
|
|
await self.get_me()
|
|
for task in self.run_before_loop:
|
|
await task()
|
|
self.set_default_keyboard()
|
|
asyncio.ensure_future(
|
|
self.message_loop(handler=self.routing_table)
|
|
)
|
|
return
|
|
|
|
def stop_bots(self):
|
|
"""Exit script with code 0."""
|
|
Bot.stop = True
|
|
|
|
def restart_bots(self):
|
|
"""Restart the script exiting with code 65.
|
|
|
|
Actually, you need to catch Bot.stop state when Bot.run() returns
|
|
and handle the situation yourself.
|
|
"""
|
|
Bot.stop = "Restart"
|
|
|
|
@classmethod
|
|
async def check_task(cls):
|
|
"""Await until cls.stop, then end session and return."""
|
|
for bot in cls.instances.values():
|
|
asyncio.ensure_future(bot.continue_running())
|
|
while not cls.stop:
|
|
await asyncio.sleep(10)
|
|
return await cls.end_session()
|
|
|
|
@classmethod
|
|
async def end_session(cls):
|
|
"""Run after stop, before the script exits.
|
|
|
|
Await final tasks, obscure and delete pending messages,
|
|
log current operation (stop/restart).
|
|
"""
|
|
for bot in cls.instances.values():
|
|
for task in bot.run_after_loop:
|
|
await task()
|
|
for message in bot.to_be_destroyed:
|
|
try:
|
|
await bot.delete_message(message)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Couldn't delete message\n{}\n\n{}".format(
|
|
message,
|
|
e
|
|
)
|
|
)
|
|
for inline_message_id in bot.to_be_obscured:
|
|
try:
|
|
await bot.editMessageCaption(
|
|
inline_message_id,
|
|
text="Time over"
|
|
)
|
|
except Exception:
|
|
try:
|
|
await bot.editMessageText(
|
|
inline_message_id,
|
|
text="Time over"
|
|
)
|
|
except Exception as e:
|
|
logging.error(
|
|
"Couldn't obscure message\n{}\n\n{}".format(
|
|
inline_message_id,
|
|
e
|
|
)
|
|
)
|
|
if cls.stop == "Restart":
|
|
logging.info("\n\t\t---Restart!---")
|
|
elif cls.stop == "KeyboardInterrupt":
|
|
logging.info("Stopped by KeyboardInterrupt.")
|
|
else:
|
|
logging.info("Stopped gracefully by user.")
|
|
return
|
|
|
|
@classmethod
|
|
def run(cls, loop=None):
|
|
"""Call this method to run the async bots."""
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
logging.info(
|
|
"{sep}{subjvb} STARTED{sep}".format(
|
|
sep='-'*10,
|
|
subjvb=('BOT HAS' if len(cls.instances) == 1 else 'BOTS HAVE')
|
|
)
|
|
)
|
|
try:
|
|
loop.run_until_complete(cls.check_task())
|
|
except KeyboardInterrupt:
|
|
logging.info(
|
|
(
|
|
"\n\t\tYour script received a KeyboardInterrupt signal, "
|
|
"your bot{} being stopped."
|
|
).format(
|
|
's are'
|
|
if len(cls.instances) > 1
|
|
else ' is'
|
|
)
|
|
)
|
|
|
|
cls.stop = "KeyboardInterrupt"
|
|
loop.run_until_complete(cls.end_session())
|
|
except Exception as e:
|
|
logging.error(
|
|
'\nYour bot{vb} been stopped. with error \'{e}\''.format(
|
|
e=e,
|
|
vb='s have' if len(cls.instances) > 1 else ' has'
|
|
),
|
|
exc_info=True
|
|
)
|
|
logging.info(
|
|
"{sep}{subjvb} STOPPED{sep}".format(
|
|
sep='-'*10,
|
|
subjvb='BOT HAS' if len(cls.instances) == 1 else 'BOTS HAVE'
|
|
)
|
|
)
|
|
return
|
|
|
|
@classmethod
|
|
async def _run_manual_mode(cls):
|
|
available_bots = MyOD()
|
|
for code, bot in enumerate(
|
|
cls.instances.values()
|
|
):
|
|
await bot.get_me()
|
|
available_bots[code] = dict(
|
|
bot=bot,
|
|
code=code,
|
|
name=bot.name
|
|
)
|
|
selected_bot = None
|
|
while selected_bot is None:
|
|
user_input = input(
|
|
"\n=============================================\n"
|
|
"Which bot would you like to control manually?\n"
|
|
"Available bots:\n{}\n\n\t\t".format(
|
|
line_drawing_unordered_list(
|
|
list(
|
|
"{b[code]:>3} - {b[bot].name}".format(
|
|
b=bot,
|
|
)
|
|
for bot in available_bots.values()
|
|
)
|
|
)
|
|
)
|
|
)
|
|
if (
|
|
user_input.isnumeric()
|
|
and int(user_input) in available_bots
|
|
):
|
|
selected_bot = available_bots[int(user_input)]
|
|
else:
|
|
selected_bot = pick_most_similar_from_list(
|
|
[
|
|
bot['name']
|
|
for bot in available_bots.values()
|
|
],
|
|
user_input
|
|
)
|
|
selected_bot = available_bots.get_by_key_val(
|
|
key='name',
|
|
val=selected_bot,
|
|
case_sensitive=False,
|
|
return_value=True
|
|
)
|
|
if selected_bot is None:
|
|
logging.error("Invalid selection.")
|
|
continue
|
|
logging.info(
|
|
"Bot `{b[name]}` selected.".format(
|
|
b=selected_bot
|
|
)
|
|
)
|
|
exit_code = await selected_bot['bot']._run_manually()
|
|
if exit_code == 0:
|
|
break
|
|
elif exit_code == 65:
|
|
selected_bot = None
|
|
return
|
|
|
|
@classmethod
|
|
def run_manual_mode(cls, loop=None):
|
|
"""Run in manual mode: send messages via bots."""
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
logging.info(
|
|
"=== MANUAL MODE STARTED ==="
|
|
)
|
|
try:
|
|
loop.run_until_complete(
|
|
cls._run_manual_mode()
|
|
)
|
|
except KeyboardInterrupt:
|
|
logging.info(
|
|
(
|
|
"\n\t\tYour script received a KeyboardInterrupt signal, "
|
|
"your bot{} being stopped."
|
|
).format(
|
|
's are' if len(cls.instances) > 1 else ' is'
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logging.error(
|
|
'\nYour bot{vb} been stopped. with error \'{e}\''.format(
|
|
e=e,
|
|
vb='s have' if len(cls.instances) > 1 else ' has'
|
|
),
|
|
exc_info=True
|
|
)
|
|
logging.info(
|
|
"=== MANUAL MODE STOPPED ==="
|
|
)
|
|
|
|
async def _run_manually(self):
|
|
user_input = ' choose_addressee'
|
|
selected_user = None
|
|
users = []
|
|
while user_input:
|
|
if user_input == ' choose_addressee':
|
|
try:
|
|
user_input = input(
|
|
"Choose an addressee.\n"
|
|
"Press enter to change bot.\n"
|
|
"\n\t\t"
|
|
)
|
|
if len(user_input) == 0:
|
|
return 65 # Let user select a different bot
|
|
except KeyboardInterrupt:
|
|
logging.error("Keyboard interrupt.")
|
|
return 0 # Stop running
|
|
if (
|
|
selected_user is None
|
|
and user_input.strip('-').isnumeric()
|
|
):
|
|
user_input = int(user_input)
|
|
users = list(
|
|
filter(
|
|
lambda user: user['telegram_id'] == user_input,
|
|
users
|
|
)
|
|
)
|
|
if len(users) == 0:
|
|
users = [
|
|
dict(
|
|
telegram_id=user_input,
|
|
name='Unknown user'
|
|
)
|
|
]
|
|
elif (
|
|
selected_user is None
|
|
and self.db_url is not None
|
|
):
|
|
with self.db as db:
|
|
if 'users' not in db.tables:
|
|
db['users'].insert(
|
|
dict(
|
|
telegram_id=0,
|
|
privileges=100,
|
|
username='username',
|
|
first_name='first_name',
|
|
last_name='last_name'
|
|
)
|
|
)
|
|
if 'contacts' not in db.tables:
|
|
db['contacts'].insert(
|
|
dict(
|
|
telegram_id=0,
|
|
name='ZZZXXXAAA',
|
|
)
|
|
)
|
|
users = list(
|
|
db.query(
|
|
"""SELECT telegram_id, MAX(name) name
|
|
FROM (
|
|
SELECT telegram_id,
|
|
COALESCE(
|
|
first_name || ' ' || last_name ||
|
|
' (' || username || ')',
|
|
username,
|
|
first_name || ' ' || last_name,
|
|
last_name,
|
|
first_name
|
|
) AS name
|
|
FROM users
|
|
WHERE COALESCE(
|
|
first_name || last_name || username,
|
|
first_name || username,
|
|
last_name || username,
|
|
first_name || last_name,
|
|
username,
|
|
last_name,
|
|
first_name
|
|
)
|
|
LIKE '%{u}%'
|
|
UNION ALL
|
|
SELECT telegram_id, name
|
|
FROM contacts
|
|
WHERE name LIKE '%{u}%'
|
|
)
|
|
GROUP BY telegram_id
|
|
""".format(
|
|
u=user_input
|
|
)
|
|
)
|
|
)
|
|
if len(users) == 0:
|
|
logging.info("Sorry, no user matches your query.")
|
|
user_input = ' choose_addressee'
|
|
elif len(users) > 1:
|
|
user_input = input(
|
|
"Please select one of the following users:\n"
|
|
"\n"
|
|
"{users}\n"
|
|
"\n"
|
|
"Paste their telegram_id\n"
|
|
"\t\t".format(
|
|
users=line_drawing_unordered_list(
|
|
sorted(
|
|
map(
|
|
lambda user: (
|
|
"{u[telegram_id]} - {u[name]}"
|
|
).format(
|
|
u=user
|
|
),
|
|
users
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
elif len(users) == 1:
|
|
selected_user = users[0]
|
|
while selected_user is not None:
|
|
sent = None
|
|
text = input(
|
|
"What would you like to send "
|
|
"{u[name]} ({u[telegram_id]})?\n"
|
|
"Leave it blank if you want to select another "
|
|
"addressee.\n"
|
|
"\t\t\t".format(
|
|
u=selected_user
|
|
)
|
|
)
|
|
if len(text) == 0:
|
|
selected_user = None
|
|
user_input = ' choose_addressee'
|
|
elif text.lower() == 'photo':
|
|
caption = input(
|
|
'Write a caption (you can leave it blank)\n'
|
|
'\t\t\t'
|
|
)
|
|
try:
|
|
with io.BytesIO() as buffered_picture:
|
|
with open(
|
|
'{path}/sendme.png'.format(
|
|
path=self.path
|
|
),
|
|
'rb' # Read bytes
|
|
) as photo_file:
|
|
buffered_picture.write(
|
|
photo_file.read()
|
|
)
|
|
photo = buffered_picture.getvalue()
|
|
sent = await self.send_photo(
|
|
chat_id=selected_user['telegram_id'],
|
|
photo=photo,
|
|
caption=caption,
|
|
parse_mode='HTML',
|
|
use_stored=False
|
|
)
|
|
except Exception as e:
|
|
logging.error(e)
|
|
else:
|
|
try:
|
|
sent = await self.send_message(
|
|
chat_id=selected_user['telegram_id'],
|
|
text=text,
|
|
parse_mode='HTML'
|
|
)
|
|
except Exception as e:
|
|
logging.error(e)
|
|
if (
|
|
sent is not None
|
|
and not isinstance(sent, Exception)
|
|
):
|
|
logging.info(
|
|
'\n'
|
|
'Sent message:\n'
|
|
'{s}\n'
|
|
'\n'.format(
|
|
s=sent
|
|
)
|
|
)
|
|
while (
|
|
self.db_url
|
|
and selected_user['name'] == 'Unknown user'
|
|
):
|
|
new_name = input(
|
|
"Please enter a nickname for this user.\n"
|
|
"Next time you may retrieve their telegram_id "
|
|
"by passing this nickname (or a part of it).\n"
|
|
"\t\t"
|
|
)
|
|
if len(new_name):
|
|
selected_user['name'] = new_name
|
|
with self.db as db:
|
|
db['contacts'].upsert(
|
|
selected_user,
|
|
['telegram_id'],
|
|
ensure=True
|
|
)
|
|
else:
|
|
logging.info("Invalid name, please try again.")
|
|
return 65 # Keep running, making user select another bot
|