General handler implemented and applied to contact updates

This commit is contained in:
Davte 2020-08-17 23:51:50 +02:00
parent e481623a75
commit 7233f9e478
Signed by: Davte
GPG Key ID: 209AE674A0007425
2 changed files with 141 additions and 5 deletions

View File

@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0"
__version__ = "2.6.10"
__version__ = "2.6.11"
__maintainer__ = "Davide Testa"
__contact__ = "t.me/davte"

View File

@ -195,6 +195,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
self.text_message_parsers = OrderedDict()
self.document_handlers = OrderedDict()
self.individual_document_message_handlers = dict()
# General handlers
self.individual_handlers = dict()
self.handlers = OrderedDict()
# Support for /help command
self.messages['help_sections'] = OrderedDict()
# Handle location messages
@ -990,10 +993,47 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
async def contact_message_handler(self, update, user_record, language=None):
"""Handle `contact` message update."""
logging.info(
"A contact message update was received, "
"but this handler does nothing yet."
return await self.general_handler(update=update,
user_record=user_record,
language=language,
update_type='contact')
async def general_handler(self, update: dict, user_record: OrderedDict,
language: str, update_type: str):
replier, reply = None, None
user_id = update['from']['id'] if 'from' in update else None
if update_type not in self.individual_handlers:
self.individual_handlers[update_type] = dict()
if user_id in self.individual_handlers[update_type]:
replier = self.individual_handlers[update_type][user_id]
del self.individual_handlers[update_type][user_id]
else:
for check_function, handler in self.handlers[update_type].items():
if check_function(update):
replier = handler['handler']
break
if replier:
reply = await replier(
bot=self,
**{
name: argument
for name, argument in locals().items()
if name in inspect.signature(
replier
).parameters
}
)
if reply:
if type(reply) is str:
reply = dict(text=reply)
try:
return await self.reply(update=update, **reply)
except Exception as e:
logging.error(
f"Failed to handle document:\n{e}",
exc_info=True
)
return
async def location_message_handler(self, update, user_record, language=None):
"""Handle `location` message update."""
@ -2481,6 +2521,50 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
)
return parser_decorator
def handler(self, update_type: str, condition: Callable[[dict], bool],
description: str = '',
authorization_level: str = 'admin'):
"""Decorator: define a handler for updates matching `condition`.
You may provide a description and a minimum authorization level.
The first handler matching condition is called (other matching handlers
are ignored).
"""
if not callable(condition):
raise TypeError(
f'Condition {condition.__name__} is not a callable'
)
def parser_decorator(parser):
async def decorated_parser(bot, update, user_record, language=None):
logging.info(
f"{update_type} matching condition "
f"`{condition.__name__}@{bot.name}` from "
f"`{update['from'] if 'from' in update else update['chat']}`"
)
if bot.authorization_function(
update=update,
user_record=user_record,
authorization_level=authorization_level
):
# Pass supported arguments from locals() to parser
return await parser(
**{
name: arg
for name, arg in locals().items()
if name in inspect.signature(parser).parameters
}
)
return dict(text=bot.authorization_denied_message)
if update_type not in self.handlers:
self.handlers[update_type] = OrderedDict()
self.handlers[update_type][condition] = dict(
handler=decorated_parser,
description=description,
authorization_level=authorization_level,
)
return parser_decorator
def set_command(self, command, handler, aliases=None,
reply_keyboard_button=None, show_in_keyboard=False,
description="",
@ -2810,6 +2894,58 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
del self.individual_voice_handlers[identifier]
return
def set_individual_handler(self, handler, update_type: str,
update=None, user_id=None,):
"""Set a custom `update_type` handler for the user.
Any update of given type from the user will be handled by this custom
handler instead of default handler.
Custom handlers last one single use, but they can call this method and
set themselves as next custom handler.
"""
identifier = self.get_user_identifier(
user_id=user_id,
update=update
)
assert callable(handler), (f"Handler `{handler.name}` is not "
"callable. Custom handler "
"could not be set.")
if update_type not in self.individual_handlers:
self.individual_handlers[update_type] = dict()
self.individual_handlers[update_type][identifier] = handler
return
def remove_individual_handler(self, update_type: str,
update=None, user_id=None):
"""Remove a custom `update_type` handler for the user.
Any update of given type from the user will be handled by default
handler for its type.
"""
identifier = self.get_user_identifier(
user_id=user_id,
update=update
)
if (
update_type in self.individual_handlers
and identifier in self.individual_handlers[update_type]
):
del self.individual_handlers[update_type][identifier]
return
def set_individual_contact_message_handler(self, handler,
update: dict,
user_id: OrderedDict):
return self.set_individual_handler(handler=handler,
update_type='contact',
update=update,
user_id=user_id)
def remove_individual_contact_handler(self, update=None, user_id=None):
return self.remove_individual_handler(update_type='contact',
update=update,
user_id=user_id)
def set_placeholder(self, chat_id,
text=None, sent_message=None, timeout=1):
"""Set a placeholder chat action or text message.