diff --git a/davtelepot/__init__.py b/davtelepot/__init__.py index f2071a0..6c1a40d 100644 --- a/davtelepot/__init__.py +++ b/davtelepot/__init__.py @@ -11,7 +11,7 @@ __author__ = "Davide Testa" __email__ = "davide@davte.it" __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __license__ = "GNU General Public License v3.0" -__version__ = "2.8.13" +__version__ = "2.9.1" __maintainer__ = "Davide Testa" __contact__ = "t.me/davte" diff --git a/davtelepot/__main__.py b/davtelepot/__main__.py new file mode 100644 index 0000000..1608a34 --- /dev/null +++ b/davtelepot/__main__.py @@ -0,0 +1,5 @@ +from davtelepot.cli import run_from_command_line + + +if __name__ == '__main__': + run_from_command_line() diff --git a/davtelepot/api.py b/davtelepot/api.py index 02902de..f17f5a5 100644 --- a/davtelepot/api.py +++ b/davtelepot/api.py @@ -10,6 +10,7 @@ import datetime import io import json import logging +import os.path from typing import Dict, Union, List, IO @@ -270,6 +271,79 @@ class InlineQueryResult(dict): self[key] = value +class MaskPosition(dict): + """This object describes the position on faces where a mask should be placed by default.""" + + def __init__(self, point: str, x_shift: float, y_shift: float, scale: float): + """This object describes the position on faces where a mask should be placed by default. + + @param point: The part of the face relative to which the mask should + be placed. One of “forehead”, “eyes”, “mouth”, or “chin”. + @param x_shift: Shift by X-axis measured in widths of the mask scaled + to the face size, from left to right. For example, choosing -1.0 + will place mask just to the left of the default mask position. + @param y_shift: Shift by Y-axis measured in heights of the mask scaled + to the face size, from top to bottom. For example, 1.0 will place + the mask just below the default mask position. + @param scale: Mask scaling coefficient. + For example, 2.0 means double size. + """ + super().__init__(self) + self['point'] = point + self['x_shift'] = x_shift + self['y_shift'] = y_shift + self['scale'] = scale + + +class InputSticker(dict): + """This object describes a sticker to be added to a sticker set.""" + + def __init__(self, sticker: Union[str, dict, IO], emoji_list: List[str], + mask_position: Union['MaskPosition', None] = None, + keywords: Union[List[str], None] = None): + """This object describes a sticker to be added to a sticker set. + + @param sticker: The added sticker. Pass a file_id as a String to send + a file that already exists on the Telegram servers, + pass an HTTP URL as a String for Telegram to get a file from the + Internet, upload a new one using multipart/form-data, + or pass “attach://” to upload a new one using + multipart/form-data under name. + Animated and video stickers can't be uploaded via HTTP URL. + More information on Sending Files: https://core.telegram.org/bots/api#sending-files + @param emoji_list: List of 1-20 emoji associated with the sticker + @param mask_position: Optional. Position where the mask should be + placed on faces. For “mask” stickers only. + @param keywords: Optional. List of 0-20 search keywords for the sticker + with total length of up to 64 characters. + For “regular” and “custom_emoji” stickers only. + """ + super().__init__(self) + self['sticker'] = sticker + self['emoji_list'] = emoji_list + self['mask_position'] = mask_position + self['keywords'] = keywords + + +class InlineQueryResultsButton(dict): + """Button to be shown above inline query results.""" + + def __init__(self, + text: str = None, + web_app: 'WebAppInfo' = None, + start_parameter: str = None): + super().__init__(self) + if sum(1 for e in (text, web_app, start_parameter) if e) != 1: + logging.error("You must provide exactly one parameter (`text` " + "or `web_app` or `start_parameter`).") + return + self['text'] = text + self['web_app'] = web_app + self['start_parameter'] = start_parameter + return + + + # This class needs to mirror Telegram API, so camelCase method are needed # noinspection PyPep8Naming class TelegramBot: @@ -389,7 +463,7 @@ class TelegramBot: """ if exclude is None: exclude = [] - exclude.append('self') + exclude += ['self', 'kwargs'] # quote_fields=False, otherwise some file names cause troubles data = aiohttp.FormData(quote_fields=False) for key, value in parameters.items(): @@ -402,8 +476,12 @@ class TelegramBot: @staticmethod def prepare_file_object(file: Union[str, IO, dict, None] - ) -> Union[Dict[str, IO], None]: - if type(file) is str: + ) -> Union[str, Dict[str, IO], None]: + """If `file` is a valid file path, return a dict for multipart/form-data. + + Other valid file identifiers are URLs and Telegram `file_id`s. + """ + if type(file) is str and os.path.isfile(file): try: file = open(file, 'r') except FileNotFoundError as e: @@ -443,6 +521,20 @@ class TelegramBot: """Wait `flood_wait` seconds before next request.""" self._flood_wait = flood_wait + def make_input_sticker(self, + sticker: Union[dict, str, IO], + emoji_list: Union[List[str], str], + mask_position: Union[MaskPosition, None] = None, + keywords: Union[List[str], None] = None) -> InputSticker: + if isinstance(emoji_list, str): + emoji_list = [c for c in emoji_list] + if isinstance(keywords, str): + keywords = [w for w in keywords] + if isinstance(sticker, str) and os.path.isfile(sticker): + sticker = self.prepare_file_object(sticker) + return InputSticker(sticker=sticker, emoji_list=emoji_list, + mask_position=mask_position, keywords=keywords) + async def prevent_flooding(self, chat_id): """Await until request may be sent safely. @@ -702,24 +794,30 @@ class TelegramBot: duration: int = None, performer: str = None, title: str = None, - thumb=None, + thumbnail=None, disable_notification: bool = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = None, message_thread_id: int = None, protect_content: bool = None, - reply_markup=None): + reply_markup=None, + **kwargs): """Send an audio file from file_id, HTTP url or file. See https://core.telegram.org/bots/api#sendaudio for details. """ + if 'thumb' in kwargs: + thumbnail = kwargs['thumb'] + logging.error("DEPRECATION WARNING: `thumb` parameter of function" + "`sendAudio` has been deprecated since Bot API 6.6. " + "Use `thumbnail` instead.") return await self.api_request( 'sendAudio', parameters=locals() ) async def sendDocument(self, chat_id: Union[int, str], document, - thumb=None, + thumbnail=None, caption: str = None, parse_mode: str = None, caption_entities: List[dict] = None, @@ -729,11 +827,17 @@ class TelegramBot: allow_sending_without_reply: bool = None, message_thread_id: int = None, protect_content: bool = None, - reply_markup=None): + reply_markup=None, + **kwargs): """Send a document from file_id, HTTP url or file. See https://core.telegram.org/bots/api#senddocument for details. """ + if 'thumb' in kwargs: + thumbnail = kwargs['thumb'] + logging.error("DEPRECATION WARNING: `thumb` parameter of function" + "`sendDocument` has been deprecated since Bot API 6.6. " + "Use `thumbnail` instead.") return await self.api_request( 'sendDocument', parameters=locals() @@ -743,7 +847,7 @@ class TelegramBot: duration: int = None, width: int = None, height: int = None, - thumb=None, + thumbnail=None, caption: str = None, parse_mode: str = None, caption_entities: List[dict] = None, @@ -754,11 +858,17 @@ class TelegramBot: message_thread_id: int = None, protect_content: bool = None, has_spoiler: bool = None, - reply_markup=None): + reply_markup=None, + **kwargs): """Send a video from file_id, HTTP url or file. See https://core.telegram.org/bots/api#sendvideo for details. """ + if 'thumb' in kwargs: + thumbnail = kwargs['thumb'] + logging.error("DEPRECATION WARNING: `thumb` parameter of function" + "`sendVideo` has been deprecated since Bot API 6.6. " + "Use `thumbnail` instead.") return await self.api_request( 'sendVideo', parameters=locals() @@ -768,7 +878,7 @@ class TelegramBot: duration: int = None, width: int = None, height: int = None, - thumb=None, + thumbnail=None, caption: str = None, parse_mode: str = None, caption_entities: List[dict] = None, @@ -778,11 +888,17 @@ class TelegramBot: message_thread_id: int = None, protect_content: bool = None, has_spoiler: bool = None, - reply_markup=None): + reply_markup=None, + **kwargs): """Send animation files (GIF or H.264/MPEG-4 AVC video without sound). See https://core.telegram.org/bots/api#sendanimation for details. """ + if 'thumb' in kwargs: + thumbnail = kwargs['thumb'] + logging.error("DEPRECATION WARNING: `thumb` parameter of function" + "`sendAnimation` has been deprecated since Bot API 6.6. " + "Use `thumbnail` instead.") return await self.api_request( 'sendAnimation', parameters=locals() @@ -812,17 +928,23 @@ class TelegramBot: async def sendVideoNote(self, chat_id: Union[int, str], video_note, duration: int = None, length: int = None, - thumb=None, + thumbnail=None, disable_notification: bool = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = None, message_thread_id: int = None, protect_content: bool = None, - reply_markup=None): + reply_markup=None, + **kwargs): """Send a rounded square mp4 video message of up to 1 minute long. See https://core.telegram.org/bots/api#sendvideonote for details. """ + if 'thumb' in kwargs: + thumbnail = kwargs['thumb'] + logging.error("DEPRECATION WARNING: `thumb` parameter of function" + "`sendVideoNote` has been deprecated since Bot API 6.6. " + "Use `thumbnail` instead.") return await self.api_request( 'sendVideoNote', parameters=locals() @@ -1466,9 +1588,12 @@ class TelegramBot: allow_sending_without_reply: bool = None, message_thread_id: int = None, protect_content: bool = None, + emoji: str = None, reply_markup=None): """Send `.webp` stickers. + `sticker` must be a file path, a URL, a file handle or a dict + {"file": io_file_handle}, to allow multipart/form-data encoding. On success, the sent Message is returned. See https://core.telegram.org/bots/api#sendsticker for details. """ @@ -1495,29 +1620,42 @@ class TelegramBot: parameters=locals() ) - async def uploadStickerFile(self, user_id, png_sticker): - """Upload a .png file as a sticker. + async def uploadStickerFile(self, user_id: int, sticker: Union[str, dict, IO], + sticker_format: str, **kwargs): + """Upload an image file for later use in sticker packs. - Use it later via `createNewStickerSet` and `addStickerToSet` methods - (can be used multiple times). - Return the uploaded File on success. - `png_sticker` must be a *.png image up to 512 kilobytes in size, - dimensions must not exceed 512px, and either width or height must - be exactly 512px. + Use this method to upload a file with a sticker for later use in the + createNewStickerSet and addStickerToSet methods + (the file can be used multiple times). + `sticker` must be a file path, a file handle or a dict + {"file": io_file_handle}, to allow multipart/form-data encoding. + Returns the uploaded File on success. See https://core.telegram.org/bots/api#uploadstickerfile for details. """ - return await self.api_request( + if 'png_sticker' in kwargs: + sticker = kwargs['png_sticker'] + logging.error("DEPRECATION WARNING: `png_sticker` parameter of function" + "`uploadStickerFile` has been deprecated since Bot API 6.6. " + "Use `sticker` instead.") + if sticker_format not in ("static", "animated", "video"): + logging.error(f"Unknown sticker format `{sticker_format}`.") + sticker = self.prepare_file_object(sticker) + if sticker is None: + logging.error("Invalid sticker provided!") + return + result = await self.api_request( 'uploadStickerFile', parameters=locals() ) + if type(sticker) is dict: # Close sticker file, if it was open + sticker['file'].close() + return result async def createNewStickerSet(self, user_id: int, name: str, title: str, - emojis: str, - png_sticker: Union[str, dict, IO] = None, - tgs_sticker: Union[str, dict, IO] = None, - webm_sticker: Union[str, dict, IO] = None, + stickers: List['InputSticker'], + sticker_format: str = 'static', sticker_type: str = 'regular', - mask_position: dict = None, + needs_repainting: bool = False, **kwargs): """Create new sticker set owned by a user. @@ -1525,58 +1663,72 @@ class TelegramBot: Returns True on success. See https://core.telegram.org/bots/api#createnewstickerset for details. """ + if stickers is None: + stickers = [] if 'contains_masks' in kwargs: logging.error("Parameter `contains_masks` of method " "`createNewStickerSet` has been deprecated. " "Use `sticker_type = 'mask'` instead.") sticker_type = 'mask' if kwargs['contains_masks'] else 'regular' - if sticker_type not in ('regular', 'mask'): - raise TypeError - png_sticker = self.prepare_file_object(png_sticker) - tgs_sticker = self.prepare_file_object(tgs_sticker) - webm_sticker = self.prepare_file_object(webm_sticker) - if png_sticker is None and tgs_sticker is None and webm_sticker is None: - logging.error("Invalid sticker provided!") - return + for old_sticker_format in ('png_sticker', 'tgs_sticker', 'webm_sticker'): + if old_sticker_format in kwargs: + if 'emojis' not in kwargs: + logging.error(f"No `emojis` provided together with " + f"`{old_sticker_format}`. To create new " + f"sticker set with some stickers in it, use " + f"the new `stickers` parameter.") + return + logging.error(f"Parameter `{old_sticker_format}` of method " + "`createNewStickerSet` has been deprecated since" + "Bot API 6.6. " + "Use `stickers` instead.") + stickers.append( + self.make_input_sticker( + sticker=kwargs[old_sticker_format], + emoji_list=kwargs['emojis'] + ) + ) + if sticker_type not in ('regular', 'mask', 'custom_emoji'): + raise TypeError(f"Unknown sticker type `{sticker_type}`.") result = await self.api_request( 'createNewStickerSet', - parameters=locals() + parameters=locals(), + exclude=['old_sticker_format'] ) - if type(png_sticker) is dict: # Close png_sticker file, if it was open - png_sticker['file'].close() - if type(tgs_sticker) is dict: # Close tgs_sticker file, if it was open - tgs_sticker['file'].close() - if type(webm_sticker) is dict: # Close webm_sticker file, if it was open - webm_sticker['file'].close() return result async def addStickerToSet(self, user_id: int, name: str, - emojis: str, - png_sticker: Union[str, dict, IO] = None, - tgs_sticker: Union[str, dict, IO] = None, - webm_sticker: Union[str, dict, IO] = None, - mask_position: dict = None): + sticker: InputSticker = None, + **kwargs): """Add a new sticker to a set created by the bot. Returns True on success. See https://core.telegram.org/bots/api#addstickertoset for details. """ - png_sticker = self.prepare_file_object(png_sticker) - tgs_sticker = self.prepare_file_object(tgs_sticker) - webm_sticker = self.prepare_file_object(webm_sticker) - if png_sticker is None and tgs_sticker is None and webm_sticker is None: - logging.error("Invalid sticker provided!") + for old_sticker_format in ('png_sticker', 'tgs_sticker', 'webm_sticker'): + if old_sticker_format in kwargs: + if 'emojis' not in kwargs: + logging.error(f"No `emojis` provided together with " + f"`{old_sticker_format}`.") + return + logging.error(f"Parameter `{old_sticker_format}` of method " + "`addStickerToSet` has been deprecated since" + "Bot API 6.6. " + "Use `sticker` instead.") + sticker = self.make_input_sticker( + sticker=kwargs[old_sticker_format], + emoji_list=kwargs['emojis'], + mask_position=kwargs['mask_position'] if 'mask_position' in kwargs else None + ) + if sticker is None: + logging.error("Must provide a sticker of type `InputSticker` to " + "`addStickerToSet` method.") return result = await self.api_request( 'addStickerToSet', - parameters=locals() + parameters=locals(), + exclude=['old_sticker_format'] ) - if type(png_sticker) is dict: # Close png_sticker file, if it was open - png_sticker['file'].close() - if type(tgs_sticker) is dict: # Close tgs_sticker file, if it was open - tgs_sticker['file'].close() - if type(webm_sticker) is dict: # Close webm_sticker file, if it was open - webm_sticker['file'].close() return result async def setStickerPositionInSet(self, sticker, position): @@ -1608,14 +1760,18 @@ class TelegramBot: cache_time=None, is_personal=None, next_offset=None, - switch_pm_text=None, - switch_pm_parameter=None): + button: Union['InlineQueryResultsButton', None] = None, + **kwargs): """Send answers to an inline query. On success, True is returned. No more than 50 results per query are allowed. See https://core.telegram.org/bots/api#answerinlinequery for details. """ + if 'switch_pm_text' in kwargs: + button = InlineQueryResultsButton(text=kwargs['switch_pm_text']) + if 'switch_pm_parameter' in kwargs: + button = InlineQueryResultsButton(start_parameter=kwargs['switch_pm_parameter']) return await self.api_request( 'answerInlineQuery', parameters=locals() @@ -2358,3 +2514,153 @@ class TelegramBot: 'unhideGeneralForumTopic', parameters=locals() ) + + async def setMyName(self, name: str, language_code: str): + """Change the bot's name. + + Returns True on success. + See https://core.telegram.org/bots/api#setmyname for details. + """ + return await self.api_request( + 'setMyName', + parameters=locals() + ) + + async def getMyName(self, language_code: str): + """Get the current bot name for the given user language. + + Returns BotName on success. + See https://core.telegram.org/bots/api#getmyname for details. + """ + return await self.api_request( + 'getMyName', + parameters=locals() + ) + + async def setMyDescription(self, description: str, language_code: str): + """Change the bot's description, which is shown in the chat with the bot if + the chat is empty. + + Returns True on success. + See https://core.telegram.org/bots/api#setmydescription for details. + """ + return await self.api_request( + 'setMyDescription', + parameters=locals() + ) + + async def getMyDescription(self, language_code: str): + """Get the current bot description for the given user language. + + Returns BotDescription on success. + See https://core.telegram.org/bots/api#getmydescription for details. + """ + return await self.api_request( + 'getMyDescription', + parameters=locals() + ) + + async def setMyShortDescription(self, short_description: str, language_code: str): + """Change the bot's short description, which is shown on the bot's profile + page and is sent together with the link when users share the bot. + + Returns True on success. + See https://core.telegram.org/bots/api#setmyshortdescription for details. + """ + return await self.api_request( + 'setMyShortDescription', + parameters=locals() + ) + + async def getMyShortDescription(self, language_code: str): + """Get the current bot short description for the given user language. + + Returns BotShortDescription on success. + See https://core.telegram.org/bots/api#getmyshortdescription for details. + """ + return await self.api_request( + 'getMyShortDescription', + parameters=locals() + ) + + async def setStickerEmojiList(self, sticker: str, emoji_list: List[str]): + """Change the list of emoji assigned to a regular or custom emoji sticker. + + The sticker must belong to a sticker set created by the bot. + Returns True on success. + See https://core.telegram.org/bots/api#setstickeremojilist for details. + """ + return await self.api_request( + 'setStickerEmojiList', + parameters=locals() + ) + + async def setStickerKeywords(self, sticker: str, keywords: List[str]): + """Change search keywords assigned to a regular or custom emoji sticker. + + The sticker must belong to a sticker set created by the bot. + Returns True on success. + See https://core.telegram.org/bots/api#setstickerkeywords for details. + """ + return await self.api_request( + 'setStickerKeywords', + parameters=locals() + ) + + async def setStickerMaskPosition(self, sticker: str, mask_position: 'MaskPosition'): + """Change the mask position of a mask sticker. + + The sticker must belong to a sticker set that was created by the bot. + Returns True on success. + See https://core.telegram.org/bots/api#setstickermaskposition for details. + """ + return await self.api_request( + 'setStickerMaskPosition', + parameters=locals() + ) + + async def setStickerSetTitle(self, name: str, title: str): + """Set the title of a created sticker set. + + Returns True on success. + See https://core.telegram.org/bots/api#setstickersettitle for details. + """ + return await self.api_request( + 'setStickerSetTitle', + parameters=locals() + ) + + async def setStickerSetThumbnail(self, name: str, user_id: int, thumbnail: 'InputFile or String'): + """Set the thumbnail of a regular or mask sticker set. + + The format of the thumbnail file must match the format of the stickers + in the set. + Returns True on success. + See https://core.telegram.org/bots/api#setstickersetthumbnail for details. + """ + return await self.api_request( + 'setStickerSetThumbnail', + parameters=locals() + ) + + async def setCustomEmojiStickerSetThumbnail(self, name: str, custom_emoji_id: str): + """Set the thumbnail of a custom emoji sticker set. + + Returns True on success. + See https://core.telegram.org/bots/api#setcustomemojistickersetthumbnail for details. + """ + return await self.api_request( + 'setCustomEmojiStickerSetThumbnail', + parameters=locals() + ) + + async def deleteStickerSet(self, name: str): + """Delete a sticker set that was created by the bot. + + Returns True on success. + See https://core.telegram.org/bots/api#deletestickerset for details. + """ + return await self.api_request( + 'deleteStickerSet', + parameters=locals() + ) diff --git a/davtelepot/bot.py b/davtelepot/bot.py index fa4fbed..e3b4234 100644 --- a/davtelepot/bot.py +++ b/davtelepot/bot.py @@ -99,7 +99,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): ) ] _log_file_name = None + _log_file_path = None _errors_file_name = None + _errors_file_path = None _documents_max_dimension = 50 * 1000 * 1000 # 50 MB def __init__( @@ -237,7 +239,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): self.default_reply_keyboard_elements = [] self.recent_users = OrderedDict() self._log_file_name = None + self._log_file_path = None self._errors_file_name = None + self._errors_file_path = None self.placeholder_requests = dict() self.shared_data = dict() self.Role = None @@ -321,10 +325,17 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): @property def log_file_path(self): - """Return log file path basing on self.path and `_log_file_name`. + """Return log file path. + If an instance file path is set, return it. + If not and a class file path is set, return that. + Otherwise, generate a file path basing on `self.path` and `_log_file_name` Fallback to class file if set, otherwise return None. """ + if self._log_file_path: + return self._log_file_path + if self.__class__._log_file_path: + return self.__class__._log_file_path if self.log_file_name: return f"{self.path}/data/{self.log_file_name}" @@ -337,6 +348,15 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): """Set class log file name.""" cls._log_file_name = file_name + def set_log_file_path(self, file_path): + """Set log file path.""" + self._log_file_path = file_path + + @classmethod + def set_class_log_file_path(cls, file_path): + """Set class log file path.""" + cls._log_file_path = file_path + @property def errors_file_name(self): """Return errors file name. @@ -347,10 +367,17 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): @property def errors_file_path(self): - """Return errors file path basing on `self.path` and `_errors_file_name`. + """Return errors file path. + If an instance file path is set, return it. + If not and a class file path is set, return that. + Otherwise, generate a file path basing on `self.path` and `_errors_file_name` Fallback to class file if set, otherwise return None. """ + if self.__class__._errors_file_path: + return self.__class__._errors_file_path + if self._errors_file_path: + return self._errors_file_path if self.errors_file_name: return f"{self.path}/data/{self.errors_file_name}" @@ -363,6 +390,15 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): """Set class errors file name.""" cls._errors_file_name = file_name + def set_errors_file_path(self, file_path): + """Set errors file path.""" + self._errors_file_path = file_path + + @classmethod + def set_class_errors_file_path(cls, file_path): + """Set class errors file path.""" + cls._errors_file_path = file_path + @classmethod def get(cls, token, *args, **kwargs): """Given a `token`, return class instance with that token. @@ -1658,7 +1694,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): duration: int = None, performer: str = None, title: str = None, - thumb=None, + thumbnail=None, disable_notification: bool = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = None, @@ -1743,7 +1779,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): duration=duration, performer=performer, title=title, - thumb=thumb, + thumbnail=thumbnail, disable_notification=disable_notification, reply_to_message_id=reply_to_message_id, allow_sending_without_reply=allow_sending_without_reply, @@ -1902,7 +1938,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): return sent_update async def send_document(self, chat_id: Union[int, str] = None, document=None, - thumb=None, + thumbnail=None, caption: str = None, parse_mode: str = None, caption_entities: List[dict] = None, @@ -2021,7 +2057,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): sent_document = await self.send_document( chat_id=chat_id, document=buffered_file, - thumb=thumb, + thumbnail=thumbnail, caption=caption, parse_mode=parse_mode, disable_notification=disable_notification, @@ -2050,7 +2086,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): sent_update = await self.sendDocument( chat_id=chat_id, document=document, - thumb=thumb, + thumbnail=thumbnail, caption=caption, parse_mode=parse_mode, caption_entities=caption_entities, @@ -3111,8 +3147,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject): await session.close() async def send_one_message(self, *args, **kwargs): - await self.send_message(*args, **kwargs) + sent_message = await self.send_message(*args, **kwargs) await self.close_sessions() + return sent_message async def set_webhook(self, url=None, certificate=None, max_connections=None, allowed_updates=None): diff --git a/davtelepot/cli.py b/davtelepot/cli.py new file mode 100644 index 0000000..9e5ed3f --- /dev/null +++ b/davtelepot/cli.py @@ -0,0 +1,195 @@ +import argparse +import asyncio +import logging +import os.path +import sys +from typing import Any, Union + +import davtelepot.authorization as authorization +import davtelepot.administration_tools as administration_tools +import davtelepot.helper as helper +from davtelepot.bot import Bot +from davtelepot.utilities import get_cleaned_text, get_secure_key, get_user, json_read, json_write, \ + line_drawing_unordered_list + + +def join_path(*args): + return os.path.abspath(os.path.join(*args)) + +def dir_path(path): + if os.path.isdir(path) and os.access(path, os.W_OK): + return path + else: + raise argparse.ArgumentTypeError(f"`{path}` is not a valid path") + +def get_cli_arguments() -> dict[str, Any]: + default_path = join_path(os.path.dirname(__file__), 'data') + cli_parser = argparse.ArgumentParser( + description='Run a davtelepot-powered Telegram bot from command line.', + allow_abbrev=False, + ) + cli_parser.add_argument('-a', '--action', type=str, + default='run', + required=False, + help='Action to perform (currently supported: run).') + cli_parser.add_argument('-p', '--path', type=dir_path, + default=default_path, + required=False, + help='Folder to store secrets, data and log files.') + cli_parser.add_argument('-l', '--log_file', type=argparse.FileType('a'), + default=None, + required=False, + help='File path to store full log') + cli_parser.add_argument('-e', '--error_log_file', type=argparse.FileType('a'), + default=None, + required=False, + help='File path to store only error log') + cli_parser.add_argument('-t', '--token', type=str, + required=False, + help='Telegram bot token (you may get one from t.me/botfather)') + cli_parsed_arguments = vars(cli_parser.parse_args()) + for key in cli_parsed_arguments: + if key.endswith('_file') and cli_parsed_arguments[key]: + cli_parsed_arguments[key] = cli_parsed_arguments[key].name + for key, default in {'error_log_file': "davtelepot.errors", + 'log_file': "davtelepot.log"}.items(): + if cli_parsed_arguments[key] is None: + cli_parsed_arguments[key] = join_path(cli_parsed_arguments['path'], default) + return cli_parsed_arguments + +def set_loggers(log_file: str = 'davtelepot.log', + error_log_file: str = 'davtelepot.errors'): + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + log_formatter = logging.Formatter( + "%(asctime)s [%(module)-10s %(levelname)-8s] %(message)s", + style='%' + ) + + file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8") + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + + file_handler = logging.FileHandler(error_log_file, mode="a", encoding="utf-8") + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.ERROR) + root_logger.addHandler(file_handler) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.DEBUG) + root_logger.addHandler(console_handler) + + +async def elevate_to_admin(bot: Bot, update: dict, user_record: dict, + secret: str) -> Union[str, None]: + text = get_cleaned_text(update=update, bot=bot, + replace=['00elevate_', 'elevate ']) + if text == secret: + bot.db['users'].upsert(dict(id=user_record['id'], privileges=1), ['id']) + return "👑 You have been granted full powers! 👑" + else: + print(f"The secret entered (`{text}`) is wrong. Enter `{secret}` instead.") + + +def allow_elevation_to_admin(telegram_bot: Bot) -> None: + secret = get_secure_key(length=15) + @telegram_bot.additional_task('BEFORE') + async def print_secret(): + await telegram_bot.get_me() + logging.info(f"To get administration privileges, enter code {secret} " + f"or click here: https://t.me/{telegram_bot.name}?start=00elevate_{secret}") + @telegram_bot.command(command='/elevate', aliases=['00elevate_'], show_in_keyboard=False, + authorization_level='anybody') + async def _elevate_to_admin(bot, update, user_record): + return await elevate_to_admin(bot=bot, update=update, + user_record=user_record, + secret=secret) + return + + +def send_single_message(telegram_bot: Bot): + records = [] + text, last_text = '', '' + offset = 0 + max_shown = 3 + while True: + if text == '+' and len(records) > max_shown: + offset += 1 + elif offset > 0 and text == '-': + offset -= 1 + else: + offset = 0 + if text in ('+', '-'): + text = last_text + condition = (f"WHERE username LIKE '%{text}%' " + f"OR first_name LIKE '%{text}%' " + f"OR last_name LIKE '%{text}%' ") + records = list(telegram_bot.db.query("SELECT username, first_name, " + "last_name, telegram_id " + "FROM users " + f"{condition} " + f"LIMIT {max_shown+1} " + f"OFFSET {offset*max_shown} ")) + if len(records) == 1 and offset == 0: + break + last_text = text + print("=== Users ===", + line_drawing_unordered_list( + list(map(lambda x: get_user(x, False), + records[:max_shown])) + + (['...'] if len(records)>=max_shown else []) + ), + sep='\n') + text = input("Select a recipient: write part of their name.\t\t") + while True: + text = input(f"Write a message for {get_user(records[0], False)}\t\t") + if input("Should I send it? Y to send, anything else cancel\t\t").lower() == "y": + break + async def send_and_print_message(): + sent_message = await telegram_bot.send_one_message(chat_id=records[0]['telegram_id'], text=text) + print(sent_message) + asyncio.run(send_and_print_message()) + return + + +def run_from_command_line(): + arguments = get_cli_arguments() + stored_arguments_file = os.path.join(arguments['path'], + 'cli_args.json') + for key, value in json_read(file_=stored_arguments_file, + default={}).items(): + if key not in arguments or not arguments[key]: + arguments[key] = value + set_loggers(**{k: v + for k, v in arguments.items() + if k in ('log_file', 'error_log_file')}) + if 'error_log_file' in arguments: + Bot.set_class_errors_file_path(file_path=arguments['error_log_file']) + if 'log_file' in arguments: + Bot.set_class_log_file_path(file_path=arguments['log_file']) + if 'path' in arguments: + Bot.set_class_path(arguments['path']) + if 'token' in arguments and arguments['token']: + token = arguments['token'] + else: + token = input("Enter bot Token:\t\t") + arguments['token'] = token + json_write(arguments, stored_arguments_file) + bot = Bot(token=token, database_url=join_path(arguments['path'], 'bot.db')) + action = arguments['action'] if 'action' in arguments else 'run' + if action == 'run': + administration_tools.init(telegram_bot=bot) + authorization.init(telegram_bot=bot) + allow_elevation_to_admin(telegram_bot=bot) + helper.init(telegram_bot=bot) + exit_state = Bot.run(**{k: v + for k, v in arguments.items() + if k in ('local_host', 'port')}) + sys.exit(exit_state) + if action == 'send': + try: + send_single_message(telegram_bot=bot) + except KeyboardInterrupt: + print("\nExiting...")