HTML parse_mode check upgraded.

All supported tags permitted, malformed tags replaced with escaped characters
This commit is contained in:
Davte 2022-12-12 22:43:44 +01:00
parent 55b47ed1f7
commit fafa639328
Signed by: Davte
GPG Key ID: 70336F92E6814706
5 changed files with 82 additions and 77 deletions

View File

@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it" __email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0" __license__ = "GNU General Public License v3.0"
__version__ = "2.8.9" __version__ = "2.8.10"
__maintainer__ = "Davide Testa" __maintainer__ = "Davide Testa"
__contact__ = "t.me/davte" __contact__ = "t.me/davte"

View File

@ -27,7 +27,7 @@ from davtelepot.messages import default_admin_messages, default_talk_messages
from davtelepot.bot import Bot from davtelepot.bot import Bot
from davtelepot.utilities import ( from davtelepot.utilities import (
async_wrapper, CachedPage, Confirmator, extract, get_cleaned_text, async_wrapper, CachedPage, Confirmator, extract, get_cleaned_text,
get_user, escape_html_chars, line_drawing_unordered_list, make_button, get_user, clean_html_string, line_drawing_unordered_list, make_button,
make_inline_keyboard, remove_html_tags, send_part_of_text_file, make_inline_keyboard, remove_html_tags, send_part_of_text_file,
send_csv_file, make_lines_of_buttons send_csv_file, make_lines_of_buttons
) )
@ -130,7 +130,7 @@ def get_talk_panel(bot: Bot,
'help_text', 'help_text',
update=update, update=update,
user_record=user_record, user_record=user_record,
q=escape_html_chars( q=clean_html_string(
remove_html_tags(text) remove_html_tags(text)
) )
) )
@ -155,7 +155,7 @@ def get_talk_panel(bot: Bot,
'user_not_found', 'user_not_found',
update=update, update=update,
user_record=user_record, user_record=user_record,
q=escape_html_chars( q=clean_html_string(
remove_html_tags(text) remove_html_tags(text)
) )
) )

View File

@ -382,7 +382,7 @@ class TelegramBot:
@staticmethod @staticmethod
def adapt_parameters(parameters, exclude=None): def adapt_parameters(parameters, exclude=None):
"""Build a aiohttp.FormData object from given `parameters`. """Build an aiohttp.FormData object from given `parameters`.
Exclude `self`, empty values and parameters in `exclude` list. Exclude `self`, empty values and parameters in `exclude` list.
Cast integers to string to avoid TypeError during json serialization. Cast integers to string to avoid TypeError during json serialization.
@ -1058,7 +1058,7 @@ class TelegramBot:
unbanned first. unbanned first.
Note: In regular groups (non-supergroups), this method will only work Note: In regular groups (non-supergroups), this method will only work
if the All Members Are Admins setting is off in the target group. if the All Members Are Admins setting is off in the target group.
Otherwise members may only be removed by the group's creator or by Otherwise, members may only be removed by the group's creator or by
the member that added them. the member that added them.
See https://core.telegram.org/bots/api#kickchatmember for details. See https://core.telegram.org/bots/api#kickchatmember for details.
""" """
@ -1245,7 +1245,7 @@ class TelegramBot:
) )
async def getChat(self, chat_id: Union[int, str]): async def getChat(self, chat_id: Union[int, str]):
"""Get up to date information about the chat. """Get up-to-date information about the chat.
Return a Chat object on success. Return a Chat object on success.
See https://core.telegram.org/bots/api#getchat for details. See https://core.telegram.org/bots/api#getchat for details.

View File

