diff --git a/ciclopibot/__init__.py b/ciclopibot/__init__.py
index e69de29..616db1e 100644
--- a/ciclopibot/__init__.py
+++ b/ciclopibot/__init__.py
@@ -0,0 +1,8 @@
+"""Provide information about this package."""
+
+__author__ = "Davide Testa"
+__email__ = "davide@davte.it"
+__license__ = "GNU General Public License v3.0"
+__version__ = "1.0"
+__maintainer__ = "Davide Testa"
+__contact__ = "t.me/davte"
diff --git a/ciclopibot/bot.py b/ciclopibot/bot.py
new file mode 100644
index 0000000..6fe83ca
--- /dev/null
+++ b/ciclopibot/bot.py
@@ -0,0 +1,68 @@
+"""Provide bike sharing information via Telegram bot."""
+
+# Standard library modules
+import logging
+import os
+
+# Third party modules
+from davtelepot.bot import Bot
+
+# Project modules
+import ciclopi
+from data.passwords import bot_token
+import helper
+
+if __name__ == '__main__':
+ path = os.path.dirname(__file__)
+ try:
+ from data.config import log_file_name
+ except ImportError:
+ log_file_name = 'CicloPi.info.log'
+ try:
+ from data.config import errors_file_name
+ except ImportError:
+ errors_file_name = 'CicloPi.errors.log'
+ log_file = f"{path}/data/{log_file_name}"
+ errors_file = f"{path}/data/{errors_file_name}"
+
+ # Outputs the log in console, log_file and errors_file
+ # Log formatter: datetime, module name (filled with spaces up to 15
+ # characters), logging level name (filled to 8), message
+ log_formatter = logging.Formatter(
+ "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s",
+ style='%'
+ )
+ root_logger = logging.getLogger()
+ root_logger.setLevel(logging.DEBUG)
+
+ file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
+ file_handler.setFormatter(log_formatter)
+ file_handler.setLevel(logging.DEBUG)
+ root_logger.addHandler(file_handler)
+
+ file_handler = logging.FileHandler(errors_file, mode="a", encoding="utf-8")
+ file_handler.setFormatter(log_formatter)
+ file_handler.setLevel(logging.ERROR)
+ root_logger.addHandler(file_handler)
+
+ consoleHandler = logging.StreamHandler()
+ consoleHandler.setFormatter(log_formatter)
+ consoleHandler.setLevel(logging.DEBUG)
+ root_logger.addHandler(consoleHandler)
+
+ # Instantiate bot
+ bot = Bot(token=bot_token, database_url='ciclopibot/data/ciclopi.db')
+ # Assign commands to bot
+ ciclopi.init(bot)
+ helper.init(
+ bot=bot,
+ help_message="📖 Guida di {bot.name}\n\n"
+ "Benvenuto!\n"
+ "Per conoscere i comandi disponibili visita l'apposita "
+ "sezione della guida premendo il pulsante Comandi.\n\n"
+ "Autore e amministratore del bot: @davte",
+ help_sections_file='ciclopibot/data/help.json'
+ )
+ # Run bot(s)
+ logging.info("Presso ctrl+C to exit.")
+ Bot.run()
diff --git a/ciclopibot/ciclopi.py b/ciclopibot/ciclopi.py
new file mode 100644
index 0000000..cdb6110
--- /dev/null
+++ b/ciclopibot/ciclopi.py
@@ -0,0 +1,1490 @@
+"""Get information about bike sharing in Pisa.
+
+Available bikes in bike sharing stations.
+"""
+
+# Standard library modules
+import asyncio
+import datetime
+import math
+
+# Third party modules
+from davtelepot.utilities import (
+ async_wrapper, CachedPage, extract, get_cleaned_text,
+ line_drawing_unordered_list, make_button, make_inline_keyboard,
+ make_lines_of_buttons
+)
+
+_URL = "http://www.ciclopi.eu/frmLeStazioni.aspx"
+
+ciclopi_webpage = CachedPage.get(
+ _URL,
+ datetime.timedelta(seconds=15),
+ mode='html'
+)
+
+UNIT_TO_KM = {
+ 'km': 1,
+ 'm': 1000,
+ 'mi': 0.621371192,
+ 'nmi': 0.539956803,
+ 'ft': 3280.839895013,
+ 'in': 39370.078740158
+}
+
+CICLOPI_SETTINGS = {
+ 'sort': dict(
+ name="Ordina",
+ description="scegli in che ordine visualizzare le stazioni CicloPi.",
+ symbol="⏬"
+ ),
+ 'limit': dict(
+ name="Numero di stazioni",
+ description="scegli quante stazioni visualizzare.",
+ symbol="#️⃣"
+ ),
+ 'fav': dict(
+ name="Stazioni preferite",
+ description="cambia le tue stazioni preferite.",
+ symbol="⭐️"
+ ),
+ 'setpos': dict(
+ name="Cambia posizione",
+ description=(
+ "imposta una posizione da cui ordinare le stazioni per distanza."
+ ),
+ symbol='🧭'
+ )
+}
+
+CICLOPI_SORTING_CHOICES = {
+ 0: dict(
+ name='Scuola',
+ description='in ordine di distanza crescente da Scuola.',
+ short_description='per distanza da Scuola',
+ symbol='🏫'
+ ),
+ 1: dict(
+ name='Alfabetico',
+ description='in ordine alfabetico.',
+ short_description='per nome',
+ symbol='🔤'
+ ),
+ 2: dict(
+ name='Posizione',
+ description='in ordine di distanza crescente dall\'ultima posizione '
+ 'inviata. Di default sarà la posizione di Scuola.',
+ short_description='per distanza',
+ symbol='🧭'
+ ),
+ 3: dict(
+ name='Preferite',
+ description='nell\'ordine che hai scelto.',
+ short_description='in ordine personalizzato',
+ symbol='⭐️'
+ )
+}
+
+CICLOPI_STATIONS_TO_SHOW = {
+ -1: dict(
+ name="Solo le preferite",
+ symbol='⭐️'
+ ),
+ 0: dict(
+ name='Tutte',
+ symbol='💯'
+ ),
+ 3: dict(
+ name='3',
+ symbol='3️⃣'
+ ),
+ 5: dict(
+ name='5',
+ symbol='5️⃣'
+ ),
+ 10: dict(
+ name='10',
+ symbol='🔟'
+ )
+}
+
+
+def haversine_distance(lat1, lon1, lat2, lon2, degrees='dec', unit='m'):
+ """
+ Calculate the great circle distance between two points on Earth.
+
+ (specified in decimal degrees)
+ """
+ assert unit in UNIT_TO_KM, "Invalid distance unit of measurement!"
+ assert degrees in ['dec', 'rad'], "Invalid angle unit of measurement!"
+ # Convert decimal degrees to radians
+ if degrees == 'dec':
+ lon1, lat1, lon2, lat2 = map(
+ math.radians,
+ [lon1, lat1, lon2, lat2]
+ )
+ average_earth_radius = 6371.0088 * UNIT_TO_KM[unit]
+ return (
+ 2
+ * average_earth_radius
+ * math.asin(
+ math.sqrt(
+ math.sin((lat2 - lat1) * 0.5) ** 2
+ + math.cos(lat1)
+ * math.cos(lat2)
+ * math.sin((lon2 - lon1) * 0.5) ** 2
+ )
+ )
+ )
+
+
+class Location():
+ """Location in world map."""
+
+ def __init__(self, coordinates):
+ """Check and set instance attributes."""
+ assert type(coordinates) is tuple, "`coordinates` must be a tuple"
+ assert (
+ len(coordinates) == 2
+ and all(type(c) is float for c in coordinates)
+ ), "`coordinates` must be two floats"
+ self._coordinates = coordinates
+
+ @property
+ def coordinates(self):
+ """Return a tuple (latitude, longitude)."""
+ return self._coordinates
+
+ @property
+ def latitude(self):
+ """Return latitude."""
+ return self._coordinates[0]
+
+ @property
+ def longitude(self):
+ """Return longitude."""
+ return self._coordinates[1]
+
+ def get_distance(self, other, *args, **kwargs):
+ """Return the distance between two `Location`s."""
+ return haversine_distance(
+ self.latitude, self.longitude,
+ other.latitude, other.longitude,
+ *args, **kwargs
+ )
+
+
+default_location = Location(
+ (43.719821, 10.403021) # M. Libertà station
+)
+
+
+class Station(Location):
+ """CicloPi bike sharing station."""
+
+ stations = {
+ 1: dict(
+ name='Aeroporto',
+ coordinates=(43.699455, 10.400075),
+ ),
+ 2: dict(
+ name='Stazione F.S.',
+ coordinates=(43.708627, 10.399051),
+ ),
+ 3: dict(
+ name='Comune Palazzo Blu',
+ coordinates=(43.715541, 10.400505),
+ ),
+ 4: dict(
+ name='Teatro Tribunale',
+ coordinates=(43.716391, 10.405136),
+ ),
+ 5: dict(
+ name='Borgo Stretto',
+ coordinates=(43.718518, 10.402165),
+ ),
+ 6: dict(
+ name='Polo Marzotto',
+ coordinates=(43.719772, 10.407291),
+ ),
+ 7: dict(
+ name='Duomo',
+ coordinates=(43.722855, 10.391977),
+ ),
+ 8: dict(
+ name='Pietrasantina',
+ coordinates=(43.729020, 10.392726),
+ ),
+ 9: dict(
+ name='Paparelli',
+ coordinates=(43.724449, 10.410438),
+ ),
+ 10: dict(
+ name='Pratale',
+ coordinates=(43.7212554, 10.4180257),
+ ),
+ 11: dict(
+ name='Ospedale Cisanello',
+ coordinates=(43.705752, 10.441740),
+ ),
+ 12: dict(
+ name='Sms Biblioteca',
+ coordinates=(43.706565, 10.419136),
+ ),
+ 13: dict(
+ name='Vittorio Emanuele',
+ coordinates=(43.710182, 10.398751),
+ ),
+ 14: dict(
+ name='Palacongressi',
+ coordinates=(43.710014, 10.410232),
+ ),
+ 15: dict(
+ name='Porta a Lucca',
+ coordinates=(43.724247, 10.402269),
+ ),
+ 16: dict(
+ name='Solferino',
+ coordinates=(43.715698, 10.394999),
+ ),
+ 17: dict(
+ name='San Rossore F.S.',
+ coordinates=(43.718992, 10.384391),
+ ),
+ 18: dict(
+ name='Guerrazzi',
+ coordinates=(43.710358, 10.405337),
+ ),
+ 19: dict(
+ name='Livornese',
+ coordinates=(43.708114, 10.384021),
+ ),
+ 20: dict(
+ name='Cavalieri',
+ coordinates=(43.719856, 10.400194),
+ ),
+ 21: dict(
+ name='M. Libertà',
+ coordinates=(43.719821, 10.403021),
+ ),
+ 22: dict(
+ name='Galleria Gerace',
+ coordinates=(43.710791, 10.420456),
+ ),
+ 23: dict(
+ name='C. Marchesi',
+ coordinates=(43.714971, 10.419322),
+ ),
+ 24: dict(
+ name='CNR-Praticelli',
+ coordinates=(43.719256, 10.424012),
+ ),
+ 25: dict(
+ name='Sesta Porta',
+ coordinates=(43.709162, 10.395837),
+ ),
+ 26: dict(
+ name='Qualconia',
+ coordinates=(43.713011, 10.394458),
+ ),
+ 27: dict(
+ name='Donatello',
+ coordinates=(43.711715, 10.372480),
+ ),
+ 28: dict(
+ name='Spadoni',
+ coordinates=(43.716850, 10.391347),
+ ),
+ 29: dict(
+ name='Nievo',
+ coordinates=(43.738286, 10.400865),
+ ),
+ 30: dict(
+ name='Cisanello',
+ coordinates=(43.701159, 10.438863),
+ ),
+ 31: dict(
+ name='Edificio 3',
+ coordinates=(43.707869, 10.441698),
+ ),
+ 32: dict(
+ name='Edificio 6',
+ coordinates=(43.709046, 10.442541),
+ ),
+ 33: dict(
+ name='Frascani',
+ coordinates=(43.710157, 10.433339),
+ ),
+ 34: dict(
+ name='Chiarugi',
+ coordinates=(43.726244, 10.412882),
+ ),
+ 35: dict(
+ name='Praticelli 2',
+ coordinates=(43.719619, 10.427469),
+ ),
+ 36: dict(
+ name='Carducci',
+ coordinates=(43.726700, 10.420562),
+ ),
+ 37: dict(
+ name='Garibaldi',
+ coordinates=(43.718077, 10.418168),
+ ),
+ 38: dict(
+ name='Silvestro',
+ coordinates=(43.714128, 10.409065),
+ ),
+ 39: dict(
+ name='Pardi',
+ coordinates=(43.702273, 10.399793),
+ ),
+ }
+
+ def __init__(self, id=0, name='unknown', coordinates=(91.0, 181.0)):
+ """Check and set instance attributes."""
+ if id in self.__class__.stations:
+ coordinates = self.__class__.stations[id]['coordinates']
+ name = self.__class__.stations[id]['name']
+ Location.__init__(self, coordinates)
+ self._id = id
+ self._name = name
+ self._active = True
+ self._location = None
+ self._description = ''
+ self._distance = None
+ self._bikes = 0
+ self._free = 0
+
+ @property
+ def id(self):
+ """Return station identification number."""
+ return self._id
+
+ @property
+ def name(self):
+ """Return station name."""
+ return self._name
+
+ @property
+ def description(self):
+ """Return station description."""
+ return self._description
+
+ @property
+ def is_active(self):
+ """Return True if station is active."""
+ return self._active
+
+ @property
+ def location(self):
+ """Return location from which distance should be evaluated."""
+ if self._location is None:
+ return default_location
+ return self._location
+
+ @property
+ def distance(self):
+ """Return distance from `self.location`.
+
+ If distance is not evaluated yet, do it and store the result.
+ Otherwise, return stored value.
+ """
+ if self._distance is None:
+ self._distance = self.get_distance(self.location)
+ return self._distance
+
+ @property
+ def bikes(self):
+ """Return number of available bikes."""
+ return self._bikes
+
+ @property
+ def free(self):
+ """Return number of free slots."""
+ return self._free
+
+ def set_active(self, active):
+ """Change station status to `active`.
+
+ `active` should be either `True` or `False`.
+ """
+ assert type(active) is bool, "`active` should be a boolean."
+ self._active = active
+
+ def set_description(self, description):
+ """Change station description to `description`.
+
+ `description` should be a string.
+ """
+ assert type(description) is str, "`description` should be a boolean."
+ self._description = description
+
+ def set_location(self, location):
+ """Change station location to `location`.
+
+ `location` should be a Location object.
+ """
+ assert (
+ isinstance(location, Location)
+ ), "`location` should be a Location."
+ self._location = location
+
+ def set_bikes(self, bikes):
+ """Change number of available `bikes`.
+
+ `bikes` should be an int.
+ """
+ assert (
+ type(bikes) is int
+ ), "`bikes` should be an int."
+ self._bikes = bikes
+
+ def set_free(self, free):
+ """Change number of `free` bike parking slots.
+
+ `free` should be an int.
+ """
+ assert (
+ type(free) is int
+ ), "`free` should be an int."
+ self._free = free
+
+ @property
+ def status(self):
+ """Return station status to be shown to users.
+
+ It includes distance, location, available bikes and free stalls.
+ """
+ if self.bikes + self.free == 0:
+ bikes_and_stalls = "⚠️ Non disponibile"
+ else:
+ bikes_and_stalls = f"🚲 {self.bikes} | 🅿️ {self.free}"
+ return (
+ f"{self.name} | {self.description}\n"
+ f"
{bikes_and_stalls} | 📍 {self.distance:.0f} m"
+ ).format(
+ s=self
+ )
+
+
+def ciclopi_custom_sorter(custom_order):
+ """Return a function to sort stations by a `custom_order`."""
+ custom_values = {
+ record['station']: record['value']
+ for record in custom_order
+ }
+
+ def sorter(station):
+ """Take a station and return its queue value.
+
+ Stations will be sorted by queue value in ascending order.
+ """
+ if station.id in custom_values:
+ return (custom_values[station.id], station.name)
+ return (100, station.name)
+ return sorter
+
+
+def _get_stations(data, location):
+ stations = []
+ for _station in data.find_all(
+ "li",
+ attrs={"class": "rrItem"}
+ ):
+ station_name = _station.find(
+ "span",
+ attrs={"class": "Stazione"}
+ ).text
+ if 'Non operativa' in station_name:
+ active = False
+ else:
+ active = True
+ station_id = _station.find(
+ "div",
+ attrs={"class": "cssNumero"}
+ ).text
+ if (
+ station_id is None
+ or type(station_id) is not str
+ or not station_id.isnumeric()
+ ):
+ station_id = 0
+ else:
+ station_id = int(station_id)
+ station = Station(station_id)
+ station.set_active(active)
+ station.set_description(
+ _station.find(
+ "span",
+ attrs={"class": "TableComune"}
+ ).text.replace(
+ 'a`',
+ 'à'
+ )
+ )
+ bikes_text = _station.find(
+ "span",
+ attrs={"class": "Red"}
+ ).get_text('\t')
+ if bikes_text.count('\t') < 1:
+ bikes = 0
+ free = 0
+ else:
+ bikes, free, *other = [
+ int(
+ ''.join(
+ char
+ for char in s
+ if char.isnumeric()
+ )
+ )
+ for s in bikes_text.split('\t')
+ ]
+ station.set_bikes(bikes)
+ station.set_free(free)
+ station.set_location(location)
+ stations.append(
+ station
+ )
+ return stations
+
+
+async def set_ciclopi_location(bot, update, user_record):
+ """Take a location update and store it as CicloPi place.
+
+ CicloPi stations will be sorted by distance from this place.
+ """
+ location = update['location']
+ chat_id = update['chat']['id']
+ telegram_id = update['from']['id']
+ with bot.db as db:
+ db['ciclopi'].upsert(
+ dict(
+ chat_id=chat_id,
+ latitude=location['latitude'],
+ longitude=location['longitude']
+ ),
+ ['chat_id']
+ )
+ await bot.send_message(
+ chat_id=chat_id,
+ text=(
+ "Ho salvato questa posizione!\n"
+ "D'ora in poi ordinerò le stazioni dalla più vicina alla più "
+ "lontana da qui."
+ )
+ )
+ # Remove individual text message handler which was set to catch `/cancel`
+ bot.remove_individual_text_message_handler(telegram_id)
+ return await _ciclopi_command(bot, update, user_record)
+
+
+async def cancel_ciclopi_location(bot, update, user_record):
+ """Handle the situation in which a user does not send location on request.
+
+ This function is set as custom_parser when the bot requests user's location
+ and is removed if user does. If not, return a proper message.
+ """
+ text = get_cleaned_text(bot=bot, update=update)
+ if text.lower() == 'annulla':
+ return "Operazione annullata."
+ return (
+ "Non ho capito la tua posizione. Fai /ciclopi > Ordina... > "
+ "Posizione 🧭 per riprovare."
+ )
+
+
+async def _ciclopi_command(bot, update, user_record, sent_message=None,
+ show_all=False):
+ chat_id = update['chat']['id']
+ default_stations_to_show = 5
+ if sent_message is None:
+ await bot.sendChatAction(
+ chat_id=chat_id,
+ action='typing'
+ )
+ else:
+ await bot.edit_message_text(
+ update=sent_message,
+ text="Aggiornamento in corso...",
+ parse_mode='HTML',
+ reply_markup=None
+ )
+ ciclopi_data = await ciclopi_webpage.get_page()
+ if ciclopi_data is None or isinstance(ciclopi_data, Exception):
+ text = (
+ "Il sito del CicloPi è momentaneamente irraggiungibile, "
+ "riprova tra un po' :/"
+ )
+ else:
+ with bot.db as db:
+ ciclopi_record = db['ciclopi'].find_one(
+ chat_id=chat_id
+ )
+ custom_order = list(
+ db['ciclopi_custom_order'].find(
+ chat_id=chat_id
+ )
+ )
+ if (
+ ciclopi_record is not None
+ and isinstance(ciclopi_record, dict)
+ and 'sorting' in ciclopi_record
+ and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES
+ ):
+ sorting_code = ciclopi_record['sorting']
+ if (
+ 'latitude' in ciclopi_record
+ and ciclopi_record['latitude'] is not None
+ and 'longitude' in ciclopi_record
+ and ciclopi_record['longitude'] is not None
+ ):
+ saved_place = Location(
+ (
+ ciclopi_record['latitude'],
+ ciclopi_record['longitude']
+ )
+ )
+ else:
+ saved_place = default_location
+ else:
+ sorting_code = 0
+ if (
+ ciclopi_record is not None
+ and isinstance(ciclopi_record, dict)
+ and 'stations_to_show' in ciclopi_record
+ and ciclopi_record[
+ 'stations_to_show'
+ ] in CICLOPI_STATIONS_TO_SHOW
+ ):
+ stations_to_show = ciclopi_record[
+ 'stations_to_show'
+ ]
+ else:
+ stations_to_show = default_stations_to_show
+ location = (
+ saved_place if sorting_code != 0
+ else default_location
+ )
+ sorting_method = (
+ (lambda station: station.distance) if sorting_code in [0, 2]
+ else (lambda station: station.name) if sorting_code == 1
+ else ciclopi_custom_sorter(custom_order) if sorting_code == 3
+ else (lambda station: 0)
+ )
+ stations = sorted(
+ _get_stations(
+ ciclopi_data,
+ location
+ ),
+ key=sorting_method
+ )
+ if (
+ stations_to_show == -1
+ and not show_all
+ ):
+ stations = list(
+ filter(
+ lambda station: station.id in [
+ record['station']
+ for record in custom_order
+ ],
+ stations
+ )
+ )
+ if (
+ stations_to_show > 0
+ and sorting_code != 1
+ and not show_all
+ ):
+ stations = stations[:stations_to_show]
+ text = (
+ "🚲 Stazioni ciclopi {sort[short_description]}"
+ "{lim} {sort[symbol]}\n"
+ "\n"
+ "{s}"
+ ).format(
+ s=(
+ '\n\n'.join(
+ station.status
+ for station in stations
+ ) if len(stations)
+ else "- Nessuna stazione -"
+ ),
+ sort=CICLOPI_SORTING_CHOICES[sorting_code],
+ lim=(
+ " ({adv} le preferite)".format(
+ adv='prima' if show_all else 'solo'
+ ) if stations_to_show == -1
+ else " (prime {n})".format(
+ n=stations_to_show
+ )
+ if len(stations) < len(Station.stations)
+ else ""
+ )
+ )
+ if not text:
+ return
+ reply_markup = make_inline_keyboard(
+ (
+ [
+ make_button(
+ "💯 Tutte",
+ prefix='ciclopi:///',
+ data=['show', 'all']
+ )
+ ] if len(stations) < len(Station.stations)
+ else [
+ make_button(
+ "{sy[symbol]} {t}".format(
+ t=(
+ "Solo preferite" if stations_to_show == -1
+ else "Prime {n}"
+ ).format(
+ n=stations_to_show
+ ),
+ sy=CICLOPI_STATIONS_TO_SHOW[stations_to_show]
+ ),
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ] if show_all
+ else []
+ ) + [
+ make_button(
+ "🔄 Aggiorna",
+ prefix='ciclopi:///',
+ data=(
+ ['show'] + (
+ [] if len(stations) < len(Station.stations)
+ else ['all']
+ )
+ )
+ ),
+ make_button(
+ "📜 Legenda",
+ prefix='ciclopi:///',
+ data=['legend']
+ ),
+ make_button(
+ "⚙️ Impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ )
+ ],
+ 2
+ )
+ parameters = dict(
+ update=update,
+ text=text,
+ parse_mode='HTML',
+ reply_markup=reply_markup
+ )
+ method = (
+ bot.send_message
+ if sent_message is None
+ else bot.edit_message_text
+ )
+ await method(**parameters)
+ return
+
+
+async def _ciclopi_button_main(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ text = (
+ "⚙️ Impostazioni CicloPi 🚲\n"
+ "\n"
+ "{c}"
+ ).format(
+ c='\n'.join(
+ "- {s[symbol]} {s[name]}: {s[description]}".format(
+ s=setting
+ )
+ for setting in CICLOPI_SETTINGS.values()
+ )
+ )
+ reply_markup = make_inline_keyboard(
+ [
+ make_button(
+ text="{s[symbol]} {s[name]}".format(
+ s=setting
+ ),
+ prefix='ciclopi:///',
+ data=[code]
+ )
+ for code, setting in CICLOPI_SETTINGS.items()
+ ] + [
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ]
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_sort(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ chat_id = (
+ update['message']['chat']['id'] if 'message' in update
+ else update['chat']['id'] if 'chat' in update
+ else 0
+ )
+ with bot.db as db:
+ ciclopi_record = db['ciclopi'].find_one(
+ chat_id=chat_id
+ )
+ if ciclopi_record is None:
+ ciclopi_record = dict(
+ chat_id=chat_id,
+ sorting=0
+ )
+ if len(arguments) == 1:
+ new_choice = (
+ int(arguments[0])
+ if arguments[0].isnumeric()
+ else 0
+ )
+ if new_choice == ciclopi_record['sorting']:
+ return "È già così!", '', None
+ elif new_choice not in CICLOPI_SORTING_CHOICES:
+ return "Opzione sconosciuta!", '', None
+ db['ciclopi'].upsert(
+ dict(
+ chat_id=chat_id,
+ sorting=new_choice
+ ),
+ ['chat_id'],
+ ensure=True
+ )
+ ciclopi_record['sorting'] = new_choice
+ result = "Fatto!"
+ text = (
+ "📜 Modalità di visualizzazione delle stazioni CicloPi 🚲\n\n"
+ "{options}\n\n"
+ "Scegli una nuova modalità o torna all'elenco delle stazioni."
+ ).format(
+ options='\n'.join(
+ "- {c[symbol]} {c[name]}: {c[description]}".format(
+ c=choice
+ )
+ for choice in CICLOPI_SORTING_CHOICES.values()
+ )
+ )
+ reply_markup = make_inline_keyboard(
+ [
+ make_button(
+ text="{s} {c[name]} {c[symbol]}".format(
+ c=choice,
+ s=(
+ '✅'
+ if code == ciclopi_record['sorting']
+ else '☑️'
+ )
+ ),
+ prefix='ciclopi:///',
+ data=['sort', code]
+ )
+ for code, choice in CICLOPI_SORTING_CHOICES.items()
+ ] + [
+ make_button(
+ text="⚙️ Torna alle impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ ),
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ]
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_limit(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ chat_id = (
+ update['message']['chat']['id'] if 'message' in update
+ else update['chat']['id'] if 'chat' in update
+ else 0
+ )
+ with bot.db as db:
+ ciclopi_record = db['ciclopi'].find_one(
+ chat_id=chat_id
+ )
+ if ciclopi_record is None or 'stations_to_show' not in ciclopi_record:
+ ciclopi_record = dict(
+ chat_id=chat_id,
+ stations_to_show=5
+ )
+ if len(arguments) == 1:
+ new_choice = (
+ int(arguments[0])
+ if arguments[0].lstrip('+-').isnumeric()
+ else 0
+ )
+ if new_choice == ciclopi_record['stations_to_show']:
+ return "È già così!", '', None
+ elif new_choice not in CICLOPI_STATIONS_TO_SHOW:
+ return "Opzione sconosciuta!", '', None
+ db['ciclopi'].upsert(
+ dict(
+ chat_id=chat_id,
+ stations_to_show=new_choice
+ ),
+ ['chat_id'],
+ ensure=True
+ )
+ ciclopi_record['stations_to_show'] = new_choice
+ result = "Fatto!"
+ text = (
+ "📜 Modalità di visualizzazione delle stazioni CicloPi 🚲\n\n"
+ "{options}\n\n"
+ "Scegli quante stazioni vedere (quando filtrate per distanza) o torna "
+ "alle impostazioni o all'elenco delle stazioni."
+ ).format(
+ options='\n'.join(
+ "- {c[symbol]} {c[name]}".format(
+ c=choice
+ )
+ for choice in CICLOPI_STATIONS_TO_SHOW.values()
+ )
+ )
+ reply_markup = make_inline_keyboard(
+ [
+ make_button(
+ text="{s} {c[name]} {c[symbol]}".format(
+ c=choice,
+ s=(
+ '✅'
+ if code == ciclopi_record['stations_to_show']
+ else '☑️'
+ )
+ ),
+ prefix='ciclopi:///',
+ data=['limit', code]
+ )
+ for code, choice in CICLOPI_STATIONS_TO_SHOW.items()
+ ] + [
+ make_button(
+ text="⚙️ Torna alle impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ ),
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ]
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_show(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ fake_update = update['message']
+ fake_update['from'] = update['from']
+ asyncio.ensure_future(
+ _ciclopi_command(
+ bot=bot,
+ update=fake_update,
+ user_record=user_record,
+ sent_message=fake_update,
+ show_all=(
+ True if len(arguments) == 1 and arguments[0] == 'all'
+ else False
+ )
+ )
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_legend(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ text = (
+ "{s[name]} | {s[description]}\n"
+ "
🚲 {s[bikes]} | 🅿️ {s[free]} | 📍 {s[distance]}"
+ ).format(
+ s={
+ 'name': "Nome della stazione",
+ 'distance': "Distanza in m",
+ 'description': "Indirizzo della stazione",
+ 'bikes': "Bici disponibili",
+ 'free': "Posti liberi"
+ }
+ )
+ reply_markup = make_inline_keyboard(
+ [
+ make_button(
+ text="⚙️ Torna alle impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ ),
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ]
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_favorites_add(bot, update, user_record, arguments,
+ order_record, ordered_stations):
+ result, text, reply_markup = '', '', None
+ result = "Seleziona le stazioni da aggiungere"
+ if len(arguments) == 2 and arguments[1].isnumeric():
+ station_id = int(arguments[1])
+ chat_id = (
+ update['message']['chat']['id'] if 'message' in update
+ else update['chat']['id'] if 'chat' in update
+ else 0
+ )
+ with bot.db as db:
+ if station_id in (s.id for s in ordered_stations): # Remove
+ # Find `old_record` to be removed
+ for old_record in order_record:
+ if old_record['station'] == station_id:
+ break
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = value - 1
+ WHERE chat_id = {chat_id}
+ AND value > {val}
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ db['ciclopi_custom_order'].delete(
+ id=old_record['id']
+ )
+ order_record = list(
+ filter(
+ (lambda r: r['station'] != station_id),
+ order_record
+ )
+ )
+ ordered_stations = list(
+ filter(
+ (lambda s: s.id != station_id),
+ ordered_stations
+ )
+ )
+ else: # Add
+ new_record = dict(
+ chat_id=chat_id,
+ station=station_id,
+ value=(len(order_record) + 1)
+ )
+ db['ciclopi_custom_order'].upsert(
+ new_record,
+ ['chat_id', 'station'],
+ ensure=True
+ )
+ order_record.append(new_record)
+ ordered_stations.append(
+ Station(station_id)
+ )
+ text = (
+ "🚲 Stazioni preferite ⭐️\n"
+ "{options}\n\n"
+ "Aggiungi o togli le tue stazioni preferite."
+ ).format(
+ options=line_drawing_unordered_list(
+ [
+ station.name
+ for station in ordered_stations
+ ]
+ )
+ )
+ reply_markup = dict(
+ inline_keyboard=make_lines_of_buttons(
+ [
+ make_button(
+ text=(
+ "{sy} {n}"
+ ).format(
+ sy=(
+ '✅' if station_id in [
+ s.id for s in ordered_stations
+ ]
+ else '☑️'
+ ),
+ n=station['name']
+ ),
+ prefix='ciclopi:///',
+ data=['fav', 'add', station_id]
+ )
+ for station_id, station in sorted(
+ Station.stations.items(),
+ key=lambda t: t[1]['name'] # Sort by station_name
+ )
+ ],
+ 3
+ ) + make_lines_of_buttons(
+ [
+ make_button(
+ text="🔃 Riordina",
+ prefix="ciclopi:///",
+ data=["fav"]
+ ),
+ make_button(
+ text="⚙️ Torna alle impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ ),
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ],
+ 3
+ )
+ )
+ return result, text, reply_markup
+
+
+def move_favorite_station(
+ bot, chat_id, action, station_id,
+ order_record
+):
+ """Move a station in `chat_id`-associated custom order.
+
+ `bot`: Bot object, having a `.db` property.
+ `action`: should be `up` or `down`
+ `order_record`: list of records about `chat_id`-associated custom order.
+ """
+ assert action in ('up', 'down'), "Invalid action!"
+ for old_record in order_record:
+ if old_record['station'] == station_id:
+ break
+ with bot.db as db:
+ if action == 'down':
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = 500
+ WHERE chat_id = {chat_id}
+ AND value = {val} + 1
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = value + 1
+ WHERE chat_id = {chat_id}
+ AND value = {val}
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = {val}
+ WHERE chat_id = {chat_id}
+ AND value = 500
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ elif action == 'up':
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = 500
+ WHERE chat_id = {chat_id}
+ AND value = {val} - 1
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = value - 1
+ WHERE chat_id = {chat_id}
+ AND value = {val}
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ db.query(
+ """UPDATE ciclopi_custom_order
+ SET value = {val}
+ WHERE chat_id = {chat_id}
+ AND value = 500
+ """.format(
+ chat_id=chat_id,
+ val=old_record['value']
+ )
+ )
+ order_record = list(
+ db['ciclopi_custom_order'].find(
+ chat_id=chat_id,
+ order_by=['value']
+ )
+ )
+ ordered_stations = [
+ Station(record['station'])
+ for record in order_record
+ ]
+ return order_record, ordered_stations
+
+
+async def _ciclopi_button_favorites(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ action = (
+ arguments[0] if len(arguments) > 0
+ else 'up'
+ )
+ chat_id = (
+ update['message']['chat']['id'] if 'message' in update
+ else update['chat']['id'] if 'chat' in update
+ else 0
+ )
+ with bot.db as db:
+ order_record = list(
+ db['ciclopi_custom_order'].find(
+ chat_id=chat_id,
+ order_by=['value']
+ )
+ )
+ ordered_stations = [
+ Station(record['station'])
+ for record in order_record
+ ]
+ if action == 'add':
+ return await _ciclopi_button_favorites_add(
+ bot, update, user_record, arguments,
+ order_record, ordered_stations
+ )
+ elif action == 'dummy':
+ return 'Capolinea!', '', None
+ elif action == 'set' and len(arguments) > 1:
+ action = arguments[1]
+ elif (
+ action in ['up', 'down']
+ and len(arguments) > 1
+ and arguments[1].isnumeric()
+ ):
+ station_id = int(arguments[1])
+ order_record, ordered_stations = move_favorite_station(
+ bot, chat_id, action, station_id,
+ order_record
+ )
+ text = (
+ "🚲 Stazioni preferite ⭐️\n"
+ "{options}\n\n"
+ "Aggiungi, togli o riordina le tue stazioni preferite."
+ ).format(
+ options=line_drawing_unordered_list(
+ [
+ station.name
+ for station in ordered_stations
+ ]
+ )
+ )
+ reply_markup = dict(
+ inline_keyboard=[
+ [
+ make_button(
+ text="{s.name} {sy}".format(
+ sy=(
+ '⬆️' if (
+ action == 'up'
+ and n != 1
+ ) else '⬇️' if (
+ action == 'down'
+ and n != len(ordered_stations)
+ ) else '⏹'
+ ),
+ s=station
+ ),
+ prefix='ciclopi:///',
+ data=[
+ 'fav',
+ (
+ action if (
+ action == 'up'
+ and n != 1
+ ) or (
+ action == 'down'
+ and n != len(ordered_stations)
+ )
+ else 'dummy'
+ ),
+ station.id
+ ]
+ )
+ ]
+ for n, station in enumerate(ordered_stations, 1)
+ ] + [
+ [
+ make_button(
+ text="➕ Aggiungi stazione preferita ⭐️",
+ prefix='ciclopi:///',
+ data=['fav', 'add']
+ )
+ ]
+ ] + [
+ [
+ (
+ make_button(
+ text='Sposta in basso ⬇️',
+ prefix='ciclopi:///',
+ data=['fav', 'set', 'down']
+ ) if action == 'up'
+ else make_button(
+ text='Sposta in alto ⬆️',
+ prefix='ciclopi:///',
+ data=['fav', 'set', 'up']
+ )
+ )
+ ]
+ ] + [
+ [
+ make_button(
+ text="⚙️ Torna alle impostazioni",
+ prefix='ciclopi:///',
+ data=['main']
+ ),
+ make_button(
+ text="🚲 Torna alle stazioni",
+ prefix='ciclopi:///',
+ data=['show']
+ )
+ ]
+ ]
+ )
+ return result, text, reply_markup
+
+
+async def _ciclopi_button_setpos(bot, update, user_record, arguments):
+ result, text, reply_markup = '', '', None
+ chat_id = (
+ update['message']['chat']['id'] if 'message' in update
+ else update['chat']['id'] if 'chat' in update
+ else 0
+ )
+ result = "Inviami una posizione!"
+ bot.set_individual_location_handler(
+ await async_wrapper(
+ set_ciclopi_location
+ ),
+ update
+ )
+ bot.set_individual_text_message_handler(
+ cancel_ciclopi_location,
+ update
+ )
+ asyncio.ensure_future(
+ bot.send_message(
+ chat_id=chat_id,
+ text=(
+ "Inviami una posizione.\n"
+ "Per inviare la tua posizione attuale, usa il "
+ "pulsante."
+ ),
+ reply_markup=dict(
+ keyboard=[
+ [
+ dict(
+ text="Invia la mia posizione",
+ request_location=True
+ )
+ ],
+ [
+ dict(
+ text="Annulla"
+ )
+ ]
+ ],
+ resize_keyboard=True
+ )
+ )
+ )
+ return result, text, reply_markup
+
+_ciclopi_button_routing_table = {
+ 'main': _ciclopi_button_main,
+ 'sort': _ciclopi_button_sort,
+ 'limit': _ciclopi_button_limit,
+ 'show': _ciclopi_button_show,
+ 'setpos': _ciclopi_button_setpos,
+ 'legend': _ciclopi_button_legend,
+ 'fav': _ciclopi_button_favorites
+}
+
+
+async def _ciclopi_button(bot, update, user_record):
+ data = update['data']
+ command, *arguments = extract(data, ':///').split('|')
+ if command in _ciclopi_button_routing_table:
+ result, text, reply_markup = await _ciclopi_button_routing_table[
+ command
+ ](
+ bot, update, user_record, arguments
+ )
+ else:
+ return
+ if text:
+ return dict(
+ text=result,
+ edit=dict(
+ text=text,
+ parse_mode='HTML',
+ reply_markup=reply_markup
+ )
+ )
+ return result
+
+
+def init(bot):
+ """Take a bot and assign commands to it."""
+ with bot.db as db:
+ if 'ciclopi_stations' not in db.tables:
+ db['ciclopi_stations'].insert_many(
+ sorted(
+ [
+ dict(
+ station_id=station_id,
+ name=station['name'],
+ latitude=station['coordinates'][0],
+ longitude=station['coordinates'][1]
+ )
+ for station_id, station in Station.stations.items()
+ ],
+ key=(lambda station: station['station_id'])
+ )
+ )
+ if 'ciclopi' not in db.tables:
+ db['ciclopi'].insert(
+ dict(
+ chat_id=0,
+ sorting=0,
+ latitude=0.0,
+ longitude=0.0,
+ stations_to_show=-1
+ )
+ )
+
+ @bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"],
+ show_in_keyboard=True,
+ description="Stato delle stazioni CicloPi",
+ authorization_level='everybody')
+ async def ciclopi_command(bot, update, user_record):
+ return await _ciclopi_command(bot, update, user_record)
+
+ @bot.button(prefix='ciclopi:///', authorization_level='everybody')
+ async def ciclopi_button(bot, update, user_record):
+ return await _ciclopi_button(bot, update, user_record)
diff --git a/ciclopibot/data/help.json b/ciclopibot/data/help.json
new file mode 100644
index 0000000..f1c25a5
--- /dev/null
+++ b/ciclopibot/data/help.json
@@ -0,0 +1,8 @@
+[
+ {
+ "label": "CicloPi 🚲",
+ "abbr": "ciclopi",
+ "auth": "everybody",
+ "descr": "Vedi quante bici disponibili e quanti posti liberi ci sono in ogni stazione CicloPi."
+ }
+]
diff --git a/ciclopibot/helper.py b/ciclopibot/helper.py
new file mode 100644
index 0000000..371e864
--- /dev/null
+++ b/ciclopibot/helper.py
@@ -0,0 +1,190 @@
+"""Make a self-consistent bot help section."""
+
+# Third party modules
+from davtelepot.utilities import (
+ extract, get_cleaned_text, json_read, make_inline_keyboard,
+ make_lines_of_buttons, make_button, MyOD
+)
+
+# Project modules
+import roles
+
+DENY_MESSAGE = (
+ "Chiedi di essere autorizzato: se la tua richiesta verrà accolta, "
+ "ripeti il comando /help per leggere il messaggio di aiuto."
+)
+
+
+def get_command_description(bot, update, user_record):
+ """Get a string description of `bot` commands.
+
+ Show only commands available for `update` sender.
+ """
+ user_role = roles.get_role(bot=bot, update=update, user_record=user_record)
+ return "\n".join(
+ [
+ "/{}: {}".format(
+ command,
+ details['description']
+ )
+ for command, details in sorted(
+ bot.commands.items(),
+ key=lambda x:x[0]
+ )
+ if details['description']
+ and user_role <= roles.get_privilege_code(
+ details['authorization_level']
+ )
+ ]
+ )
+
+
+def _make_button(x, y):
+ if not y.startswith('help:///'):
+ y = 'help:///{}'.format(y)
+ return make_button(x, y)
+
+
+HELP_MENU_BUTTON = make_inline_keyboard(
+ [
+ _make_button(
+ 'Torna al menu Guida 📖',
+ 'menu'
+ )
+ ],
+ 1
+)
+
+
+def get_help_buttons(bot, update, user_record):
+ """Get `bot` help menu inline keyboard.
+
+ Show only buttons available for `update` sender.
+ """
+ user_role = roles.get_role(bot=bot, update=update, user_record=user_record)
+ buttons_list = [
+ _make_button(
+ section['label'],
+ section['abbr']
+ )
+ for section in bot.help_sections.values()
+ if 'auth' in section
+ and user_role <= roles.get_privilege_code(
+ section['auth']
+ )
+ ]
+ return dict(
+ inline_keyboard=(
+ make_lines_of_buttons(buttons_list, 3)
+ + make_lines_of_buttons(
+ [
+ _make_button('Comandi 🤖', 'commands')
+ ],
+ 1
+ )
+ + (
+ bot.help_buttons
+ if bot.authorization_function(update=update,
+ authorization_level='user')
+ else []
+ )
+ )
+ )
+
+
+async def _help_command(bot, update, user_record):
+ if not bot.authorization_function(update=update,
+ authorization_level='everybody'):
+ return DENY_MESSAGE
+ reply_markup = get_help_buttons(bot, update, user_record)
+ return dict(
+ text=bot.help_message.format(bot=bot),
+ parse_mode='HTML',
+ reply_markup=reply_markup,
+ disable_web_page_preview=True
+ )
+
+
+async def _help_button(bot, update, user_record):
+ data = update['data']
+ command = extract(data, ':///')
+ result, text, rm = '', '', None
+ if command == 'commands':
+ text = "Comandi di {bot.name}\n\n{cd}".format(
+ bot=bot,
+ cd=get_command_description(bot, update, user_record)
+ )
+ rm = HELP_MENU_BUTTON
+ elif command == 'menu':
+ text = bot.help_message.format(bot=bot)
+ rm = get_help_buttons(bot, update, user_record)
+ else:
+ for code, section in bot.help_sections.items():
+ if section['abbr'] == command:
+ if not bot.authorization_function(
+ update=update,
+ authorization_level=section['auth']
+ ):
+ return "Non sei autorizzato!"
+ rm = HELP_MENU_BUTTON
+ text = (
+ '{s[label]}\n\n{s[descr]}'
+ ).format(
+ s=section
+ ).format(
+ bot=bot
+ )
+ break
+ if text or rm:
+ return dict(
+ text=result,
+ edit=dict(
+ text=text,
+ parse_mode='HTML',
+ reply_markup=rm,
+ disable_web_page_preview=True
+ )
+ )
+ return result
+
+
+async def _start_command(bot, update, user_record):
+ text = get_cleaned_text(update=update, bot=bot, replace=['start'])
+ if not text:
+ return await _help_command(bot, update, user_record)
+ update['text'] = text
+ await bot.text_message_handler(
+ update=update,
+ user_record=None
+ )
+ return
+
+
+def init(bot, help_message="Guida",
+ help_sections_file='data/help.json', help_buttons=[]):
+ """Assign parsers, commands, buttons and queries to given `bot`."""
+ bot.help_message = help_message
+ bot.help_buttons = help_buttons
+ bot.help_sections = MyOD()
+ for code, section in enumerate(
+ json_read(
+ help_sections_file,
+ default=[]
+ )
+ ):
+ bot.help_sections[code] = section
+
+ @bot.command("/start", authorization_level='everybody')
+ async def start_command(bot, update, user_record):
+ return await _start_command(bot, update, user_record)
+
+ @bot.command(command='/help', aliases=['Guida 📖', '00help'],
+ show_in_keyboard=True, description="Aiuto",
+ authorization_level='everybody')
+ async def help_command(bot, update, user_record):
+ result = await _help_command(bot, update, user_record)
+ return result
+
+ @bot.button(prefix='help:///', authorization_level='everybody')
+ async def help_button(bot, update, user_record):
+ return await _help_button(bot, update, user_record)
diff --git a/ciclopibot/roles.py b/ciclopibot/roles.py
new file mode 100644
index 0000000..5c55e2f
--- /dev/null
+++ b/ciclopibot/roles.py
@@ -0,0 +1,450 @@
+"""Handle authorization-related functions."""
+
+# Standard library modules
+import datetime
+import json
+
+# Third party modules
+from davtelepot.utilities import (
+ Confirmator, extract, get_cleaned_text, make_button, make_inline_keyboard,
+ MyOD
+)
+
+ROLES = MyOD()
+ROLES[0] = {'abbr': 'banned',
+ 'symbol': '🚫',
+ 'plural': 'bannati',
+ 'singular': 'bannato',
+ 'can_appoint': [],
+ 'can_be_appointed': [1, 2, 3]
+ }
+ROLES[1] = {'abbr': 'founder',
+ 'symbol': '👑',
+ 'plural': 'fondatori',
+ 'singular': 'fondatore',
+ 'can_appoint': [0, 1, 2, 3, 4, 5, 7, 100],
+ 'can_be_appointed': []
+ }
+ROLES[2] = {'abbr': 'admin',
+ 'symbol': '⚜️',
+ 'plural': 'amministratori',
+ 'singular': 'amministratore',
+ 'can_appoint': [0, 3, 4, 5, 7, 100],
+ 'can_be_appointed': [1]
+ }
+ROLES[3] = {'abbr': 'moderator',
+ 'symbol': '🔰',
+ 'plural': 'moderatori',
+ 'singular': 'moderatore',
+ 'can_appoint': [0, 5, 7],
+ 'can_be_appointed': [1, 2]
+ }
+ROLES[5] = {'abbr': 'user',
+ 'symbol': '🎫',
+ 'plural': 'utenti registrati',
+ 'singular': 'utente registrato',
+ 'can_appoint': [],
+ 'can_be_appointed': [1, 2, 3]
+ }
+ROLES[100] = {'abbr': 'everybody',
+ 'symbol': '👤',
+ 'plural': 'chiunque',
+ 'singular': 'chiunque',
+ 'can_appoint': [],
+ 'can_be_appointed': [1, 2, 3]
+ }
+
+
+def _get_user_role_panel(user_record):
+ text = """👤 {u[username]}
+🔑 {r} {s}
+ """.format(
+ u=user_record,
+ r=ROLES[user_record['privileges']]['singular'].capitalize(),
+ s=ROLES[user_record['privileges']]['symbol'],
+ )
+ buttons = [
+ make_button(
+ '{s} {r}'.format(
+ s=role['symbol'],
+ r=role['singular'].capitalize()
+ ),
+ 'auth:///set|{a[id]}_{c}'.format(
+ c=code,
+ a=user_record
+ )
+ )
+ for code, role in ROLES.items()
+ ]
+ return text, buttons
+
+
+async def _authorization_command(bot, update, user_record):
+ text = get_cleaned_text(bot=bot, update=update, replace=['auth'])
+ reply_markup = None
+ result = 'Caso non previsto :/
'
+ if not text:
+ if 'reply_to_message' not in update:
+ result = "Usa questo comando in risposta a un utente registrato "\
+ "(oppure scrivi /auth username
) per "\
+ "cambiarne il grado di autorizzazione."
+ else:
+ with bot.db as db:
+ user_record = db['users'].find_one(
+ telegram_id=update['reply_to_message']['from']['id']
+ )
+ if not user_record:
+ result = "Chi ha inviato questo messaggio non è un utente "\
+ "registrato.\nDeve essere lui ad avviare il bot e "\
+ "inviare il comando /askauth\nPotrai allora "\
+ "modificare i suoi permessi rispondendo a un suo "\
+ "messaggio (come hai fatto ora)."
+ else:
+ result, buttons = _get_user_role_panel(user_record)
+ reply_markup = make_inline_keyboard(buttons, 1)
+ else:
+ with bot.db as db:
+ user_record = list(
+ db.query(
+ """SELECT *
+ FROM users
+ WHERE username LIKE '{}%'
+ """.format(
+ text
+ )
+ )
+ )
+ if not user_record:
+ result = "Utente sconosciuto"
+ else:
+ user_record = user_record[0]
+ result, buttons = _get_user_role_panel(user_record)
+ reply_markup = make_inline_keyboard(buttons, 1)
+ return dict(
+ text=result,
+ reply_markup=reply_markup,
+ parse_mode='HTML'
+ )
+
+
+async def _ask_for_authorization_command(bot, update, user_record):
+ chat_id = update['chat']['id']
+ username = (
+ update['from']['username']
+ if 'username' in update['from']
+ else None
+ )
+ if chat_id < 0:
+ return dict(
+ chat_id=chat_id,
+ text="Passa a una chat privata con @{} per questa funzione. "
+ "Dovrai prima fare /start, se non hai ancora mai "
+ "usato il bot.".format(
+ bot.name
+ )
+ )
+ user_id = update['from']['id']
+ with bot.db as db:
+ check = db['users'].find_one(telegram_id=user_id)
+ admins = db['users'].find(privileges=[1, 2])
+ if check:
+ if not check['privileges']:
+ return "Sei stato bannato!"
+ return "Sei già registrato"
+ for admin in admins:
+ await bot.send_message(
+ chat_id=admin['telegram_id'],
+ text="""Vuoi autorizzare il seguente """
+ """utente?\n"""
+ """{data}
""".format(
+ data=json.dumps(
+ update['from'],
+ indent=2
+ ),
+ user=user_id
+ ),
+ parse_mode="HTML",
+ reply_markup=dict(
+ inline_keyboard=[
+ [
+ make_button(
+ "Autorizza",
+ "auth:///auth|{i}_{n}".format(
+ i=user_id,
+ n=username
+ )
+ ),
+ make_button(
+ "Banna",
+ "auth:///ban|{i}_{n}".format(
+ i=user_id,
+ n=username
+ )
+ )
+ ]
+ ]
+ )
+ )
+ return "Richiesta di autorizzazione inoltrata."
+
+
+async def _ban_command(bot, update, user_record):
+ chat_id = update['chat']['id']
+ if 'reply_to_message' not in update:
+ return dict(
+ text="Questo comando va usato in risposta",
+ chat_id=chat_id
+ )
+ user_id = update['reply_to_message']['from']['id']
+ with bot.db as db:
+ record = db['users'].find_one(telegram_id=user_id)
+ if record and record['privileges'] == 0:
+ return dict(text="Questo utente è già bannato", chat_id=chat_id)
+ db['users'].upsert(
+ dict(
+ telegram_id=user_id,
+ privileges=0
+ ),
+ ['telegram_id']
+ )
+ return dict(text="Utente bannato.", chat_id=chat_id)
+
+
+async def _authorization_button(bot, update, user_record):
+ data = update['data']
+ command = extract(data, ':///', '|')
+ arguments = extract(data, "|").split('_')
+ user_id = update['from']['id']
+ other_user_id = int(arguments[0])
+ result, text, reply_markup = '', '', None
+ if command in ['auth', 'ban']:
+ username = arguments[1]
+ if command in ['auth']:
+ with bot.db as db:
+ record = db['users'].find_one(telegram_id=user_id)
+ if record:
+ return "Queste utente è già autorizzato."
+ db['users'].upsert(
+ dict(
+ telegram_id=user_id,
+ privileges=5,
+ username=username
+ ),
+ ['telegram_id']
+ )
+ await bot.send_message(
+ chat_id=user_id,
+ text="Sei stato autorizzato a usare il bot :D Per info: /help"
+ )
+ result = "Utente autorizzato."
+ elif command in ['ban']:
+ with bot.db as db:
+ record = db['users'].find_one(telegram_id=user_id)
+ if record and record['privileges'] == 0:
+ return "Questo utente è già bannato"
+ db['users'].upsert(
+ dict(
+ telegram_id=user_id,
+ privileges=0,
+ username=username
+ ),
+ ['telegram_id']
+ )
+ result = "Utente bannato."
+ elif command in ['set']:
+ other_user_id, other_user_privileges = (int(x) for x in arguments)
+ if not Confirmator.get(
+ key='{}_set_{}'.format(
+ user_id,
+ other_user_id
+ ),
+ confirm_timedelta=5
+ ).confirm:
+ return "Sicuro sicuro?"
+ with bot.db as db:
+ user_record = db['users'].find_one(telegram_id=user_id)
+ other_user_record = db['users'].find_one(id=other_user_id)
+ if other_user_record is None:
+ other_user_record = dict(privileges=100)
+ if (
+ other_user_privileges not in (
+ ROLES[user_record['privileges']]['can_appoint']
+ )
+ or user_record['privileges'] not in (
+ ROLES[other_user_record['privileges']]['can_be_appointed']
+ )
+ ):
+ result = "Permesso negato"
+ text = "Non hai l'autorità di conferire questo grado di "\
+ "autorizzazione a questo utente!"
+ buttons = [
+ make_button(
+ 'Torna all\'utente',
+ 'auth:///show|{}'.format(
+ other_user_id
+ )
+ )
+ ]
+ reply_markup = make_inline_keyboard(buttons, 1)
+ else:
+ with bot.db as db:
+ db['users'].update(
+ dict(
+ id=other_user_id,
+ privileges=other_user_privileges
+ ),
+ ['id']
+ )
+ other_user_record = db['users'].find_one(id=other_user_id)
+ result = "Permesso conferito"
+ text, buttons = _get_user_role_panel(other_user_record)
+ reply_markup = make_inline_keyboard(buttons, 1)
+ elif command in ['show']:
+ with bot.db as db:
+ other_user_record = db['users'].find_one(id=other_user_id)
+ text, buttons = _get_user_role_panel(other_user_record)
+ reply_markup = make_inline_keyboard(buttons, 1)
+ if text:
+ return dict(
+ text=result,
+ edit=dict(
+ text=text,
+ reply_markup=reply_markup,
+ parse_mode='HTML'
+ )
+ )
+ return result
+
+
+def init(bot):
+ """Assign parsers, commands, buttons and queries to given `bot`."""
+ @bot.command(command='/auth', aliases=[], show_in_keyboard=False,
+ description="Cambia il grado di autorizzazione di un utente "
+ "(in risposta o scrivendone l'utenza)",
+ authorization_level='moderator')
+ async def authorization_command(bot, update, user_record):
+ return await _authorization_command(bot, update, user_record)
+
+ @bot.button('auth:///', authorization_level='admin')
+ async def authorization_button(bot, update, user_record):
+ return await _authorization_button(bot, update, user_record)
+
+ @bot.command('/ban', description="Banna l'utente (da usare in risposta)",
+ authorization_level='admin')
+ async def ban_command(bot, update, user_record):
+ return await _ban_command(bot, update, user_record)
+
+
+def get_privilege_code(privileges):
+ """Get privilege code."""
+ if not privileges:
+ privileges = 'everybody'
+ if privileges in [x['abbr'] for x in ROLES.values()]:
+ privileges = ROLES.get_by_key_val('abbr', privileges)
+ assert type(privileges) is int, ("privileges must be either a ROLES "
+ "role abbreviation or a ROLES code")
+ return privileges
+
+
+def get_role(bot, update, user_record=None):
+ """Get role of `update` sender.
+
+ Update user record as well.
+ """
+ if type(update) is int:
+ user_id = update
+ # Mark this update as fake by adding a `notes` field
+ update = {'from': {'id': user_id, 'notes': 'Unavailable data'}}
+ else:
+ user_id = update['from']['id']
+ assert type(user_id) is int, "user_id must be a telegram user id, "\
+ "or an update object sent from it"
+ role = 100
+ with bot.db as db:
+ if user_record is None:
+ user_record = db['users'].find_one(
+ telegram_id=user_id
+ )
+ if user_record is None:
+ new_user = dict(telegram_id=user_id, privileges=100)
+ for key in [
+ 'first_name',
+ 'last_name',
+ 'username',
+ 'language_code'
+ ]:
+ new_user[key] = (
+ update['from'][key]
+ if key in update['from']
+ else None
+ )
+ db['users'].insert(new_user)
+ user_record = db['users'].find_one(telegram_id=user_id)
+ else:
+ new_user = dict()
+ for key in [
+ 'first_name',
+ 'last_name',
+ 'username',
+ 'language_code'
+ ]:
+ new_user[key] = (
+ update['from'][key]
+ if key in update['from']
+ else None
+ )
+ if (
+ (
+ key not in user_record
+ or new_user[key] != user_record[key]
+ )
+ and 'notes' not in update['from'] # Exclude fake updates
+ ):
+ db['users_history'].insert(
+ dict(
+ until=datetime.datetime.now(),
+ user_id=user_record['id'],
+ field=key,
+ value=(
+ user_record[key]
+ if key in user_record
+ else None
+ )
+ )
+ )
+ db['users'].update(
+ {
+ 'id': user_record['id'],
+ key: new_user[key]
+ },
+ ['id'],
+ ensure=True
+ )
+ if (
+ user_record is not None
+ and 'privileges' in user_record
+ and user_record['privileges'] is not None
+ ):
+ role = user_record['privileges']
+ return role
+
+
+def get_authorization_function(bot):
+ """Take a bot and return its authorization function."""
+ def is_authorized(update, user_record=None, authorization_level=2):
+ authorization_level = get_privilege_code(authorization_level)
+ # Channel posts will be considered as made by "anyone"
+ if (
+ isinstance(update, dict)
+ and 'from' not in update
+ ):
+ role = 100
+ else:
+ role = get_role(bot, update)
+ if any([
+ not role,
+ role > authorization_level
+ ]):
+ return False
+ return True
+ return is_authorized