From d7244426042f3e70a46f04a2a9ec436a86349d96 Mon Sep 17 00:00:00 2001 From: Davte Date: Wed, 13 May 2020 15:28:49 +0200 Subject: [PATCH] /ban command implemented --- davtelepot/__init__.py | 2 +- davtelepot/authorization.py | 270 +++++++++++++++++++++++------------- davtelepot/messages.py | 4 +- davtelepot/suggestions.py | 2 +- 4 files changed, 181 insertions(+), 97 deletions(-) diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index a190c2f..102665e 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -11,7 +11,7 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __license__ = "GNU General Public License v3.0" -__version__ = "2.5.9" +__version__ = "2.5.10" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/authorization.py b/davtelepot/authorization.py index eea4b4f..53ac7f4 100644 --- a/davtelepot/authorization.py +++ b/davtelepot/authorization.py @@ -2,7 +2,7 @@ # Standard library modules from collections import OrderedDict -from typing import Callable, Union +from typing import Callable, List, Union # Project modules from .bot import Bot @@ -68,14 +68,14 @@ class Role: roles = OrderedDict() default_role_code = 100 - def __init__(self, code, name, symbol, singular, plural, - can_appoint, can_be_appointed_by): + def __init__(self, code: int, name: str, symbol: str, + singular: str, plural: str, + can_appoint: List[int], can_be_appointed_by: List[int]): """Instantiate Role object. code : int The higher the code, the less privileges are connected to that - role. - Use 0 for banned users. + role. Use 0 for banned users. name : str Short name for role. symbol : str @@ -84,7 +84,7 @@ class Role: Singular full name of role. plural : str Plural full name of role. - can_appoint : lsit of int + can_appoint : list of int List of role codes that this role can appoint. can_be_appointed_by : list of int List of role codes this role can be appointed by. @@ -99,42 +99,42 @@ class Role: self.__class__.roles[self.code] = self @property - def code(self): + def code(self) -> int: """Return code.""" return self._code @property - def name(self): + def name(self) -> str: """Return name.""" return self._name @property - def symbol(self): + def symbol(self) -> str: """Return symbol.""" return self._symbol @property - def singular(self): + def singular(self) -> str: """Return singular.""" return self._singular @property - def plural(self): + def plural(self) -> str: """Return plural.""" return self._plural @property - def can_appoint(self): + def can_appoint(self) -> List[int]: """Return can_appoint.""" return self._can_appoint @property - def can_be_appointed_by(self): + def can_be_appointed_by(self) -> List[int]: """Return roles whom this role can be appointed by.""" return self._can_be_appointed_by @classmethod - def get_by_role_id(cls, role_id=100): + def get_by_role_id(cls, role_id=100) -> 'Role': """Given a `role_id`, return the corresponding `Role` instance.""" for code, role in cls.roles.items(): if code == role_id: @@ -142,7 +142,7 @@ class Role: raise IndexError(f"Unknown role id: {role_id}") @classmethod - def get_role_by_name(cls, name='everybody'): + def get_role_by_name(cls, name='everybody') -> 'Role': """Given a `name`, return the corresponding `Role` instance.""" for role in cls.roles.values(): if role.name == name: @@ -150,7 +150,9 @@ class Role: raise IndexError(f"Unknown role name: {name}") @classmethod - def get_user_role(cls, user_record=None, user_role_id=None): + def get_user_role(cls, + user_record: OrderedDict = None, + user_role_id: int = None) -> 'Role': """Given a `user_record`, return its `Role`. `role_id` may be passed as keyword argument or as user_record. @@ -170,7 +172,7 @@ class Role: return cls.get_by_role_id(role_id=user_role_id) @classmethod - def set_default_role_code(cls, role): + def set_default_role_code(cls, role: int) -> None: """Set class default role code. It will be returned if a specific role code cannot be evaluated. @@ -178,30 +180,82 @@ class Role: cls.default_role_code = role @classmethod - def get_user_role_panel(cls, user_record): - """Get text and buttons for user role panel.""" - user_role = cls.get_user_role(user_record=user_record) - text = ( - """👤 {u[username]}\n""" + def get_user_role_text(cls, + user_record: OrderedDict, + user_role: 'Role' = None) -> str: + """ + Get a string to describe the role of a user. + + @param user_record: record of table `users` about the user; it must + contain at least a [username | last_name | first_name] and a + telegram identifier. + @param user_role: Role instance about user permissions. + @return: String to describe the role of a user, like this: + ``` + 👤 LinkedUsername + 🔑 Admin ⚜️ + ``` + """ + if user_role is None: + user_role = cls.get_user_role(user_record=user_record) + return ( + f"""👤 {get_user(record=user_record)}\n""" f"🔑 {user_role.singular.capitalize()} {user_role.symbol}" - ).format( - u=user_record, ) - buttons = [ + + @classmethod + def get_user_role_buttons(cls, + user_record: OrderedDict, + admin_record: OrderedDict, + user_role: 'Role' = None, + admin_role: 'Role' = None) -> List[dict]: + """ Return buttons to edit user permissions. + @param user_record: record of table `users` about the user; it must + contain at least a [username | last_name | first_name] and a + telegram identifier. + @param admin_record: record of table `users` about the admin; it must + contain at least a [username | last_name | first_name] and a + telegram identifier. + @param user_role: Role instance about user permissions. + @param admin_role: Role instance about admin permissions. + @return: list of `InlineKeyboardButton`s. + """ + if admin_role is None: + admin_role = cls.get_user_role(user_record=admin_record) + if user_role is None: + user_role = cls.get_user_role(user_record=user_record) + return [ make_button( f"{role.symbol} {role.singular.capitalize()}", prefix='auth:///', data=['set', user_record['id'], code] ) for code, role in cls.roles.items() + if (admin_role > user_role + and code in admin_role.can_appoint + and code != user_role.code) ] + + @classmethod + def get_user_role_text_and_buttons(cls, + user_record: OrderedDict, + admin_record: OrderedDict): + """Get text and buttons for user role panel.""" + admin_role = cls.get_user_role(user_record=admin_record) + user_role = cls.get_user_role(user_record=user_record) + text = cls.get_user_role_text(user_record=user_record, + user_role=user_role) + buttons = cls.get_user_role_buttons(user_record=user_record, + user_role=user_role, + admin_record=admin_record, + admin_role=admin_role) return text, buttons - def __eq__(self, other): + def __eq__(self, other: 'Role'): """Return True if self is equal to other.""" return self.code == other.code - def __gt__(self, other): + def __gt__(self, other: 'Role'): """Return True if self can appoint other.""" return ( ( @@ -211,19 +265,19 @@ class Role: and self.code in other.can_be_appointed_by ) - def __ge__(self, other): + def __ge__(self, other: 'Role'): """Return True if self >= other.""" return self.__gt__(other) or self.__eq__(other) - def __lt__(self, other): + def __lt__(self, other: 'Role'): """Return True if self can not appoint other.""" return not self.__ge__(other) - def __le__(self, other): + def __le__(self, other: 'Role'): """Return True if self is superior or equal to other.""" return not self.__gt__(other) - def __ne__(self, other): + def __ne__(self, other: 'Role'): """Return True if self is not equal to other.""" return not self.__eq__(other) @@ -232,7 +286,7 @@ class Role: return f"" -def get_authorization_function(bot): +def get_authorization_function(bot: Bot): """Take a `bot` and return its authorization_function.""" def is_authorized(update, user_record=None, authorization_level=2): @@ -258,51 +312,69 @@ def get_authorization_function(bot): return is_authorized -async def _authorization_command(bot, update, user_record): - text = get_cleaned_text(bot=bot, update=update, replace=['auth']) +async def _authorization_command(bot: Bot, + update: dict, + user_record: OrderedDict, + mode: str = 'auth'): + db = bot.db + text = get_cleaned_text(bot=bot, update=update, replace=[mode]) reply_markup = None - # noinspection PyUnusedLocal + admin_record = user_record.copy() + user_record = None + admin_role = bot.Role.get_user_role(user_record=admin_record) result = bot.get_message( 'authorization', 'auth_command', 'unhandled_case', - update=update, user_record=user_record + update=update, user_record=admin_record ) - if not text: - if 'reply_to_message' not in update: - return bot.get_message( + if not text: # No text provided: command must be used in reply + if 'reply_to_message' not in update: # No text and not in reply + result = bot.get_message( 'authorization', 'auth_command', 'instructions', - update=update, user_record=user_record + update=update, user_record=admin_record, + command=mode ) - else: - with bot.db as db: + else: # No text, command used in reply to another message + update = update['reply_to_message'] + # Forwarded message: get both the user who forwarded and the original author + if ('forward_from' in update + and update['from']['id'] != update['forward_from']['id']): + user_record = list( + db['users'].find( + telegram_id=[update['from']['id'], + update['forward_from']['id']] + ) + ) + else: # Otherwise: get the author of the message user_record = db['users'].find_one( - telegram_id=update['reply_to_message']['from']['id'] - ) - else: - with bot.db as db: - user_record = list( - db.query( - "SELECT * " - "FROM users " - "WHERE COALESCE(" - " first_name || last_name || username," - " last_name || username," - " first_name || username," - " username," - " first_name || last_name," - " last_name," - " first_name" - f") LIKE '%{text}%'" + telegram_id=update['from']['id'] ) + else: # Get users matching the input text + user_record = list( + db.query( + "SELECT * " + "FROM users " + "WHERE COALESCE(" + " first_name || last_name || username," + " last_name || username," + " first_name || username," + " username," + " first_name || last_name," + " last_name," + " first_name" + f") LIKE '%{text}%'" ) - if user_record is None: + ) + if len(user_record) == 1: + user_record = user_record[0] + if user_record is None: # If query was not provided and user cannot be found result = bot.get_message( 'authorization', 'auth_command', 'unknown_user', - update=update, user_record=user_record + update=update, user_record=admin_record ) - elif type(user_record) is list and len(user_record) > 1: + elif type(user_record) is list and len(user_record) > 1: # If many users match result = bot.get_message( 'authorization', 'auth_command', 'choose_user', - update=update, user_record=user_record, + update=update, user_record=admin_record, n=len(user_record) ) reply_markup = make_inline_keyboard( @@ -316,15 +388,25 @@ async def _authorization_command(bot, update, user_record): ], 3 ) - elif type(user_record) is list and len(user_record) == 0: + elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches result = bot.get_message( 'authorization', 'auth_command', 'no_match', - update=update, user_record=user_record, + update=update, user_record=admin_record, + ) + elif isinstance(user_record, dict): # If 1 user matches + # Ban user if admin can do it + user_role = bot.Role.get_user_role(user_record=user_record) + if mode == 'ban' and admin_role > user_role: + user_record['privileges'] = 0 + db['users'].update( + user_record, + ['id'] + ) + # Show user panel (text and buttons) to edit user permissions + result, buttons = bot.Role.get_user_role_text_and_buttons( + user_record=user_record, + admin_record=admin_record ) - else: - if type(user_record) is list: - user_record = user_record[0] - result, buttons = bot.Role.get_user_role_panel(user_record) reply_markup = make_inline_keyboard(buttons, 1) return dict( text=result, @@ -333,7 +415,10 @@ async def _authorization_command(bot, update, user_record): ) -async def _authorization_button(bot, update, user_record, data): +async def _authorization_button(bot: Bot, + update: dict, + user_record: OrderedDict, + data: Union[str, List[Union[int, str]]]): if len(data) == 0: data = [''] command, *arguments = data @@ -343,10 +428,13 @@ async def _authorization_button(bot, update, user_record, data): else: other_user_id = None result, text, reply_markup = '', '', None + db = bot.db if command in ['show']: - with bot.db as db: - other_user_record = db['users'].find_one(id=other_user_id) - text, buttons = bot.Role.get_user_role_panel(other_user_record) + other_user_record = db['users'].find_one(id=other_user_id) + text, buttons = bot.Role.get_user_role_text_and_buttons( + user_record=other_user_record, + admin_record=user_record + ) reply_markup = make_inline_keyboard(buttons, 1) elif command in ['set'] and len(arguments) > 1: other_user_id, new_privileges, *_ = arguments @@ -358,8 +446,7 @@ async def _authorization_button(bot, update, user_record, data): 'authorization', 'auth_button', 'confirm', update=update, user_record=user_record, ) - with bot.db as db: - other_user_record = db['users'].find_one(id=other_user_id) + other_user_record = db['users'].find_one(id=other_user_id) user_role = bot.Role.get_user_role(user_record=user_record) other_user_role = bot.Role.get_user_role(user_record=other_user_record) if other_user_role.code == new_privileges: @@ -404,20 +491,22 @@ async def _authorization_button(bot, update, user_record, data): 1 ) else: - with bot.db as db: - db['users'].update( - dict( - id=other_user_id, - privileges=new_privileges - ), - ['id'] - ) - other_user_record = db['users'].find_one(id=other_user_id) + db['users'].update( + dict( + id=other_user_id, + privileges=new_privileges + ), + ['id'] + ) + other_user_record = db['users'].find_one(id=other_user_id) result = bot.get_message( 'authorization', 'auth_button', 'appointed', update=update, user_record=user_record ) - text, buttons = bot.Role.get_user_role_panel(other_user_record) + text, buttons = bot.Role.get_user_role_text_and_buttons( + user_record=other_user_record, + admin_record=user_record + ) reply_markup = make_inline_keyboard(buttons, 1) if text: return dict( @@ -431,14 +520,9 @@ async def _authorization_button(bot, update, user_record, data): return result -async def _ban_command(bot, update, user_record): - # TODO define this function! - return - - def default_get_administrators_function(bot: Bot): return list( - bot.db['users'].find(privileges=[1,2]) + bot.db['users'].find(privileges=[1, 2]) ) @@ -498,6 +582,6 @@ def init(telegram_bot: Bot, @telegram_bot.command('/ban', aliases=[], show_in_keyboard=False, description=authorization_messages['ban_command']['description'], - authorization_level='admin') + authorization_level='moderator') async def ban_command(bot, update, user_record): - return await _ban_command(bot, update, user_record) + return await _authorization_command(bot, update, user_record, mode='ban') diff --git a/davtelepot/messages.py b/davtelepot/messages.py index 1d88067..7121f2f 100644 --- a/davtelepot/messages.py +++ b/davtelepot/messages.py @@ -649,9 +649,9 @@ default_authorization_messages = { }, 'instructions': { 'en': "Reply with this command to a user or write " - "/auth username to edit their permissions.", + "/{command} username to edit their permissions.", 'it': "Usa questo comando in risposta a un utente " - "oppure scrivi /auth username per " + "oppure scrivi /{command} username per " "cambiarne il grado di autorizzazione." }, 'unknown_user': { diff --git a/davtelepot/suggestions.py b/davtelepot/suggestions.py index de30225..63f72b8 100644 --- a/davtelepot/suggestions.py +++ b/davtelepot/suggestions.py @@ -162,7 +162,7 @@ async def _suggestions_button(bot: davtelepot.bot.Bot, update, user_record, data )['suggestion'] suggestion_message = bot.get_message( 'suggestions', 'suggestions_command', 'received_suggestion', 'text', - user=bot.Role.get_user_role_panel(registered_user)[0], + user=bot.Role.get_user_role_text(user_record=registered_user), suggestion=suggestion_text, bot=bot, update=update, user_record=user_record,