Manual mode implemented

This commit is contained in:
Davte 2018-11-21 19:18:39 +01:00
parent 121e3c370c
commit accdd373d8
2 changed files with 191 additions and 13 deletions

View File

@ -7,7 +7,7 @@ __author__ = "Davide Testa"
__email__ = "davte@libero.it"
__credits__ = "Marco Origlia"
__license__ = "GNU General Public License v3.0"
__version__ = "1.3.6"
__version__ = "1.4.0"
__maintainer__ = "Davide Testa"
__contact__ = "t.me/davte"

View File

@ -24,9 +24,9 @@ import os
# Third party modules
import dataset
from davteutil.utilities import (
Gettable, escape_html_chars, get_cleaned_text, line_drawing_unordered_list,
make_lines_of_buttons, markdown_check, MyOD, pick_most_similar_from_list,
remove_html_tags, sleep_until
Gettable, escape_html_chars, get_cleaned_text,
line_drawing_unordered_list, make_lines_of_buttons, markdown_check, MyOD,
pick_most_similar_from_list, remove_html_tags, sleep_until
)
import telepot
import telepot.aio
@ -159,10 +159,12 @@ class Bot(telepot.aio.Bot, Gettable):
'photo': self.handle_photo_message
}
if db_name:
self.db_url = 'sqlite:///{name}{ext}'.format(
self._db_url = 'sqlite:///{name}{ext}'.format(
name=db_name,
ext='.db' if not db_name.endswith('.db') else ''
)
else:
self._db_url = None
self._unauthorized_message = None
self.authorization_function = lambda update, authorization_level: True
self.get_chat_id = lambda update: (
@ -208,6 +210,11 @@ class Bot(telepot.aio.Bot, Gettable):
"""custombot.py file path."""
return self.__class__._path
@property
def db_url(self):
"""Return complete path to database."""
return self._db_url
@property
def db(self):
"""Connect to bot's database.
@ -1837,6 +1844,8 @@ class Bot(telepot.aio.Bot, Gettable):
exit_code = await selected_bot['bot']._run_manually()
if exit_code == 0:
break
elif exit_code == 65:
selected_bot = None
return
@classmethod
@ -1874,13 +1883,182 @@ class Bot(telepot.aio.Bot, Gettable):
async def _run_manually(self):
user_input = ' choose_addressee'
selected_user = None
users = []
while user_input:
try:
user_input = input(
"Choose an addressee."
"\n\t\t"
if user_input == ' choose_addressee':
try:
user_input = input(
"Choose an addressee."
"\n\t\t"
)
except KeyboardInterrupt:
logging.error("Keyboard interrupt.")
return 0 # Stop running
if (
selected_user is None
and user_input.strip('-').isnumeric()
):
user_input = int(user_input)
users = list(
filter(
lambda user: user['telegram_id'] == user_input,
users
)
)
except KeyboardInterrupt:
logging.error("Keyboard interrupt.")
break
logging.info(user_input)
if len(users) == 0:
users = [
dict(
telegram_id=user_input,
name='Unknown user'
)
]
elif (
selected_user is None
and self.db_url is not None
):
with self.db as db:
if 'users' not in db.tables:
db['users'].insert(
dict(
telegram_id=0,
privileges=100,
username='username',
first_name='first_name',
last_name='last_name'
)
)
if 'contacts' not in db.tables:
db['contacts'].insert(
dict(
telegram_id=0,
name='ZZZXXXAAA',
)
)
users = list(
db.query(
"""SELECT telegram_id, MAX(name) name
FROM (
SELECT telegram_id,
first_name || ' ' || last_name ||
' (' ||username || ')' AS name
FROM users
WHERE first_name || last_name || username
LIKE '%{u}%'
UNION ALL
SELECT telegram_id, name
FROM contacts
WHERE name LIKE '%{u}%'
)
GROUP BY telegram_id
""".format(
u=user_input
)
)
)
if len(users) == 0:
logging.info("Sorry, no user matches your query.")
user_input = ' choose_addressee'
elif len(users) > 1:
user_input = input(
"Please select one of the following users:\n"
"\n"
"{users}\n"
"\n"
"Paste their telegram_id\n"
"\t\t".format(
users=line_drawing_unordered_list(
sorted(
map(
lambda user: (
"{u[telegram_id]} - {u[name]}"
).format(
u=user
),
users
)
)
)
)
)
elif len(users) == 1:
selected_user = users[0]
while selected_user is not None:
sent = None
text = input(
"What would you like to send "
"{u[name]} ({u[telegram_id]})?\n"
"\t\t\t".format(
u=selected_user
)
)
if len(text) == 0:
selected_user = None
user_input = ' choose_addressee'
elif text.lower() == 'photo':
caption = input(
'Write a caption (you can leave it blank)\n'
'\t\t\t'
)
try:
with io.BytesIO() as buffered_picture:
with open(
'{path}/sendme.png'.format(
path=self.path
),
'rb' # Read bytes
) as photo_file:
buffered_picture.write(
photo_file.read()
)
photo = buffered_picture.getvalue()
sent = await self.send_photo(
chat_id=selected_user['telegram_id'],
photo=photo,
caption=caption,
parse_mode='HTML'
)
except Exception as e:
logging.error(e)
else:
try:
sent = await self.send_message(
chat_id=selected_user['telegram_id'],
text=text,
parse_mode='HTML'
)
except Exception as e:
logging.error(e)
if (
sent is not None
and not isinstance(sent, Exception)
):
logging.info(
'\n'
'Sent message:\n'
'{s}\n'
'\n'.format(
s=sent
)
)
while (
self.db_url
and selected_user['name'] == 'Unknown user'
):
new_name = input(
"Please enter a nickname for this user.\n"
"Next time you may retrieve their telegram_id "
"by passing this nickname (or a part of it).\n"
"\t\t"
)
if len(new_name):
selected_user['name'] = new_name
with self.db as db:
db['contacts'].upsert(
selected_user,
['telegram_id'],
ensure=True
)
else:
logging.info("Invalid name, please try again.")
return 65 # Keep running, making user select another bot