/calc command implemented

This commit is contained in:
Davte 2020-05-18 15:08:15 +02:00
parent 9ab3fb3616
commit 11d07b45d6
4 changed files with 571 additions and 27 deletions

View File

@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it" __email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0" __license__ = "GNU General Public License v3.0"
__version__ = "2.5.14" __version__ = "2.5.15"
__maintainer__ = "Davide Testa" __maintainer__ = "Davide Testa"
__contact__ = "t.me/davte" __contact__ = "t.me/davte"

View File

@ -2232,7 +2232,8 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
handler=decorated_command_handler, handler=decorated_command_handler,
description=description, description=description,
authorization_level=authorization_level, authorization_level=authorization_level,
language_labelled_commands=language_labelled_commands language_labelled_commands=language_labelled_commands,
aliases=aliases
) )
if type(description) is dict: if type(description) is dict:
self.messages['commands'][command] = dict( self.messages['commands'][command] = dict(

View File

@ -1008,6 +1008,75 @@ default_unknown_command_message = {
} }
default_useful_tools_messages = { default_useful_tools_messages = {
'calculate_command': {
'description': {
'en': "Do calculations",
'it': "Calcola",
},
'help_section': None,
'instructions': {
'en': "🔢 <b>Calculator</b> 🧮\n\n"
"Enter an algebraic expression after /calc to get its "
"result, or use the command in reply to a message containing "
"an expression, or use the keyboard below.\n\n"
"- <code></code>: show information about special keys\n",
'it': "🔢 <b>Calcolatrice</b> 🧮\n\n"
"Inserisci un'espressione algebrica dopo /calcola per "
"ottenerne il risultato, oppure usa il comando in risposta, "
"o ancora usa la tastiera qui sotto.\n\n"
"- <code></code>: mostra informazioni sui tasti speciali\n",
},
'invalid_expression': {
'en': "Invalid expression: {error}",
'it': "Espressione non valida: {error}",
},
'language_labelled_commands': {
'en': "calculate",
'it': "calcola",
},
'message_input': {
'en': "🔢 <b>Calculator</b> 🧮\n\n"
"<i>Enter an expression</i>",
'it': "🔢 <b>Calcolatrice</b> 🧮\n\n"
"<i>Mandami l'espressione</i>",
},
'special_keys': {
'en': "<b>Special keys</b>\n"
"- <code>**</code>: exponentiation\n"
"- <code>//</code>: floor division\n"
"- <code>mod</code>: modulus (remainder of division)\n"
"- <code>MR</code>: result of last expression\n"
"- <code></code>: show this help message\n"
"- <code>💬</code>: write your expression in a message\n"
"- <code>⬅️</code>: delete last character\n"
"- <code>✅</code>: start a new line (and a new expression)\n",
'it': "<b>Tasti speciali</b>\n"
"- <code>**</code>: elevamento a potenza\n"
"- <code>//</code>: quoziente della divisione\n"
"- <code>mod</code>: resto della divisione\n"
"- <code>MR</code>: risultato dell'espressione precedente\n"
"- <code></code>: mostra questo messaggio\n"
"- <code>💬</code>: invia un messaggio con l'espressione\n"
"- <code>⬅️</code>: cancella ultimo carattere\n"
"- <code>✅</code>: vai a capo (inizia una nuova espressione)\n",
},
'use_buttons': {
'en': "Use buttons to enter an algebraic expression.\n\n"
"<i>The input will be displayed after you stop typing for a "
"while.</i>",
'it': "Usa i pulsanti per comporre un'espressione algebrica.\n\n"
"<i>L'espressione verrà mostrata quando smetterai di "
"digitare per un po'.</i>",
},
'result': {
'en': "🔢 <b>Calculator</b> 🧮\n\n"
"<i>Expressions evaluation:</i>\n\n"
"{expressions}",
'it': "🔢 <b>Calcolatrice</b> 🧮\n\n"
"<i>Risultato delle espresisoni:</i>\n\n"
"{expressions}",
},
},
'info_command': { 'info_command': {
'description': { 'description': {
'en': "Use this command in reply to get information about a message", 'en': "Use this command in reply to get information about a message",

View File

@ -1,43 +1,443 @@
"""General purpose functions for Telegram bots.""" """General purpose functions for Telegram bots."""
# Standard library # Standard library
import ast
import asyncio
import datetime import datetime
import json import json
import logging
import operator
from collections import OrderedDict from collections import OrderedDict
from typing import List, Union
# Project modules # Project modules
from .api import TelegramError from .api import TelegramError
from .bot import Bot from .bot import Bot
from .messages import default_useful_tools_messages from .messages import default_useful_tools_messages
from .utilities import get_cleaned_text, recursive_dictionary_update, get_user from .utilities import (get_cleaned_text, get_user, make_button,
make_inline_keyboard, recursive_dictionary_update, )
async def _message_info_command(bot: Bot, update: dict, language: str): def get_calc_buttons() -> OrderedDict:
"""Provide information about selected update. buttons = OrderedDict()
buttons['**'] = dict(
Selected update: the message `update` is sent in reply to. If `update` is value='**',
not a reply to anything, it gets selected. symbol='**',
The update containing the command, if sent in reply, is deleted. order='A1',
"""
if 'reply_to_message' in update:
selected_update = update['reply_to_message']
else:
selected_update = update
await bot.send_message(
text=bot.get_message(
'useful_tools', 'info_command', 'result',
language=language,
info=json.dumps(selected_update, indent=2)
),
update=update,
reply_to_message_id=selected_update['message_id'],
) )
if selected_update != update: buttons['//'] = dict(
value=' // ',
symbol='//',
order='A2',
)
buttons['%'] = dict(
value=' % ',
symbol='mod',
order='A3',
)
buttons['_'] = dict(
value='_',
symbol='MR',
order='B5',
)
buttons[0] = dict(
value='0',
symbol='0',
order='E1',
)
buttons[1] = dict(
value='1',
symbol='1',
order='D1',
)
buttons[2] = dict(
value='2',
symbol='2',
order='D2',
)
buttons[3] = dict(
value='3',
symbol='3',
order='D3',
)
buttons[4] = dict(
value='4',
symbol='4',
order='C1',
)
buttons[5] = dict(
value='5',
symbol='5',
order='C2',
)
buttons[6] = dict(
value='6',
symbol='6',
order='C3',
)
buttons[7] = dict(
value='7',
symbol='7',
order='B1',
)
buttons[8] = dict(
value='8',
symbol='8',
order='B2',
)
buttons[9] = dict(
value='9',
symbol='9',
order='B3',
)
buttons['+'] = dict(
value=' + ',
symbol='+',
order='B4',
)
buttons['-'] = dict(
value=' - ',
symbol='-',
order='C4',
)
buttons['*'] = dict(
value=' * ',
symbol='*',
order='D4',
)
buttons['/'] = dict(
value=' / ',
symbol='/',
order='E4',
)
buttons['.'] = dict(
value='.',
symbol='.',
order='E2',
)
buttons['thousands'] = dict(
value='000',
symbol='000',
order='E3',
)
buttons['end'] = dict(
value='\n',
symbol='',
order='F1',
)
buttons['del'] = dict(
value='del',
symbol='⬅️',
order='E5',
)
buttons['('] = dict(
value='(',
symbol='(',
order='A4',
)
buttons[')'] = dict(
value=')',
symbol=')',
order='A5',
)
buttons['info'] = dict(
value='info',
symbol='',
order='C5',
)
buttons['parser'] = dict(
value='parser',
symbol='💬️',
order='D5',
)
return buttons
def get_operators() -> dict:
def multiply(a, b):
"""Call operator.mul only if a and b are small enough."""
if abs(max(a, b)) > 10 ** 21:
raise Exception("Numbers were too large!")
return operator.mul(a, b)
def power(a, b):
"""Call operator.pow only if a and b are small enough."""
if abs(a) > 1000 or abs(b) > 100:
raise Exception("Numbers were too large!")
return operator.pow(a, b)
return {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: multiply,
ast.Div: operator.truediv,
ast.Pow: power,
ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod
}
calc_buttons = get_calc_buttons()
operators = get_operators()
def get_calculator_keyboard(additional_data: list = None):
if additional_data is None:
additional_data = []
return make_inline_keyboard(
[
make_button(
text=button['symbol'],
prefix='calc:///',
delimiter='|',
data=[*additional_data, code]
)
for code, button in sorted(calc_buttons.items(),
key=lambda b: b[1]['order'])
],
5
)
async def _calculate_button(bot: Bot,
update: dict,
user_record: OrderedDict,
language: str,
data: List[Union[int, str]]):
text, reply_markup = '', None
if len(data) < 2:
record_id = bot.db['calculations'].insert(
dict(
user_id=user_record['id'],
created=datetime.datetime.now()
)
)
data = [record_id, *data]
text = bot.get_message(
'useful_tools', 'calculate_command', 'use_buttons',
language=language
)
else:
record_id = data[0]
reply_markup = get_calculator_keyboard(
additional_data=([record_id] if record_id else None)
)
if record_id not in bot.shared_data['calc']:
bot.shared_data['calc'][record_id] = []
asyncio.ensure_future(
calculate_session(bot=bot,
record_id=record_id,
language=language)
)
update['data'] = data
if len(data) and data[-1] in ('info', 'parser'):
command = data[-1]
if command == 'parser':
reply_markup = None
bot.set_individual_text_message_handler(
handler=_calculate_command,
user_id=user_record['telegram_id']
)
elif command == 'info':
reply_markup = make_inline_keyboard(
[
make_button(
text='Ok',
prefix='calc:///',
delimiter='|',
data=[record_id, 'back']
)
]
)
text = bot.get_message(
'useful_tools', 'calculate_command', (
'special_keys' if command == 'info'
else 'message_input' if command == 'parser'
else ''
),
language=language
)
else:
bot.shared_data['calc'][record_id].append(update)
# Edit the update with the button if a new text is specified
if not text:
return
return dict(
text='',
edit=dict(
text=text,
reply_markup=reply_markup
)
)
def eval_(node):
"""Evaluate ast nodes."""
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, ast.BinOp): # <left> <operator> <right>
return operators[type(node.op)](eval_(node.left), eval_(node.right))
elif isinstance(node, ast.UnaryOp): # <operator> <operand> e.g., -1
# noinspection PyArgumentList
return operators[type(node.op)](eval_(node.operand))
else:
raise Exception("Invalid operator")
def evaluate_expression(expr):
"""Evaluate expressions in a safe way."""
return eval_(
ast.parse(
expr,
mode='eval'
).body
)
def evaluate_expressions(bot: Bot,
expressions: str,
language: str = None) -> str:
"""Evaluate a string containing lines of expressions.
`expressions` must be a string containing one expression per line.
"""
line_result, result = 0, []
for line in expressions.split('\n'):
if not line:
continue
try: try:
await bot.delete_message(update=update) line_result = evaluate_expression(
except TelegramError: line.replace('_', str(line_result))
)
except Exception as e:
line_result = bot.get_message(
'useful_tools', 'calculate_command', 'invalid_expression',
language=language,
error=e
)
result.append(
f"<code>{line}</code>\n<b>= {line_result}</b>"
)
return '\n\n'.join(result)
async def calculate_session(bot: Bot,
record_id: int,
language: str,
buffer_seconds: Union[int, float] = 1):
# Wait until input ends
queue = bot.shared_data['calc'][record_id]
queue_len = None
while queue_len != len(queue):
queue_len = len(queue)
await asyncio.sleep(buffer_seconds)
last_entry = max(queue, key=lambda u: u['id'], default=None)
# Delete record-associated queue
queue = queue.copy()
del bot.shared_data['calc'][record_id]
record = bot.db['calculations'].find_one(
id=record_id
)
if record is None:
logging.error("Invalid record identifier!")
return
expression = record['expression'] or ''
reply_markup = get_calculator_keyboard(additional_data=[record['id']])
# It would be nice to do:
# for update in sorted(queue, key=lambda u: u['id'])
# Alas, 'id's are not progressive... Telegram's fault!
for i, update in enumerate(queue):
if i % 5 == 0:
await asyncio.sleep(.1)
data = update['data']
if len(data) != 2:
logging.error(f"Something went wrong: invalid data received.\n{data}")
return
input_value = data[1]
if input_value == 'del':
expression = expression[:-1]
elif input_value == 'back':
pass pass
elif input_value in calc_buttons:
expression += calc_buttons[input_value]['value']
else:
logging.error(f"Invalid input from calculator button: {input_value}")
if record:
bot.db['calculations'].update(
dict(
id=record['id'],
modified=datetime.datetime.now(),
expression=expression
),
['id']
)
if expression:
text = bot.get_message(
'useful_tools', 'calculate_command', 'result',
language=language,
expressions=evaluate_expressions(bot=bot,
expressions=expression,
language=language)
)
else:
text = bot.get_message(
'useful_tools', 'calculate_command', 'instructions',
language=language
)
if last_entry is None:
return
await bot.edit_message_text(
text=text,
update=last_entry,
reply_markup=reply_markup
)
async def _calculate_command(bot: Bot,
update: dict,
user_record: OrderedDict,
language: str,
command_name: str = 'calc'):
if 'reply_to_message' in update:
update = update['reply_to_message']
command_aliases = [command_name]
if command_name in bot.commands:
command_aliases += list(
bot.commands[command_name]['language_labelled_commands'].values()
) + bot.commands[command_name]['aliases']
text = get_cleaned_text(bot=bot,
update=update,
replace=command_aliases)
if not text:
text = bot.get_message(
'useful_tools', 'calculate_command', 'instructions',
language=language
)
reply_markup = get_calculator_keyboard()
else:
record_id = bot.db['calculations'].insert(
dict(
user_id=user_record['id'],
created=datetime.datetime.now(),
expression=text
)
)
text = bot.get_message(
'useful_tools', 'calculate_command', 'result',
language=language,
expressions=evaluate_expressions(bot=bot,
expressions=text,
language=language)
)
reply_markup = get_calculator_keyboard(additional_data=[record_id])
await bot.send_message(text=text,
update=update,
reply_markup=reply_markup)
async def _length_command(bot: Bot, update: dict, user_record: OrderedDict): async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
@ -82,6 +482,33 @@ async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
) )
async def _message_info_command(bot: Bot, update: dict, language: str):
"""Provide information about selected update.
Selected update: the message `update` is sent in reply to. If `update` is
not a reply to anything, it gets selected.
The update containing the command, if sent in reply, is deleted.
"""
if 'reply_to_message' in update:
selected_update = update['reply_to_message']
else:
selected_update = update
await bot.send_message(
text=bot.get_message(
'useful_tools', 'info_command', 'result',
language=language,
info=json.dumps(selected_update, indent=2)
),
update=update,
reply_to_message_id=selected_update['message_id'],
)
if selected_update != update:
try:
await bot.delete_message(update=update)
except TelegramError:
pass
async def _ping_command(bot: Bot, update: dict): async def _ping_command(bot: Bot, update: dict):
"""Return `pong` only in private chat.""" """Return `pong` only in private chat."""
chat_id = bot.get_chat_id(update=update) chat_id = bot.get_chat_id(update=update)
@ -150,6 +577,53 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
useful_tools_messages useful_tools_messages
) )
telegram_bot.messages['useful_tools'] = useful_tools_messages telegram_bot.messages['useful_tools'] = useful_tools_messages
telegram_bot.shared_data['calc'] = dict()
if 'calculations' not in telegram_bot.db.tables:
types = telegram_bot.db.types
table = telegram_bot.db.create_table(
table_name='calculations'
)
table.create_column(
'user_id',
types.integer
)
table.create_column(
'created',
types.datetime
)
table.create_column(
'modified',
types.datetime
)
table.create_column(
'expression',
types.string
)
@telegram_bot.command(command='/calc',
aliases=None,
reply_keyboard_button=None,
show_in_keyboard=False,
**{key: val for key, val
in useful_tools_messages['calculate_command'].items()
if key in ('description', 'help_section',
'language_labelled_commands')},
authorization_level='everybody')
async def calculate_command(bot, update, user_record, language):
return await _calculate_command(bot=bot,
update=update,
user_record=user_record,
language=language,
command_name='calc')
@telegram_bot.button(prefix='calc:///',
separator='|',
authorization_level='everybody')
async def calculate_button(bot, update, user_record, language, data):
return await _calculate_button(bot=bot, user_record=user_record,
update=update,
language=language, data=data)
@telegram_bot.command(command='/info', @telegram_bot.command(command='/info',
aliases=None, aliases=None,
@ -159,7 +633,7 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
in useful_tools_messages['info_command'].items() in useful_tools_messages['info_command'].items()
if key in ('description', 'help_section', if key in ('description', 'help_section',
'language_labelled_commands')}, 'language_labelled_commands')},
authorization_level='moderator') authorization_level='everybody')
async def message_info_command(bot, update, language): async def message_info_command(bot, update, language):
return await _message_info_command(bot=bot, return await _message_info_command(bot=bot,
update=update, update=update,