From b9f826a2d0f0d674128cbdeba91932bf947f3993 Mon Sep 17 00:00:00 2001 From: Davte Date: Tue, 28 May 2019 16:52:32 +0200 Subject: [PATCH] davteutil.utilities imported davteutil stand-alone repository will be deleted --- davtelepot/__init__.py | 2 +- davtelepot/utilities.py | 1459 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 1460 insertions(+), 1 deletion(-) create mode 100644 davtelepot/utilities.py diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index 82cea4f..852ca1e 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -7,7 +7,7 @@ __author__ = "Davide Testa" __email__ = "davte@libero.it" __credits__ = "Marco Origlia" __license__ = "GNU General Public License v3.0" -__version__ = "1.5.1" +__version__ = "1.5.2" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/utilities.py b/davtelepot/utilities.py new file mode 100644 index 0000000..348326c --- /dev/null +++ b/davtelepot/utilities.py @@ -0,0 +1,1459 @@ +"""Useful functions used by Davte when programming in python.""" + +# Standard library modules +import asyncio +import collections +import csv +import datetime +from difflib import SequenceMatcher +import json +import logging +import os +import random +import string +import time + +# Third party modules +import aiohttp +from bs4 import BeautifulSoup + + +def sumif(iterable, condition): + """Sum all `iterable` items matching `condition`.""" + return sum( + filter( + condition, + iterable + ) + ) + + +def markdown_check(text, symbols): + """Check that all `symbols` occur an even number of times in `text`.""" + for s in symbols: + if (len(text.replace(s, "")) - len(text)) % 2 != 0: + return False + return True + + +def shorten_text(text, limit, symbol="[...]"): + """Return a given text truncated at limit if longer than limit. + + On truncation, add symbol. + """ + assert type(text) is str and type(symbol) is str and type(limit) is int + if len(text) <= limit: + return text + return text[:limit-len(symbol)] + symbol + + +def extract(text, starter=None, ender=None): + """Return string in text between starter and ender. + + If starter is None, truncate at ender. + """ + if starter and starter in text: + text = text.partition(starter)[2] + if ender: + return text.partition(ender)[0] + return text + + +def make_button(text=None, callback_data='', + prefix='', delimiter='|', data=[]): + """Return a Telegram bot API-compliant button. + + callback_data can be either a ready-to-use string or a + prefix + delimiter-joined data. + If callback_data exceeds Telegram limits (currently 60 characters), + it gets truncated at the last delimiter before that limit. + If absent, text is the same as callback_data. + """ + if len(data): + callback_data += delimiter.join(map(str, data)) + callback_data = "{p}{c}".format( + p=prefix, + c=callback_data + ) + if len(callback_data) > 60: + callback_data = callback_data[:61] + callback_data = callback_data[:-1-callback_data[::-1].find(delimiter)] + if text is None: + text = callback_data + return dict( + text=text, + callback_data=callback_data + ) + + +def mkbtn(x, y): + """Backward compatibility. + + Warning: this function will be deprecated sooner or later, + stop using it and migrate to make_button. + """ + return make_button(text=x, callback_data=y) + + +def make_lines_of_buttons(buttons, row_len=1): + """Split `buttons` list in a list of lists having length = `row_len`.""" + return [ + buttons[i:i + row_len] + for i in range( + 0, + len(buttons), + row_len + ) + ] + + +def make_inline_keyboard(buttons, row_len=1): + """Make a Telegram API compliant inline keyboard.""" + return dict( + inline_keyboard=make_lines_of_buttons( + buttons, + row_len + ) + ) + + +async def async_get(url, mode='json', **kwargs): + """Make an async get request. + + `mode`s allowed: + * html + * json + * string + + Additional **kwargs may be passed. + """ + if 'mode' in kwargs: + mode = kwargs['mode'] + del kwargs['mode'] + return await async_request( + url, + type='get', + mode=mode, + **kwargs + ) + + +async def async_post(url, mode='html', **kwargs): + """Make an async post request. + + `mode`s allowed: + * html + * json + * string + + Additional **kwargs may be passed (will be converted to `data`). + """ + return await async_request( + url, + type='post', + mode=mode, + **kwargs + ) + + +async def async_request(url, type='get', mode='json', encoding='utf-8', + **kwargs): + """Make an async html request. + + `types` allowed + * get + * post + + `mode`s allowed: + * html + * json + * string + + Additional **kwargs may be passed. + """ + try: + async with aiohttp.ClientSession() as s: + async with ( + s.get(url, timeout=30) + if type == 'get' + else s.post(url, timeout=30, data=kwargs) + ) as r: + result = await r.read() + result = result.decode(encoding) + except Exception as e: + logging.error( + 'Error making async request to {}:\n{}'.format( + url, + e + ), + exc_info=False + ) # Set exc_info=True to debug + return e + if mode == 'json': + try: + result = json.loads( + result + ) + except Exception as e: + result = {} + elif mode == 'html': + result = BeautifulSoup(result, "html.parser") + elif mode == 'string': + result = result + return result + + +def json_read(file_, default={}, encoding='utf-8', **kwargs): + """Return json parsing of `file_`, or `default` if file does not exist. + + `encoding` refers to how the file should be read. + `kwargs` will be passed to json.load() + """ + if not os.path.isfile(file_): + return default + with open(file_, "r", encoding=encoding) as f: + return json.load(f, **kwargs) + + +def json_write(what, file_, encoding='utf-8', **kwargs): + """Store `what` in json `file_`. + + `encoding` refers to how the file should be written. + `kwargs` will be passed to json.dump() + """ + with open(file_, "w", encoding=encoding) as f: + return json.dump(what, f, indent=4, **kwargs) + + +def csv_read(file_, default=[], encoding='utf-8', + delimiter=',', quotechar='"', **kwargs): + """Return csv parsing of `file_`, or `default` if file does not exist. + + `encoding` refers to how the file should be read. + `delimiter` is the separator of fields. + `quotechar` is the string delimiter. + `kwargs` will be passed to csv.reader() + """ + if not os.path.isfile(file_): + return default + result = [] + keys = [] + with open(file_, newline='', encoding=encoding) as csv_file: + csv_reader = csv.reader( + csv_file, + delimiter=delimiter, + quotechar=quotechar, + **kwargs + ) + for row in csv_reader: + if not keys: + keys = row + continue + item = collections.OrderedDict() + for key, val in zip(keys, row): + item[key] = val + result.append(item) + return result + + +def csv_write(info=[], file_='output.csv', encoding='utf-8', + delimiter=',', quotechar='"', **kwargs): + """Store `info` in CSV `file_`. + + `encoding` refers to how the file should be read. + `delimiter` is the separator of fields. + `quotechar` is the string delimiter. + `encoding` refers to how the file should be written. + `kwargs` will be passed to csv.writer() + """ + assert ( + type(info) is list + and len(info) > 0 + ), "info must be a non-empty list" + assert all( + isinstance(row, dict) + for row in info + ), "Rows must be dictionaries!" + with open(file_, 'w', newline='', encoding=encoding) as csv_file: + csv_writer = csv.writer( + csv_file, + delimiter=delimiter, + quotechar=quotechar, + **kwargs + ) + csv_writer.writerow(info[0].keys()) + for row in info: + csv_writer.writerow(row.values()) + return + + +class MyOD(collections.OrderedDict): + """Subclass of OrderedDict. + + It features `get_by_val` and `get_by_key_val` methods. + """ + + def __init__(self, *args, **kwargs): + """Return a MyOD instance.""" + super().__init__(*args, **kwargs) + self._anti_list_casesensitive = None + self._anti_list_caseinsensitive = None + + @property + def anti_list_casesensitive(self): + """Case-sensitive reverse dictionary. + + Keys and values are swapped. + """ + if not self._anti_list_casesensitive: + self._anti_list_casesensitive = { + val: key + for key, val in self.items() + } + return self._anti_list_casesensitive + + @property + def anti_list_caseinsensitive(self): + """Case-sensitive reverse dictionary. + + Keys and values are swapped and lowered. + """ + if not self._anti_list_caseinsensitive: + self._anti_list_caseinsensitive = { + (val.lower() if type(val) is str else val): key + for key, val in self.items() + } + return self._anti_list_caseinsensitive + + def get_by_val(self, val, case_sensitive=True): + """Get key pointing to given val. + + Can be case-sensitive or insensitive. + MyOD[key] = val <-- MyOD.get(key) = val <--> MyOD.get_by_val(val) = key + """ + return ( + self.anti_list_casesensitive + if case_sensitive + else self.anti_list_caseinsensitive + )[val] + + def get_by_key_val(self, key, val, + case_sensitive=True, return_value=False): + """Get key (or val) of a dict-like object having key == val. + + Perform case-sensitive or insensitive search. + """ + for x, y in self.items(): + if ( + ( + y[key] == val + and case_sensitive + ) or ( + y[key].lower() == val.lower() + and not case_sensitive + ) + ): + return ( + y if return_value + else x + ) + return None + + +def line_drawing_unordered_list(l): + """Draw an old-fashioned unordered list. + + Unorderd list example + ├ An element + ├ Another element + └Last element + """ + result = "" + if l: + for x in l[:-1]: + result += "├ {}\n".format(x) + result += "└ {}".format(l[-1]) + return result + + +def str_to_datetime(d): + """Convert string to datetime. + + Dataset library often casts datetimes to str, this is a workaround. + """ + if isinstance(d, datetime.datetime): + return d + return datetime.datetime.strptime( + d, + '%Y-%m-%d %H:%M:%S.%f' + ) + + +def datetime_to_str(d): + """Cast datetime to string.""" + if isinstance(d, str): + d = str_to_datetime(d) + if not isinstance(d, datetime.datetime): + raise TypeError( + 'Input of datetime_to_str function must be a datetime.datetime ' + 'object. Output is a str' + ) + return '{:%Y-%m-%d %H:%M:%S.%f}'.format(d) + + +class MyCounter(): + """Counter object, with a `lvl` method incrementing `n` property.""" + + def __init__(self): + """Initialize and get MyCounter instance.""" + self._n = 0 + return + + def lvl(self): + """Increments and return self.n.""" + self._n += 1 + return self.n + + def reset(self): + """Set self.n = 0.""" + self._n = 0 + return self.n + + def n(self): + """Counter's value.""" + return self._n + + +def wrapper(func, *args, **kwargs): + """Wrap a function so that it can be later called with one argument.""" + def wrapped(update): + return func(update, *args, **kwargs) + return wrapped + + +async def async_wrapper(func, *args, **kwargs): + """Wrap a coroutine so that it can be later awaited with one argument.""" + async def wrapped(update): + return await func(update, *args, **kwargs) + return wrapped + + +def forwarded(by=None): + """Check that update is forwarded, optionally `by` someone in particular. + + Decorator: such decorated functions have effect only if update + is forwarded from someone (you can specify `by` whom). + """ + def is_forwarded_by(update, by): + if 'forward_from' not in update: + return False + if by and update['forward_from']['id'] != by: + return False + return True + + def decorator(view_func): + if asyncio.iscoroutinefunction(view_func): + async def decorated(update): + if is_forwarded_by(update, by): + return await view_func(update) + else: + def decorated(update): + if is_forwarded_by(update, by): + return view_func(update) + return decorated + return decorator + + +def chat_selective(chat_id=None): + """Check that update comes from a chat, optionally having `chat_id`. + + Such decorated functions have effect only if update comes from + a specific (if `chat_id` is given) or generic chat. + """ + def check_function(update, chat_id): + if 'chat' not in update: + return False + if chat_id: + if update['chat']['id'] != chat_id: + return False + return True + + def decorator(view_func): + if asyncio.iscoroutinefunction(view_func): + async def decorated(update): + if check_function(update, chat_id): + return await view_func(update) + else: + def decorated(update): + if check_function(update, chat_id): + return view_func(update) + return decorated + return decorator + + +async def sleep_until(when): + """Sleep until now > `when`. + + `when` could be a datetime.datetime or a datetime.timedelta instance. + """ + if not ( + isinstance(when, datetime.datetime) + or isinstance(when, datetime.timedelta) + ): + raise TypeError( + "sleep_until takes a datetime.datetime or datetime.timedelta " + "object as argument!" + ) + if isinstance(when, datetime.datetime): + delta = when - datetime.datetime.now() + elif isinstance(when, datetime.timedelta): + delta = when + if delta.days >= 0: + await asyncio.sleep( + delta.seconds + ) + return + + +async def wait_and_do(when, what, *args, **kwargs): + """Sleep until `when`, then call `what` passing `args` and `kwargs`.""" + await sleep_until(when) + return await what(*args, **kwargs) + + +def get_csv_string(list_, delimiter=',', quotechar='"'): + """Return a `delimiter`-delimited string of `list_` items. + + Wrap strings in `quotechar`s. + """ + return delimiter.join( + str(item) if type(item) is not str + else '{q}{i}{q}'.format( + i=item, + q=quotechar + ) + for item in list_ + ) + + +def case_accent_insensitive_sql(field): + """Get a SQL string to perform a case- and accent-insensitive query. + + Given a `field`, return a part of SQL string necessary to perform + a case- and accent-insensitive query. + """ + replacements = [ + (' ', ''), + ('à', 'a'), + ('è', 'e'), + ('é', 'e'), + ('ì', 'i'), + ('ò', 'o'), + ('ù', 'u'), + ] + return "{r}LOWER({f}){w}".format( + r="replace(".upper()*len(replacements), + f=field, + w=''.join( + ", '{w[0]}', '{w[1]}')".format(w=w) + for w in replacements + ) + ) + + +# Italian definite articles. +ARTICOLI = MyOD() +ARTICOLI[1] = { + 'ind': 'un', + 'dets': 'il', + 'detp': 'i', + 'dess': 'l', + 'desp': 'i' +} +ARTICOLI[2] = { + 'ind': 'una', + 'dets': 'la', + 'detp': 'le', + 'dess': 'lla', + 'desp': 'lle' +} +ARTICOLI[3] = { + 'ind': 'uno', + 'dets': 'lo', + 'detp': 'gli', + 'dess': 'llo', + 'desp': 'gli' +} +ARTICOLI[4] = { + 'ind': 'un', + 'dets': 'l\'', + 'detp': 'gli', + 'dess': 'll\'', + 'desp': 'gli' +} + + +class Gettable(): + """Gettable objects can be retrieved from memory without being duplicated. + + Key is the primary key. + Use classmethod get to instantiate (or retrieve) Gettable objects. + Assign SubClass.instances = {}, otherwise Gettable.instances will + contain SubClass objects. + """ + + instances = {} + + @classmethod + def get(cls, key, *args, **kwargs): + """Instantiate and/or retrieve Gettable object. + + SubClass.instances is searched if exists. + Gettable.instances is searched otherwise. + """ + if key not in cls.instances: + cls.instances[key] = cls(key, *args, **kwargs) + return cls.instances[key] + + +class Confirmable(): + """Confirmable objects are provided with a confirm instance method. + + It evaluates True if it was called within self._confirm_timedelta, + False otherwise. + When it returns True, timer is reset. + """ + + CONFIRM_TIMEDELTA = datetime.timedelta(seconds=10) + + def __init__(self, confirm_timedelta=None): + """Instantiate Confirmable instance. + + If `confirm_timedelta` is not passed, + `self.__class__.CONFIRM_TIMEDELTA` is used as default. + """ + if confirm_timedelta is None: + confirm_timedelta = self.__class__.CONFIRM_TIMEDELTA + self.set_confirm_timedelta(confirm_timedelta) + self._confirm_datetimes = {} + + @property + def confirm_timedelta(self): + """Maximum timedelta between two calls of `confirm`.""" + return self._confirm_timedelta + + def confirm_datetime(self, who='unique'): + """Get datetime of `who`'s last confirm. + + If `who` never called `confirm`, fake an expired call. + """ + if who not in self._confirm_datetimes: + self._confirm_datetimes[who] = ( + datetime.datetime.now() + - 2*self.confirm_timedelta + ) + return self._confirm_datetimes[who] + + def set_confirm_timedelta(self, confirm_timedelta): + """Change self._confirm_timedelta.""" + if type(confirm_timedelta) is int: + confirm_timedelta = datetime.timedelta( + seconds=confirm_timedelta + ) + assert isinstance( + confirm_timedelta, datetime.timedelta + ), "confirm_timedelta must be a datetime.timedelta instance!" + self._confirm_timedelta = confirm_timedelta + + def confirm(self, who='unique'): + """Return True if `confirm` was called by `who` recetly enough.""" + now = datetime.datetime.now() + if now >= self.confirm_datetime(who) + self.confirm_timedelta: + self._confirm_datetimes[who] = now + return False + self._confirm_datetimes[who] = now - 2*self.confirm_timedelta + return True + + +class HasBot(): + """Objects having a telepot.Bot subclass object as `.bot` attribute. + + HasBot objects have a .bot and .db properties for faster access. + """ + + bot = None + + @property + def bot(self): + """Class bot.""" + return self.__class__.bot + + @property + def db(self): + """Class bot db.""" + return self.bot.db + + @classmethod + def set_bot(cls, bot): + """Change class bot.""" + cls.bot = bot + + +class CachedPage(Gettable): + """Cache a webpage and return it during CACHE_TIME, otherwise refresh. + + Usage: + cached_page = CachedPage.get( + 'https://www.google.com', + datetime.timedelta(seconds=30), + **kwargs + ) + page = await cached_page.get_page() + """ + + CACHE_TIME = datetime.timedelta(minutes=5) + instances = {} + + def __init__(self, url, cache_time=None, **async_get_kwargs): + """Instantiate CachedPage object. + + `url`: the URL to be cached + `cache_time`: timedelta from last_update during which + page will be cached + `**kwargs` will be passed to async_get function + """ + self._url = url + if type(cache_time) is int: + cache_time = datetime.timedelta(seconds=cache_time) + if cache_time is None: + cache_time = self.__class__.CACHE_TIME + assert type(cache_time) is datetime.timedelta, ( + "Cache time must be a datetime.timedelta object!" + ) + self._cache_time = cache_time + self._page = None + self._last_update = datetime.datetime.now() - self.cache_time + self._async_get_kwargs = async_get_kwargs + + @property + def url(self): + """Geet cached page url.""" + return self._url + + @property + def cache_time(self): + """Get cache time.""" + return self._cache_time + + @property + def page(self): + """Get webpage.""" + return self._page + + @property + def last_update(self): + """Get datetime of last update.""" + return self._last_update + + @property + def async_get_kwargs(self): + """Get async get request keyword arguments.""" + return self._async_get_kwargs + + @property + def is_old(self): + """Evaluate True if `chache_time` has passed since last update.""" + return datetime.datetime.now() > self.last_update + self.cache_time + + async def refresh(self): + """Update cached webpage.""" + try: + self._page = await async_get(self.url, **self.async_get_kwargs) + self._last_update = datetime.datetime.now() + return 0 + except Exception as e: + self._page = None + logging.error( + '{e}'.format( + e=e + ), + exc_info=False + ) # Set exc_info=True to debug + return 1 + return 1 + + async def get_page(self): + """Refresh if necessary and return webpage.""" + if self.is_old: + await self.refresh() + return self.page + + +class Confirmator(Gettable, Confirmable): + """Gettable Confirmable object.""" + + instances = {} + + def __init__(self, key, *args, confirm_timedelta=None): + """Call Confirmable.__init__ passing `confirm_timedelta`.""" + Confirmable.__init__(self, confirm_timedelta) + + +def get_cleaned_text(update, bot=None, replace=[], strip='/ @'): + """Clean `update`['text'] and return it. + + Strip `bot`.name and items to be `replace`d from the beginning of text. + Strip `strip` characters from both ends. + """ + if bot is not None: + replace.append( + '@{.name}'.format( + bot + ) + ) + text = update['text'].strip(strip) + for s in replace: + while s and text.lower().startswith(s.lower()): + text = text[len(s):] + return text.strip(strip) + + +def get_user(record): + """Get an HTML Telegram tag for user `record`.""" + if not record: + return + from_ = {key: val for key, val in record.items()} + first_name, last_name, username, id_ = None, None, None, None + result = '{name}' + if 'telegram_id' in from_: + from_['id'] = from_['telegram_id'] + if ( + 'id' in from_ + and from_['id'] is not None + ): + result = '{{name}}'.format( + from_['id'] + ) + if 'username' in from_ and from_['username']: + result = result.format( + name=from_['username'] + ) + elif ( + 'first_name' in from_ + and from_['first_name'] + and 'last_name' in from_ + and from_['last_name'] + ): + result = result.format( + name='{} {}'.format( + from_['first_name'], + from_['last_name'] + ) + ) + elif 'first_name' in from_ and from_['first_name']: + result = result.format( + name=from_['first_name'] + ) + elif 'last_name' in from_ and from_['last_name']: + result = result.format( + name=from_['last_name'] + ) + else: + result = result.format( + name="Utente anonimo" + ) + return result + + +def datetime_from_utc_to_local(utc_datetime): + """Convert `utc_datetime` to local datetime.""" + now_timestamp = time.time() + offset = ( + datetime.datetime.fromtimestamp(now_timestamp) + - datetime.datetime.utcfromtimestamp(now_timestamp) + ) + return utc_datetime + offset + + +# TIME_SYMBOLS from more specific to less specific (avoid false positives!) +TIME_SYMBOLS = MyOD() +TIME_SYMBOLS["'"] = 'minutes' +TIME_SYMBOLS["settimana"] = 'weeks' +TIME_SYMBOLS["settimane"] = 'weeks' +TIME_SYMBOLS["weeks"] = 'weeks' +TIME_SYMBOLS["week"] = 'weeks' +TIME_SYMBOLS["giorno"] = 'days' +TIME_SYMBOLS["giorni"] = 'days' +TIME_SYMBOLS["secondi"] = 'seconds' +TIME_SYMBOLS["seconds"] = 'seconds' +TIME_SYMBOLS["secondo"] = 'seconds' +TIME_SYMBOLS["minuti"] = 'minutes' +TIME_SYMBOLS["minuto"] = 'minutes' +TIME_SYMBOLS["minute"] = 'minutes' +TIME_SYMBOLS["minutes"] = 'minutes' +TIME_SYMBOLS["day"] = 'days' +TIME_SYMBOLS["days"] = 'days' +TIME_SYMBOLS["ore"] = 'hours' +TIME_SYMBOLS["ora"] = 'hours' +TIME_SYMBOLS["sec"] = 'seconds' +TIME_SYMBOLS["min"] = 'minutes' +TIME_SYMBOLS["m"] = 'minutes' +TIME_SYMBOLS["h"] = 'hours' +TIME_SYMBOLS["d"] = 'days' +TIME_SYMBOLS["s"] = 'seconds' + + +def _interval_parser(text, result): + text = text.lower() + succeeded = False + if result is None: + result = [] + if len(result) == 0 or result[-1]['ok']: + text_part = '' + _text = text # I need to iterate through _text modifying text + for char in _text: + if not char.isnumeric(): + break + else: + text_part += char + text = text[1:] + if text_part.isnumeric(): + result.append( + dict( + unit=None, + value=int(text_part), + ok=False + ) + ) + succeeded = True, True + if text: + dummy, result = _interval_parser(text, result) + elif len(result) > 0 and not result[-1]['ok']: + text_part = '' + _text = text # I need to iterate through _text modifying text + for char in _text: + if char.isnumeric(): + break + else: + text_part += char + text = text[1:] + for time_symbol, unit in TIME_SYMBOLS.items(): + if time_symbol in text_part: + result[-1]['unit'] = unit + result[-1]['ok'] = True + succeeded = True, True + break + else: + result.pop() + if text: + dummy, result = _interval_parser(text, result) + return succeeded, result + + +def _date_parser(text, result): + succeeded = False + if 3 <= len(text) <= 10 and text.count('/') >= 1: + if 3 <= len(text) <= 5 and text.count('/') == 1: + text += '/{:%y}'.format(datetime.datetime.now()) + if 6 <= len(text) <= 10 and text.count('/') == 2: + day, month, year = [ + int(n) for n in [ + ''.join(char) + for char in text.split('/') + if char.isnumeric() + ] + ] + if year < 100: + year += 2000 + if result is None: + result = [] + result += [ + dict( + unit='day', + value=day, + ok=True + ), + dict( + unit='month', + value=month, + ok=True + ), + dict( + unit='year', + value=year, + ok=True + ) + ] + succeeded = True, True + return succeeded, result + + +def _time_parser(text, result): + succeeded = False + if (1 <= len(text) <= 8) and any(char.isnumeric() for char in text): + text = ''.join( + ':' if char == '.' else char + for char in text + if char.isnumeric() or char in (':', '.') + ) + if len(text) <= 2: + text = '{:02d}:00:00'.format(int(text)) + elif len(text) == 4 and ':' not in text: + text = '{:02d}:{:02d}:00'.format( + *[int(x) for x in (text[:2], text[2:])] + ) + elif text.count(':') == 1: + text = '{:02d}:{:02d}:00'.format( + *[int(x) for x in text.split(':')] + ) + if text.count(':') == 2: + hour, minute, second = (int(x) for x in text.split(':')) + if ( + 0 <= hour <= 24 + and 0 <= minute <= 60 + and 0 <= second <= 60 + ): + if result is None: + result = [] + result += [ + dict( + unit='hour', + value=hour, + ok=True + ), + dict( + unit='minute', + value=minute, + ok=True + ), + dict( + unit='second', + value=second, + ok=True + ) + ] + succeeded = True + return succeeded, result + + +WEEKDAY_NAMES_ITA = ["Lunedì", "Martedì", "Mercoledì", "Giovedì", + "Venerdì", "Sabato", "Domenica"] +WEEKDAY_NAMES_ENG = ["Monday", "Tuesday", "Wednesday", "Thursday", + "Friday", "Saturday", "Sunday"] + + +def _period_parser(text, result): + succeeded = False + if text in ('every', 'ogni',): + succeeded = True + if text.title() in WEEKDAY_NAMES_ITA + WEEKDAY_NAMES_ENG: + day_code = (WEEKDAY_NAMES_ITA + WEEKDAY_NAMES_ENG).index(text.title()) + if day_code > 6: + day_code -= 7 + today = datetime.date.today() + days = 1 + while (today + datetime.timedelta(days=days)).weekday() != day_code: + days += 1 + if result is None: + result = [] + result.append( + dict( + unit='days', + value=days, + ok=True, + weekly=True + ) + ) + succeeded = True + else: + succeeded, result = _interval_parser(text, result) + return succeeded, result + + +TIME_WORDS = { + 'tra': dict( + parser=_interval_parser, + recurring=False, + type_='delta' + ), + 'in': dict( + parser=_interval_parser, + recurring=False, + type_='delta' + ), + 'at': dict( + parser=_time_parser, + recurring=False, + type_='set' + ), + 'on': dict( + parser=_date_parser, + recurring=False, + type_='set' + ), + 'alle': dict( + parser=_time_parser, + recurring=False, + type_='set' + ), + 'il': dict( + parser=_date_parser, + recurring=False, + type_='set' + ), + 'every': dict( + parser=_period_parser, + recurring=True, + type_='delta' + ), + 'ogni': dict( + parser=_period_parser, + recurring=True, + type_='delta' + ), +} + + +def parse_datetime_interval_string(text): + """Parse `text` and return text, datetime and timedelta.""" + parsers = [] + result_text, result_datetime, result_timedelta = [], None, None + is_quoted_text = False + for word in text.split(' '): + if word.count('"') % 2: + is_quoted_text = not is_quoted_text + if is_quoted_text or '"' in word: + result_text.append( + word.replace('"', '') if 'href=' not in word else word + ) + continue + result_text.append(word) + word = word.lower() + succeeded = False + if len(parsers) > 0: + succeeded, result = parsers[-1]['parser']( + word, + parsers[-1]['result'] + ) + if succeeded: + parsers[-1]['result'] = result + if not succeeded and word in TIME_WORDS: + parsers.append( + dict( + result=None, + parser=TIME_WORDS[word]['parser'], + recurring=TIME_WORDS[word]['recurring'], + type_=TIME_WORDS[word]['type_'] + ) + ) + if succeeded: + result_text.pop() + if len(result_text) > 0 and result_text[-1].lower() in TIME_WORDS: + result_text.pop() + result_text = escape_html_chars( + ' '.join(result_text) + ) + parsers = list( + filter( + lambda x: 'result' in x and x['result'], + parsers + ) + ) + recurring_event = False + weekly = False + _timedelta = datetime.timedelta() + _datetime = None + _now = datetime.datetime.now() + for parser in parsers: + if parser['recurring']: + recurring_event = True + type_ = parser['type_'] + for result in parser['result']: + if not result['ok']: + continue + if recurring_event and 'weekly' in result and result['weekly']: + weekly = True + if type_ == 'set': + if _datetime is None: + _datetime = _now + _datetime = _datetime.replace( + **{ + result['unit']: result['value'] + } + ) + elif type_ == 'delta': + _timedelta += datetime.timedelta( + **{ + result['unit']: result['value'] + } + ) + if _datetime: + result_datetime = _datetime + if _timedelta: + if result_datetime is None: + result_datetime = _now + if recurring_event: + result_timedelta = _timedelta + if weekly: + result_timedelta = datetime.timedelta(days=7) + else: + result_datetime += _timedelta + while result_datetime and result_datetime < datetime.datetime.now(): + result_datetime += ( + result_timedelta + if result_timedelta + else datetime.timedelta(days=1) + ) + return result_text, result_datetime, result_timedelta + + +DAY_GAPS = { + -1: 'ieri', + -2: 'avantieri', + 0: 'oggi', + 1: 'domani', + 2: 'dopodomani' +} + +MONTH_NAMES_ITA = MyOD() +MONTH_NAMES_ITA[1] = "gennaio" +MONTH_NAMES_ITA[2] = "febbraio" +MONTH_NAMES_ITA[3] = "marzo" +MONTH_NAMES_ITA[4] = "aprile" +MONTH_NAMES_ITA[5] = "maggio" +MONTH_NAMES_ITA[6] = "giugno" +MONTH_NAMES_ITA[7] = "luglio" +MONTH_NAMES_ITA[8] = "agosto" +MONTH_NAMES_ITA[9] = "settembre" +MONTH_NAMES_ITA[10] = "ottobre" +MONTH_NAMES_ITA[11] = "novembre" +MONTH_NAMES_ITA[12] = "dicembre" + + +def beautytd(td): + """Format properly timedeltas.""" + result = '' + if type(td) is int: + td = datetime.timedelta(seconds=td) + assert isinstance( + td, + datetime.timedelta + ), "td must be a datetime.timedelta object!" + mtd = datetime.timedelta + if td < mtd(minutes=1): + result = "{:.0f} secondi".format( + td.total_seconds() + ) + elif td < mtd(minutes=60): + result = "{:.0f} min{}".format( + td.total_seconds()//60, + ( + " {:.0f} s".format( + td.total_seconds() % 60 + ) + ) if td.total_seconds() % 60 else '' + ) + elif td < mtd(days=1): + result = "{:.0f} h{}".format( + td.total_seconds()//3600, + ( + " {:.0f} min".format( + (td.total_seconds() % 3600) // 60 + ) + ) if td.total_seconds() % 3600 else '' + ) + elif td < mtd(days=30): + result = "{} giorni{}".format( + td.days, + ( + " {:.0f} h".format( + td.total_seconds() % (3600*24) // 3600 + ) + ) if td.total_seconds() % (3600*24) else '' + ) + return result + + +def beautydt(dt): + """Format a datetime in a smart way.""" + if type(dt) is str: + dt = str_to_datetime(dt) + assert isinstance( + dt, + datetime.datetime + ), "dt must be a datetime.datetime object!" + now = datetime.datetime.now() + gap = dt - now + gap_days = (dt.date() - now.date()).days + result = "{dt:alle %H:%M}".format( + dt=dt + ) + if abs(gap) < datetime.timedelta(minutes=30): + result += "{dt::%S}".format(dt=dt) + if -2 <= gap_days <= 2: + result += " di {dg}".format( + dg=DAY_GAPS[gap_days] + ) + elif gap.days not in (-1, 0): + result += " del {d}{m}".format( + d=dt.day, + m=( + "" if now.year == dt.year and now.month == dt.month + else " {m}{y}".format( + m=MONTH_NAMES_ITA[dt.month].title(), + y="" if now.year == dt.year + else " {}".format(dt.year) + ) + ) + ) + return result + + +HTML_SYMBOLS = MyOD() +HTML_SYMBOLS["&"] = "&" +HTML_SYMBOLS["<"] = "<" +HTML_SYMBOLS[">"] = ">" +HTML_SYMBOLS["\""] = """ +HTML_SYMBOLS["<b>"] = "" +HTML_SYMBOLS["</b>"] = "" +HTML_SYMBOLS["<i>"] = "" +HTML_SYMBOLS["</i>"] = "" +HTML_SYMBOLS["<code>"] = "" +HTML_SYMBOLS["</code>"] = "" +HTML_SYMBOLS["<pre>"] = "
"
+HTML_SYMBOLS["</pre>"] = "
" +HTML_SYMBOLS["<a href=""] = "" +HTML_SYMBOLS["</a>"] = "" + +HTML_TAGS = [ + None, "", "", + None, "", "", + None, "", "", + None, "
", "
", + None, "", "", + None +] + + +def remove_html_tags(text): + """Remove HTML tags from `text`.""" + for tag in HTML_TAGS: + if tag is None: + continue + text = text.replace(tag, '') + return text + + +def escape_html_chars(text): + """Escape HTML chars if not part of a tag.""" + for s, r in HTML_SYMBOLS.items(): + text = text.replace(s, r) + copy = text + expected_tag = None + while copy: + min_ = min( + ( + dict( + position=copy.find(tag) if tag in copy else len(copy), + tag=tag + ) + for tag in HTML_TAGS + if tag + ), + key=lambda x: x['position'], + default=0 + ) + if min_['position'] == len(copy): + break + if expected_tag and min_['tag'] != expected_tag: + return text.replace('<', '_').replace('>', '_') + expected_tag = HTML_TAGS[HTML_TAGS.index(min_['tag'])+1] + copy = extract(copy, min_['tag']) + return text + + +def accents_to_jolly(text, lower=True): + """Replace letters with Italian accents with SQL jolly character.""" + to_be_replaced = ('à', 'è', 'é', 'ì', 'ò', 'ù') + if lower: + text = text.lower() + else: + to_be_replaced += tuple(s.upper() for s in to_be_replaced) + for s in to_be_replaced: + text = text.replace(s, '_') + return text.replace("'", "''") + + +def get_secure_key(allowed_chars=None, length=6): + """Get a randomly-generate secure key. + + You can specify a set of `allowed_chars` and a `length`. + """ + if allowed_chars is None: + allowed_chars = string.ascii_uppercase + string.digits + return ''.join( + random.SystemRandom().choice( + allowed_chars + ) + for _ in range(length) + ) + + +def round_to_minute(datetime_): + """Round `datetime_` to closest minute.""" + return ( + datetime_ + datetime.timedelta(seconds=30) + ).replace(second=0, microsecond=0) + + +def get_line_by_content(text, key): + """Get line of `text` containing `key`.""" + for line in text.split('\n'): + if key in line: + return line + return + + +def str_to_int(string): + """Cast str to int, ignoring non-numeric characters.""" + string = ''.join( + char + for char in string + if char.isnumeric() + ) + if len(string) == 0: + string = '0' + return int(string) + + +def starting_with_or_similar_to(a, b): + """Return similarity between two strings. + + Least similar equals 0, most similar evaluates 1. + If similarity is less than 0.75, return 1 if one string starts with + the other and return 0.5 if one string is contained in the other. + """ + a = a.lower() + b = b.lower() + similarity = SequenceMatcher(None, a, b).ratio() + if similarity < 0.75: + if b.startswith(a) or a.startswith(b): + return 1 + if b in a or a in b: + return 0.5 + return similarity + + +def pick_most_similar_from_list(list_, item): + """Return element from `list_` which is most similar to `item`. + + Similarity is evaluated using `starting_with_or_similar_to`. + """ + return max( + list_, + key=lambda element: starting_with_or_similar_to( + item, + element + ) + )