diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py
index e6c8390..2a58146 100644
--- a/davtelepot/__init__.py
+++ b/davtelepot/__init__.py
@@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0"
-__version__ = "2.5.14"
+__version__ = "2.5.15"
__maintainer__ = "Davide Testa"
__contact__ = "t.me/davte"
diff --git a/davtelepot/bot.py b/davtelepot/bot.py
index f7bd50b..62f268c 100644
--- a/davtelepot/bot.py
+++ b/davtelepot/bot.py
@@ -2232,7 +2232,8 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
handler=decorated_command_handler,
description=description,
authorization_level=authorization_level,
- language_labelled_commands=language_labelled_commands
+ language_labelled_commands=language_labelled_commands,
+ aliases=aliases
)
if type(description) is dict:
self.messages['commands'][command] = dict(
diff --git a/davtelepot/messages.py b/davtelepot/messages.py
index 19d469f..b7b16cf 100644
--- a/davtelepot/messages.py
+++ b/davtelepot/messages.py
@@ -1008,6 +1008,75 @@ default_unknown_command_message = {
}
default_useful_tools_messages = {
+ 'calculate_command': {
+ 'description': {
+ 'en': "Do calculations",
+ 'it': "Calcola",
+ },
+ 'help_section': None,
+ 'instructions': {
+ 'en': "🔢 Calculator 🧮\n\n"
+ "Enter an algebraic expression after /calc to get its "
+ "result, or use the command in reply to a message containing "
+ "an expression, or use the keyboard below.\n\n"
+ "- ℹ️
: show information about special keys\n",
+ 'it': "🔢 Calcolatrice 🧮\n\n"
+ "Inserisci un'espressione algebrica dopo /calcola per "
+ "ottenerne il risultato, oppure usa il comando in risposta, "
+ "o ancora usa la tastiera qui sotto.\n\n"
+ "- ℹ️
: mostra informazioni sui tasti speciali\n",
+ },
+ 'invalid_expression': {
+ 'en': "Invalid expression: {error}",
+ 'it': "Espressione non valida: {error}",
+ },
+ 'language_labelled_commands': {
+ 'en': "calculate",
+ 'it': "calcola",
+ },
+ 'message_input': {
+ 'en': "🔢 Calculator 🧮\n\n"
+ "Enter an expression",
+ 'it': "🔢 Calcolatrice 🧮\n\n"
+ "Mandami l'espressione",
+ },
+ 'special_keys': {
+ 'en': "Special keys\n"
+ "- **
: exponentiation\n"
+ "- //
: floor division\n"
+ "- mod
: modulus (remainder of division)\n"
+ "- MR
: result of last expression\n"
+ "- ℹ️
: show this help message\n"
+ "- 💬
: write your expression in a message\n"
+ "- ⬅️
: delete last character\n"
+ "- ✅
: start a new line (and a new expression)\n",
+ 'it': "Tasti speciali\n"
+ "- **
: elevamento a potenza\n"
+ "- //
: quoziente della divisione\n"
+ "- mod
: resto della divisione\n"
+ "- MR
: risultato dell'espressione precedente\n"
+ "- ℹ️
: mostra questo messaggio\n"
+ "- 💬
: invia un messaggio con l'espressione\n"
+ "- ⬅️
: cancella ultimo carattere\n"
+ "- ✅
: vai a capo (inizia una nuova espressione)\n",
+ },
+ 'use_buttons': {
+ 'en': "Use buttons to enter an algebraic expression.\n\n"
+ "The input will be displayed after you stop typing for a "
+ "while.",
+ 'it': "Usa i pulsanti per comporre un'espressione algebrica.\n\n"
+ "L'espressione verrà mostrata quando smetterai di "
+ "digitare per un po'.",
+ },
+ 'result': {
+ 'en': "🔢 Calculator 🧮\n\n"
+ "Expressions evaluation:\n\n"
+ "{expressions}",
+ 'it': "🔢 Calcolatrice 🧮\n\n"
+ "Risultato delle espresisoni:\n\n"
+ "{expressions}",
+ },
+ },
'info_command': {
'description': {
'en': "Use this command in reply to get information about a message",
diff --git a/davtelepot/useful_tools.py b/davtelepot/useful_tools.py
index d5c4d38..29fdc79 100644
--- a/davtelepot/useful_tools.py
+++ b/davtelepot/useful_tools.py
@@ -1,43 +1,443 @@
"""General purpose functions for Telegram bots."""
# Standard library
+import ast
+import asyncio
import datetime
import json
+import logging
+import operator
from collections import OrderedDict
+from typing import List, Union
# Project modules
from .api import TelegramError
from .bot import Bot
from .messages import default_useful_tools_messages
-from .utilities import get_cleaned_text, recursive_dictionary_update, get_user
+from .utilities import (get_cleaned_text, get_user, make_button,
+ make_inline_keyboard, recursive_dictionary_update, )
-async def _message_info_command(bot: Bot, update: dict, language: str):
- """Provide information about selected update.
-
- Selected update: the message `update` is sent in reply to. If `update` is
- not a reply to anything, it gets selected.
- The update containing the command, if sent in reply, is deleted.
- """
- if 'reply_to_message' in update:
- selected_update = update['reply_to_message']
- else:
- selected_update = update
- await bot.send_message(
- text=bot.get_message(
- 'useful_tools', 'info_command', 'result',
- language=language,
- info=json.dumps(selected_update, indent=2)
- ),
- update=update,
- reply_to_message_id=selected_update['message_id'],
+def get_calc_buttons() -> OrderedDict:
+ buttons = OrderedDict()
+ buttons['**'] = dict(
+ value='**',
+ symbol='**',
+ order='A1',
)
- if selected_update != update:
+ buttons['//'] = dict(
+ value=' // ',
+ symbol='//',
+ order='A2',
+ )
+ buttons['%'] = dict(
+ value=' % ',
+ symbol='mod',
+ order='A3',
+ )
+ buttons['_'] = dict(
+ value='_',
+ symbol='MR',
+ order='B5',
+ )
+ buttons[0] = dict(
+ value='0',
+ symbol='0',
+ order='E1',
+ )
+ buttons[1] = dict(
+ value='1',
+ symbol='1',
+ order='D1',
+ )
+ buttons[2] = dict(
+ value='2',
+ symbol='2',
+ order='D2',
+ )
+ buttons[3] = dict(
+ value='3',
+ symbol='3',
+ order='D3',
+ )
+ buttons[4] = dict(
+ value='4',
+ symbol='4',
+ order='C1',
+ )
+ buttons[5] = dict(
+ value='5',
+ symbol='5',
+ order='C2',
+ )
+ buttons[6] = dict(
+ value='6',
+ symbol='6',
+ order='C3',
+ )
+ buttons[7] = dict(
+ value='7',
+ symbol='7',
+ order='B1',
+ )
+ buttons[8] = dict(
+ value='8',
+ symbol='8',
+ order='B2',
+ )
+ buttons[9] = dict(
+ value='9',
+ symbol='9',
+ order='B3',
+ )
+ buttons['+'] = dict(
+ value=' + ',
+ symbol='+',
+ order='B4',
+ )
+ buttons['-'] = dict(
+ value=' - ',
+ symbol='-',
+ order='C4',
+ )
+ buttons['*'] = dict(
+ value=' * ',
+ symbol='*',
+ order='D4',
+ )
+ buttons['/'] = dict(
+ value=' / ',
+ symbol='/',
+ order='E4',
+ )
+ buttons['.'] = dict(
+ value='.',
+ symbol='.',
+ order='E2',
+ )
+ buttons['thousands'] = dict(
+ value='000',
+ symbol='000',
+ order='E3',
+ )
+ buttons['end'] = dict(
+ value='\n',
+ symbol='✅',
+ order='F1',
+ )
+ buttons['del'] = dict(
+ value='del',
+ symbol='⬅️',
+ order='E5',
+ )
+ buttons['('] = dict(
+ value='(',
+ symbol='(️',
+ order='A4',
+ )
+ buttons[')'] = dict(
+ value=')',
+ symbol=')️',
+ order='A5',
+ )
+ buttons['info'] = dict(
+ value='info',
+ symbol='ℹ️️',
+ order='C5',
+ )
+
+ buttons['parser'] = dict(
+ value='parser',
+ symbol='💬️',
+ order='D5',
+ )
+
+ return buttons
+
+
+def get_operators() -> dict:
+ def multiply(a, b):
+ """Call operator.mul only if a and b are small enough."""
+ if abs(max(a, b)) > 10 ** 21:
+ raise Exception("Numbers were too large!")
+ return operator.mul(a, b)
+
+ def power(a, b):
+ """Call operator.pow only if a and b are small enough."""
+ if abs(a) > 1000 or abs(b) > 100:
+ raise Exception("Numbers were too large!")
+ return operator.pow(a, b)
+
+ return {
+ ast.Add: operator.add,
+ ast.Sub: operator.sub,
+ ast.Mult: multiply,
+ ast.Div: operator.truediv,
+ ast.Pow: power,
+ ast.FloorDiv: operator.floordiv,
+ ast.Mod: operator.mod
+ }
+
+
+calc_buttons = get_calc_buttons()
+operators = get_operators()
+
+
+def get_calculator_keyboard(additional_data: list = None):
+ if additional_data is None:
+ additional_data = []
+ return make_inline_keyboard(
+ [
+ make_button(
+ text=button['symbol'],
+ prefix='calc:///',
+ delimiter='|',
+ data=[*additional_data, code]
+ )
+ for code, button in sorted(calc_buttons.items(),
+ key=lambda b: b[1]['order'])
+ ],
+ 5
+ )
+
+
+async def _calculate_button(bot: Bot,
+ update: dict,
+ user_record: OrderedDict,
+ language: str,
+ data: List[Union[int, str]]):
+ text, reply_markup = '', None
+ if len(data) < 2:
+ record_id = bot.db['calculations'].insert(
+ dict(
+ user_id=user_record['id'],
+ created=datetime.datetime.now()
+ )
+ )
+ data = [record_id, *data]
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', 'use_buttons',
+ language=language
+ )
+ else:
+ record_id = data[0]
+ reply_markup = get_calculator_keyboard(
+ additional_data=([record_id] if record_id else None)
+ )
+ if record_id not in bot.shared_data['calc']:
+ bot.shared_data['calc'][record_id] = []
+ asyncio.ensure_future(
+ calculate_session(bot=bot,
+ record_id=record_id,
+ language=language)
+ )
+ update['data'] = data
+ if len(data) and data[-1] in ('info', 'parser'):
+ command = data[-1]
+ if command == 'parser':
+ reply_markup = None
+ bot.set_individual_text_message_handler(
+ handler=_calculate_command,
+ user_id=user_record['telegram_id']
+ )
+ elif command == 'info':
+ reply_markup = make_inline_keyboard(
+ [
+ make_button(
+ text='Ok',
+ prefix='calc:///',
+ delimiter='|',
+ data=[record_id, 'back']
+ )
+ ]
+ )
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', (
+ 'special_keys' if command == 'info'
+ else 'message_input' if command == 'parser'
+ else ''
+ ),
+ language=language
+ )
+ else:
+ bot.shared_data['calc'][record_id].append(update)
+ # Edit the update with the button if a new text is specified
+ if not text:
+ return
+ return dict(
+ text='',
+ edit=dict(
+ text=text,
+ reply_markup=reply_markup
+ )
+ )
+
+
+def eval_(node):
+ """Evaluate ast nodes."""
+ if isinstance(node, ast.Num): #
+ return node.n
+ elif isinstance(node, ast.BinOp): #
+ return operators[type(node.op)](eval_(node.left), eval_(node.right))
+ elif isinstance(node, ast.UnaryOp): # e.g., -1
+ # noinspection PyArgumentList
+ return operators[type(node.op)](eval_(node.operand))
+ else:
+ raise Exception("Invalid operator")
+
+
+def evaluate_expression(expr):
+ """Evaluate expressions in a safe way."""
+ return eval_(
+ ast.parse(
+ expr,
+ mode='eval'
+ ).body
+ )
+
+
+def evaluate_expressions(bot: Bot,
+ expressions: str,
+ language: str = None) -> str:
+ """Evaluate a string containing lines of expressions.
+
+ `expressions` must be a string containing one expression per line.
+ """
+ line_result, result = 0, []
+ for line in expressions.split('\n'):
+ if not line:
+ continue
try:
- await bot.delete_message(update=update)
- except TelegramError:
+ line_result = evaluate_expression(
+ line.replace('_', str(line_result))
+ )
+ except Exception as e:
+ line_result = bot.get_message(
+ 'useful_tools', 'calculate_command', 'invalid_expression',
+ language=language,
+ error=e
+ )
+ result.append(
+ f"{line}
\n= {line_result}"
+ )
+ return '\n\n'.join(result)
+
+
+async def calculate_session(bot: Bot,
+ record_id: int,
+ language: str,
+ buffer_seconds: Union[int, float] = 1):
+ # Wait until input ends
+ queue = bot.shared_data['calc'][record_id]
+ queue_len = None
+ while queue_len != len(queue):
+ queue_len = len(queue)
+ await asyncio.sleep(buffer_seconds)
+ last_entry = max(queue, key=lambda u: u['id'], default=None)
+ # Delete record-associated queue
+ queue = queue.copy()
+ del bot.shared_data['calc'][record_id]
+
+ record = bot.db['calculations'].find_one(
+ id=record_id
+ )
+ if record is None:
+ logging.error("Invalid record identifier!")
+ return
+ expression = record['expression'] or ''
+ reply_markup = get_calculator_keyboard(additional_data=[record['id']])
+
+ # It would be nice to do:
+ # for update in sorted(queue, key=lambda u: u['id'])
+ # Alas, 'id's are not progressive... Telegram's fault!
+ for i, update in enumerate(queue):
+ if i % 5 == 0:
+ await asyncio.sleep(.1)
+ data = update['data']
+ if len(data) != 2:
+ logging.error(f"Something went wrong: invalid data received.\n{data}")
+ return
+ input_value = data[1]
+ if input_value == 'del':
+ expression = expression[:-1]
+ elif input_value == 'back':
pass
+ elif input_value in calc_buttons:
+ expression += calc_buttons[input_value]['value']
+ else:
+ logging.error(f"Invalid input from calculator button: {input_value}")
+ if record:
+ bot.db['calculations'].update(
+ dict(
+ id=record['id'],
+ modified=datetime.datetime.now(),
+ expression=expression
+ ),
+ ['id']
+ )
+ if expression:
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', 'result',
+ language=language,
+ expressions=evaluate_expressions(bot=bot,
+ expressions=expression,
+ language=language)
+ )
+ else:
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', 'instructions',
+ language=language
+ )
+ if last_entry is None:
+ return
+ await bot.edit_message_text(
+ text=text,
+ update=last_entry,
+ reply_markup=reply_markup
+ )
+
+
+async def _calculate_command(bot: Bot,
+ update: dict,
+ user_record: OrderedDict,
+ language: str,
+ command_name: str = 'calc'):
+ if 'reply_to_message' in update:
+ update = update['reply_to_message']
+ command_aliases = [command_name]
+ if command_name in bot.commands:
+ command_aliases += list(
+ bot.commands[command_name]['language_labelled_commands'].values()
+ ) + bot.commands[command_name]['aliases']
+ text = get_cleaned_text(bot=bot,
+ update=update,
+ replace=command_aliases)
+ if not text:
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', 'instructions',
+ language=language
+ )
+ reply_markup = get_calculator_keyboard()
+ else:
+ record_id = bot.db['calculations'].insert(
+ dict(
+ user_id=user_record['id'],
+ created=datetime.datetime.now(),
+ expression=text
+ )
+ )
+ text = bot.get_message(
+ 'useful_tools', 'calculate_command', 'result',
+ language=language,
+ expressions=evaluate_expressions(bot=bot,
+ expressions=text,
+ language=language)
+ )
+ reply_markup = get_calculator_keyboard(additional_data=[record_id])
+ await bot.send_message(text=text,
+ update=update,
+ reply_markup=reply_markup)
async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
@@ -82,6 +482,33 @@ async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
)
+async def _message_info_command(bot: Bot, update: dict, language: str):
+ """Provide information about selected update.
+
+ Selected update: the message `update` is sent in reply to. If `update` is
+ not a reply to anything, it gets selected.
+ The update containing the command, if sent in reply, is deleted.
+ """
+ if 'reply_to_message' in update:
+ selected_update = update['reply_to_message']
+ else:
+ selected_update = update
+ await bot.send_message(
+ text=bot.get_message(
+ 'useful_tools', 'info_command', 'result',
+ language=language,
+ info=json.dumps(selected_update, indent=2)
+ ),
+ update=update,
+ reply_to_message_id=selected_update['message_id'],
+ )
+ if selected_update != update:
+ try:
+ await bot.delete_message(update=update)
+ except TelegramError:
+ pass
+
+
async def _ping_command(bot: Bot, update: dict):
"""Return `pong` only in private chat."""
chat_id = bot.get_chat_id(update=update)
@@ -111,7 +538,7 @@ async def _when_command(bot: Bot, update: dict, language: str):
when=date
)
if 'forward_date' in update:
- original_datetime= (
+ original_datetime = (
datetime.datetime.fromtimestamp(update['forward_date'])
if 'forward_from' in update
else None
@@ -150,6 +577,53 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
useful_tools_messages
)
telegram_bot.messages['useful_tools'] = useful_tools_messages
+ telegram_bot.shared_data['calc'] = dict()
+
+ if 'calculations' not in telegram_bot.db.tables:
+ types = telegram_bot.db.types
+ table = telegram_bot.db.create_table(
+ table_name='calculations'
+ )
+ table.create_column(
+ 'user_id',
+ types.integer
+ )
+ table.create_column(
+ 'created',
+ types.datetime
+ )
+ table.create_column(
+ 'modified',
+ types.datetime
+ )
+ table.create_column(
+ 'expression',
+ types.string
+ )
+
+ @telegram_bot.command(command='/calc',
+ aliases=None,
+ reply_keyboard_button=None,
+ show_in_keyboard=False,
+ **{key: val for key, val
+ in useful_tools_messages['calculate_command'].items()
+ if key in ('description', 'help_section',
+ 'language_labelled_commands')},
+ authorization_level='everybody')
+ async def calculate_command(bot, update, user_record, language):
+ return await _calculate_command(bot=bot,
+ update=update,
+ user_record=user_record,
+ language=language,
+ command_name='calc')
+
+ @telegram_bot.button(prefix='calc:///',
+ separator='|',
+ authorization_level='everybody')
+ async def calculate_button(bot, update, user_record, language, data):
+ return await _calculate_button(bot=bot, user_record=user_record,
+ update=update,
+ language=language, data=data)
@telegram_bot.command(command='/info',
aliases=None,
@@ -159,7 +633,7 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
in useful_tools_messages['info_command'].items()
if key in ('description', 'help_section',
'language_labelled_commands')},
- authorization_level='moderator')
+ authorization_level='everybody')
async def message_info_command(bot, update, language):
return await _message_info_command(bot=bot,
update=update,