Split text in several messages if it hits Telegram limits

This commit is contained in:
Davte 2019-07-01 23:36:45 +02:00
parent af64241d81
commit e05b0ebe46

View File

@ -15,7 +15,7 @@ from aiohttp import web
# Project modules
from api import TelegramBot, TelegramError
from utilities import get_secure_key, make_lines_of_buttons
from utilities import escape_html_chars, get_secure_key, make_lines_of_buttons
# Do not log aiohttp `INFO` and `DEBUG` levels
logging.getLogger('aiohttp').setLevel(logging.WARNING)
@ -36,6 +36,7 @@ class Bot(TelegramBot):
"Please retry later...")
_authorization_denied_message = None
_unknown_command_message = None
TELEGRAM_MESSAGES_MAX_LEN = 4096
def __init__(
self, token, hostname='', certificate=None, max_connections=40,
@ -604,8 +605,58 @@ class Bot(TelegramBot):
"but this handler does nothing yet."
)
@staticmethod
def split_message_text(text, limit=None, parse_mode='HTML'):
r"""Split text if it hits telegram limits for text messages.
Split at `\n` if possible.
Add a `[...]` at the end and beginning of split messages,
with proper code markdown.
"""
if parse_mode == 'HTML':
text = escape_html_chars(text)
tags = (
('`', '`')
if parse_mode == 'Markdown'
else ('<code>', '</code>')
if parse_mode.lower() == 'html'
else ('', '')
)
if limit is None:
limit = Bot.TELEGRAM_MESSAGES_MAX_LEN - 100
# Example text: "lines\nin\nreversed\order"
text = text.split("\n")[::-1] # ['order', 'reversed', 'in', 'lines']
text_part_number = 0
while len(text) > 0:
temp = []
text_part_number += 1
while (
len(text) > 0
and len(
"\n".join(temp + [text[-1]])
) < limit
):
# Append lines of `text` in order (`.pop` returns the last
# line in text) until the addition of the next line would hit
# the `limit`.
temp.append(text.pop())
# If graceful split failed (last line was longer than limit)
if len(temp) == 0:
# Force split last line
temp.append(text[-1][:limit])
text[-1] = text[-1][limit:]
text_chunk = "\n".join(temp) # Re-join this group of lines
prefix, suffix = '', ''
is_last = len(text) > 0
if text_part_number > 1:
prefix = f"{tags[0]}[...]{tags[1]}\n"
if is_last:
suffix = f"\n{tags[0]}[...]{tags[1]}"
yield (prefix + text_chunk + suffix), is_last
return
async def send_message(self, chat_id=None, text=None,
parse_mode=None,
parse_mode='HTML',
disable_web_page_preview=None,
disable_notification=None,
reply_to_message_id=None,
@ -636,15 +687,26 @@ class Bot(TelegramBot):
and text != self.authorization_denied_message
):
reply_markup = self.default_keyboard
return await self.sendMessage(
chat_id=chat_id,
if not text:
return
parse_mode = str(parse_mode)
text_chunks = self.split_message_text(
text=text,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id,
reply_markup=reply_markup
limit=self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 100,
parse_mode=parse_mode
)
for text_chunk, is_last in text_chunks:
_reply_markup = (reply_markup if is_last else None)
sent_message_update = await self.sendMessage(
chat_id=chat_id,
text=text_chunk,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id,
reply_markup=_reply_markup
)
return sent_message_update
async def answer_inline_query(self,
inline_query_id=None,