@ -54,7 +54,7 @@ from davtelepot.database import ObjectWithDatabase
from davtelepot.languages import MultiLanguageObject from davtelepot.languages import MultiLanguageObject
from davtelepot.messages import davtelepot_messages from davtelepot.messages import davtelepot_messages
from davtelepot.utilities import ( from davtelepot.utilities import (
async_get, escape_html_chars, extract, get_secure_key, async_get, clean_html_string, extract, get_secure_key,
make_inline_query_answer, make_lines_of_buttons, remove_html_tags make_inline_query_answer, make_lines_of_buttons, remove_html_tags
) )
@ -69,7 +69,7 @@ logging.getLogger('chardet').setLevel(logging.WARNING)
class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
"""Simple Bot object, providing methods corresponding to Telegram bot API. """Simple Bot object, providing methods corresponding to Telegram bot API.
Multiple Bot() instances may be run together, along with a aiohttp web app. Multiple Bot() instances may be run together, along with an aiohttp web app.
""" """
bots = [] bots = []
@ -347,7 +347,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
@property @property
def errors_file_path(self): def errors_file_path(self):
"""Return errors file path basing on self.path and `_errors_file_name`. """Return errors file path basing on `self.path` and `_errors_file_name`.
Fallback to class file if set, otherwise return None. Fallback to class file if set, otherwise return None.
""" """
@ -417,7 +417,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
"""Maximum number of simultaneous HTTPS connections allowed. """Maximum number of simultaneous HTTPS connections allowed.
Telegram will open as many connections as possible to boost bots Telegram will open as many connections as possible to boost bots
throughput, lower values limit the load on bots server. throughput, lower values limit the load on bot's server.
""" """
return self._max_connections return self._max_connections
@ -477,7 +477,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
def allowed_during_maintenance(self): def allowed_during_maintenance(self):
"""Return the list of criteria to allow an update during maintenance. """Return the list of criteria to allow an update during maintenance.
If any of this criteria returns True on an update, that update will be If any of these criteria returns True on an update, that update will be
handled even during maintenance. handled even during maintenance.
""" """
return self._allowed_during_maintenance return self._allowed_during_maintenance
@ -858,7 +858,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
elif 'chat' in update and update['chat']['id'] > 0: elif 'chat' in update and update['chat']['id'] > 0:
reply = dict(text=self.unknown_command_message) reply = dict(text=self.unknown_command_message)
else: # Handle command aliases and text parsers else: # Handle command aliases and text parsers
# Aliases are case insensitive: text and alias are both .lower() # Aliases are case-insensitive: text and alias are both .lower()
for alias, function in self.command_aliases.items(): for alias, function in self.command_aliases.items():
if lowered_text.startswith(alias.lower()): if lowered_text.startswith(alias.lower()):
replier = function replier = function
@ -1222,7 +1222,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
with proper code markdown. with proper code markdown.
""" """
if parse_mode == 'HTML': if parse_mode == 'HTML':
text = escape_html_chars(text) text = clean_html_string(text)
tags = ( tags = (
('`', '`') ('`', '`')
if parse_mode == 'Markdown' if parse_mode == 'Markdown'
@ -1591,7 +1591,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
photo.startswith(url_starter) photo.startswith(url_starter)
for url_starter in ('http', 'www',) for url_starter in ('http', 'www',)
] ]
): # If `photo` is not a url but a local file path ): # If `photo` is not a URL but a local file path
try: try:
with io.BytesIO() as buffered_picture: with io.BytesIO() as buffered_picture:
with open( with open(
@ -1716,7 +1716,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
audio.startswith(url_starter) audio.startswith(url_starter)
for url_starter in ('http', 'www',) for url_starter in ('http', 'www',)
] ]
): # If `audio` is not a url but a local file path ): # If `audio` is not a URL but a local file path
try: try:
with io.BytesIO() as buffered_picture: with io.BytesIO() as buffered_picture:
with open( with open(
@ -1841,7 +1841,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
voice.startswith(url_starter) voice.startswith(url_starter)
for url_starter in ('http', 'www',) for url_starter in ('http', 'www',)
] ]
): # If `voice` is not a url but a local file path ): # If `voice` is not a URL but a local file path
try: try:
with io.BytesIO() as buffered_picture: with io.BytesIO() as buffered_picture:
with open( with open(
@ -1977,7 +1977,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
document_path.startswith(url_starter) document_path.startswith(url_starter)
for url_starter in ('http', 'www',) for url_starter in ('http', 'www',)
] ]
): # If `document_path` is not a url but a local file path ): # If `document_path` is not a URL but a local file path
try: try:
with open( with open(
document_path.format( document_path.format(

View File

@ -16,9 +16,9 @@ import string
import time import time
from difflib import SequenceMatcher from difflib import SequenceMatcher
from typing import Tuple, Union
# Third party modules # Third party modules
from typing import Tuple, Union
import aiohttp import aiohttp
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@ -1251,7 +1251,7 @@ def parse_datetime_interval_string(text):
result_text.pop() result_text.pop()
if len(result_text) > 0 and result_text[-1].lower() in TIME_WORDS: if len(result_text) > 0 and result_text[-1].lower() in TIME_WORDS:
result_text.pop() result_text.pop()
result_text = escape_html_chars( result_text = clean_html_string(
' '.join(result_text) ' '.join(result_text)
) )
parsers = list( parsers = list(
@ -1330,6 +1330,22 @@ MONTH_NAMES_ITA[10] = "ottobre"
MONTH_NAMES_ITA[11] = "novembre" MONTH_NAMES_ITA[11] = "novembre"
MONTH_NAMES_ITA[12] = "dicembre" MONTH_NAMES_ITA[12] = "dicembre"
allowed_html_tags = ['b', 'strong',
'i', 'em',
'u', 'ins',
's', 'strike', 'del',
'span', 'tg-spoiler',
'a',
'code', 'pre']
HTML_SYMBOLS = collections.OrderedDict()
HTML_SYMBOLS["&"] = "&"
HTML_SYMBOLS["<"] = "&lt;"
HTML_SYMBOLS[">"] = "&gt;"
HTML_SYMBOLS["\""] = "&quot;"
html_numeric_code_regex = re.compile(r'&amp;(?P<code>#\d{2,3};)')
def beautytd(td): def beautytd(td):
"""Format properly timedeltas.""" """Format properly timedeltas."""
@ -1410,67 +1426,56 @@ def beautydt(dt):
return result return result
HTML_SYMBOLS = MyOD() def clean_html_string(text: str) -> str:
HTML_SYMBOLS["&"] = "&amp;" """Escape HTML symbols, unless part of a valid tag or numeric code character.
HTML_SYMBOLS["<"] = "&lt;"
HTML_SYMBOLS[">"] = "&gt;"
HTML_SYMBOLS["\""] = "&quot;"
HTML_SYMBOLS["&lt;b&gt;"] = "<b>"
HTML_SYMBOLS["&lt;/b&gt;"] = "</b>"
HTML_SYMBOLS["&lt;i&gt;"] = "<i>"
HTML_SYMBOLS["&lt;/i&gt;"] = "</i>"
HTML_SYMBOLS["&lt;code&gt;"] = "<code>"
HTML_SYMBOLS["&lt;/code&gt;"] = "</code>"
HTML_SYMBOLS["&lt;pre&gt;"] = "<pre>"
HTML_SYMBOLS["&lt;/pre&gt;"] = "</pre>"
HTML_SYMBOLS["&lt;a href=&quot;"] = "<a href=\""
HTML_SYMBOLS["&quot;&gt;"] = "\">"
HTML_SYMBOLS["&lt;/a&gt;"] = "</a>"
HTML_TAGS = [ Find valid HTML tags;
None, "<b>", "</b>", if there are any, choose the first occurring and call the function
None, "<i>", "</i>", recursively on what comes before the tag, inside the tag and after the
None, "<code>", "</code>", tag, preserving the tag opening and close as they are;
None, "<pre>", "</pre>", if there aren't any, escape HTML symbols except for `&` in HTML numeric code
None, "<a href=\"", "\">", "</a>", characters (`&#` followed by 2 or 3 digits followed by `;`).
None """
] first_match = None
for tag in allowed_html_tags:
if tag in ('a', ): # <a> must have href attribute
def remove_html_tags(text): attribute = r" href=\".*\""
"""Remove HTML tags from `text`.""" elif tag in ('span', ): # <span> must have class attribute with "tg-spoiler" value
for tag in HTML_TAGS: attribute = r" class=\"tg-spoiler\""
if tag is None: elif tag in ('code',): # <code> may have a class with a programming language as value
continue attribute = r"( class=\".*\")?"
text = text.replace(tag, '') else:
attribute = ""
match = re.search(
rf'(?P<opening><{tag}{attribute}>)'
rf'(?P<body>.*?)'
rf'(?P<close></{tag}>)',
text
)
if match and (first_match is None or match.start() < first_match.start()):
first_match = match
if first_match is not None:
groups = first_match.groupdict()
text = (f"{clean_html_string(text[:first_match.start()])}"
f"{groups['opening']}{clean_html_string(groups['body'])}{groups['close']}"
f"{clean_html_string(text[first_match.end():])}")
else:
for key, value in HTML_SYMBOLS.items():
text = text.replace(key, value)
if re.search(html_numeric_code_regex, text):
text = re.sub(html_numeric_code_regex, r'&\g<code>', text)
return text return text
def escape_html_chars(text): def escape_html_chars(text):
"""Escape HTML chars if not part of a tag.""" logging.error("`escape_html_chars` function deprecated, use `clean_html_string` instead.")
for s, r in HTML_SYMBOLS.items(): return clean_html_string(text)
text = text.replace(s, r)
copy = text
expected_tag = None def remove_html_tags(text):
while copy: """Remove HTML tags from `text`."""
min_ = min( for tag in allowed_html_tags:
( text = re.sub(rf'</?{tag}( (href|class)=\".*\")?>', '', text)
dict(
position=copy.find(tag) if tag in copy else len(copy),
tag=tag
)
for tag in HTML_TAGS
if tag
),
key=lambda x: x['position'],
default=0
)
if min_['position'] == len(copy):
break
if expected_tag and min_['tag'] != expected_tag:
return text.replace('<', '_').replace('>', '_')
expected_tag = HTML_TAGS[HTML_TAGS.index(min_['tag'])+1]
copy = extract(copy, min_['tag'])
return text return text