Browse users via /auth
command
This commit is contained in:
parent
21f91fb07c
commit
e813d37c8f
@ -11,7 +11,7 @@ __author__ = "Davide Testa"
|
||||
__email__ = "davide@davte.it"
|
||||
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
|
||||
__license__ = "GNU General Public License v3.0"
|
||||
__version__ = "2.8.5"
|
||||
__version__ = "2.8.6"
|
||||
__maintainer__ = "Davide Testa"
|
||||
__contact__ = "t.me/davte"
|
||||
|
||||
|
@ -61,6 +61,8 @@ DEFAULT_ROLES[100] = {
|
||||
'can_be_appointed_by': [1, 2, 3]
|
||||
}
|
||||
|
||||
max_records_per_query = 15
|
||||
|
||||
|
||||
class Role:
|
||||
"""Authorization level for users of a bot."""
|
||||
@ -74,7 +76,7 @@ class Role:
|
||||
"""Instantiate Role object.
|
||||
|
||||
code : int
|
||||
The higher the code, the less privileges are connected to that
|
||||
The higher the code, the fewer privileges are connected to that
|
||||
role. Use 0 for banned users.
|
||||
name : str
|
||||
Short name for role.
|
||||
@ -198,8 +200,16 @@ class Role:
|
||||
"""
|
||||
if user_role is None:
|
||||
user_role = cls.get_user_role(user_record=user_record)
|
||||
long_name = ' '.join(
|
||||
[user_record[key]
|
||||
for key in ('first_name', 'last_name')
|
||||
if key in user_record
|
||||
and user_record[key]]
|
||||
)
|
||||
if long_name:
|
||||
long_name = f" ({long_name})"
|
||||
return (
|
||||
f"""👤 {get_user(record=user_record)}\n"""
|
||||
f"👤 {get_user(record=user_record)}{long_name}\n"
|
||||
f"🔑 <i>{user_role.singular.capitalize()}</i> {user_role.symbol}"
|
||||
)
|
||||
|
||||
@ -228,6 +238,7 @@ class Role:
|
||||
make_button(
|
||||
f"{role.symbol} {role.singular.capitalize()}",
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['set', user_record['id'], code]
|
||||
)
|
||||
for code, role in cls.roles.items()
|
||||
@ -252,7 +263,7 @@ class Role:
|
||||
return text, buttons
|
||||
|
||||
def __eq__(self, other: 'Role'):
|
||||
"""Return True if self is equal to other."""
|
||||
"""Return True if `self` is equal to `other`."""
|
||||
return self.code == other.code
|
||||
|
||||
def __gt__(self, other: 'Role'):
|
||||
@ -274,11 +285,11 @@ class Role:
|
||||
return not self.__ge__(other)
|
||||
|
||||
def __le__(self, other: 'Role'):
|
||||
"""Return True if self is superior or equal to other."""
|
||||
"""Return True if `self` is superior or equal to `other`."""
|
||||
return not self.__gt__(other)
|
||||
|
||||
def __ne__(self, other: 'Role'):
|
||||
"""Return True if self is not equal to other."""
|
||||
"""Return True if `self` is not equal to `other`."""
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __str__(self):
|
||||
@ -312,6 +323,27 @@ def get_authorization_function(bot: Bot):
|
||||
return is_authorized
|
||||
|
||||
|
||||
def get_browse_buttons(bot: Bot, language: str):
|
||||
return [
|
||||
make_button(
|
||||
text=bot.get_message('authorization', 'auth_button',
|
||||
'browse', 'browse_button_az',
|
||||
language=language),
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['browse', 'az'],
|
||||
),
|
||||
make_button(
|
||||
text=bot.get_message('authorization', 'auth_button',
|
||||
'browse', 'browse_button_by_role',
|
||||
language=language),
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['browse', 'role'],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
async def _authorization_command(bot: Bot,
|
||||
update: dict,
|
||||
user_record: OrderedDict,
|
||||
@ -321,6 +353,7 @@ async def _authorization_command(bot: Bot,
|
||||
text = get_cleaned_text(bot=bot, update=update, replace=[mode])
|
||||
reply_markup = None
|
||||
admin_record = user_record.copy()
|
||||
query_id = None
|
||||
user_record = None
|
||||
admin_role = bot.Role.get_user_role(user_record=admin_record)
|
||||
result = bot.get_message(
|
||||
@ -334,6 +367,9 @@ async def _authorization_command(bot: Bot,
|
||||
update=update, user_record=admin_record,
|
||||
command=mode
|
||||
)
|
||||
buttons = get_browse_buttons(bot=bot, language=language)
|
||||
reply_markup = make_inline_keyboard(buttons, 2)
|
||||
user_record = -1
|
||||
else: # No text, command used in reply to another message
|
||||
update = update['reply_to_message']
|
||||
# Forwarded message: get both the user who forwarded and the original author
|
||||
@ -350,6 +386,36 @@ async def _authorization_command(bot: Bot,
|
||||
telegram_id=update['from']['id']
|
||||
)
|
||||
else: # Get users matching the input text
|
||||
query_text = (
|
||||
"SELECT * "
|
||||
"FROM users "
|
||||
"WHERE COALESCE("
|
||||
" first_name || last_name || username,"
|
||||
" last_name || username,"
|
||||
" first_name || username,"
|
||||
" username,"
|
||||
" first_name || last_name,"
|
||||
" last_name,"
|
||||
" first_name"
|
||||
f") LIKE '%{text}%' "
|
||||
"ORDER BY COALESCE("
|
||||
" username || first_name || last_name,"
|
||||
" username || last_name,"
|
||||
" username || first_name,"
|
||||
" username,"
|
||||
" first_name || last_name,"
|
||||
" first_name,"
|
||||
" last_name"
|
||||
")"
|
||||
)
|
||||
query_id = bot.db['queries'].upsert(
|
||||
dict(query=query_text),
|
||||
['query']
|
||||
)
|
||||
if query_id is True:
|
||||
query_id = bot.db['queries'].find_one(
|
||||
query=query_text
|
||||
)['id']
|
||||
user_record = list(
|
||||
db.query(
|
||||
"SELECT * "
|
||||
@ -373,28 +439,21 @@ async def _authorization_command(bot: Bot,
|
||||
update=update, user_record=admin_record
|
||||
)
|
||||
elif type(user_record) is list and len(user_record) > 1: # If many users match
|
||||
result = bot.get_message(
|
||||
'authorization', 'auth_command', 'choose_user',
|
||||
update=update, user_record=admin_record,
|
||||
n=len(user_record)
|
||||
)
|
||||
reply_markup = make_inline_keyboard(
|
||||
[
|
||||
make_button(
|
||||
f"👤 {get_user(user, link_profile=False)}",
|
||||
prefix='auth:///',
|
||||
data=['show', user['id']]
|
||||
)
|
||||
for user in user_record[:30]
|
||||
],
|
||||
3
|
||||
)
|
||||
browse_panel = await _authorization_button(bot=bot,
|
||||
update=update,
|
||||
user_record=admin_record,
|
||||
language=language,
|
||||
data=['browse', query_id])
|
||||
if 'edit' in browse_panel:
|
||||
return browse_panel['edit']
|
||||
elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches
|
||||
result = bot.get_message(
|
||||
'authorization', 'auth_command', 'no_match',
|
||||
update=update, user_record=admin_record,
|
||||
)
|
||||
elif isinstance(user_record, dict): # If 1 user matches
|
||||
elif (type(user_record) is list and len(user_record) == 1) or isinstance(user_record, dict): # If 1 user matches
|
||||
if type(user_record) is list:
|
||||
user_record = user_record[0]
|
||||
# Ban user if admin can do it
|
||||
user_role = bot.Role.get_user_role(user_record=user_record)
|
||||
if mode == 'ban' and admin_role > user_role:
|
||||
@ -420,6 +479,7 @@ async def _authorization_command(bot: Bot,
|
||||
)
|
||||
)
|
||||
reply_markup = make_inline_keyboard(buttons, 1)
|
||||
reply_markup['inline_keyboard'].append(get_browse_buttons(bot=bot, language=language))
|
||||
return dict(
|
||||
text=result,
|
||||
reply_markup=reply_markup,
|
||||
@ -436,14 +496,115 @@ async def _authorization_button(bot: Bot,
|
||||
data = ['']
|
||||
command, *arguments = data
|
||||
user_id = user_record['telegram_id']
|
||||
if len(arguments) > 0:
|
||||
if len(arguments) > 0 and command in ['show', 'set']:
|
||||
other_user_id = arguments[0]
|
||||
else:
|
||||
other_user_id = None
|
||||
result, text, reply_markup = '', '', None
|
||||
db = bot.db
|
||||
if command in ['show']:
|
||||
other_user_record = db['users'].find_one(id=other_user_id)
|
||||
query_text = None
|
||||
if command in ['browse'] and len(arguments) >= 1:
|
||||
mode = arguments[0]
|
||||
offset = arguments[1] if len(arguments) > 1 else 0
|
||||
user_records = []
|
||||
if mode == 'choose':
|
||||
update['text'] = ''
|
||||
answer_update = await _authorization_command(bot=bot,
|
||||
update=update,
|
||||
user_record=user_record,
|
||||
language=language,
|
||||
mode='auth')
|
||||
text = answer_update['text']
|
||||
reply_markup = answer_update['reply_markup']
|
||||
elif isinstance(mode, int) or (isinstance(mode, str) and mode.isnumeric()):
|
||||
query_record = bot.db['queries'].find_one(id=int(mode))
|
||||
if query_record:
|
||||
query_text = query_record['query']
|
||||
elif mode == 'az':
|
||||
query_text = (
|
||||
"SELECT * "
|
||||
"FROM users "
|
||||
"ORDER BY COALESCE("
|
||||
" username || first_name || last_name,"
|
||||
" username || last_name,"
|
||||
" username || first_name,"
|
||||
" username,"
|
||||
" first_name || last_name,"
|
||||
" first_name,"
|
||||
" last_name"
|
||||
")"
|
||||
)
|
||||
elif mode == 'role':
|
||||
query_text = (
|
||||
"SELECT * "
|
||||
"FROM users "
|
||||
"ORDER BY ("
|
||||
" CASE WHEN privileges = 0 THEN 5000 "
|
||||
" ELSE privileges END) "
|
||||
"ASC, COALESCE( "
|
||||
" username || first_name || last_name,"
|
||||
" username || last_name,"
|
||||
" username || first_name,"
|
||||
" username,"
|
||||
" first_name || last_name,"
|
||||
" first_name,"
|
||||
" last_name"
|
||||
") "
|
||||
)
|
||||
n = 0
|
||||
if query_text:
|
||||
user_records = list(
|
||||
bot.db.query(
|
||||
f"{query_text} "
|
||||
f"LIMIT {max_records_per_query + 1} "
|
||||
f"OFFSET {offset * max_records_per_query} "
|
||||
)
|
||||
)
|
||||
for record in bot.db.query("SELECT COUNT(*) n "
|
||||
f"FROM ({query_text})"):
|
||||
n = record['n']
|
||||
if user_records:
|
||||
text = bot.get_message(
|
||||
'authorization', 'auth_command', 'choose_user',
|
||||
update=update, user_record=user_record,
|
||||
n=n
|
||||
)
|
||||
reply_markup = make_inline_keyboard(
|
||||
[
|
||||
make_button(
|
||||
f"{bot.Role.get_user_role(user_record=user).symbol} {get_user(user, link_profile=False)}",
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['show', user['id'], command, mode, offset]
|
||||
)
|
||||
for user in user_records[:max_records_per_query]
|
||||
],
|
||||
3
|
||||
)
|
||||
if n > max_records_per_query:
|
||||
reply_markup['inline_keyboard'].append(
|
||||
[
|
||||
make_button(
|
||||
text='◀️',
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['browse', mode, offset - 1 if offset else n // max_records_per_query]
|
||||
),
|
||||
make_button(
|
||||
text='↩️',
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['browse', 'choose']
|
||||
),
|
||||
make_button(
|
||||
text='▶️',
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['browse', mode, offset + 1 if n > (offset + 1) * max_records_per_query else 0]
|
||||
)
|
||||
]
|
||||
)
|
||||
elif command in ['show']:
|
||||
other_user_record = bot.db['users'].find_one(id=other_user_id)
|
||||
text, buttons = bot.Role.get_user_role_text_and_buttons(
|
||||
user_record=other_user_record,
|
||||
admin_record=user_record
|
||||
@ -459,6 +620,15 @@ async def _authorization_button(bot: Bot,
|
||||
data=['picture', user_record['id']]
|
||||
)
|
||||
)
|
||||
if len(arguments) > 2:
|
||||
buttons.append(
|
||||
make_button(
|
||||
text='↩️',
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=data[2:]
|
||||
)
|
||||
)
|
||||
reply_markup = make_inline_keyboard(buttons, 1)
|
||||
elif command in ['set'] and len(arguments) > 1:
|
||||
other_user_id, new_privileges, *_ = arguments
|
||||
@ -470,7 +640,7 @@ async def _authorization_button(bot: Bot,
|
||||
'authorization', 'auth_button', 'confirm',
|
||||
update=update, user_record=user_record,
|
||||
)
|
||||
other_user_record = db['users'].find_one(id=other_user_id)
|
||||
other_user_record = bot.db['users'].find_one(id=other_user_id)
|
||||
user_role = bot.Role.get_user_role(user_record=user_record)
|
||||
other_user_role = bot.Role.get_user_role(user_record=other_user_record)
|
||||
if other_user_role.code == new_privileges:
|
||||
@ -491,6 +661,7 @@ async def _authorization_button(bot: Bot,
|
||||
update=update, user_record=user_record
|
||||
),
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['show', other_user_id]
|
||||
)
|
||||
],
|
||||
@ -509,20 +680,21 @@ async def _authorization_button(bot: Bot,
|
||||
update=update, user_record=user_record
|
||||
),
|
||||
prefix='auth:///',
|
||||
delimiter='|',
|
||||
data=['show', other_user_id]
|
||||
)
|
||||
],
|
||||
1
|
||||
)
|
||||
else:
|
||||
db['users'].update(
|
||||
bot.db['users'].update(
|
||||
dict(
|
||||
id=other_user_id,
|
||||
privileges=new_privileges
|
||||
),
|
||||
['id']
|
||||
)
|
||||
other_user_record = db['users'].find_one(id=other_user_id)
|
||||
other_user_record = bot.db['users'].find_one(id=other_user_id)
|
||||
result = bot.get_message(
|
||||
'authorization', 'auth_button', 'appointed',
|
||||
update=update, user_record=user_record
|
||||
|
@ -713,6 +713,16 @@ default_authorization_messages = {
|
||||
'en': "Permission granted",
|
||||
'it': "Permesso conferito"
|
||||
},
|
||||
'browse': {
|
||||
'browse_button_az': {
|
||||
'en': "🆎 Users A-Z",
|
||||
'it': "🆎 Utenti A-Z",
|
||||
},
|
||||
'browse_button_by_role': {
|
||||
'en': "👑 Users by role",
|
||||
'it': "👑 Utenti per ruolo",
|
||||
},
|
||||
},
|
||||
'back_to_user': {
|
||||
'en': "Back to user",
|
||||
'it': "Torna all'utente"
|
||||
|
Loading…
x
Reference in New Issue
Block a user