diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py
index a190c2f..102665e 100644
--- a/davtelepot/__init__.py
+++ b/davtelepot/__init__.py
@@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0"
-__version__ = "2.5.9"
+__version__ = "2.5.10"
__maintainer__ = "Davide Testa"
__contact__ = "t.me/davte"
diff --git a/davtelepot/authorization.py b/davtelepot/authorization.py
index eea4b4f..53ac7f4 100644
--- a/davtelepot/authorization.py
+++ b/davtelepot/authorization.py
@@ -2,7 +2,7 @@
# Standard library modules
from collections import OrderedDict
-from typing import Callable, Union
+from typing import Callable, List, Union
# Project modules
from .bot import Bot
@@ -68,14 +68,14 @@ class Role:
roles = OrderedDict()
default_role_code = 100
- def __init__(self, code, name, symbol, singular, plural,
- can_appoint, can_be_appointed_by):
+ def __init__(self, code: int, name: str, symbol: str,
+ singular: str, plural: str,
+ can_appoint: List[int], can_be_appointed_by: List[int]):
"""Instantiate Role object.
code : int
The higher the code, the less privileges are connected to that
- role.
- Use 0 for banned users.
+ role. Use 0 for banned users.
name : str
Short name for role.
symbol : str
@@ -84,7 +84,7 @@ class Role:
Singular full name of role.
plural : str
Plural full name of role.
- can_appoint : lsit of int
+ can_appoint : list of int
List of role codes that this role can appoint.
can_be_appointed_by : list of int
List of role codes this role can be appointed by.
@@ -99,42 +99,42 @@ class Role:
self.__class__.roles[self.code] = self
@property
- def code(self):
+ def code(self) -> int:
"""Return code."""
return self._code
@property
- def name(self):
+ def name(self) -> str:
"""Return name."""
return self._name
@property
- def symbol(self):
+ def symbol(self) -> str:
"""Return symbol."""
return self._symbol
@property
- def singular(self):
+ def singular(self) -> str:
"""Return singular."""
return self._singular
@property
- def plural(self):
+ def plural(self) -> str:
"""Return plural."""
return self._plural
@property
- def can_appoint(self):
+ def can_appoint(self) -> List[int]:
"""Return can_appoint."""
return self._can_appoint
@property
- def can_be_appointed_by(self):
+ def can_be_appointed_by(self) -> List[int]:
"""Return roles whom this role can be appointed by."""
return self._can_be_appointed_by
@classmethod
- def get_by_role_id(cls, role_id=100):
+ def get_by_role_id(cls, role_id=100) -> 'Role':
"""Given a `role_id`, return the corresponding `Role` instance."""
for code, role in cls.roles.items():
if code == role_id:
@@ -142,7 +142,7 @@ class Role:
raise IndexError(f"Unknown role id: {role_id}")
@classmethod
- def get_role_by_name(cls, name='everybody'):
+ def get_role_by_name(cls, name='everybody') -> 'Role':
"""Given a `name`, return the corresponding `Role` instance."""
for role in cls.roles.values():
if role.name == name:
@@ -150,7 +150,9 @@ class Role:
raise IndexError(f"Unknown role name: {name}")
@classmethod
- def get_user_role(cls, user_record=None, user_role_id=None):
+ def get_user_role(cls,
+ user_record: OrderedDict = None,
+ user_role_id: int = None) -> 'Role':
"""Given a `user_record`, return its `Role`.
`role_id` may be passed as keyword argument or as user_record.
@@ -170,7 +172,7 @@ class Role:
return cls.get_by_role_id(role_id=user_role_id)
@classmethod
- def set_default_role_code(cls, role):
+ def set_default_role_code(cls, role: int) -> None:
"""Set class default role code.
It will be returned if a specific role code cannot be evaluated.
@@ -178,30 +180,82 @@ class Role:
cls.default_role_code = role
@classmethod
- def get_user_role_panel(cls, user_record):
- """Get text and buttons for user role panel."""
- user_role = cls.get_user_role(user_record=user_record)
- text = (
- """👤 {u[username]}\n"""
+ def get_user_role_text(cls,
+ user_record: OrderedDict,
+ user_role: 'Role' = None) -> str:
+ """
+ Get a string to describe the role of a user.
+
+ @param user_record: record of table `users` about the user; it must
+ contain at least a [username | last_name | first_name] and a
+ telegram identifier.
+ @param user_role: Role instance about user permissions.
+ @return: String to describe the role of a user, like this:
+ ```
+ 👤 LinkedUsername
+ 🔑 Admin ⚜️
+ ```
+ """
+ if user_role is None:
+ user_role = cls.get_user_role(user_record=user_record)
+ return (
+ f"""👤 {get_user(record=user_record)}\n"""
f"🔑 {user_role.singular.capitalize()} {user_role.symbol}"
- ).format(
- u=user_record,
)
- buttons = [
+
+ @classmethod
+ def get_user_role_buttons(cls,
+ user_record: OrderedDict,
+ admin_record: OrderedDict,
+ user_role: 'Role' = None,
+ admin_role: 'Role' = None) -> List[dict]:
+ """ Return buttons to edit user permissions.
+ @param user_record: record of table `users` about the user; it must
+ contain at least a [username | last_name | first_name] and a
+ telegram identifier.
+ @param admin_record: record of table `users` about the admin; it must
+ contain at least a [username | last_name | first_name] and a
+ telegram identifier.
+ @param user_role: Role instance about user permissions.
+ @param admin_role: Role instance about admin permissions.
+ @return: list of `InlineKeyboardButton`s.
+ """
+ if admin_role is None:
+ admin_role = cls.get_user_role(user_record=admin_record)
+ if user_role is None:
+ user_role = cls.get_user_role(user_record=user_record)
+ return [
make_button(
f"{role.symbol} {role.singular.capitalize()}",
prefix='auth:///',
data=['set', user_record['id'], code]
)
for code, role in cls.roles.items()
+ if (admin_role > user_role
+ and code in admin_role.can_appoint
+ and code != user_role.code)
]
+
+ @classmethod
+ def get_user_role_text_and_buttons(cls,
+ user_record: OrderedDict,
+ admin_record: OrderedDict):
+ """Get text and buttons for user role panel."""
+ admin_role = cls.get_user_role(user_record=admin_record)
+ user_role = cls.get_user_role(user_record=user_record)
+ text = cls.get_user_role_text(user_record=user_record,
+ user_role=user_role)
+ buttons = cls.get_user_role_buttons(user_record=user_record,
+ user_role=user_role,
+ admin_record=admin_record,
+ admin_role=admin_role)
return text, buttons
- def __eq__(self, other):
+ def __eq__(self, other: 'Role'):
"""Return True if self is equal to other."""
return self.code == other.code
- def __gt__(self, other):
+ def __gt__(self, other: 'Role'):
"""Return True if self can appoint other."""
return (
(
@@ -211,19 +265,19 @@ class Role:
and self.code in other.can_be_appointed_by
)
- def __ge__(self, other):
+ def __ge__(self, other: 'Role'):
"""Return True if self >= other."""
return self.__gt__(other) or self.__eq__(other)
- def __lt__(self, other):
+ def __lt__(self, other: 'Role'):
"""Return True if self can not appoint other."""
return not self.__ge__(other)
- def __le__(self, other):
+ def __le__(self, other: 'Role'):
"""Return True if self is superior or equal to other."""
return not self.__gt__(other)
- def __ne__(self, other):
+ def __ne__(self, other: 'Role'):
"""Return True if self is not equal to other."""
return not self.__eq__(other)
@@ -232,7 +286,7 @@ class Role:
return f""
-def get_authorization_function(bot):
+def get_authorization_function(bot: Bot):
"""Take a `bot` and return its authorization_function."""
def is_authorized(update, user_record=None, authorization_level=2):
@@ -258,51 +312,69 @@ def get_authorization_function(bot):
return is_authorized
-async def _authorization_command(bot, update, user_record):
- text = get_cleaned_text(bot=bot, update=update, replace=['auth'])
+async def _authorization_command(bot: Bot,
+ update: dict,
+ user_record: OrderedDict,
+ mode: str = 'auth'):
+ db = bot.db
+ text = get_cleaned_text(bot=bot, update=update, replace=[mode])
reply_markup = None
- # noinspection PyUnusedLocal
+ admin_record = user_record.copy()
+ user_record = None
+ admin_role = bot.Role.get_user_role(user_record=admin_record)
result = bot.get_message(
'authorization', 'auth_command', 'unhandled_case',
- update=update, user_record=user_record
+ update=update, user_record=admin_record
)
- if not text:
- if 'reply_to_message' not in update:
- return bot.get_message(
+ if not text: # No text provided: command must be used in reply
+ if 'reply_to_message' not in update: # No text and not in reply
+ result = bot.get_message(
'authorization', 'auth_command', 'instructions',
- update=update, user_record=user_record
+ update=update, user_record=admin_record,
+ command=mode
)
- else:
- with bot.db as db:
+ else: # No text, command used in reply to another message
+ update = update['reply_to_message']
+ # Forwarded message: get both the user who forwarded and the original author
+ if ('forward_from' in update
+ and update['from']['id'] != update['forward_from']['id']):
+ user_record = list(
+ db['users'].find(
+ telegram_id=[update['from']['id'],
+ update['forward_from']['id']]
+ )
+ )
+ else: # Otherwise: get the author of the message
user_record = db['users'].find_one(
- telegram_id=update['reply_to_message']['from']['id']
- )
- else:
- with bot.db as db:
- user_record = list(
- db.query(
- "SELECT * "
- "FROM users "
- "WHERE COALESCE("
- " first_name || last_name || username,"
- " last_name || username,"
- " first_name || username,"
- " username,"
- " first_name || last_name,"
- " last_name,"
- " first_name"
- f") LIKE '%{text}%'"
+ telegram_id=update['from']['id']
)
+ else: # Get users matching the input text
+ user_record = list(
+ db.query(
+ "SELECT * "
+ "FROM users "
+ "WHERE COALESCE("
+ " first_name || last_name || username,"
+ " last_name || username,"
+ " first_name || username,"
+ " username,"
+ " first_name || last_name,"
+ " last_name,"
+ " first_name"
+ f") LIKE '%{text}%'"
)
- if user_record is None:
+ )
+ if len(user_record) == 1:
+ user_record = user_record[0]
+ if user_record is None: # If query was not provided and user cannot be found
result = bot.get_message(
'authorization', 'auth_command', 'unknown_user',
- update=update, user_record=user_record
+ update=update, user_record=admin_record
)
- elif type(user_record) is list and len(user_record) > 1:
+ elif type(user_record) is list and len(user_record) > 1: # If many users match
result = bot.get_message(
'authorization', 'auth_command', 'choose_user',
- update=update, user_record=user_record,
+ update=update, user_record=admin_record,
n=len(user_record)
)
reply_markup = make_inline_keyboard(
@@ -316,15 +388,25 @@ async def _authorization_command(bot, update, user_record):
],
3
)
- elif type(user_record) is list and len(user_record) == 0:
+ elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches
result = bot.get_message(
'authorization', 'auth_command', 'no_match',
- update=update, user_record=user_record,
+ update=update, user_record=admin_record,
+ )
+ elif isinstance(user_record, dict): # If 1 user matches
+ # Ban user if admin can do it
+ user_role = bot.Role.get_user_role(user_record=user_record)
+ if mode == 'ban' and admin_role > user_role:
+ user_record['privileges'] = 0
+ db['users'].update(
+ user_record,
+ ['id']
+ )
+ # Show user panel (text and buttons) to edit user permissions
+ result, buttons = bot.Role.get_user_role_text_and_buttons(
+ user_record=user_record,
+ admin_record=admin_record
)
- else:
- if type(user_record) is list:
- user_record = user_record[0]
- result, buttons = bot.Role.get_user_role_panel(user_record)
reply_markup = make_inline_keyboard(buttons, 1)
return dict(
text=result,
@@ -333,7 +415,10 @@ async def _authorization_command(bot, update, user_record):
)
-async def _authorization_button(bot, update, user_record, data):
+async def _authorization_button(bot: Bot,
+ update: dict,
+ user_record: OrderedDict,
+ data: Union[str, List[Union[int, str]]]):
if len(data) == 0:
data = ['']
command, *arguments = data
@@ -343,10 +428,13 @@ async def _authorization_button(bot, update, user_record, data):
else:
other_user_id = None
result, text, reply_markup = '', '', None
+ db = bot.db
if command in ['show']:
- with bot.db as db:
- other_user_record = db['users'].find_one(id=other_user_id)
- text, buttons = bot.Role.get_user_role_panel(other_user_record)
+ other_user_record = db['users'].find_one(id=other_user_id)
+ text, buttons = bot.Role.get_user_role_text_and_buttons(
+ user_record=other_user_record,
+ admin_record=user_record
+ )
reply_markup = make_inline_keyboard(buttons, 1)
elif command in ['set'] and len(arguments) > 1:
other_user_id, new_privileges, *_ = arguments
@@ -358,8 +446,7 @@ async def _authorization_button(bot, update, user_record, data):
'authorization', 'auth_button', 'confirm',
update=update, user_record=user_record,
)
- with bot.db as db:
- other_user_record = db['users'].find_one(id=other_user_id)
+ other_user_record = db['users'].find_one(id=other_user_id)
user_role = bot.Role.get_user_role(user_record=user_record)
other_user_role = bot.Role.get_user_role(user_record=other_user_record)
if other_user_role.code == new_privileges:
@@ -404,20 +491,22 @@ async def _authorization_button(bot, update, user_record, data):
1
)
else:
- with bot.db as db:
- db['users'].update(
- dict(
- id=other_user_id,
- privileges=new_privileges
- ),
- ['id']
- )
- other_user_record = db['users'].find_one(id=other_user_id)
+ db['users'].update(
+ dict(
+ id=other_user_id,
+ privileges=new_privileges
+ ),
+ ['id']
+ )
+ other_user_record = db['users'].find_one(id=other_user_id)
result = bot.get_message(
'authorization', 'auth_button', 'appointed',
update=update, user_record=user_record
)
- text, buttons = bot.Role.get_user_role_panel(other_user_record)
+ text, buttons = bot.Role.get_user_role_text_and_buttons(
+ user_record=other_user_record,
+ admin_record=user_record
+ )
reply_markup = make_inline_keyboard(buttons, 1)
if text:
return dict(
@@ -431,14 +520,9 @@ async def _authorization_button(bot, update, user_record, data):
return result
-async def _ban_command(bot, update, user_record):
- # TODO define this function!
- return
-
-
def default_get_administrators_function(bot: Bot):
return list(
- bot.db['users'].find(privileges=[1,2])
+ bot.db['users'].find(privileges=[1, 2])
)
@@ -498,6 +582,6 @@ def init(telegram_bot: Bot,
@telegram_bot.command('/ban', aliases=[], show_in_keyboard=False,
description=authorization_messages['ban_command']['description'],
- authorization_level='admin')
+ authorization_level='moderator')
async def ban_command(bot, update, user_record):
- return await _ban_command(bot, update, user_record)
+ return await _authorization_command(bot, update, user_record, mode='ban')
diff --git a/davtelepot/messages.py b/davtelepot/messages.py
index 1d88067..7121f2f 100644
--- a/davtelepot/messages.py
+++ b/davtelepot/messages.py
@@ -649,9 +649,9 @@ default_authorization_messages = {
},
'instructions': {
'en': "Reply with this command to a user or write "
- "/auth username
to edit their permissions.",
+ "/{command} username
to edit their permissions.",
'it': "Usa questo comando in risposta a un utente "
- "oppure scrivi /auth username
per "
+ "oppure scrivi /{command} username
per "
"cambiarne il grado di autorizzazione."
},
'unknown_user': {
diff --git a/davtelepot/suggestions.py b/davtelepot/suggestions.py
index de30225..63f72b8 100644
--- a/davtelepot/suggestions.py
+++ b/davtelepot/suggestions.py
@@ -162,7 +162,7 @@ async def _suggestions_button(bot: davtelepot.bot.Bot, update, user_record, data
)['suggestion']
suggestion_message = bot.get_message(
'suggestions', 'suggestions_command', 'received_suggestion', 'text',
- user=bot.Role.get_user_role_panel(registered_user)[0],
+ user=bot.Role.get_user_role_text(user_record=registered_user),
suggestion=suggestion_text,
bot=bot,
update=update, user_record=user_record,