From 50872e9ca7d30b722facae50bb17da6893f21d5d Mon Sep 17 00:00:00 2001 From: Davte Date: Tue, 18 Feb 2020 19:14:55 +0100 Subject: [PATCH] Module refactoring --- ciclopibot/__init__.py | 2 +- ciclopibot/ciclopi.py | 332 ++++++++++++++++++++--------------------- 2 files changed, 166 insertions(+), 168 deletions(-) diff --git a/ciclopibot/__init__.py b/ciclopibot/__init__.py index e0a29f2..749cf18 100644 --- a/ciclopibot/__init__.py +++ b/ciclopibot/__init__.py @@ -3,7 +3,7 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __license__ = "GNU General Public License v3.0" -__version__ = "1.1.7" +__version__ = "1.1.8" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/ciclopibot/ciclopi.py b/ciclopibot/ciclopi.py index ad433b0..f46f921 100644 --- a/ciclopibot/ciclopi.py +++ b/ciclopibot/ciclopi.py @@ -15,6 +15,8 @@ from davtelepot.utilities import ( make_lines_of_buttons ) +default_location = None + _URL = "http://www.ciclopi.eu/frmLeStazioni.aspx" ciclopi_webpage = CachedPage.get( @@ -104,15 +106,15 @@ def haversine_distance(lat1, lon1, lat2, lon2, degrees='dec', unit='m'): ) -class Location(): +class Location: """Location in world map.""" def __init__(self, coordinates): """Check and set instance attributes.""" assert type(coordinates) is tuple, "`coordinates` must be a tuple" assert ( - len(coordinates) == 2 - and all(type(c) is float for c in coordinates) + len(coordinates) == 2 + and all(type(c) is float for c in coordinates) ), "`coordinates` must be two floats" self._coordinates = coordinates @@ -302,13 +304,13 @@ class Station(Location): ), } - def __init__(self, id=0, name='unknown', coordinates=(91.0, 181.0)): + def __init__(self, id_=0, name='unknown', coordinates=(91.0, 181.0)): """Check and set instance attributes.""" - if id in self.__class__.stations: - coordinates = self.__class__.stations[id]['coordinates'] - name = self.__class__.stations[id]['name'] + if id_ in self.__class__.stations: + coordinates = self.__class__.stations[id_]['coordinates'] + name = self.__class__.stations[id_]['name'] Location.__init__(self, coordinates) - self._id = id + self._id = id_ self._name = name self._active = True self._location = None @@ -397,7 +399,7 @@ class Station(Location): `bikes` should be an int. """ assert ( - type(bikes) is int + type(bikes) is int ), "`bikes` should be an int." self._bikes = bikes @@ -407,7 +409,7 @@ class Station(Location): `free` should be an int. """ assert ( - type(free) is int + type(free) is int ), "`free` should be an int." self._free = free @@ -442,16 +444,17 @@ def ciclopi_custom_sorter(custom_order): Stations will be sorted by queue value in ascending order. """ if station.id in custom_values: - return (custom_values[station.id], station.name) - return (100, station.name) + return custom_values[station.id], station.name + return 100, station.name + return sorter def _get_stations(data, location): stations = [] for _station in data.find_all( - "li", - attrs={"class": "rrItem"} + "li", + attrs={"class": "rrItem"} ): station_name = _station.find( "span", @@ -466,9 +469,9 @@ def _get_stations(data, location): attrs={"class": "cssNumero"} ).text if ( - station_id is None - or type(station_id) is not str - or not station_id.isnumeric() + station_id is None + or type(station_id) is not str + or not station_id.isnumeric() ): station_id = 0 else: @@ -568,6 +571,7 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None, show_all=False): chat_id = update['chat']['id'] default_stations_to_show = 5 + stations = [] placeholder_id = bot.set_placeholder( timeout=datetime.timedelta(seconds=1), sent_message=sent_message, @@ -596,17 +600,17 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None, ) ) if ( - ciclopi_record is not None - and isinstance(ciclopi_record, dict) - and 'sorting' in ciclopi_record - and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES + ciclopi_record is not None + and isinstance(ciclopi_record, dict) + and 'sorting' in ciclopi_record + and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES ): sorting_code = ciclopi_record['sorting'] if ( - 'latitude' in ciclopi_record - and ciclopi_record['latitude'] is not None - and 'longitude' in ciclopi_record - and ciclopi_record['longitude'] is not None + 'latitude' in ciclopi_record + and ciclopi_record['latitude'] is not None + and 'longitude' in ciclopi_record + and ciclopi_record['longitude'] is not None ): saved_place = Location( ( @@ -619,12 +623,10 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None, else: sorting_code = 0 if ( - ciclopi_record is not None - and isinstance(ciclopi_record, dict) - and 'stations_to_show' in ciclopi_record - and ciclopi_record[ - 'stations_to_show' - ] in CICLOPI_STATIONS_TO_SHOW + ciclopi_record is not None + and isinstance(ciclopi_record, dict) + and 'stations_to_show' in ciclopi_record + and ciclopi_record['stations_to_show'] in CICLOPI_STATIONS_TO_SHOW ): stations_to_show = ciclopi_record[ 'stations_to_show' @@ -649,8 +651,8 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None, key=sorting_method ) if ( - stations_to_show == -1 - and not show_all + stations_to_show == -1 + and not show_all ): stations = list( filter( @@ -662,9 +664,9 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None, ) ) if ( - stations_to_show > 0 - and sorting_code != 1 - and not show_all + stations_to_show > 0 + and sorting_code != 1 + and not show_all ): stations = stations[:stations_to_show] filter_label = "" @@ -843,7 +845,7 @@ def get_menu_back_buttons(bot, update, user_record, return buttons -async def _ciclopi_button_main(bot, update, user_record, arguments): +async def _ciclopi_button_main(bot, update, user_record): result, text, reply_markup = '', '', None text = ( "⚙️ {settings_title} 🚲\n" @@ -1094,7 +1096,7 @@ async def _ciclopi_button_show(bot, update, user_record, arguments): return result, text, reply_markup -async def _ciclopi_button_legend(bot, update, user_record, arguments): +async def _ciclopi_button_legend(bot, update, user_record): result, text, reply_markup = '', '', None text = ( "{s[name]} | {s[description]}\n" @@ -1119,7 +1121,6 @@ async def _ciclopi_button_legend(bot, update, user_record, arguments): async def _ciclopi_button_favourites_add(bot, update, user_record, arguments, order_record, ordered_stations): - result, text, reply_markup = '', '', None result = bot.get_message( 'ciclopi', 'button', 'favourites', 'popup', update=update, user_record=user_record @@ -1150,12 +1151,6 @@ async def _ciclopi_button_favourites_add(bot, update, user_record, arguments, db['ciclopi_custom_order'].delete( id=old_record['id'] ) - order_record = list( - filter( - (lambda r: r['station'] != station_id), - order_record - ) - ) ordered_stations = list( filter( (lambda s: s.id != station_id), @@ -1233,8 +1228,8 @@ async def _ciclopi_button_favourites_add(bot, update, user_record, arguments, def move_favorite_station( - bot, chat_id, action, station_id, - order_record + bot, chat_id, action, station_id, + order_record ): """Move a station in `chat_id`-associated custom order. @@ -1246,6 +1241,8 @@ def move_favorite_station( for old_record in order_record: if old_record['station'] == station_id: break + else: # Error: no record found + return with bot.db as db: if action == 'down': db.query( @@ -1311,9 +1308,9 @@ def move_favorite_station( ) order_record = list( db['ciclopi_custom_order'].find( - chat_id=chat_id, - order_by=['value'] - ) + chat_id=chat_id, + order_by=['value'] + ) ) ordered_stations = [ Station(record['station']) @@ -1336,9 +1333,9 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments): with bot.db as db: order_record = list( db['ciclopi_custom_order'].find( - chat_id=chat_id, - order_by=['value'] - ) + chat_id=chat_id, + order_by=['value'] + ) ) ordered_stations = [ Station(record['station']) @@ -1357,9 +1354,9 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments): elif action == 'set' and len(arguments) > 1: action = arguments[1] elif ( - action in ['up', 'down'] - and len(arguments) > 1 - and arguments[1].isnumeric() + action in ['up', 'down'] + and len(arguments) > 1 + and arguments[1].isnumeric() ): station_id = int(arguments[1]) order_record, ordered_stations = move_favorite_station( @@ -1378,84 +1375,84 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments): ) reply_markup = dict( inline_keyboard=[ - [ - make_button( - text="{s.name} {sy}".format( - sy=( - '⬆️' if ( - action == 'up' - and n != 1 - ) else '⬇️' if ( - action == 'down' - and n != len(ordered_stations) - ) else '⏹' - ), - s=station - ), - prefix='ciclopi:///', - data=[ - 'fav', - ( - action if ( - action == 'up' - and n != 1 - ) or ( - action == 'down' - and n != len(ordered_stations) + [ + make_button( + text="{s.name} {sy}".format( + sy=( + '⬆️' if ( + action == 'up' + and n != 1 + ) else '⬇️' if ( + action == 'down' + and n != len(ordered_stations) + ) else '⏹' + ), + s=station + ), + prefix='ciclopi:///', + data=[ + 'fav', + ( + action if ( + action == 'up' + and n != 1 + ) or ( + action == 'down' + and n != len(ordered_stations) + ) + else 'dummy' + ), + station.id + ] + ) + ] + for n, station in enumerate(ordered_stations, 1) + ] + [ + [ + make_button( + text=bot.get_message( + 'ciclopi', 'button', 'favourites', 'sort', 'buttons', + 'edit', + update=update, user_record=user_record + ), + prefix='ciclopi:///', + data=['fav', 'add'] + ) + ] + ] + [ + [ + ( + make_button( + text=bot.get_message( + 'ciclopi', 'button', 'favourites', 'sort', + 'buttons', 'move_down', + update=update, user_record=user_record + ), + prefix='ciclopi:///', + data=['fav', 'set', 'down'] + ) if action == 'up' + else make_button( + text=bot.get_message( + 'ciclopi', 'button', 'favourites', 'sort', + 'buttons', 'move_up', + update=update, user_record=user_record + ), + prefix='ciclopi:///', + data=['fav', 'set', 'up'] + ) + ) + ] + ] + [ + get_menu_back_buttons( + bot=bot, update=update, user_record=user_record, + include_back_to_settings=True ) - else 'dummy' - ), - station.id - ] - ) - ] - for n, station in enumerate(ordered_stations, 1) - ] + [ - [ - make_button( - text=bot.get_message( - 'ciclopi', 'button', 'favourites', 'sort', 'buttons', - 'edit', - update=update, user_record=user_record - ), - prefix='ciclopi:///', - data=['fav', 'add'] - ) - ] - ] + [ - [ - ( - make_button( - text=bot.get_message( - 'ciclopi', 'button', 'favourites', 'sort', - 'buttons', 'move_down', - update=update, user_record=user_record - ), - prefix='ciclopi:///', - data=['fav', 'set', 'down'] - ) if action == 'up' - else make_button( - text=bot.get_message( - 'ciclopi', 'button', 'favourites', 'sort', - 'buttons', 'move_up', - update=update, user_record=user_record - ), - prefix='ciclopi:///', - data=['fav', 'set', 'up'] - ) - ) - ] - ] + [ - get_menu_back_buttons( - bot=bot, update=update, user_record=user_record, - include_back_to_settings=True - ) - ] + ] ) return result, text, reply_markup -async def _ciclopi_button_setpos(bot, update, user_record, arguments): +async def _ciclopi_button_setpos(bot, update, user_record): result, text, reply_markup = '', '', None chat_id = ( update['message']['chat']['id'] if 'message' in update @@ -1510,6 +1507,7 @@ async def _ciclopi_button_setpos(bot, update, user_record, arguments): ) return result, text, reply_markup + _ciclopi_button_routing_table = { 'main': _ciclopi_button_main, 'sort': _ciclopi_button_sort, @@ -1544,7 +1542,7 @@ async def _ciclopi_button(bot, update, user_record): return result -def init(bot, ciclopi_messages=None, ciclopi_messages_json=None, +def init(telegram_bot, ciclopi_messages=None, _default_location=(43.718518, 10.402165)): """Take a bot and assign CicloPi-related commands to it. @@ -1558,55 +1556,55 @@ def init(bot, ciclopi_messages=None, ciclopi_messages_json=None, # Define a global `default_location` variable holding default location global default_location default_location = Location(_default_location) - bot.ciclopi_default_location = default_location + telegram_bot.ciclopi_default_location = default_location - with bot.db as db: - if 'ciclopi_stations' not in db.tables: - db['ciclopi_stations'].insert_many( - sorted( - [ - dict( - station_id=station_id, - name=station['name'], - latitude=station['coordinates'][0], - longitude=station['coordinates'][1] - ) - for station_id, station in Station.stations.items() - ], - key=(lambda station: station['station_id']) - ) + db = telegram_bot.db + if 'ciclopi_stations' not in db.tables: + db['ciclopi_stations'].insert_many( + sorted( + [ + dict( + station_id=station_id, + name=station['name'], + latitude=station['coordinates'][0], + longitude=station['coordinates'][1] + ) + for station_id, station in Station.stations.items() + ], + key=(lambda station: station['station_id']) ) - if 'ciclopi' not in db.tables: - db['ciclopi'].insert( - dict( - chat_id=0, - sorting=0, - latitude=0.0, - longitude=0.0, - stations_to_show=-1 - ) + ) + if 'ciclopi' not in db.tables: + db['ciclopi'].insert( + dict( + chat_id=0, + sorting=0, + latitude=0.0, + longitude=0.0, + stations_to_show=-1 ) + ) if ciclopi_messages is None: try: from .messages import default_ciclopi_messages as ciclopi_messages except ImportError: ciclopi_messages = {} - bot.messages['ciclopi'] = ciclopi_messages + telegram_bot.messages['ciclopi'] = ciclopi_messages - @bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"], - reply_keyboard_button=( - bot.messages['ciclopi']['command']['reply_keyboard_button'] - ), - show_in_keyboard=True, - description=( - bot.messages['ciclopi']['command']['description'] - ), - help_section=bot.messages['ciclopi']['help'], - authorization_level='everybody') + @telegram_bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"], + reply_keyboard_button=( + telegram_bot.messages['ciclopi']['command']['reply_keyboard_button'] + ), + show_in_keyboard=True, + description=( + telegram_bot.messages['ciclopi']['command']['description'] + ), + help_section=telegram_bot.messages['ciclopi']['help'], + authorization_level='everybody') async def ciclopi_command(bot, update, user_record): return await _ciclopi_command(bot, update, user_record) - @bot.button(prefix='ciclopi:///', authorization_level='everybody') + @telegram_bot.button(prefix='ciclopi:///', authorization_level='everybody') async def ciclopi_button(bot, update, user_record): return await _ciclopi_button(bot, update, user_record)