Module refactoring

This commit is contained in:
Davte 2020-02-18 19:14:55 +01:00
parent 3d28e59d46
commit 50872e9ca7
2 changed files with 166 additions and 168 deletions

View File

@ -3,7 +3,7 @@
__author__ = "Davide Testa" __author__ = "Davide Testa"
__email__ = "davide@davte.it" __email__ = "davide@davte.it"
__license__ = "GNU General Public License v3.0" __license__ = "GNU General Public License v3.0"
__version__ = "1.1.7" __version__ = "1.1.8"
__maintainer__ = "Davide Testa" __maintainer__ = "Davide Testa"
__contact__ = "t.me/davte" __contact__ = "t.me/davte"

View File

@ -15,6 +15,8 @@ from davtelepot.utilities import (
make_lines_of_buttons make_lines_of_buttons
) )
default_location = None
_URL = "http://www.ciclopi.eu/frmLeStazioni.aspx" _URL = "http://www.ciclopi.eu/frmLeStazioni.aspx"
ciclopi_webpage = CachedPage.get( ciclopi_webpage = CachedPage.get(
@ -104,15 +106,15 @@ def haversine_distance(lat1, lon1, lat2, lon2, degrees='dec', unit='m'):
) )
class Location(): class Location:
"""Location in world map.""" """Location in world map."""
def __init__(self, coordinates): def __init__(self, coordinates):
"""Check and set instance attributes.""" """Check and set instance attributes."""
assert type(coordinates) is tuple, "`coordinates` must be a tuple" assert type(coordinates) is tuple, "`coordinates` must be a tuple"
assert ( assert (
len(coordinates) == 2 len(coordinates) == 2
and all(type(c) is float for c in coordinates) and all(type(c) is float for c in coordinates)
), "`coordinates` must be two floats" ), "`coordinates` must be two floats"
self._coordinates = coordinates self._coordinates = coordinates
@ -302,13 +304,13 @@ class Station(Location):
), ),
} }
def __init__(self, id=0, name='unknown', coordinates=(91.0, 181.0)): def __init__(self, id_=0, name='unknown', coordinates=(91.0, 181.0)):
"""Check and set instance attributes.""" """Check and set instance attributes."""
if id in self.__class__.stations: if id_ in self.__class__.stations:
coordinates = self.__class__.stations[id]['coordinates'] coordinates = self.__class__.stations[id_]['coordinates']
name = self.__class__.stations[id]['name'] name = self.__class__.stations[id_]['name']
Location.__init__(self, coordinates) Location.__init__(self, coordinates)
self._id = id self._id = id_
self._name = name self._name = name
self._active = True self._active = True
self._location = None self._location = None
@ -397,7 +399,7 @@ class Station(Location):
`bikes` should be an int. `bikes` should be an int.
""" """
assert ( assert (
type(bikes) is int type(bikes) is int
), "`bikes` should be an int." ), "`bikes` should be an int."
self._bikes = bikes self._bikes = bikes
@ -407,7 +409,7 @@ class Station(Location):
`free` should be an int. `free` should be an int.
""" """
assert ( assert (
type(free) is int type(free) is int
), "`free` should be an int." ), "`free` should be an int."
self._free = free self._free = free
@ -442,16 +444,17 @@ def ciclopi_custom_sorter(custom_order):
Stations will be sorted by queue value in ascending order. Stations will be sorted by queue value in ascending order.
""" """
if station.id in custom_values: if station.id in custom_values:
return (custom_values[station.id], station.name) return custom_values[station.id], station.name
return (100, station.name) return 100, station.name
return sorter return sorter
def _get_stations(data, location): def _get_stations(data, location):
stations = [] stations = []
for _station in data.find_all( for _station in data.find_all(
"li", "li",
attrs={"class": "rrItem"} attrs={"class": "rrItem"}
): ):
station_name = _station.find( station_name = _station.find(
"span", "span",
@ -466,9 +469,9 @@ def _get_stations(data, location):
attrs={"class": "cssNumero"} attrs={"class": "cssNumero"}
).text ).text
if ( if (
station_id is None station_id is None
or type(station_id) is not str or type(station_id) is not str
or not station_id.isnumeric() or not station_id.isnumeric()
): ):
station_id = 0 station_id = 0
else: else:
@ -568,6 +571,7 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None,
show_all=False): show_all=False):
chat_id = update['chat']['id'] chat_id = update['chat']['id']
default_stations_to_show = 5 default_stations_to_show = 5
stations = []
placeholder_id = bot.set_placeholder( placeholder_id = bot.set_placeholder(
timeout=datetime.timedelta(seconds=1), timeout=datetime.timedelta(seconds=1),
sent_message=sent_message, sent_message=sent_message,
@ -596,17 +600,17 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None,
) )
) )
if ( if (
ciclopi_record is not None ciclopi_record is not None
and isinstance(ciclopi_record, dict) and isinstance(ciclopi_record, dict)
and 'sorting' in ciclopi_record and 'sorting' in ciclopi_record
and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES
): ):
sorting_code = ciclopi_record['sorting'] sorting_code = ciclopi_record['sorting']
if ( if (
'latitude' in ciclopi_record 'latitude' in ciclopi_record
and ciclopi_record['latitude'] is not None and ciclopi_record['latitude'] is not None
and 'longitude' in ciclopi_record and 'longitude' in ciclopi_record
and ciclopi_record['longitude'] is not None and ciclopi_record['longitude'] is not None
): ):
saved_place = Location( saved_place = Location(
( (
@ -619,12 +623,10 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None,
else: else:
sorting_code = 0 sorting_code = 0
if ( if (
ciclopi_record is not None ciclopi_record is not None
and isinstance(ciclopi_record, dict) and isinstance(ciclopi_record, dict)
and 'stations_to_show' in ciclopi_record and 'stations_to_show' in ciclopi_record
and ciclopi_record[ and ciclopi_record['stations_to_show'] in CICLOPI_STATIONS_TO_SHOW
'stations_to_show'
] in CICLOPI_STATIONS_TO_SHOW
): ):
stations_to_show = ciclopi_record[ stations_to_show = ciclopi_record[
'stations_to_show' 'stations_to_show'
@ -649,8 +651,8 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None,
key=sorting_method key=sorting_method
) )
if ( if (
stations_to_show == -1 stations_to_show == -1
and not show_all and not show_all
): ):
stations = list( stations = list(
filter( filter(
@ -662,9 +664,9 @@ async def _ciclopi_command(bot, update, user_record, sent_message=None,
) )
) )
if ( if (
stations_to_show > 0 stations_to_show > 0
and sorting_code != 1 and sorting_code != 1
and not show_all and not show_all
): ):
stations = stations[:stations_to_show] stations = stations[:stations_to_show]
filter_label = "" filter_label = ""
@ -843,7 +845,7 @@ def get_menu_back_buttons(bot, update, user_record,
return buttons return buttons
async def _ciclopi_button_main(bot, update, user_record, arguments): async def _ciclopi_button_main(bot, update, user_record):
result, text, reply_markup = '', '', None result, text, reply_markup = '', '', None
text = ( text = (
"⚙️ {settings_title} 🚲\n" "⚙️ {settings_title} 🚲\n"
@ -1094,7 +1096,7 @@ async def _ciclopi_button_show(bot, update, user_record, arguments):
return result, text, reply_markup return result, text, reply_markup
async def _ciclopi_button_legend(bot, update, user_record, arguments): async def _ciclopi_button_legend(bot, update, user_record):
result, text, reply_markup = '', '', None result, text, reply_markup = '', '', None
text = ( text = (
"<b>{s[name]}</b> | <i>{s[description]}</i>\n" "<b>{s[name]}</b> | <i>{s[description]}</i>\n"
@ -1119,7 +1121,6 @@ async def _ciclopi_button_legend(bot, update, user_record, arguments):
async def _ciclopi_button_favourites_add(bot, update, user_record, arguments, async def _ciclopi_button_favourites_add(bot, update, user_record, arguments,
order_record, ordered_stations): order_record, ordered_stations):
result, text, reply_markup = '', '', None
result = bot.get_message( result = bot.get_message(
'ciclopi', 'button', 'favourites', 'popup', 'ciclopi', 'button', 'favourites', 'popup',
update=update, user_record=user_record update=update, user_record=user_record
@ -1150,12 +1151,6 @@ async def _ciclopi_button_favourites_add(bot, update, user_record, arguments,
db['ciclopi_custom_order'].delete( db['ciclopi_custom_order'].delete(
id=old_record['id'] id=old_record['id']
) )
order_record = list(
filter(
(lambda r: r['station'] != station_id),
order_record
)
)
ordered_stations = list( ordered_stations = list(
filter( filter(
(lambda s: s.id != station_id), (lambda s: s.id != station_id),
@ -1233,8 +1228,8 @@ async def _ciclopi_button_favourites_add(bot, update, user_record, arguments,
def move_favorite_station( def move_favorite_station(
bot, chat_id, action, station_id, bot, chat_id, action, station_id,
order_record order_record
): ):
"""Move a station in `chat_id`-associated custom order. """Move a station in `chat_id`-associated custom order.
@ -1246,6 +1241,8 @@ def move_favorite_station(
for old_record in order_record: for old_record in order_record:
if old_record['station'] == station_id: if old_record['station'] == station_id:
break break
else: # Error: no record found
return
with bot.db as db: with bot.db as db:
if action == 'down': if action == 'down':
db.query( db.query(
@ -1311,9 +1308,9 @@ def move_favorite_station(
) )
order_record = list( order_record = list(
db['ciclopi_custom_order'].find( db['ciclopi_custom_order'].find(
chat_id=chat_id, chat_id=chat_id,
order_by=['value'] order_by=['value']
) )
) )
ordered_stations = [ ordered_stations = [
Station(record['station']) Station(record['station'])
@ -1336,9 +1333,9 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments):
with bot.db as db: with bot.db as db:
order_record = list( order_record = list(
db['ciclopi_custom_order'].find( db['ciclopi_custom_order'].find(
chat_id=chat_id, chat_id=chat_id,
order_by=['value'] order_by=['value']
) )
) )
ordered_stations = [ ordered_stations = [
Station(record['station']) Station(record['station'])
@ -1357,9 +1354,9 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments):
elif action == 'set' and len(arguments) > 1: elif action == 'set' and len(arguments) > 1:
action = arguments[1] action = arguments[1]
elif ( elif (
action in ['up', 'down'] action in ['up', 'down']
and len(arguments) > 1 and len(arguments) > 1
and arguments[1].isnumeric() and arguments[1].isnumeric()
): ):
station_id = int(arguments[1]) station_id = int(arguments[1])
order_record, ordered_stations = move_favorite_station( order_record, ordered_stations = move_favorite_station(
@ -1378,84 +1375,84 @@ async def _ciclopi_button_favourites(bot, update, user_record, arguments):
) )
reply_markup = dict( reply_markup = dict(
inline_keyboard=[ inline_keyboard=[
[ [
make_button( make_button(
text="{s.name} {sy}".format( text="{s.name} {sy}".format(
sy=( sy=(
'⬆️' if ( '⬆️' if (
action == 'up' action == 'up'
and n != 1 and n != 1
) else '⬇️' if ( ) else '⬇️' if (
action == 'down' action == 'down'
and n != len(ordered_stations) and n != len(ordered_stations)
) else '' ) else ''
), ),
s=station s=station
), ),
prefix='ciclopi:///', prefix='ciclopi:///',
data=[ data=[
'fav', 'fav',
( (
action if ( action if (
action == 'up' action == 'up'
and n != 1 and n != 1
) or ( ) or (
action == 'down' action == 'down'
and n != len(ordered_stations) and n != len(ordered_stations)
)
else 'dummy'
),
station.id
]
)
]
for n, station in enumerate(ordered_stations, 1)
] + [
[
make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort', 'buttons',
'edit',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'add']
)
]
] + [
[
(
make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort',
'buttons', 'move_down',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'set', 'down']
) if action == 'up'
else make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort',
'buttons', 'move_up',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'set', 'up']
)
)
]
] + [
get_menu_back_buttons(
bot=bot, update=update, user_record=user_record,
include_back_to_settings=True
) )
else 'dummy' ]
),
station.id
]
)
]
for n, station in enumerate(ordered_stations, 1)
] + [
[
make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort', 'buttons',
'edit',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'add']
)
]
] + [
[
(
make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort',
'buttons', 'move_down',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'set', 'down']
) if action == 'up'
else make_button(
text=bot.get_message(
'ciclopi', 'button', 'favourites', 'sort',
'buttons', 'move_up',
update=update, user_record=user_record
),
prefix='ciclopi:///',
data=['fav', 'set', 'up']
)
)
]
] + [
get_menu_back_buttons(
bot=bot, update=update, user_record=user_record,
include_back_to_settings=True
)
]
) )
return result, text, reply_markup return result, text, reply_markup
async def _ciclopi_button_setpos(bot, update, user_record, arguments): async def _ciclopi_button_setpos(bot, update, user_record):
result, text, reply_markup = '', '', None result, text, reply_markup = '', '', None
chat_id = ( chat_id = (
update['message']['chat']['id'] if 'message' in update update['message']['chat']['id'] if 'message' in update
@ -1510,6 +1507,7 @@ async def _ciclopi_button_setpos(bot, update, user_record, arguments):
) )
return result, text, reply_markup return result, text, reply_markup
_ciclopi_button_routing_table = { _ciclopi_button_routing_table = {
'main': _ciclopi_button_main, 'main': _ciclopi_button_main,
'sort': _ciclopi_button_sort, 'sort': _ciclopi_button_sort,
@ -1544,7 +1542,7 @@ async def _ciclopi_button(bot, update, user_record):
return result return result
def init(bot, ciclopi_messages=None, ciclopi_messages_json=None, def init(telegram_bot, ciclopi_messages=None,
_default_location=(43.718518, 10.402165)): _default_location=(43.718518, 10.402165)):
"""Take a bot and assign CicloPi-related commands to it. """Take a bot and assign CicloPi-related commands to it.
@ -1558,55 +1556,55 @@ def init(bot, ciclopi_messages=None, ciclopi_messages_json=None,
# Define a global `default_location` variable holding default location # Define a global `default_location` variable holding default location
global default_location global default_location
default_location = Location(_default_location) default_location = Location(_default_location)
bot.ciclopi_default_location = default_location telegram_bot.ciclopi_default_location = default_location
with bot.db as db: db = telegram_bot.db
if 'ciclopi_stations' not in db.tables: if 'ciclopi_stations' not in db.tables:
db['ciclopi_stations'].insert_many( db['ciclopi_stations'].insert_many(
sorted( sorted(
[ [
dict( dict(
station_id=station_id, station_id=station_id,
name=station['name'], name=station['name'],
latitude=station['coordinates'][0], latitude=station['coordinates'][0],
longitude=station['coordinates'][1] longitude=station['coordinates'][1]
) )
for station_id, station in Station.stations.items() for station_id, station in Station.stations.items()
], ],
key=(lambda station: station['station_id']) key=(lambda station: station['station_id'])
)
) )
if 'ciclopi' not in db.tables: )
db['ciclopi'].insert( if 'ciclopi' not in db.tables:
dict( db['ciclopi'].insert(
chat_id=0, dict(
sorting=0, chat_id=0,
latitude=0.0, sorting=0,
longitude=0.0, latitude=0.0,
stations_to_show=-1 longitude=0.0,
) stations_to_show=-1
) )
)
if ciclopi_messages is None: if ciclopi_messages is None:
try: try:
from .messages import default_ciclopi_messages as ciclopi_messages from .messages import default_ciclopi_messages as ciclopi_messages
except ImportError: except ImportError:
ciclopi_messages = {} ciclopi_messages = {}
bot.messages['ciclopi'] = ciclopi_messages telegram_bot.messages['ciclopi'] = ciclopi_messages
@bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"], @telegram_bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"],
reply_keyboard_button=( reply_keyboard_button=(
bot.messages['ciclopi']['command']['reply_keyboard_button'] telegram_bot.messages['ciclopi']['command']['reply_keyboard_button']
), ),
show_in_keyboard=True, show_in_keyboard=True,
description=( description=(
bot.messages['ciclopi']['command']['description'] telegram_bot.messages['ciclopi']['command']['description']
), ),
help_section=bot.messages['ciclopi']['help'], help_section=telegram_bot.messages['ciclopi']['help'],
authorization_level='everybody') authorization_level='everybody')
async def ciclopi_command(bot, update, user_record): async def ciclopi_command(bot, update, user_record):
return await _ciclopi_command(bot, update, user_record) return await _ciclopi_command(bot, update, user_record)
@bot.button(prefix='ciclopi:///', authorization_level='everybody') @telegram_bot.button(prefix='ciclopi:///', authorization_level='everybody')
async def ciclopi_button(bot, update, user_record): async def ciclopi_button(bot, update, user_record):
return await _ciclopi_button(bot, update, user_record) return await _ciclopi_button(bot, update, user_record)