From 7233f9e478a121e219394b36f0f9004a390c95d4 Mon Sep 17 00:00:00 2001 From: Davte Date: Mon, 17 Aug 2020 23:51:50 +0200 Subject: [PATCH] General handler implemented and applied to contact updates --- davtelepot/__init__.py | 2 +- davtelepot/bot.py | 144 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 141 insertions(+), 5 deletions(-) diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index e956684..f3f4791 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -11,7 +11,7 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __license__ = "GNU General Public License v3.0" -__version__ = "2.6.10" +__version__ = "2.6.11" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/bot.py b/davtelepot/bot.py index dd5665a..235eb49 100644 --- a/davtelepot/bot.py +++ b/davtelepot/bot.py @@ -195,6 +195,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): self.text_message_parsers = OrderedDict() self.document_handlers = OrderedDict() self.individual_document_message_handlers = dict() + # General handlers + self.individual_handlers = dict() + self.handlers = OrderedDict() # Support for /help command self.messages['help_sections'] = OrderedDict() # Handle location messages @@ -990,10 +993,47 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): async def contact_message_handler(self, update, user_record, language=None): """Handle `contact` message update.""" - logging.info( - "A contact message update was received, " - "but this handler does nothing yet." - ) + return await self.general_handler(update=update, + user_record=user_record, + language=language, + update_type='contact') + + async def general_handler(self, update: dict, user_record: OrderedDict, + language: str, update_type: str): + replier, reply = None, None + user_id = update['from']['id'] if 'from' in update else None + if update_type not in self.individual_handlers: + self.individual_handlers[update_type] = dict() + if user_id in self.individual_handlers[update_type]: + replier = self.individual_handlers[update_type][user_id] + del self.individual_handlers[update_type][user_id] + else: + for check_function, handler in self.handlers[update_type].items(): + if check_function(update): + replier = handler['handler'] + break + if replier: + reply = await replier( + bot=self, + **{ + name: argument + for name, argument in locals().items() + if name in inspect.signature( + replier + ).parameters + } + ) + if reply: + if type(reply) is str: + reply = dict(text=reply) + try: + return await self.reply(update=update, **reply) + except Exception as e: + logging.error( + f"Failed to handle document:\n{e}", + exc_info=True + ) + return async def location_message_handler(self, update, user_record, language=None): """Handle `location` message update.""" @@ -2481,6 +2521,50 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): ) return parser_decorator + def handler(self, update_type: str, condition: Callable[[dict], bool], + description: str = '', + authorization_level: str = 'admin'): + """Decorator: define a handler for updates matching `condition`. + + You may provide a description and a minimum authorization level. + The first handler matching condition is called (other matching handlers + are ignored). + """ + if not callable(condition): + raise TypeError( + f'Condition {condition.__name__} is not a callable' + ) + + def parser_decorator(parser): + async def decorated_parser(bot, update, user_record, language=None): + logging.info( + f"{update_type} matching condition " + f"`{condition.__name__}@{bot.name}` from " + f"`{update['from'] if 'from' in update else update['chat']}`" + ) + if bot.authorization_function( + update=update, + user_record=user_record, + authorization_level=authorization_level + ): + # Pass supported arguments from locals() to parser + return await parser( + **{ + name: arg + for name, arg in locals().items() + if name in inspect.signature(parser).parameters + } + ) + return dict(text=bot.authorization_denied_message) + if update_type not in self.handlers: + self.handlers[update_type] = OrderedDict() + self.handlers[update_type][condition] = dict( + handler=decorated_parser, + description=description, + authorization_level=authorization_level, + ) + return parser_decorator + def set_command(self, command, handler, aliases=None, reply_keyboard_button=None, show_in_keyboard=False, description="", @@ -2810,6 +2894,58 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): del self.individual_voice_handlers[identifier] return + def set_individual_handler(self, handler, update_type: str, + update=None, user_id=None,): + """Set a custom `update_type` handler for the user. + + Any update of given type from the user will be handled by this custom + handler instead of default handler. + Custom handlers last one single use, but they can call this method and + set themselves as next custom handler. + """ + identifier = self.get_user_identifier( + user_id=user_id, + update=update + ) + assert callable(handler), (f"Handler `{handler.name}` is not " + "callable. Custom handler " + "could not be set.") + if update_type not in self.individual_handlers: + self.individual_handlers[update_type] = dict() + self.individual_handlers[update_type][identifier] = handler + return + + def remove_individual_handler(self, update_type: str, + update=None, user_id=None): + """Remove a custom `update_type` handler for the user. + + Any update of given type from the user will be handled by default + handler for its type. + """ + identifier = self.get_user_identifier( + user_id=user_id, + update=update + ) + if ( + update_type in self.individual_handlers + and identifier in self.individual_handlers[update_type] + ): + del self.individual_handlers[update_type][identifier] + return + + def set_individual_contact_message_handler(self, handler, + update: dict, + user_id: OrderedDict): + return self.set_individual_handler(handler=handler, + update_type='contact', + update=update, + user_id=user_id) + + def remove_individual_contact_handler(self, update=None, user_id=None): + return self.remove_individual_handler(update_type='contact', + update=update, + user_id=user_id) + def set_placeholder(self, chat_id, text=None, sent_message=None, timeout=1): """Set a placeholder chat action or text message.