diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index 506eb13..eb32be4 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -14,12 +14,12 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __license__ = "GNU General Public License v3.0" -__version__ = "2.3.30" +__version__ = "2.4.0" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" # Legacy module; please use `from davtelepot.bot import Bot` from now on from .custombot import Bot -from . import administration_tools, authorization, bot, helper, utilities +from . import administration_tools, authorization, bot, helper, suggestions, utilities -__all__ = [administration_tools, authorization, Bot, bot, helper, utilities] +__all__ = [administration_tools, authorization, Bot, bot, helper, suggestions, utilities] diff --git a/davtelepot/authorization.py b/davtelepot/authorization.py index a1005cb..5234df2 100644 --- a/davtelepot/authorization.py +++ b/davtelepot/authorization.py @@ -4,6 +4,7 @@ from collections import OrderedDict # Project modules +from .bot import Bot from .utilities import ( Confirmator, get_cleaned_text, get_user, make_button, make_inline_keyboard ) @@ -132,12 +133,20 @@ class Role(): @classmethod def get_by_role_id(cls, role_id=100): - """Give a `role_id`, return the corresponding `Role` instance.""" + """Given a `role_id`, return the corresponding `Role` instance.""" for code, role in cls.roles.items(): if code == role_id: return role raise IndexError(f"Unknown role id: {role_id}") + @classmethod + def get_role_by_name(cls, name='everybody'): + """Given a `name`, return the corresponding `Role` instance.""" + for role in cls.roles.values(): + if role.name == name: + return role + raise IndexError(f"Unknown role name: {name}") + @classmethod def get_user_role(cls, user_record=None, user_role_id=None): """Given a `user_record`, return its `Role`. @@ -488,7 +497,7 @@ async def _ban_command(bot, update, user_record): return -def init(bot, roles=None, authorization_messages=None): +def init(bot: Bot, roles=None, authorization_messages=None): """Set bot roles and assign role-related commands. Pass an OrderedDict of `roles` to get them set. @@ -496,7 +505,7 @@ def init(bot, roles=None, authorization_messages=None): class _Role(Role): roles = OrderedDict() - bot.Role = _Role + bot.set_role_class(_Role) if roles is None: roles = DEFAULT_ROLES # Cast roles to OrderedDict diff --git a/davtelepot/bot.py b/davtelepot/bot.py index 06910f3..3819743 100644 --- a/davtelepot/bot.py +++ b/davtelepot/bot.py @@ -219,6 +219,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): self._errors_file_name = None self.placeholder_requests = dict() self.shared_data = dict() + self.Role = None # Add `users` table with its fields if missing self.db['users'].upsert( dict( @@ -2814,3 +2815,10 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): finally: cls.loop.run_until_complete(cls.stop_app()) return cls.final_state + + def set_role_class(self, role): + """Set a Role class for bot. + + `role` must be an instance of `authorization.Role`. + """ + self.Role = role diff --git a/davtelepot/messages.py b/davtelepot/messages.py index c180f89..725d4f2 100644 --- a/davtelepot/messages.py +++ b/davtelepot/messages.py @@ -37,3 +37,140 @@ default_help_messages = { 'it': "Comandi πŸ€–", }, } + +default_suggestion_messages = { + 'suggestions_command': { + 'command': "/suggestion", + 'aliases': [ + "/suggestions", "/ideas", + "/suggerimento", "/suggerimenti", "idee" + ], + 'reply_keyboard_button': { + 'en': "Ideas πŸ’‘", + 'it': "Idee πŸ’‘" + }, + 'description': { + 'en': "Send a suggestion to help improve the bot", + 'it': "Invia un suggerimento per aiutare a migliorare il bot" + }, + 'prompt_text': { + 'en': ( + "Send a suggestion to bot administrator.\n\n" + "Maximum 1500 characters (extra ones will be ignored).\n" + "If you need more space, you may create a telegra.ph topic and link it here.\n\n" + "/cancel if you misclicked." + ), + 'it': ( + "Inserisci un suggerimento da inviare agli amministratori.\n\n" + "Massimo 1500 caratteri (quelli in piΓΉ non verranno registrati).\n" + "Se ti serve maggiore libertΓ , puoi per esempio creare un topic " + "su telegra.ph e linkarlo qui!\n\n" + "/annulla se hai clickato per errore." + ), + }, + 'prompt_popup': { + 'en': ( + "Send a suggestion" + ), + 'it': ( + "Inserisci un suggerimento" + ), + }, + 'entered_suggestion': { + 'text': { + 'en': ( + "Entered suggestions:\n\n" + "{suggestion}\n\n" + "Do you want to send it to bot administrators?" + ), + 'it': ( + "Suggerimento inserito:\n\n" + "{suggestion}\n\n" + "Vuoi inviarlo agli amministratori?" + ), + }, + 'buttons': { + 'send': { + 'en': "Send it! πŸ“§", + 'it': "Invia! πŸ“§", + }, + 'cancel': { + 'en': "Cancel ❌", + 'it': "Annulla ❌", + }, + } + }, + 'received_suggestion': { + 'text': { + 'en': ( + "πŸ’‘ We received a new suggestion! πŸ’‘\n\n" + "{user}\n\n" + "{suggestion}\n\n" + "#suggestions #{bot.name}" + ), + 'it': ( + "πŸ’‘ Abbiamo ricevuto un nuovo suggerimento! πŸ’‘\n\n" + "{user}\n\n" + "{suggestion}\n\n" + "#suggestions #{bot.name}" + ), + }, + 'buttons': { + 'new': { + 'en': "New suggestion πŸ’‘", + 'it': "Nuovo suggerimento πŸ’‘", + }, + }, + }, + 'invalid_suggestion': { + 'en': "Invalid suggestion.", + 'it': "Suggerimento non valido." + }, + 'cancel_messages': { + 'en': ['cancel'], + 'it': ['annulla', 'cancella'], + }, + 'operation_cancelled': { + 'en': "Operation cancelled.", + 'it': "Operazione annullata con successo.", + }, + 'suggestion_sent': { + 'popup': { + 'en': "Thanks!", + 'it': "Grazie!", + }, + 'text': { + 'en': ( + "πŸ’‘ Suggestion sent, thank you! πŸ’‘\n\n" + "{suggestion}\n\n" + "#suggestions #{bot.name}" + ), + 'it': ( + "πŸ’‘ Suggerimento inviato, grazie! πŸ’‘\n\n" + "{suggestion}\n\n" + "#suggerimenti #{bot.name}" + ), + }, + } + }, + 'suggestions_button': { + 'file_name': { + 'en': "Suggestions.csv", + 'it': "Suggerimenti.csv", + }, + 'file_caption': { + 'en': "Here is the suggestions file.", + 'it': "Ecco il file dei suggerimenti.", + } + }, + 'see_suggestions': { + 'command': "/getsuggestions", + 'aliases': [ + "/vedisuggerimenti", + ], + 'description': { + 'en': "Get a file containing all suggestions", + 'it': "Richiedi un file con tutti i suggerimenti" + }, + } +} diff --git a/davtelepot/suggestions.py b/davtelepot/suggestions.py new file mode 100644 index 0000000..381b7c8 --- /dev/null +++ b/davtelepot/suggestions.py @@ -0,0 +1,286 @@ +"""Receive structured suggestions from bot users.""" + +# Standard library modules +import asyncio +import datetime + +# Third party modules +import davtelepot + +# Project modules +from .messages import default_suggestion_messages +from .utilities import ( + async_wrapper, get_cleaned_text, make_button, + make_inline_keyboard, send_csv_file +) + + +async def _handle_suggestion_message(bot: davtelepot.bot.Bot, update, user_record, try_no=1, + suggestion_prefixes=None): + if suggestion_prefixes is None: + suggestion_prefixes = [] + suggestion_prefixes = [prefix.strip('/') for prefix in suggestion_prefixes] + user_id = user_record['id'] + telegram_id = user_record['telegram_id'] + text = get_cleaned_text( + update, + bot, + suggestion_prefixes + ) + text = text.strip(' /')[:1500] + if not text: + if try_no < 2: + bot.set_individual_text_message_handler( + await async_wrapper( + _handle_suggestion_message, + bot=bot, + update=update, + user_record=user_record, + try_no=(try_no + 1), + suggestion_prefixes=suggestion_prefixes + ), + user_id=telegram_id + ) + return dict( + chat_id=telegram_id, + reply_markup=dict( + force_reply=True + ), + text=bot.get_message( + 'suggestions', 'suggestions_command', 'prompt_text', + update=update, user_record=user_record + ) + ) + return bot.get_message( + 'suggestions', 'suggestions_command', 'invalid_suggestion', + update=update, user_record=user_record + ) + if text.lower() in bot.messages['suggestions']['suggestions_command']['cancel_messages']: + return bot.get_message( + 'suggestions', 'suggestions_command', 'operation_cancelled', + update=update, user_record=user_record + ) + created = datetime.datetime.now() + with bot.db as db: + db['suggestions'].insert( + dict( + user_id=user_id, + suggestion=text, + created=created + ), + ensure=True + ) + suggestion_id = db['suggestions'].find_one( + user_id=user_id, + created=created + )['id'] + text = bot.get_message( + 'suggestions', 'suggestions_command', 'entered_suggestion', 'text', + suggestion=text, + update=update, user_record=user_record + ) + reply_markup = make_inline_keyboard( + [ + make_button( + bot.get_message( + 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'send', + update=update, user_record=user_record + ), + prefix='suggest:///', + delimiter='|', + data=['confirm', suggestion_id] + ), + make_button( + bot.get_message( + 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'cancel', + update=update, user_record=user_record + ), + prefix='suggest:///', + delimiter='|', + data=['cancel'] + ) + ] + ) + return dict( + chat_id=telegram_id, + text=text, + parse_mode='HTML', + reply_markup=reply_markup + ) + + +async def _suggestions_button(bot: davtelepot.bot.Bot, update, user_record, data): + command = data[0] + user_id = update['from']['id'] + result, text, reply_markup = '', '', None + if command in ['new']: + bot.set_individual_text_message_handler( + _handle_suggestion_message, + user_id=user_id + ) + asyncio.ensure_future( + bot.send_message( + chat_id=user_id, + reply_markup=dict( + force_reply=True + ), + text=bot.get_message( + 'suggestions', 'suggestions_command', 'prompt_text', + update=update, user_record=user_record + ) + ) + ) + result = bot.get_message( + 'suggestions', 'suggestions_command', 'prompt_popup', + update=update, user_record=user_record + ) + elif command in ['cancel']: + result = 'Operazione annullata' + text = 'Operazione annullata con successo.' + reply_markup = None + elif command in ['confirm'] and len(data) > 1: + suggestion_id = data[1] + when = datetime.datetime.now() + with bot.db as db: + registered_user = db['users'].find_one(telegram_id=user_id) + admins = [ + x['telegram_id'] + for x in db['users'].find( + privileges=[ + bot.Role.get_role_by_name('admin').code, + bot.Role.get_role_by_name('founder').code + ] + ) + ] + db['suggestions'].update( + dict( + id=suggestion_id, + sent=when + ), + ['id'], + ensure=True + ) + suggestion_text = db['suggestions'].find_one( + id=suggestion_id + )['suggestion'] + suggestion_message = bot.get_message( + 'suggestions', 'suggestions_command', 'received_suggestion', 'text', + user=bot.Role.get_user_role_panel(registered_user)[0], + suggestion=suggestion_text, + bot=bot, + update=update, user_record=user_record, + ) + for admin in admins: + when += datetime.timedelta(seconds=1) + asyncio.ensure_future( + bot.send_message( + chat_id=admin, + text=suggestion_message, + parse_mode='HTML' + ) + ) + reply_markup = make_inline_keyboard( + [ + make_button( + text=bot.get_message( + 'suggestions', 'suggestions_command', 'received_suggestion', 'buttons', 'new', + bot=bot, + update=update, user_record=user_record, + ), + prefix='suggest:///', + delimiter='|', + data=['new'] + ) + ], + 1 + ) + result = bot.get_message( + 'suggestions', 'suggestions_command', 'suggestion_sent', 'popup', + suggestion=suggestion_text, bot=bot, + update=update, user_record=user_record, + ) + text = bot.get_message( + 'suggestions', 'suggestions_command', 'suggestion_sent', 'text', + suggestion=suggestion_text, bot=bot, + update=update, user_record=user_record, + ) + if text: + return dict( + text=result, + edit=dict( + text=text, + reply_markup=reply_markup, + parse_mode='HTML' + ) + ) + return result + + +async def _see_suggestions(bot: davtelepot.bot.Bot, update, user_record): + chat_id = update['from']['id'] + query = ( + "SELECT u.username, u.privileges, s.created, s.sent, s.suggestion " + "FROM suggestions s " + "LEFT JOIN users u " + "ON u.id = s.user_id " + "ORDER BY s.created" + ) + await send_csv_file( + bot=bot, + chat_id=chat_id, + query=query, + caption=bot.get_message( + 'suggestions', 'suggestions_button', 'file_caption', + user_record=user_record, update=update + ), + file_name=bot.get_message( + 'suggestions', 'suggestions_button', 'file_name', + user_record=user_record, update=update + ), + update=update, + user_record=user_record + ) + + +def init(telegram_bot: davtelepot.bot.Bot, suggestion_messages=default_suggestion_messages): + """Set suggestion handling for `bot`.""" + telegram_bot.messages['suggestions'] = suggestion_messages + suggestion_prefixes = ( + list(suggestion_messages['suggestions_command']['reply_keyboard_button'].values()) + + [suggestion_messages['suggestions_command']['command']] + + suggestion_messages['suggestions_command']['aliases'] + ) + + @telegram_bot.command(command=suggestion_messages['suggestions_command']['command'], + aliases=suggestion_messages['suggestions_command']['aliases'], + reply_keyboard_button=( + suggestion_messages['suggestions_command']['reply_keyboard_button'] + ), + show_in_keyboard=True, + description=suggestion_messages['suggestions_command']['description'], + authorization_level='everybody') + async def suggestions_command(bot, update, user_record): + return await _handle_suggestion_message( + bot=bot, + update=update, + user_record=user_record, + try_no=1, + suggestion_prefixes=suggestion_prefixes + ) + + @telegram_bot.button(prefix='suggest:///', separator='|', + authorization_level='everybody') + async def suggestions_button(bot, update, user_record, data): + return await _suggestions_button( + bot=bot, update=update, + user_record=user_record, data=data + ) + + @telegram_bot.command(command=suggestion_messages['see_suggestions']['command'], + aliases=suggestion_messages['see_suggestions']['aliases'], + description=( + suggestion_messages['see_suggestions']['description'] + ), + authorization_level='admin') + async def see_suggestions(bot, update, user_record): + return await _see_suggestions(bot, update, user_record)