From 11d07b45d671d82e06f4dba538b7465536344abf Mon Sep 17 00:00:00 2001 From: Davte Date: Mon, 18 May 2020 15:08:15 +0200 Subject: [PATCH] /calc command implemented --- davtelepot/__init__.py | 2 +- davtelepot/bot.py | 3 +- davtelepot/messages.py | 69 +++++ davtelepot/useful_tools.py | 524 +++++++++++++++++++++++++++++++++++-- 4 files changed, 571 insertions(+), 27 deletions(-) diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index e6c8390..2a58146 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -11,7 +11,7 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __license__ = "GNU General Public License v3.0" -__version__ = "2.5.14" +__version__ = "2.5.15" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/bot.py b/davtelepot/bot.py index f7bd50b..62f268c 100644 --- a/davtelepot/bot.py +++ b/davtelepot/bot.py @@ -2232,7 +2232,8 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): handler=decorated_command_handler, description=description, authorization_level=authorization_level, - language_labelled_commands=language_labelled_commands + language_labelled_commands=language_labelled_commands, + aliases=aliases ) if type(description) is dict: self.messages['commands'][command] = dict( diff --git a/davtelepot/messages.py b/davtelepot/messages.py index 19d469f..b7b16cf 100644 --- a/davtelepot/messages.py +++ b/davtelepot/messages.py @@ -1008,6 +1008,75 @@ default_unknown_command_message = { } default_useful_tools_messages = { + 'calculate_command': { + 'description': { + 'en': "Do calculations", + 'it': "Calcola", + }, + 'help_section': None, + 'instructions': { + 'en': "🔢 Calculator 🧮\n\n" + "Enter an algebraic expression after /calc to get its " + "result, or use the command in reply to a message containing " + "an expression, or use the keyboard below.\n\n" + "- ℹ️: show information about special keys\n", + 'it': "🔢 Calcolatrice 🧮\n\n" + "Inserisci un'espressione algebrica dopo /calcola per " + "ottenerne il risultato, oppure usa il comando in risposta, " + "o ancora usa la tastiera qui sotto.\n\n" + "- ℹ️: mostra informazioni sui tasti speciali\n", + }, + 'invalid_expression': { + 'en': "Invalid expression: {error}", + 'it': "Espressione non valida: {error}", + }, + 'language_labelled_commands': { + 'en': "calculate", + 'it': "calcola", + }, + 'message_input': { + 'en': "🔢 Calculator 🧮\n\n" + "Enter an expression", + 'it': "🔢 Calcolatrice 🧮\n\n" + "Mandami l'espressione", + }, + 'special_keys': { + 'en': "Special keys\n" + "- **: exponentiation\n" + "- //: floor division\n" + "- mod: modulus (remainder of division)\n" + "- MR: result of last expression\n" + "- ℹ️: show this help message\n" + "- 💬: write your expression in a message\n" + "- ⬅️: delete last character\n" + "- : start a new line (and a new expression)\n", + 'it': "Tasti speciali\n" + "- **: elevamento a potenza\n" + "- //: quoziente della divisione\n" + "- mod: resto della divisione\n" + "- MR: risultato dell'espressione precedente\n" + "- ℹ️: mostra questo messaggio\n" + "- 💬: invia un messaggio con l'espressione\n" + "- ⬅️: cancella ultimo carattere\n" + "- : vai a capo (inizia una nuova espressione)\n", + }, + 'use_buttons': { + 'en': "Use buttons to enter an algebraic expression.\n\n" + "The input will be displayed after you stop typing for a " + "while.", + 'it': "Usa i pulsanti per comporre un'espressione algebrica.\n\n" + "L'espressione verrà mostrata quando smetterai di " + "digitare per un po'.", + }, + 'result': { + 'en': "🔢 Calculator 🧮\n\n" + "Expressions evaluation:\n\n" + "{expressions}", + 'it': "🔢 Calcolatrice 🧮\n\n" + "Risultato delle espresisoni:\n\n" + "{expressions}", + }, + }, 'info_command': { 'description': { 'en': "Use this command in reply to get information about a message", diff --git a/davtelepot/useful_tools.py b/davtelepot/useful_tools.py index d5c4d38..29fdc79 100644 --- a/davtelepot/useful_tools.py +++ b/davtelepot/useful_tools.py @@ -1,43 +1,443 @@ """General purpose functions for Telegram bots.""" # Standard library +import ast +import asyncio import datetime import json +import logging +import operator from collections import OrderedDict +from typing import List, Union # Project modules from .api import TelegramError from .bot import Bot from .messages import default_useful_tools_messages -from .utilities import get_cleaned_text, recursive_dictionary_update, get_user +from .utilities import (get_cleaned_text, get_user, make_button, + make_inline_keyboard, recursive_dictionary_update, ) -async def _message_info_command(bot: Bot, update: dict, language: str): - """Provide information about selected update. - - Selected update: the message `update` is sent in reply to. If `update` is - not a reply to anything, it gets selected. - The update containing the command, if sent in reply, is deleted. - """ - if 'reply_to_message' in update: - selected_update = update['reply_to_message'] - else: - selected_update = update - await bot.send_message( - text=bot.get_message( - 'useful_tools', 'info_command', 'result', - language=language, - info=json.dumps(selected_update, indent=2) - ), - update=update, - reply_to_message_id=selected_update['message_id'], +def get_calc_buttons() -> OrderedDict: + buttons = OrderedDict() + buttons['**'] = dict( + value='**', + symbol='**', + order='A1', ) - if selected_update != update: + buttons['//'] = dict( + value=' // ', + symbol='//', + order='A2', + ) + buttons['%'] = dict( + value=' % ', + symbol='mod', + order='A3', + ) + buttons['_'] = dict( + value='_', + symbol='MR', + order='B5', + ) + buttons[0] = dict( + value='0', + symbol='0', + order='E1', + ) + buttons[1] = dict( + value='1', + symbol='1', + order='D1', + ) + buttons[2] = dict( + value='2', + symbol='2', + order='D2', + ) + buttons[3] = dict( + value='3', + symbol='3', + order='D3', + ) + buttons[4] = dict( + value='4', + symbol='4', + order='C1', + ) + buttons[5] = dict( + value='5', + symbol='5', + order='C2', + ) + buttons[6] = dict( + value='6', + symbol='6', + order='C3', + ) + buttons[7] = dict( + value='7', + symbol='7', + order='B1', + ) + buttons[8] = dict( + value='8', + symbol='8', + order='B2', + ) + buttons[9] = dict( + value='9', + symbol='9', + order='B3', + ) + buttons['+'] = dict( + value=' + ', + symbol='+', + order='B4', + ) + buttons['-'] = dict( + value=' - ', + symbol='-', + order='C4', + ) + buttons['*'] = dict( + value=' * ', + symbol='*', + order='D4', + ) + buttons['/'] = dict( + value=' / ', + symbol='/', + order='E4', + ) + buttons['.'] = dict( + value='.', + symbol='.', + order='E2', + ) + buttons['thousands'] = dict( + value='000', + symbol='000', + order='E3', + ) + buttons['end'] = dict( + value='\n', + symbol='✅', + order='F1', + ) + buttons['del'] = dict( + value='del', + symbol='⬅️', + order='E5', + ) + buttons['('] = dict( + value='(', + symbol='(️', + order='A4', + ) + buttons[')'] = dict( + value=')', + symbol=')️', + order='A5', + ) + buttons['info'] = dict( + value='info', + symbol='ℹ️️', + order='C5', + ) + + buttons['parser'] = dict( + value='parser', + symbol='💬️', + order='D5', + ) + + return buttons + + +def get_operators() -> dict: + def multiply(a, b): + """Call operator.mul only if a and b are small enough.""" + if abs(max(a, b)) > 10 ** 21: + raise Exception("Numbers were too large!") + return operator.mul(a, b) + + def power(a, b): + """Call operator.pow only if a and b are small enough.""" + if abs(a) > 1000 or abs(b) > 100: + raise Exception("Numbers were too large!") + return operator.pow(a, b) + + return { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: multiply, + ast.Div: operator.truediv, + ast.Pow: power, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod + } + + +calc_buttons = get_calc_buttons() +operators = get_operators() + + +def get_calculator_keyboard(additional_data: list = None): + if additional_data is None: + additional_data = [] + return make_inline_keyboard( + [ + make_button( + text=button['symbol'], + prefix='calc:///', + delimiter='|', + data=[*additional_data, code] + ) + for code, button in sorted(calc_buttons.items(), + key=lambda b: b[1]['order']) + ], + 5 + ) + + +async def _calculate_button(bot: Bot, + update: dict, + user_record: OrderedDict, + language: str, + data: List[Union[int, str]]): + text, reply_markup = '', None + if len(data) < 2: + record_id = bot.db['calculations'].insert( + dict( + user_id=user_record['id'], + created=datetime.datetime.now() + ) + ) + data = [record_id, *data] + text = bot.get_message( + 'useful_tools', 'calculate_command', 'use_buttons', + language=language + ) + else: + record_id = data[0] + reply_markup = get_calculator_keyboard( + additional_data=([record_id] if record_id else None) + ) + if record_id not in bot.shared_data['calc']: + bot.shared_data['calc'][record_id] = [] + asyncio.ensure_future( + calculate_session(bot=bot, + record_id=record_id, + language=language) + ) + update['data'] = data + if len(data) and data[-1] in ('info', 'parser'): + command = data[-1] + if command == 'parser': + reply_markup = None + bot.set_individual_text_message_handler( + handler=_calculate_command, + user_id=user_record['telegram_id'] + ) + elif command == 'info': + reply_markup = make_inline_keyboard( + [ + make_button( + text='Ok', + prefix='calc:///', + delimiter='|', + data=[record_id, 'back'] + ) + ] + ) + text = bot.get_message( + 'useful_tools', 'calculate_command', ( + 'special_keys' if command == 'info' + else 'message_input' if command == 'parser' + else '' + ), + language=language + ) + else: + bot.shared_data['calc'][record_id].append(update) + # Edit the update with the button if a new text is specified + if not text: + return + return dict( + text='', + edit=dict( + text=text, + reply_markup=reply_markup + ) + ) + + +def eval_(node): + """Evaluate ast nodes.""" + if isinstance(node, ast.Num): # + return node.n + elif isinstance(node, ast.BinOp): # + return operators[type(node.op)](eval_(node.left), eval_(node.right)) + elif isinstance(node, ast.UnaryOp): # e.g., -1 + # noinspection PyArgumentList + return operators[type(node.op)](eval_(node.operand)) + else: + raise Exception("Invalid operator") + + +def evaluate_expression(expr): + """Evaluate expressions in a safe way.""" + return eval_( + ast.parse( + expr, + mode='eval' + ).body + ) + + +def evaluate_expressions(bot: Bot, + expressions: str, + language: str = None) -> str: + """Evaluate a string containing lines of expressions. + + `expressions` must be a string containing one expression per line. + """ + line_result, result = 0, [] + for line in expressions.split('\n'): + if not line: + continue try: - await bot.delete_message(update=update) - except TelegramError: + line_result = evaluate_expression( + line.replace('_', str(line_result)) + ) + except Exception as e: + line_result = bot.get_message( + 'useful_tools', 'calculate_command', 'invalid_expression', + language=language, + error=e + ) + result.append( + f"{line}\n= {line_result}" + ) + return '\n\n'.join(result) + + +async def calculate_session(bot: Bot, + record_id: int, + language: str, + buffer_seconds: Union[int, float] = 1): + # Wait until input ends + queue = bot.shared_data['calc'][record_id] + queue_len = None + while queue_len != len(queue): + queue_len = len(queue) + await asyncio.sleep(buffer_seconds) + last_entry = max(queue, key=lambda u: u['id'], default=None) + # Delete record-associated queue + queue = queue.copy() + del bot.shared_data['calc'][record_id] + + record = bot.db['calculations'].find_one( + id=record_id + ) + if record is None: + logging.error("Invalid record identifier!") + return + expression = record['expression'] or '' + reply_markup = get_calculator_keyboard(additional_data=[record['id']]) + + # It would be nice to do: + # for update in sorted(queue, key=lambda u: u['id']) + # Alas, 'id's are not progressive... Telegram's fault! + for i, update in enumerate(queue): + if i % 5 == 0: + await asyncio.sleep(.1) + data = update['data'] + if len(data) != 2: + logging.error(f"Something went wrong: invalid data received.\n{data}") + return + input_value = data[1] + if input_value == 'del': + expression = expression[:-1] + elif input_value == 'back': pass + elif input_value in calc_buttons: + expression += calc_buttons[input_value]['value'] + else: + logging.error(f"Invalid input from calculator button: {input_value}") + if record: + bot.db['calculations'].update( + dict( + id=record['id'], + modified=datetime.datetime.now(), + expression=expression + ), + ['id'] + ) + if expression: + text = bot.get_message( + 'useful_tools', 'calculate_command', 'result', + language=language, + expressions=evaluate_expressions(bot=bot, + expressions=expression, + language=language) + ) + else: + text = bot.get_message( + 'useful_tools', 'calculate_command', 'instructions', + language=language + ) + if last_entry is None: + return + await bot.edit_message_text( + text=text, + update=last_entry, + reply_markup=reply_markup + ) + + +async def _calculate_command(bot: Bot, + update: dict, + user_record: OrderedDict, + language: str, + command_name: str = 'calc'): + if 'reply_to_message' in update: + update = update['reply_to_message'] + command_aliases = [command_name] + if command_name in bot.commands: + command_aliases += list( + bot.commands[command_name]['language_labelled_commands'].values() + ) + bot.commands[command_name]['aliases'] + text = get_cleaned_text(bot=bot, + update=update, + replace=command_aliases) + if not text: + text = bot.get_message( + 'useful_tools', 'calculate_command', 'instructions', + language=language + ) + reply_markup = get_calculator_keyboard() + else: + record_id = bot.db['calculations'].insert( + dict( + user_id=user_record['id'], + created=datetime.datetime.now(), + expression=text + ) + ) + text = bot.get_message( + 'useful_tools', 'calculate_command', 'result', + language=language, + expressions=evaluate_expressions(bot=bot, + expressions=text, + language=language) + ) + reply_markup = get_calculator_keyboard(additional_data=[record_id]) + await bot.send_message(text=text, + update=update, + reply_markup=reply_markup) async def _length_command(bot: Bot, update: dict, user_record: OrderedDict): @@ -82,6 +482,33 @@ async def _length_command(bot: Bot, update: dict, user_record: OrderedDict): ) +async def _message_info_command(bot: Bot, update: dict, language: str): + """Provide information about selected update. + + Selected update: the message `update` is sent in reply to. If `update` is + not a reply to anything, it gets selected. + The update containing the command, if sent in reply, is deleted. + """ + if 'reply_to_message' in update: + selected_update = update['reply_to_message'] + else: + selected_update = update + await bot.send_message( + text=bot.get_message( + 'useful_tools', 'info_command', 'result', + language=language, + info=json.dumps(selected_update, indent=2) + ), + update=update, + reply_to_message_id=selected_update['message_id'], + ) + if selected_update != update: + try: + await bot.delete_message(update=update) + except TelegramError: + pass + + async def _ping_command(bot: Bot, update: dict): """Return `pong` only in private chat.""" chat_id = bot.get_chat_id(update=update) @@ -111,7 +538,7 @@ async def _when_command(bot: Bot, update: dict, language: str): when=date ) if 'forward_date' in update: - original_datetime= ( + original_datetime = ( datetime.datetime.fromtimestamp(update['forward_date']) if 'forward_from' in update else None @@ -150,6 +577,53 @@ def init(telegram_bot: Bot, useful_tools_messages=None): useful_tools_messages ) telegram_bot.messages['useful_tools'] = useful_tools_messages + telegram_bot.shared_data['calc'] = dict() + + if 'calculations' not in telegram_bot.db.tables: + types = telegram_bot.db.types + table = telegram_bot.db.create_table( + table_name='calculations' + ) + table.create_column( + 'user_id', + types.integer + ) + table.create_column( + 'created', + types.datetime + ) + table.create_column( + 'modified', + types.datetime + ) + table.create_column( + 'expression', + types.string + ) + + @telegram_bot.command(command='/calc', + aliases=None, + reply_keyboard_button=None, + show_in_keyboard=False, + **{key: val for key, val + in useful_tools_messages['calculate_command'].items() + if key in ('description', 'help_section', + 'language_labelled_commands')}, + authorization_level='everybody') + async def calculate_command(bot, update, user_record, language): + return await _calculate_command(bot=bot, + update=update, + user_record=user_record, + language=language, + command_name='calc') + + @telegram_bot.button(prefix='calc:///', + separator='|', + authorization_level='everybody') + async def calculate_button(bot, update, user_record, language, data): + return await _calculate_button(bot=bot, user_record=user_record, + update=update, + language=language, data=data) @telegram_bot.command(command='/info', aliases=None, @@ -159,7 +633,7 @@ def init(telegram_bot: Bot, useful_tools_messages=None): in useful_tools_messages['info_command'].items() if key in ('description', 'help_section', 'language_labelled_commands')}, - authorization_level='moderator') + authorization_level='everybody') async def message_info_command(bot, update, language): return await _message_info_command(bot=bot, update=update,