diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index 41489b8..e9bc80a 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -7,7 +7,7 @@ __author__ = "Davide Testa" __email__ = "davte@libero.it" __credits__ = "Marco Origlia" __license__ = "GNU General Public License v3.0" -__version__ = "1.3.6" +__version__ = "1.4.0" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/custombot.py b/davtelepot/custombot.py index fe75439..02a3c9b 100644 --- a/davtelepot/custombot.py +++ b/davtelepot/custombot.py @@ -24,9 +24,9 @@ import os # Third party modules import dataset from davteutil.utilities import ( - Gettable, escape_html_chars, get_cleaned_text, line_drawing_unordered_list, - make_lines_of_buttons, markdown_check, MyOD, pick_most_similar_from_list, - remove_html_tags, sleep_until + Gettable, escape_html_chars, get_cleaned_text, + line_drawing_unordered_list, make_lines_of_buttons, markdown_check, MyOD, + pick_most_similar_from_list, remove_html_tags, sleep_until ) import telepot import telepot.aio @@ -159,10 +159,12 @@ class Bot(telepot.aio.Bot, Gettable): 'photo': self.handle_photo_message } if db_name: - self.db_url = 'sqlite:///{name}{ext}'.format( + self._db_url = 'sqlite:///{name}{ext}'.format( name=db_name, ext='.db' if not db_name.endswith('.db') else '' ) + else: + self._db_url = None self._unauthorized_message = None self.authorization_function = lambda update, authorization_level: True self.get_chat_id = lambda update: ( @@ -208,6 +210,11 @@ class Bot(telepot.aio.Bot, Gettable): """custombot.py file path.""" return self.__class__._path + @property + def db_url(self): + """Return complete path to database.""" + return self._db_url + @property def db(self): """Connect to bot's database. @@ -1837,6 +1844,8 @@ class Bot(telepot.aio.Bot, Gettable): exit_code = await selected_bot['bot']._run_manually() if exit_code == 0: break + elif exit_code == 65: + selected_bot = None return @classmethod @@ -1874,13 +1883,182 @@ class Bot(telepot.aio.Bot, Gettable): async def _run_manually(self): user_input = ' choose_addressee' + selected_user = None + users = [] while user_input: - try: - user_input = input( - "Choose an addressee." - "\n\t\t" + if user_input == ' choose_addressee': + try: + user_input = input( + "Choose an addressee." + "\n\t\t" + ) + except KeyboardInterrupt: + logging.error("Keyboard interrupt.") + return 0 # Stop running + if ( + selected_user is None + and user_input.strip('-').isnumeric() + ): + user_input = int(user_input) + users = list( + filter( + lambda user: user['telegram_id'] == user_input, + users + ) ) - except KeyboardInterrupt: - logging.error("Keyboard interrupt.") - break - logging.info(user_input) + if len(users) == 0: + users = [ + dict( + telegram_id=user_input, + name='Unknown user' + ) + ] + elif ( + selected_user is None + and self.db_url is not None + ): + with self.db as db: + if 'users' not in db.tables: + db['users'].insert( + dict( + telegram_id=0, + privileges=100, + username='username', + first_name='first_name', + last_name='last_name' + ) + ) + if 'contacts' not in db.tables: + db['contacts'].insert( + dict( + telegram_id=0, + name='ZZZXXXAAA', + ) + ) + users = list( + db.query( + """SELECT telegram_id, MAX(name) name + FROM ( + SELECT telegram_id, + first_name || ' ' || last_name || + ' (' ||username || ')' AS name + FROM users + WHERE first_name || last_name || username + LIKE '%{u}%' + UNION ALL + SELECT telegram_id, name + FROM contacts + WHERE name LIKE '%{u}%' + ) + GROUP BY telegram_id + """.format( + u=user_input + ) + ) + ) + if len(users) == 0: + logging.info("Sorry, no user matches your query.") + user_input = ' choose_addressee' + elif len(users) > 1: + user_input = input( + "Please select one of the following users:\n" + "\n" + "{users}\n" + "\n" + "Paste their telegram_id\n" + "\t\t".format( + users=line_drawing_unordered_list( + sorted( + map( + lambda user: ( + "{u[telegram_id]} - {u[name]}" + ).format( + u=user + ), + users + ) + ) + ) + ) + ) + elif len(users) == 1: + selected_user = users[0] + while selected_user is not None: + sent = None + text = input( + "What would you like to send " + "{u[name]} ({u[telegram_id]})?\n" + "\t\t\t".format( + u=selected_user + ) + ) + if len(text) == 0: + selected_user = None + user_input = ' choose_addressee' + elif text.lower() == 'photo': + caption = input( + 'Write a caption (you can leave it blank)\n' + '\t\t\t' + ) + try: + with io.BytesIO() as buffered_picture: + with open( + '{path}/sendme.png'.format( + path=self.path + ), + 'rb' # Read bytes + ) as photo_file: + buffered_picture.write( + photo_file.read() + ) + photo = buffered_picture.getvalue() + sent = await self.send_photo( + chat_id=selected_user['telegram_id'], + photo=photo, + caption=caption, + parse_mode='HTML' + ) + except Exception as e: + logging.error(e) + else: + try: + sent = await self.send_message( + chat_id=selected_user['telegram_id'], + text=text, + parse_mode='HTML' + ) + except Exception as e: + logging.error(e) + if ( + sent is not None + and not isinstance(sent, Exception) + ): + logging.info( + '\n' + 'Sent message:\n' + '{s}\n' + '\n'.format( + s=sent + ) + ) + while ( + self.db_url + and selected_user['name'] == 'Unknown user' + ): + new_name = input( + "Please enter a nickname for this user.\n" + "Next time you may retrieve their telegram_id " + "by passing this nickname (or a part of it).\n" + "\t\t" + ) + if len(new_name): + selected_user['name'] = new_name + with self.db as db: + db['contacts'].upsert( + selected_user, + ['telegram_id'], + ensure=True + ) + else: + logging.info("Invalid name, please try again.") + return 65 # Keep running, making user select another bot