From 26896f1c49add6fe4aeb5eb5ac38b5167347c6db Mon Sep 17 00:00:00 2001 From: Davte Date: Thu, 18 Jul 2019 20:28:53 +0200 Subject: [PATCH] Version 1.0 /ciclopi, /start and /help commands implemented. /ciclopi system includes buttons. --- ciclopibot/__init__.py | 8 + ciclopibot/bot.py | 68 ++ ciclopibot/ciclopi.py | 1490 +++++++++++++++++++++++++++++++++++++ ciclopibot/data/help.json | 8 + ciclopibot/helper.py | 190 +++++ ciclopibot/roles.py | 450 +++++++++++ 6 files changed, 2214 insertions(+) create mode 100644 ciclopibot/bot.py create mode 100644 ciclopibot/ciclopi.py create mode 100644 ciclopibot/data/help.json create mode 100644 ciclopibot/helper.py create mode 100644 ciclopibot/roles.py diff --git a/ciclopibot/__init__.py b/ciclopibot/__init__.py index e69de29..616db1e 100644 --- a/ciclopibot/__init__.py +++ b/ciclopibot/__init__.py @@ -0,0 +1,8 @@ +"""Provide information about this package.""" + +__author__ = "Davide Testa" +__email__ = "davide@davte.it" +__license__ = "GNU General Public License v3.0" +__version__ = "1.0" +__maintainer__ = "Davide Testa" +__contact__ = "t.me/davte" diff --git a/ciclopibot/bot.py b/ciclopibot/bot.py new file mode 100644 index 0000000..6fe83ca --- /dev/null +++ b/ciclopibot/bot.py @@ -0,0 +1,68 @@ +"""Provide bike sharing information via Telegram bot.""" + +# Standard library modules +import logging +import os + +# Third party modules +from davtelepot.bot import Bot + +# Project modules +import ciclopi +from data.passwords import bot_token +import helper + +if __name__ == '__main__': + path = os.path.dirname(__file__) + try: + from data.config import log_file_name + except ImportError: + log_file_name = 'CicloPi.info.log' + try: + from data.config import errors_file_name + except ImportError: + errors_file_name = 'CicloPi.errors.log' + log_file = f"{path}/data/{log_file_name}" + errors_file = f"{path}/data/{errors_file_name}" + + # Outputs the log in console, log_file and errors_file + # Log formatter: datetime, module name (filled with spaces up to 15 + # characters), logging level name (filled to 8), message + log_formatter = logging.Formatter( + "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s", + style='%' + ) + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + + file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8") + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + + file_handler = logging.FileHandler(errors_file, mode="a", encoding="utf-8") + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.ERROR) + root_logger.addHandler(file_handler) + + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_formatter) + consoleHandler.setLevel(logging.DEBUG) + root_logger.addHandler(consoleHandler) + + # Instantiate bot + bot = Bot(token=bot_token, database_url='ciclopibot/data/ciclopi.db') + # Assign commands to bot + ciclopi.init(bot) + helper.init( + bot=bot, + help_message="📖 Guida di {bot.name}\n\n" + "Benvenuto!\n" + "Per conoscere i comandi disponibili visita l'apposita " + "sezione della guida premendo il pulsante Comandi.\n\n" + "Autore e amministratore del bot: @davte", + help_sections_file='ciclopibot/data/help.json' + ) + # Run bot(s) + logging.info("Presso ctrl+C to exit.") + Bot.run() diff --git a/ciclopibot/ciclopi.py b/ciclopibot/ciclopi.py new file mode 100644 index 0000000..cdb6110 --- /dev/null +++ b/ciclopibot/ciclopi.py @@ -0,0 +1,1490 @@ +"""Get information about bike sharing in Pisa. + +Available bikes in bike sharing stations. +""" + +# Standard library modules +import asyncio +import datetime +import math + +# Third party modules +from davtelepot.utilities import ( + async_wrapper, CachedPage, extract, get_cleaned_text, + line_drawing_unordered_list, make_button, make_inline_keyboard, + make_lines_of_buttons +) + +_URL = "http://www.ciclopi.eu/frmLeStazioni.aspx" + +ciclopi_webpage = CachedPage.get( + _URL, + datetime.timedelta(seconds=15), + mode='html' +) + +UNIT_TO_KM = { + 'km': 1, + 'm': 1000, + 'mi': 0.621371192, + 'nmi': 0.539956803, + 'ft': 3280.839895013, + 'in': 39370.078740158 +} + +CICLOPI_SETTINGS = { + 'sort': dict( + name="Ordina", + description="scegli in che ordine visualizzare le stazioni CicloPi.", + symbol="⏬" + ), + 'limit': dict( + name="Numero di stazioni", + description="scegli quante stazioni visualizzare.", + symbol="#️⃣" + ), + 'fav': dict( + name="Stazioni preferite", + description="cambia le tue stazioni preferite.", + symbol="⭐️" + ), + 'setpos': dict( + name="Cambia posizione", + description=( + "imposta una posizione da cui ordinare le stazioni per distanza." + ), + symbol='🧭' + ) +} + +CICLOPI_SORTING_CHOICES = { + 0: dict( + name='Scuola', + description='in ordine di distanza crescente da Scuola.', + short_description='per distanza da Scuola', + symbol='🏫' + ), + 1: dict( + name='Alfabetico', + description='in ordine alfabetico.', + short_description='per nome', + symbol='🔤' + ), + 2: dict( + name='Posizione', + description='in ordine di distanza crescente dall\'ultima posizione ' + 'inviata. Di default sarà la posizione di Scuola.', + short_description='per distanza', + symbol='🧭' + ), + 3: dict( + name='Preferite', + description='nell\'ordine che hai scelto.', + short_description='in ordine personalizzato', + symbol='⭐️' + ) +} + +CICLOPI_STATIONS_TO_SHOW = { + -1: dict( + name="Solo le preferite", + symbol='⭐️' + ), + 0: dict( + name='Tutte', + symbol='💯' + ), + 3: dict( + name='3', + symbol='3️⃣' + ), + 5: dict( + name='5', + symbol='5️⃣' + ), + 10: dict( + name='10', + symbol='🔟' + ) +} + + +def haversine_distance(lat1, lon1, lat2, lon2, degrees='dec', unit='m'): + """ + Calculate the great circle distance between two points on Earth. + + (specified in decimal degrees) + """ + assert unit in UNIT_TO_KM, "Invalid distance unit of measurement!" + assert degrees in ['dec', 'rad'], "Invalid angle unit of measurement!" + # Convert decimal degrees to radians + if degrees == 'dec': + lon1, lat1, lon2, lat2 = map( + math.radians, + [lon1, lat1, lon2, lat2] + ) + average_earth_radius = 6371.0088 * UNIT_TO_KM[unit] + return ( + 2 + * average_earth_radius + * math.asin( + math.sqrt( + math.sin((lat2 - lat1) * 0.5) ** 2 + + math.cos(lat1) + * math.cos(lat2) + * math.sin((lon2 - lon1) * 0.5) ** 2 + ) + ) + ) + + +class Location(): + """Location in world map.""" + + def __init__(self, coordinates): + """Check and set instance attributes.""" + assert type(coordinates) is tuple, "`coordinates` must be a tuple" + assert ( + len(coordinates) == 2 + and all(type(c) is float for c in coordinates) + ), "`coordinates` must be two floats" + self._coordinates = coordinates + + @property + def coordinates(self): + """Return a tuple (latitude, longitude).""" + return self._coordinates + + @property + def latitude(self): + """Return latitude.""" + return self._coordinates[0] + + @property + def longitude(self): + """Return longitude.""" + return self._coordinates[1] + + def get_distance(self, other, *args, **kwargs): + """Return the distance between two `Location`s.""" + return haversine_distance( + self.latitude, self.longitude, + other.latitude, other.longitude, + *args, **kwargs + ) + + +default_location = Location( + (43.719821, 10.403021) # M. Libertà station +) + + +class Station(Location): + """CicloPi bike sharing station.""" + + stations = { + 1: dict( + name='Aeroporto', + coordinates=(43.699455, 10.400075), + ), + 2: dict( + name='Stazione F.S.', + coordinates=(43.708627, 10.399051), + ), + 3: dict( + name='Comune Palazzo Blu', + coordinates=(43.715541, 10.400505), + ), + 4: dict( + name='Teatro Tribunale', + coordinates=(43.716391, 10.405136), + ), + 5: dict( + name='Borgo Stretto', + coordinates=(43.718518, 10.402165), + ), + 6: dict( + name='Polo Marzotto', + coordinates=(43.719772, 10.407291), + ), + 7: dict( + name='Duomo', + coordinates=(43.722855, 10.391977), + ), + 8: dict( + name='Pietrasantina', + coordinates=(43.729020, 10.392726), + ), + 9: dict( + name='Paparelli', + coordinates=(43.724449, 10.410438), + ), + 10: dict( + name='Pratale', + coordinates=(43.7212554, 10.4180257), + ), + 11: dict( + name='Ospedale Cisanello', + coordinates=(43.705752, 10.441740), + ), + 12: dict( + name='Sms Biblioteca', + coordinates=(43.706565, 10.419136), + ), + 13: dict( + name='Vittorio Emanuele', + coordinates=(43.710182, 10.398751), + ), + 14: dict( + name='Palacongressi', + coordinates=(43.710014, 10.410232), + ), + 15: dict( + name='Porta a Lucca', + coordinates=(43.724247, 10.402269), + ), + 16: dict( + name='Solferino', + coordinates=(43.715698, 10.394999), + ), + 17: dict( + name='San Rossore F.S.', + coordinates=(43.718992, 10.384391), + ), + 18: dict( + name='Guerrazzi', + coordinates=(43.710358, 10.405337), + ), + 19: dict( + name='Livornese', + coordinates=(43.708114, 10.384021), + ), + 20: dict( + name='Cavalieri', + coordinates=(43.719856, 10.400194), + ), + 21: dict( + name='M. Libertà', + coordinates=(43.719821, 10.403021), + ), + 22: dict( + name='Galleria Gerace', + coordinates=(43.710791, 10.420456), + ), + 23: dict( + name='C. Marchesi', + coordinates=(43.714971, 10.419322), + ), + 24: dict( + name='CNR-Praticelli', + coordinates=(43.719256, 10.424012), + ), + 25: dict( + name='Sesta Porta', + coordinates=(43.709162, 10.395837), + ), + 26: dict( + name='Qualconia', + coordinates=(43.713011, 10.394458), + ), + 27: dict( + name='Donatello', + coordinates=(43.711715, 10.372480), + ), + 28: dict( + name='Spadoni', + coordinates=(43.716850, 10.391347), + ), + 29: dict( + name='Nievo', + coordinates=(43.738286, 10.400865), + ), + 30: dict( + name='Cisanello', + coordinates=(43.701159, 10.438863), + ), + 31: dict( + name='Edificio 3', + coordinates=(43.707869, 10.441698), + ), + 32: dict( + name='Edificio 6', + coordinates=(43.709046, 10.442541), + ), + 33: dict( + name='Frascani', + coordinates=(43.710157, 10.433339), + ), + 34: dict( + name='Chiarugi', + coordinates=(43.726244, 10.412882), + ), + 35: dict( + name='Praticelli 2', + coordinates=(43.719619, 10.427469), + ), + 36: dict( + name='Carducci', + coordinates=(43.726700, 10.420562), + ), + 37: dict( + name='Garibaldi', + coordinates=(43.718077, 10.418168), + ), + 38: dict( + name='Silvestro', + coordinates=(43.714128, 10.409065), + ), + 39: dict( + name='Pardi', + coordinates=(43.702273, 10.399793), + ), + } + + def __init__(self, id=0, name='unknown', coordinates=(91.0, 181.0)): + """Check and set instance attributes.""" + if id in self.__class__.stations: + coordinates = self.__class__.stations[id]['coordinates'] + name = self.__class__.stations[id]['name'] + Location.__init__(self, coordinates) + self._id = id + self._name = name + self._active = True + self._location = None + self._description = '' + self._distance = None + self._bikes = 0 + self._free = 0 + + @property + def id(self): + """Return station identification number.""" + return self._id + + @property + def name(self): + """Return station name.""" + return self._name + + @property + def description(self): + """Return station description.""" + return self._description + + @property + def is_active(self): + """Return True if station is active.""" + return self._active + + @property + def location(self): + """Return location from which distance should be evaluated.""" + if self._location is None: + return default_location + return self._location + + @property + def distance(self): + """Return distance from `self.location`. + + If distance is not evaluated yet, do it and store the result. + Otherwise, return stored value. + """ + if self._distance is None: + self._distance = self.get_distance(self.location) + return self._distance + + @property + def bikes(self): + """Return number of available bikes.""" + return self._bikes + + @property + def free(self): + """Return number of free slots.""" + return self._free + + def set_active(self, active): + """Change station status to `active`. + + `active` should be either `True` or `False`. + """ + assert type(active) is bool, "`active` should be a boolean." + self._active = active + + def set_description(self, description): + """Change station description to `description`. + + `description` should be a string. + """ + assert type(description) is str, "`description` should be a boolean." + self._description = description + + def set_location(self, location): + """Change station location to `location`. + + `location` should be a Location object. + """ + assert ( + isinstance(location, Location) + ), "`location` should be a Location." + self._location = location + + def set_bikes(self, bikes): + """Change number of available `bikes`. + + `bikes` should be an int. + """ + assert ( + type(bikes) is int + ), "`bikes` should be an int." + self._bikes = bikes + + def set_free(self, free): + """Change number of `free` bike parking slots. + + `free` should be an int. + """ + assert ( + type(free) is int + ), "`free` should be an int." + self._free = free + + @property + def status(self): + """Return station status to be shown to users. + + It includes distance, location, available bikes and free stalls. + """ + if self.bikes + self.free == 0: + bikes_and_stalls = "⚠️ Non disponibile" + else: + bikes_and_stalls = f"🚲 {self.bikes} | 🅿️ {self.free}" + return ( + f"{self.name} | {self.description}\n" + f" {bikes_and_stalls} | 📍 {self.distance:.0f} m" + ).format( + s=self + ) + + +def ciclopi_custom_sorter(custom_order): + """Return a function to sort stations by a `custom_order`.""" + custom_values = { + record['station']: record['value'] + for record in custom_order + } + + def sorter(station): + """Take a station and return its queue value. + + Stations will be sorted by queue value in ascending order. + """ + if station.id in custom_values: + return (custom_values[station.id], station.name) + return (100, station.name) + return sorter + + +def _get_stations(data, location): + stations = [] + for _station in data.find_all( + "li", + attrs={"class": "rrItem"} + ): + station_name = _station.find( + "span", + attrs={"class": "Stazione"} + ).text + if 'Non operativa' in station_name: + active = False + else: + active = True + station_id = _station.find( + "div", + attrs={"class": "cssNumero"} + ).text + if ( + station_id is None + or type(station_id) is not str + or not station_id.isnumeric() + ): + station_id = 0 + else: + station_id = int(station_id) + station = Station(station_id) + station.set_active(active) + station.set_description( + _station.find( + "span", + attrs={"class": "TableComune"} + ).text.replace( + 'a`', + 'à' + ) + ) + bikes_text = _station.find( + "span", + attrs={"class": "Red"} + ).get_text('\t') + if bikes_text.count('\t') < 1: + bikes = 0 + free = 0 + else: + bikes, free, *other = [ + int( + ''.join( + char + for char in s + if char.isnumeric() + ) + ) + for s in bikes_text.split('\t') + ] + station.set_bikes(bikes) + station.set_free(free) + station.set_location(location) + stations.append( + station + ) + return stations + + +async def set_ciclopi_location(bot, update, user_record): + """Take a location update and store it as CicloPi place. + + CicloPi stations will be sorted by distance from this place. + """ + location = update['location'] + chat_id = update['chat']['id'] + telegram_id = update['from']['id'] + with bot.db as db: + db['ciclopi'].upsert( + dict( + chat_id=chat_id, + latitude=location['latitude'], + longitude=location['longitude'] + ), + ['chat_id'] + ) + await bot.send_message( + chat_id=chat_id, + text=( + "Ho salvato questa posizione!\n" + "D'ora in poi ordinerò le stazioni dalla più vicina alla più " + "lontana da qui." + ) + ) + # Remove individual text message handler which was set to catch `/cancel` + bot.remove_individual_text_message_handler(telegram_id) + return await _ciclopi_command(bot, update, user_record) + + +async def cancel_ciclopi_location(bot, update, user_record): + """Handle the situation in which a user does not send location on request. + + This function is set as custom_parser when the bot requests user's location + and is removed if user does. If not, return a proper message. + """ + text = get_cleaned_text(bot=bot, update=update) + if text.lower() == 'annulla': + return "Operazione annullata." + return ( + "Non ho capito la tua posizione. Fai /ciclopi > Ordina... > " + "Posizione 🧭 per riprovare." + ) + + +async def _ciclopi_command(bot, update, user_record, sent_message=None, + show_all=False): + chat_id = update['chat']['id'] + default_stations_to_show = 5 + if sent_message is None: + await bot.sendChatAction( + chat_id=chat_id, + action='typing' + ) + else: + await bot.edit_message_text( + update=sent_message, + text="Aggiornamento in corso...", + parse_mode='HTML', + reply_markup=None + ) + ciclopi_data = await ciclopi_webpage.get_page() + if ciclopi_data is None or isinstance(ciclopi_data, Exception): + text = ( + "Il sito del CicloPi è momentaneamente irraggiungibile, " + "riprova tra un po' :/" + ) + else: + with bot.db as db: + ciclopi_record = db['ciclopi'].find_one( + chat_id=chat_id + ) + custom_order = list( + db['ciclopi_custom_order'].find( + chat_id=chat_id + ) + ) + if ( + ciclopi_record is not None + and isinstance(ciclopi_record, dict) + and 'sorting' in ciclopi_record + and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES + ): + sorting_code = ciclopi_record['sorting'] + if ( + 'latitude' in ciclopi_record + and ciclopi_record['latitude'] is not None + and 'longitude' in ciclopi_record + and ciclopi_record['longitude'] is not None + ): + saved_place = Location( + ( + ciclopi_record['latitude'], + ciclopi_record['longitude'] + ) + ) + else: + saved_place = default_location + else: + sorting_code = 0 + if ( + ciclopi_record is not None + and isinstance(ciclopi_record, dict) + and 'stations_to_show' in ciclopi_record + and ciclopi_record[ + 'stations_to_show' + ] in CICLOPI_STATIONS_TO_SHOW + ): + stations_to_show = ciclopi_record[ + 'stations_to_show' + ] + else: + stations_to_show = default_stations_to_show + location = ( + saved_place if sorting_code != 0 + else default_location + ) + sorting_method = ( + (lambda station: station.distance) if sorting_code in [0, 2] + else (lambda station: station.name) if sorting_code == 1 + else ciclopi_custom_sorter(custom_order) if sorting_code == 3 + else (lambda station: 0) + ) + stations = sorted( + _get_stations( + ciclopi_data, + location + ), + key=sorting_method + ) + if ( + stations_to_show == -1 + and not show_all + ): + stations = list( + filter( + lambda station: station.id in [ + record['station'] + for record in custom_order + ], + stations + ) + ) + if ( + stations_to_show > 0 + and sorting_code != 1 + and not show_all + ): + stations = stations[:stations_to_show] + text = ( + "🚲 Stazioni ciclopi {sort[short_description]}" + "{lim} {sort[symbol]}\n" + "\n" + "{s}" + ).format( + s=( + '\n\n'.join( + station.status + for station in stations + ) if len(stations) + else "- Nessuna stazione -" + ), + sort=CICLOPI_SORTING_CHOICES[sorting_code], + lim=( + " ({adv} le preferite)".format( + adv='prima' if show_all else 'solo' + ) if stations_to_show == -1 + else " (prime {n})".format( + n=stations_to_show + ) + if len(stations) < len(Station.stations) + else "" + ) + ) + if not text: + return + reply_markup = make_inline_keyboard( + ( + [ + make_button( + "💯 Tutte", + prefix='ciclopi:///', + data=['show', 'all'] + ) + ] if len(stations) < len(Station.stations) + else [ + make_button( + "{sy[symbol]} {t}".format( + t=( + "Solo preferite" if stations_to_show == -1 + else "Prime {n}" + ).format( + n=stations_to_show + ), + sy=CICLOPI_STATIONS_TO_SHOW[stations_to_show] + ), + prefix='ciclopi:///', + data=['show'] + ) + ] if show_all + else [] + ) + [ + make_button( + "🔄 Aggiorna", + prefix='ciclopi:///', + data=( + ['show'] + ( + [] if len(stations) < len(Station.stations) + else ['all'] + ) + ) + ), + make_button( + "📜 Legenda", + prefix='ciclopi:///', + data=['legend'] + ), + make_button( + "⚙️ Impostazioni", + prefix='ciclopi:///', + data=['main'] + ) + ], + 2 + ) + parameters = dict( + update=update, + text=text, + parse_mode='HTML', + reply_markup=reply_markup + ) + method = ( + bot.send_message + if sent_message is None + else bot.edit_message_text + ) + await method(**parameters) + return + + +async def _ciclopi_button_main(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + text = ( + "⚙️ Impostazioni CicloPi 🚲\n" + "\n" + "{c}" + ).format( + c='\n'.join( + "- {s[symbol]} {s[name]}: {s[description]}".format( + s=setting + ) + for setting in CICLOPI_SETTINGS.values() + ) + ) + reply_markup = make_inline_keyboard( + [ + make_button( + text="{s[symbol]} {s[name]}".format( + s=setting + ), + prefix='ciclopi:///', + data=[code] + ) + for code, setting in CICLOPI_SETTINGS.items() + ] + [ + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ] + ) + return result, text, reply_markup + + +async def _ciclopi_button_sort(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + chat_id = ( + update['message']['chat']['id'] if 'message' in update + else update['chat']['id'] if 'chat' in update + else 0 + ) + with bot.db as db: + ciclopi_record = db['ciclopi'].find_one( + chat_id=chat_id + ) + if ciclopi_record is None: + ciclopi_record = dict( + chat_id=chat_id, + sorting=0 + ) + if len(arguments) == 1: + new_choice = ( + int(arguments[0]) + if arguments[0].isnumeric() + else 0 + ) + if new_choice == ciclopi_record['sorting']: + return "È già così!", '', None + elif new_choice not in CICLOPI_SORTING_CHOICES: + return "Opzione sconosciuta!", '', None + db['ciclopi'].upsert( + dict( + chat_id=chat_id, + sorting=new_choice + ), + ['chat_id'], + ensure=True + ) + ciclopi_record['sorting'] = new_choice + result = "Fatto!" + text = ( + "📜 Modalità di visualizzazione delle stazioni CicloPi 🚲\n\n" + "{options}\n\n" + "Scegli una nuova modalità o torna all'elenco delle stazioni." + ).format( + options='\n'.join( + "- {c[symbol]} {c[name]}: {c[description]}".format( + c=choice + ) + for choice in CICLOPI_SORTING_CHOICES.values() + ) + ) + reply_markup = make_inline_keyboard( + [ + make_button( + text="{s} {c[name]} {c[symbol]}".format( + c=choice, + s=( + '✅' + if code == ciclopi_record['sorting'] + else '☑️' + ) + ), + prefix='ciclopi:///', + data=['sort', code] + ) + for code, choice in CICLOPI_SORTING_CHOICES.items() + ] + [ + make_button( + text="⚙️ Torna alle impostazioni", + prefix='ciclopi:///', + data=['main'] + ), + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ] + ) + return result, text, reply_markup + + +async def _ciclopi_button_limit(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + chat_id = ( + update['message']['chat']['id'] if 'message' in update + else update['chat']['id'] if 'chat' in update + else 0 + ) + with bot.db as db: + ciclopi_record = db['ciclopi'].find_one( + chat_id=chat_id + ) + if ciclopi_record is None or 'stations_to_show' not in ciclopi_record: + ciclopi_record = dict( + chat_id=chat_id, + stations_to_show=5 + ) + if len(arguments) == 1: + new_choice = ( + int(arguments[0]) + if arguments[0].lstrip('+-').isnumeric() + else 0 + ) + if new_choice == ciclopi_record['stations_to_show']: + return "È già così!", '', None + elif new_choice not in CICLOPI_STATIONS_TO_SHOW: + return "Opzione sconosciuta!", '', None + db['ciclopi'].upsert( + dict( + chat_id=chat_id, + stations_to_show=new_choice + ), + ['chat_id'], + ensure=True + ) + ciclopi_record['stations_to_show'] = new_choice + result = "Fatto!" + text = ( + "📜 Modalità di visualizzazione delle stazioni CicloPi 🚲\n\n" + "{options}\n\n" + "Scegli quante stazioni vedere (quando filtrate per distanza) o torna " + "alle impostazioni o all'elenco delle stazioni." + ).format( + options='\n'.join( + "- {c[symbol]} {c[name]}".format( + c=choice + ) + for choice in CICLOPI_STATIONS_TO_SHOW.values() + ) + ) + reply_markup = make_inline_keyboard( + [ + make_button( + text="{s} {c[name]} {c[symbol]}".format( + c=choice, + s=( + '✅' + if code == ciclopi_record['stations_to_show'] + else '☑️' + ) + ), + prefix='ciclopi:///', + data=['limit', code] + ) + for code, choice in CICLOPI_STATIONS_TO_SHOW.items() + ] + [ + make_button( + text="⚙️ Torna alle impostazioni", + prefix='ciclopi:///', + data=['main'] + ), + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ] + ) + return result, text, reply_markup + + +async def _ciclopi_button_show(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + fake_update = update['message'] + fake_update['from'] = update['from'] + asyncio.ensure_future( + _ciclopi_command( + bot=bot, + update=fake_update, + user_record=user_record, + sent_message=fake_update, + show_all=( + True if len(arguments) == 1 and arguments[0] == 'all' + else False + ) + ) + ) + return result, text, reply_markup + + +async def _ciclopi_button_legend(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + text = ( + "{s[name]} | {s[description]}\n" + " 🚲 {s[bikes]} | 🅿️ {s[free]} | 📍 {s[distance]}" + ).format( + s={ + 'name': "Nome della stazione", + 'distance': "Distanza in m", + 'description': "Indirizzo della stazione", + 'bikes': "Bici disponibili", + 'free': "Posti liberi" + } + ) + reply_markup = make_inline_keyboard( + [ + make_button( + text="⚙️ Torna alle impostazioni", + prefix='ciclopi:///', + data=['main'] + ), + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ] + ) + return result, text, reply_markup + + +async def _ciclopi_button_favorites_add(bot, update, user_record, arguments, + order_record, ordered_stations): + result, text, reply_markup = '', '', None + result = "Seleziona le stazioni da aggiungere" + if len(arguments) == 2 and arguments[1].isnumeric(): + station_id = int(arguments[1]) + chat_id = ( + update['message']['chat']['id'] if 'message' in update + else update['chat']['id'] if 'chat' in update + else 0 + ) + with bot.db as db: + if station_id in (s.id for s in ordered_stations): # Remove + # Find `old_record` to be removed + for old_record in order_record: + if old_record['station'] == station_id: + break + db.query( + """UPDATE ciclopi_custom_order + SET value = value - 1 + WHERE chat_id = {chat_id} + AND value > {val} + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + db['ciclopi_custom_order'].delete( + id=old_record['id'] + ) + order_record = list( + filter( + (lambda r: r['station'] != station_id), + order_record + ) + ) + ordered_stations = list( + filter( + (lambda s: s.id != station_id), + ordered_stations + ) + ) + else: # Add + new_record = dict( + chat_id=chat_id, + station=station_id, + value=(len(order_record) + 1) + ) + db['ciclopi_custom_order'].upsert( + new_record, + ['chat_id', 'station'], + ensure=True + ) + order_record.append(new_record) + ordered_stations.append( + Station(station_id) + ) + text = ( + "🚲 Stazioni preferite ⭐️\n" + "{options}\n\n" + "Aggiungi o togli le tue stazioni preferite." + ).format( + options=line_drawing_unordered_list( + [ + station.name + for station in ordered_stations + ] + ) + ) + reply_markup = dict( + inline_keyboard=make_lines_of_buttons( + [ + make_button( + text=( + "{sy} {n}" + ).format( + sy=( + '✅' if station_id in [ + s.id for s in ordered_stations + ] + else '☑️' + ), + n=station['name'] + ), + prefix='ciclopi:///', + data=['fav', 'add', station_id] + ) + for station_id, station in sorted( + Station.stations.items(), + key=lambda t: t[1]['name'] # Sort by station_name + ) + ], + 3 + ) + make_lines_of_buttons( + [ + make_button( + text="🔃 Riordina", + prefix="ciclopi:///", + data=["fav"] + ), + make_button( + text="⚙️ Torna alle impostazioni", + prefix='ciclopi:///', + data=['main'] + ), + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ], + 3 + ) + ) + return result, text, reply_markup + + +def move_favorite_station( + bot, chat_id, action, station_id, + order_record +): + """Move a station in `chat_id`-associated custom order. + + `bot`: Bot object, having a `.db` property. + `action`: should be `up` or `down` + `order_record`: list of records about `chat_id`-associated custom order. + """ + assert action in ('up', 'down'), "Invalid action!" + for old_record in order_record: + if old_record['station'] == station_id: + break + with bot.db as db: + if action == 'down': + db.query( + """UPDATE ciclopi_custom_order + SET value = 500 + WHERE chat_id = {chat_id} + AND value = {val} + 1 + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + db.query( + """UPDATE ciclopi_custom_order + SET value = value + 1 + WHERE chat_id = {chat_id} + AND value = {val} + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + db.query( + """UPDATE ciclopi_custom_order + SET value = {val} + WHERE chat_id = {chat_id} + AND value = 500 + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + elif action == 'up': + db.query( + """UPDATE ciclopi_custom_order + SET value = 500 + WHERE chat_id = {chat_id} + AND value = {val} - 1 + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + db.query( + """UPDATE ciclopi_custom_order + SET value = value - 1 + WHERE chat_id = {chat_id} + AND value = {val} + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + db.query( + """UPDATE ciclopi_custom_order + SET value = {val} + WHERE chat_id = {chat_id} + AND value = 500 + """.format( + chat_id=chat_id, + val=old_record['value'] + ) + ) + order_record = list( + db['ciclopi_custom_order'].find( + chat_id=chat_id, + order_by=['value'] + ) + ) + ordered_stations = [ + Station(record['station']) + for record in order_record + ] + return order_record, ordered_stations + + +async def _ciclopi_button_favorites(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + action = ( + arguments[0] if len(arguments) > 0 + else 'up' + ) + chat_id = ( + update['message']['chat']['id'] if 'message' in update + else update['chat']['id'] if 'chat' in update + else 0 + ) + with bot.db as db: + order_record = list( + db['ciclopi_custom_order'].find( + chat_id=chat_id, + order_by=['value'] + ) + ) + ordered_stations = [ + Station(record['station']) + for record in order_record + ] + if action == 'add': + return await _ciclopi_button_favorites_add( + bot, update, user_record, arguments, + order_record, ordered_stations + ) + elif action == 'dummy': + return 'Capolinea!', '', None + elif action == 'set' and len(arguments) > 1: + action = arguments[1] + elif ( + action in ['up', 'down'] + and len(arguments) > 1 + and arguments[1].isnumeric() + ): + station_id = int(arguments[1]) + order_record, ordered_stations = move_favorite_station( + bot, chat_id, action, station_id, + order_record + ) + text = ( + "🚲 Stazioni preferite ⭐️\n" + "{options}\n\n" + "Aggiungi, togli o riordina le tue stazioni preferite." + ).format( + options=line_drawing_unordered_list( + [ + station.name + for station in ordered_stations + ] + ) + ) + reply_markup = dict( + inline_keyboard=[ + [ + make_button( + text="{s.name} {sy}".format( + sy=( + '⬆️' if ( + action == 'up' + and n != 1 + ) else '⬇️' if ( + action == 'down' + and n != len(ordered_stations) + ) else '⏹' + ), + s=station + ), + prefix='ciclopi:///', + data=[ + 'fav', + ( + action if ( + action == 'up' + and n != 1 + ) or ( + action == 'down' + and n != len(ordered_stations) + ) + else 'dummy' + ), + station.id + ] + ) + ] + for n, station in enumerate(ordered_stations, 1) + ] + [ + [ + make_button( + text="➕ Aggiungi stazione preferita ⭐️", + prefix='ciclopi:///', + data=['fav', 'add'] + ) + ] + ] + [ + [ + ( + make_button( + text='Sposta in basso ⬇️', + prefix='ciclopi:///', + data=['fav', 'set', 'down'] + ) if action == 'up' + else make_button( + text='Sposta in alto ⬆️', + prefix='ciclopi:///', + data=['fav', 'set', 'up'] + ) + ) + ] + ] + [ + [ + make_button( + text="⚙️ Torna alle impostazioni", + prefix='ciclopi:///', + data=['main'] + ), + make_button( + text="🚲 Torna alle stazioni", + prefix='ciclopi:///', + data=['show'] + ) + ] + ] + ) + return result, text, reply_markup + + +async def _ciclopi_button_setpos(bot, update, user_record, arguments): + result, text, reply_markup = '', '', None + chat_id = ( + update['message']['chat']['id'] if 'message' in update + else update['chat']['id'] if 'chat' in update + else 0 + ) + result = "Inviami una posizione!" + bot.set_individual_location_handler( + await async_wrapper( + set_ciclopi_location + ), + update + ) + bot.set_individual_text_message_handler( + cancel_ciclopi_location, + update + ) + asyncio.ensure_future( + bot.send_message( + chat_id=chat_id, + text=( + "Inviami una posizione.\n" + "Per inviare la tua posizione attuale, usa il " + "pulsante." + ), + reply_markup=dict( + keyboard=[ + [ + dict( + text="Invia la mia posizione", + request_location=True + ) + ], + [ + dict( + text="Annulla" + ) + ] + ], + resize_keyboard=True + ) + ) + ) + return result, text, reply_markup + +_ciclopi_button_routing_table = { + 'main': _ciclopi_button_main, + 'sort': _ciclopi_button_sort, + 'limit': _ciclopi_button_limit, + 'show': _ciclopi_button_show, + 'setpos': _ciclopi_button_setpos, + 'legend': _ciclopi_button_legend, + 'fav': _ciclopi_button_favorites +} + + +async def _ciclopi_button(bot, update, user_record): + data = update['data'] + command, *arguments = extract(data, ':///').split('|') + if command in _ciclopi_button_routing_table: + result, text, reply_markup = await _ciclopi_button_routing_table[ + command + ]( + bot, update, user_record, arguments + ) + else: + return + if text: + return dict( + text=result, + edit=dict( + text=text, + parse_mode='HTML', + reply_markup=reply_markup + ) + ) + return result + + +def init(bot): + """Take a bot and assign commands to it.""" + with bot.db as db: + if 'ciclopi_stations' not in db.tables: + db['ciclopi_stations'].insert_many( + sorted( + [ + dict( + station_id=station_id, + name=station['name'], + latitude=station['coordinates'][0], + longitude=station['coordinates'][1] + ) + for station_id, station in Station.stations.items() + ], + key=(lambda station: station['station_id']) + ) + ) + if 'ciclopi' not in db.tables: + db['ciclopi'].insert( + dict( + chat_id=0, + sorting=0, + latitude=0.0, + longitude=0.0, + stations_to_show=-1 + ) + ) + + @bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"], + show_in_keyboard=True, + description="Stato delle stazioni CicloPi", + authorization_level='everybody') + async def ciclopi_command(bot, update, user_record): + return await _ciclopi_command(bot, update, user_record) + + @bot.button(prefix='ciclopi:///', authorization_level='everybody') + async def ciclopi_button(bot, update, user_record): + return await _ciclopi_button(bot, update, user_record) diff --git a/ciclopibot/data/help.json b/ciclopibot/data/help.json new file mode 100644 index 0000000..f1c25a5 --- /dev/null +++ b/ciclopibot/data/help.json @@ -0,0 +1,8 @@ +[ + { + "label": "CicloPi 🚲", + "abbr": "ciclopi", + "auth": "everybody", + "descr": "Vedi quante bici disponibili e quanti posti liberi ci sono in ogni stazione CicloPi." + } +] diff --git a/ciclopibot/helper.py b/ciclopibot/helper.py new file mode 100644 index 0000000..371e864 --- /dev/null +++ b/ciclopibot/helper.py @@ -0,0 +1,190 @@ +"""Make a self-consistent bot help section.""" + +# Third party modules +from davtelepot.utilities import ( + extract, get_cleaned_text, json_read, make_inline_keyboard, + make_lines_of_buttons, make_button, MyOD +) + +# Project modules +import roles + +DENY_MESSAGE = ( + "Chiedi di essere autorizzato: se la tua richiesta verrà accolta, " + "ripeti il comando /help per leggere il messaggio di aiuto." +) + + +def get_command_description(bot, update, user_record): + """Get a string description of `bot` commands. + + Show only commands available for `update` sender. + """ + user_role = roles.get_role(bot=bot, update=update, user_record=user_record) + return "\n".join( + [ + "/{}: {}".format( + command, + details['description'] + ) + for command, details in sorted( + bot.commands.items(), + key=lambda x:x[0] + ) + if details['description'] + and user_role <= roles.get_privilege_code( + details['authorization_level'] + ) + ] + ) + + +def _make_button(x, y): + if not y.startswith('help:///'): + y = 'help:///{}'.format(y) + return make_button(x, y) + + +HELP_MENU_BUTTON = make_inline_keyboard( + [ + _make_button( + 'Torna al menu Guida 📖', + 'menu' + ) + ], + 1 +) + + +def get_help_buttons(bot, update, user_record): + """Get `bot` help menu inline keyboard. + + Show only buttons available for `update` sender. + """ + user_role = roles.get_role(bot=bot, update=update, user_record=user_record) + buttons_list = [ + _make_button( + section['label'], + section['abbr'] + ) + for section in bot.help_sections.values() + if 'auth' in section + and user_role <= roles.get_privilege_code( + section['auth'] + ) + ] + return dict( + inline_keyboard=( + make_lines_of_buttons(buttons_list, 3) + + make_lines_of_buttons( + [ + _make_button('Comandi 🤖', 'commands') + ], + 1 + ) + + ( + bot.help_buttons + if bot.authorization_function(update=update, + authorization_level='user') + else [] + ) + ) + ) + + +async def _help_command(bot, update, user_record): + if not bot.authorization_function(update=update, + authorization_level='everybody'): + return DENY_MESSAGE + reply_markup = get_help_buttons(bot, update, user_record) + return dict( + text=bot.help_message.format(bot=bot), + parse_mode='HTML', + reply_markup=reply_markup, + disable_web_page_preview=True + ) + + +async def _help_button(bot, update, user_record): + data = update['data'] + command = extract(data, ':///') + result, text, rm = '', '', None + if command == 'commands': + text = "Comandi di {bot.name}\n\n{cd}".format( + bot=bot, + cd=get_command_description(bot, update, user_record) + ) + rm = HELP_MENU_BUTTON + elif command == 'menu': + text = bot.help_message.format(bot=bot) + rm = get_help_buttons(bot, update, user_record) + else: + for code, section in bot.help_sections.items(): + if section['abbr'] == command: + if not bot.authorization_function( + update=update, + authorization_level=section['auth'] + ): + return "Non sei autorizzato!" + rm = HELP_MENU_BUTTON + text = ( + '{s[label]}\n\n{s[descr]}' + ).format( + s=section + ).format( + bot=bot + ) + break + if text or rm: + return dict( + text=result, + edit=dict( + text=text, + parse_mode='HTML', + reply_markup=rm, + disable_web_page_preview=True + ) + ) + return result + + +async def _start_command(bot, update, user_record): + text = get_cleaned_text(update=update, bot=bot, replace=['start']) + if not text: + return await _help_command(bot, update, user_record) + update['text'] = text + await bot.text_message_handler( + update=update, + user_record=None + ) + return + + +def init(bot, help_message="Guida", + help_sections_file='data/help.json', help_buttons=[]): + """Assign parsers, commands, buttons and queries to given `bot`.""" + bot.help_message = help_message + bot.help_buttons = help_buttons + bot.help_sections = MyOD() + for code, section in enumerate( + json_read( + help_sections_file, + default=[] + ) + ): + bot.help_sections[code] = section + + @bot.command("/start", authorization_level='everybody') + async def start_command(bot, update, user_record): + return await _start_command(bot, update, user_record) + + @bot.command(command='/help', aliases=['Guida 📖', '00help'], + show_in_keyboard=True, description="Aiuto", + authorization_level='everybody') + async def help_command(bot, update, user_record): + result = await _help_command(bot, update, user_record) + return result + + @bot.button(prefix='help:///', authorization_level='everybody') + async def help_button(bot, update, user_record): + return await _help_button(bot, update, user_record) diff --git a/ciclopibot/roles.py b/ciclopibot/roles.py new file mode 100644 index 0000000..5c55e2f --- /dev/null +++ b/ciclopibot/roles.py @@ -0,0 +1,450 @@ +"""Handle authorization-related functions.""" + +# Standard library modules +import datetime +import json + +# Third party modules +from davtelepot.utilities import ( + Confirmator, extract, get_cleaned_text, make_button, make_inline_keyboard, + MyOD +) + +ROLES = MyOD() +ROLES[0] = {'abbr': 'banned', + 'symbol': '🚫', + 'plural': 'bannati', + 'singular': 'bannato', + 'can_appoint': [], + 'can_be_appointed': [1, 2, 3] + } +ROLES[1] = {'abbr': 'founder', + 'symbol': '👑', + 'plural': 'fondatori', + 'singular': 'fondatore', + 'can_appoint': [0, 1, 2, 3, 4, 5, 7, 100], + 'can_be_appointed': [] + } +ROLES[2] = {'abbr': 'admin', + 'symbol': '⚜️', + 'plural': 'amministratori', + 'singular': 'amministratore', + 'can_appoint': [0, 3, 4, 5, 7, 100], + 'can_be_appointed': [1] + } +ROLES[3] = {'abbr': 'moderator', + 'symbol': '🔰', + 'plural': 'moderatori', + 'singular': 'moderatore', + 'can_appoint': [0, 5, 7], + 'can_be_appointed': [1, 2] + } +ROLES[5] = {'abbr': 'user', + 'symbol': '🎫', + 'plural': 'utenti registrati', + 'singular': 'utente registrato', + 'can_appoint': [], + 'can_be_appointed': [1, 2, 3] + } +ROLES[100] = {'abbr': 'everybody', + 'symbol': '👤', + 'plural': 'chiunque', + 'singular': 'chiunque', + 'can_appoint': [], + 'can_be_appointed': [1, 2, 3] + } + + +def _get_user_role_panel(user_record): + text = """👤 {u[username]} +🔑 {r} {s} + """.format( + u=user_record, + r=ROLES[user_record['privileges']]['singular'].capitalize(), + s=ROLES[user_record['privileges']]['symbol'], + ) + buttons = [ + make_button( + '{s} {r}'.format( + s=role['symbol'], + r=role['singular'].capitalize() + ), + 'auth:///set|{a[id]}_{c}'.format( + c=code, + a=user_record + ) + ) + for code, role in ROLES.items() + ] + return text, buttons + + +async def _authorization_command(bot, update, user_record): + text = get_cleaned_text(bot=bot, update=update, replace=['auth']) + reply_markup = None + result = 'Caso non previsto :/' + if not text: + if 'reply_to_message' not in update: + result = "Usa questo comando in risposta a un utente registrato "\ + "(oppure scrivi /auth username) per "\ + "cambiarne il grado di autorizzazione." + else: + with bot.db as db: + user_record = db['users'].find_one( + telegram_id=update['reply_to_message']['from']['id'] + ) + if not user_record: + result = "Chi ha inviato questo messaggio non è un utente "\ + "registrato.\nDeve essere lui ad avviare il bot e "\ + "inviare il comando /askauth\nPotrai allora "\ + "modificare i suoi permessi rispondendo a un suo "\ + "messaggio (come hai fatto ora)." + else: + result, buttons = _get_user_role_panel(user_record) + reply_markup = make_inline_keyboard(buttons, 1) + else: + with bot.db as db: + user_record = list( + db.query( + """SELECT * + FROM users + WHERE username LIKE '{}%' + """.format( + text + ) + ) + ) + if not user_record: + result = "Utente sconosciuto" + else: + user_record = user_record[0] + result, buttons = _get_user_role_panel(user_record) + reply_markup = make_inline_keyboard(buttons, 1) + return dict( + text=result, + reply_markup=reply_markup, + parse_mode='HTML' + ) + + +async def _ask_for_authorization_command(bot, update, user_record): + chat_id = update['chat']['id'] + username = ( + update['from']['username'] + if 'username' in update['from'] + else None + ) + if chat_id < 0: + return dict( + chat_id=chat_id, + text="Passa a una chat privata con @{} per questa funzione. " + "Dovrai prima fare /start, se non hai ancora mai " + "usato il bot.".format( + bot.name + ) + ) + user_id = update['from']['id'] + with bot.db as db: + check = db['users'].find_one(telegram_id=user_id) + admins = db['users'].find(privileges=[1, 2]) + if check: + if not check['privileges']: + return "Sei stato bannato!" + return "Sei già registrato" + for admin in admins: + await bot.send_message( + chat_id=admin['telegram_id'], + text="""Vuoi autorizzare il seguente """ + """utente?\n""" + """{data}""".format( + data=json.dumps( + update['from'], + indent=2 + ), + user=user_id + ), + parse_mode="HTML", + reply_markup=dict( + inline_keyboard=[ + [ + make_button( + "Autorizza", + "auth:///auth|{i}_{n}".format( + i=user_id, + n=username + ) + ), + make_button( + "Banna", + "auth:///ban|{i}_{n}".format( + i=user_id, + n=username + ) + ) + ] + ] + ) + ) + return "Richiesta di autorizzazione inoltrata." + + +async def _ban_command(bot, update, user_record): + chat_id = update['chat']['id'] + if 'reply_to_message' not in update: + return dict( + text="Questo comando va usato in risposta", + chat_id=chat_id + ) + user_id = update['reply_to_message']['from']['id'] + with bot.db as db: + record = db['users'].find_one(telegram_id=user_id) + if record and record['privileges'] == 0: + return dict(text="Questo utente è già bannato", chat_id=chat_id) + db['users'].upsert( + dict( + telegram_id=user_id, + privileges=0 + ), + ['telegram_id'] + ) + return dict(text="Utente bannato.", chat_id=chat_id) + + +async def _authorization_button(bot, update, user_record): + data = update['data'] + command = extract(data, ':///', '|') + arguments = extract(data, "|").split('_') + user_id = update['from']['id'] + other_user_id = int(arguments[0]) + result, text, reply_markup = '', '', None + if command in ['auth', 'ban']: + username = arguments[1] + if command in ['auth']: + with bot.db as db: + record = db['users'].find_one(telegram_id=user_id) + if record: + return "Queste utente è già autorizzato." + db['users'].upsert( + dict( + telegram_id=user_id, + privileges=5, + username=username + ), + ['telegram_id'] + ) + await bot.send_message( + chat_id=user_id, + text="Sei stato autorizzato a usare il bot :D Per info: /help" + ) + result = "Utente autorizzato." + elif command in ['ban']: + with bot.db as db: + record = db['users'].find_one(telegram_id=user_id) + if record and record['privileges'] == 0: + return "Questo utente è già bannato" + db['users'].upsert( + dict( + telegram_id=user_id, + privileges=0, + username=username + ), + ['telegram_id'] + ) + result = "Utente bannato." + elif command in ['set']: + other_user_id, other_user_privileges = (int(x) for x in arguments) + if not Confirmator.get( + key='{}_set_{}'.format( + user_id, + other_user_id + ), + confirm_timedelta=5 + ).confirm: + return "Sicuro sicuro?" + with bot.db as db: + user_record = db['users'].find_one(telegram_id=user_id) + other_user_record = db['users'].find_one(id=other_user_id) + if other_user_record is None: + other_user_record = dict(privileges=100) + if ( + other_user_privileges not in ( + ROLES[user_record['privileges']]['can_appoint'] + ) + or user_record['privileges'] not in ( + ROLES[other_user_record['privileges']]['can_be_appointed'] + ) + ): + result = "Permesso negato" + text = "Non hai l'autorità di conferire questo grado di "\ + "autorizzazione a questo utente!" + buttons = [ + make_button( + 'Torna all\'utente', + 'auth:///show|{}'.format( + other_user_id + ) + ) + ] + reply_markup = make_inline_keyboard(buttons, 1) + else: + with bot.db as db: + db['users'].update( + dict( + id=other_user_id, + privileges=other_user_privileges + ), + ['id'] + ) + other_user_record = db['users'].find_one(id=other_user_id) + result = "Permesso conferito" + text, buttons = _get_user_role_panel(other_user_record) + reply_markup = make_inline_keyboard(buttons, 1) + elif command in ['show']: + with bot.db as db: + other_user_record = db['users'].find_one(id=other_user_id) + text, buttons = _get_user_role_panel(other_user_record) + reply_markup = make_inline_keyboard(buttons, 1) + if text: + return dict( + text=result, + edit=dict( + text=text, + reply_markup=reply_markup, + parse_mode='HTML' + ) + ) + return result + + +def init(bot): + """Assign parsers, commands, buttons and queries to given `bot`.""" + @bot.command(command='/auth', aliases=[], show_in_keyboard=False, + description="Cambia il grado di autorizzazione di un utente " + "(in risposta o scrivendone l'utenza)", + authorization_level='moderator') + async def authorization_command(bot, update, user_record): + return await _authorization_command(bot, update, user_record) + + @bot.button('auth:///', authorization_level='admin') + async def authorization_button(bot, update, user_record): + return await _authorization_button(bot, update, user_record) + + @bot.command('/ban', description="Banna l'utente (da usare in risposta)", + authorization_level='admin') + async def ban_command(bot, update, user_record): + return await _ban_command(bot, update, user_record) + + +def get_privilege_code(privileges): + """Get privilege code.""" + if not privileges: + privileges = 'everybody' + if privileges in [x['abbr'] for x in ROLES.values()]: + privileges = ROLES.get_by_key_val('abbr', privileges) + assert type(privileges) is int, ("privileges must be either a ROLES " + "role abbreviation or a ROLES code") + return privileges + + +def get_role(bot, update, user_record=None): + """Get role of `update` sender. + + Update user record as well. + """ + if type(update) is int: + user_id = update + # Mark this update as fake by adding a `notes` field + update = {'from': {'id': user_id, 'notes': 'Unavailable data'}} + else: + user_id = update['from']['id'] + assert type(user_id) is int, "user_id must be a telegram user id, "\ + "or an update object sent from it" + role = 100 + with bot.db as db: + if user_record is None: + user_record = db['users'].find_one( + telegram_id=user_id + ) + if user_record is None: + new_user = dict(telegram_id=user_id, privileges=100) + for key in [ + 'first_name', + 'last_name', + 'username', + 'language_code' + ]: + new_user[key] = ( + update['from'][key] + if key in update['from'] + else None + ) + db['users'].insert(new_user) + user_record = db['users'].find_one(telegram_id=user_id) + else: + new_user = dict() + for key in [ + 'first_name', + 'last_name', + 'username', + 'language_code' + ]: + new_user[key] = ( + update['from'][key] + if key in update['from'] + else None + ) + if ( + ( + key not in user_record + or new_user[key] != user_record[key] + ) + and 'notes' not in update['from'] # Exclude fake updates + ): + db['users_history'].insert( + dict( + until=datetime.datetime.now(), + user_id=user_record['id'], + field=key, + value=( + user_record[key] + if key in user_record + else None + ) + ) + ) + db['users'].update( + { + 'id': user_record['id'], + key: new_user[key] + }, + ['id'], + ensure=True + ) + if ( + user_record is not None + and 'privileges' in user_record + and user_record['privileges'] is not None + ): + role = user_record['privileges'] + return role + + +def get_authorization_function(bot): + """Take a bot and return its authorization function.""" + def is_authorized(update, user_record=None, authorization_level=2): + authorization_level = get_privilege_code(authorization_level) + # Channel posts will be considered as made by "anyone" + if ( + isinstance(update, dict) + and 'from' not in update + ): + role = 100 + else: + role = get_role(bot, update) + if any([ + not role, + role > authorization_level + ]): + return False + return True + return is_authorized