Compliance with Telegram Bot API 6.6 and new command line interface to run bot or send a single message

This commit is contained in:
Davte 2023-04-29 20:38:33 +02:00
parent 0c3ed2070d
commit 3bd1a9b679
Signed by: Davte
GPG Key ID: 70336F92E6814706
5 changed files with 613 additions and 70 deletions

View File

@ -11,7 +11,7 @@ __author__ = "Davide Testa"
__email__ = "davide@davte.it" __email__ = "davide@davte.it"
__credits__ = ["Marco Origlia", "Nick Lee @Nickoala"] __credits__ = ["Marco Origlia", "Nick Lee @Nickoala"]
__license__ = "GNU General Public License v3.0" __license__ = "GNU General Public License v3.0"
__version__ = "2.8.13" __version__ = "2.9.1"
__maintainer__ = "Davide Testa" __maintainer__ = "Davide Testa"
__contact__ = "t.me/davte" __contact__ = "t.me/davte"

5
davtelepot/__main__.py Normal file
View File

@ -0,0 +1,5 @@
from davtelepot.cli import run_from_command_line
if __name__ == '__main__':
run_from_command_line()

View File

@ -10,6 +10,7 @@ import datetime
import io import io
import json import json
import logging import logging
import os.path
from typing import Dict, Union, List, IO from typing import Dict, Union, List, IO
@ -270,6 +271,79 @@ class InlineQueryResult(dict):
self[key] = value self[key] = value
class MaskPosition(dict):
"""This object describes the position on faces where a mask should be placed by default."""
def __init__(self, point: str, x_shift: float, y_shift: float, scale: float):
"""This object describes the position on faces where a mask should be placed by default.
@param point: The part of the face relative to which the mask should
be placed. One of forehead, eyes, mouth, or chin.
@param x_shift: Shift by X-axis measured in widths of the mask scaled
to the face size, from left to right. For example, choosing -1.0
will place mask just to the left of the default mask position.
@param y_shift: Shift by Y-axis measured in heights of the mask scaled
to the face size, from top to bottom. For example, 1.0 will place
the mask just below the default mask position.
@param scale: Mask scaling coefficient.
For example, 2.0 means double size.
"""
super().__init__(self)
self['point'] = point
self['x_shift'] = x_shift
self['y_shift'] = y_shift
self['scale'] = scale
class InputSticker(dict):
"""This object describes a sticker to be added to a sticker set."""
def __init__(self, sticker: Union[str, dict, IO], emoji_list: List[str],
mask_position: Union['MaskPosition', None] = None,
keywords: Union[List[str], None] = None):
"""This object describes a sticker to be added to a sticker set.
@param sticker: The added sticker. Pass a file_id as a String to send
a file that already exists on the Telegram servers,
pass an HTTP URL as a String for Telegram to get a file from the
Internet, upload a new one using multipart/form-data,
or pass attach://<file_attach_name> to upload a new one using
multipart/form-data under <file_attach_name> name.
Animated and video stickers can't be uploaded via HTTP URL.
More information on Sending Files: https://core.telegram.org/bots/api#sending-files
@param emoji_list: List of 1-20 emoji associated with the sticker
@param mask_position: Optional. Position where the mask should be
placed on faces. For mask stickers only.
@param keywords: Optional. List of 0-20 search keywords for the sticker
with total length of up to 64 characters.
For regular and custom_emoji stickers only.
"""
super().__init__(self)
self['sticker'] = sticker
self['emoji_list'] = emoji_list
self['mask_position'] = mask_position
self['keywords'] = keywords
class InlineQueryResultsButton(dict):
"""Button to be shown above inline query results."""
def __init__(self,
text: str = None,
web_app: 'WebAppInfo' = None,
start_parameter: str = None):
super().__init__(self)
if sum(1 for e in (text, web_app, start_parameter) if e) != 1:
logging.error("You must provide exactly one parameter (`text` "
"or `web_app` or `start_parameter`).")
return
self['text'] = text
self['web_app'] = web_app
self['start_parameter'] = start_parameter
return
# This class needs to mirror Telegram API, so camelCase method are needed # This class needs to mirror Telegram API, so camelCase method are needed
# noinspection PyPep8Naming # noinspection PyPep8Naming
class TelegramBot: class TelegramBot:
@ -389,7 +463,7 @@ class TelegramBot:
""" """
if exclude is None: if exclude is None:
exclude = [] exclude = []
exclude.append('self') exclude += ['self', 'kwargs']
# quote_fields=False, otherwise some file names cause troubles # quote_fields=False, otherwise some file names cause troubles
data = aiohttp.FormData(quote_fields=False) data = aiohttp.FormData(quote_fields=False)
for key, value in parameters.items(): for key, value in parameters.items():
@ -402,8 +476,12 @@ class TelegramBot:
@staticmethod @staticmethod
def prepare_file_object(file: Union[str, IO, dict, None] def prepare_file_object(file: Union[str, IO, dict, None]
) -> Union[Dict[str, IO], None]: ) -> Union[str, Dict[str, IO], None]:
if type(file) is str: """If `file` is a valid file path, return a dict for multipart/form-data.
Other valid file identifiers are URLs and Telegram `file_id`s.
"""
if type(file) is str and os.path.isfile(file):
try: try:
file = open(file, 'r') file = open(file, 'r')
except FileNotFoundError as e: except FileNotFoundError as e:
@ -443,6 +521,20 @@ class TelegramBot:
"""Wait `flood_wait` seconds before next request.""" """Wait `flood_wait` seconds before next request."""
self._flood_wait = flood_wait self._flood_wait = flood_wait
def make_input_sticker(self,
sticker: Union[dict, str, IO],
emoji_list: Union[List[str], str],
mask_position: Union[MaskPosition, None] = None,
keywords: Union[List[str], None] = None) -> InputSticker:
if isinstance(emoji_list, str):
emoji_list = [c for c in emoji_list]
if isinstance(keywords, str):
keywords = [w for w in keywords]
if isinstance(sticker, str) and os.path.isfile(sticker):
sticker = self.prepare_file_object(sticker)
return InputSticker(sticker=sticker, emoji_list=emoji_list,
mask_position=mask_position, keywords=keywords)
async def prevent_flooding(self, chat_id): async def prevent_flooding(self, chat_id):
"""Await until request may be sent safely. """Await until request may be sent safely.
@ -702,24 +794,30 @@ class TelegramBot:
duration: int = None, duration: int = None,
performer: str = None, performer: str = None,
title: str = None, title: str = None,
thumb=None, thumbnail=None,
disable_notification: bool = None, disable_notification: bool = None,
reply_to_message_id: int = None, reply_to_message_id: int = None,
allow_sending_without_reply: bool = None, allow_sending_without_reply: bool = None,
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
reply_markup=None): reply_markup=None,
**kwargs):
"""Send an audio file from file_id, HTTP url or file. """Send an audio file from file_id, HTTP url or file.
See https://core.telegram.org/bots/api#sendaudio for details. See https://core.telegram.org/bots/api#sendaudio for details.
""" """
if 'thumb' in kwargs:
thumbnail = kwargs['thumb']
logging.error("DEPRECATION WARNING: `thumb` parameter of function"
"`sendAudio` has been deprecated since Bot API 6.6. "
"Use `thumbnail` instead.")
return await self.api_request( return await self.api_request(
'sendAudio', 'sendAudio',
parameters=locals() parameters=locals()
) )
async def sendDocument(self, chat_id: Union[int, str], document, async def sendDocument(self, chat_id: Union[int, str], document,
thumb=None, thumbnail=None,
caption: str = None, caption: str = None,
parse_mode: str = None, parse_mode: str = None,
caption_entities: List[dict] = None, caption_entities: List[dict] = None,
@ -729,11 +827,17 @@ class TelegramBot:
allow_sending_without_reply: bool = None, allow_sending_without_reply: bool = None,
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
reply_markup=None): reply_markup=None,
**kwargs):
"""Send a document from file_id, HTTP url or file. """Send a document from file_id, HTTP url or file.
See https://core.telegram.org/bots/api#senddocument for details. See https://core.telegram.org/bots/api#senddocument for details.
""" """
if 'thumb' in kwargs:
thumbnail = kwargs['thumb']
logging.error("DEPRECATION WARNING: `thumb` parameter of function"
"`sendDocument` has been deprecated since Bot API 6.6. "
"Use `thumbnail` instead.")
return await self.api_request( return await self.api_request(
'sendDocument', 'sendDocument',
parameters=locals() parameters=locals()
@ -743,7 +847,7 @@ class TelegramBot:
duration: int = None, duration: int = None,
width: int = None, width: int = None,
height: int = None, height: int = None,
thumb=None, thumbnail=None,
caption: str = None, caption: str = None,
parse_mode: str = None, parse_mode: str = None,
caption_entities: List[dict] = None, caption_entities: List[dict] = None,
@ -754,11 +858,17 @@ class TelegramBot:
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
has_spoiler: bool = None, has_spoiler: bool = None,
reply_markup=None): reply_markup=None,
**kwargs):
"""Send a video from file_id, HTTP url or file. """Send a video from file_id, HTTP url or file.
See https://core.telegram.org/bots/api#sendvideo for details. See https://core.telegram.org/bots/api#sendvideo for details.
""" """
if 'thumb' in kwargs:
thumbnail = kwargs['thumb']
logging.error("DEPRECATION WARNING: `thumb` parameter of function"
"`sendVideo` has been deprecated since Bot API 6.6. "
"Use `thumbnail` instead.")
return await self.api_request( return await self.api_request(
'sendVideo', 'sendVideo',
parameters=locals() parameters=locals()
@ -768,7 +878,7 @@ class TelegramBot:
duration: int = None, duration: int = None,
width: int = None, width: int = None,
height: int = None, height: int = None,
thumb=None, thumbnail=None,
caption: str = None, caption: str = None,
parse_mode: str = None, parse_mode: str = None,
caption_entities: List[dict] = None, caption_entities: List[dict] = None,
@ -778,11 +888,17 @@ class TelegramBot:
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
has_spoiler: bool = None, has_spoiler: bool = None,
reply_markup=None): reply_markup=None,
**kwargs):
"""Send animation files (GIF or H.264/MPEG-4 AVC video without sound). """Send animation files (GIF or H.264/MPEG-4 AVC video without sound).
See https://core.telegram.org/bots/api#sendanimation for details. See https://core.telegram.org/bots/api#sendanimation for details.
""" """
if 'thumb' in kwargs:
thumbnail = kwargs['thumb']
logging.error("DEPRECATION WARNING: `thumb` parameter of function"
"`sendAnimation` has been deprecated since Bot API 6.6. "
"Use `thumbnail` instead.")
return await self.api_request( return await self.api_request(
'sendAnimation', 'sendAnimation',
parameters=locals() parameters=locals()
@ -812,17 +928,23 @@ class TelegramBot:
async def sendVideoNote(self, chat_id: Union[int, str], video_note, async def sendVideoNote(self, chat_id: Union[int, str], video_note,
duration: int = None, duration: int = None,
length: int = None, length: int = None,
thumb=None, thumbnail=None,
disable_notification: bool = None, disable_notification: bool = None,
reply_to_message_id: int = None, reply_to_message_id: int = None,
allow_sending_without_reply: bool = None, allow_sending_without_reply: bool = None,
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
reply_markup=None): reply_markup=None,
**kwargs):
"""Send a rounded square mp4 video message of up to 1 minute long. """Send a rounded square mp4 video message of up to 1 minute long.
See https://core.telegram.org/bots/api#sendvideonote for details. See https://core.telegram.org/bots/api#sendvideonote for details.
""" """
if 'thumb' in kwargs:
thumbnail = kwargs['thumb']
logging.error("DEPRECATION WARNING: `thumb` parameter of function"
"`sendVideoNote` has been deprecated since Bot API 6.6. "
"Use `thumbnail` instead.")
return await self.api_request( return await self.api_request(
'sendVideoNote', 'sendVideoNote',
parameters=locals() parameters=locals()
@ -1466,9 +1588,12 @@ class TelegramBot:
allow_sending_without_reply: bool = None, allow_sending_without_reply: bool = None,
message_thread_id: int = None, message_thread_id: int = None,
protect_content: bool = None, protect_content: bool = None,
emoji: str = None,
reply_markup=None): reply_markup=None):
"""Send `.webp` stickers. """Send `.webp` stickers.
`sticker` must be a file path, a URL, a file handle or a dict
{"file": io_file_handle}, to allow multipart/form-data encoding.
On success, the sent Message is returned. On success, the sent Message is returned.
See https://core.telegram.org/bots/api#sendsticker for details. See https://core.telegram.org/bots/api#sendsticker for details.
""" """
@ -1495,29 +1620,42 @@ class TelegramBot:
parameters=locals() parameters=locals()
) )
async def uploadStickerFile(self, user_id, png_sticker): async def uploadStickerFile(self, user_id: int, sticker: Union[str, dict, IO],
"""Upload a .png file as a sticker. sticker_format: str, **kwargs):
"""Upload an image file for later use in sticker packs.
Use it later via `createNewStickerSet` and `addStickerToSet` methods Use this method to upload a file with a sticker for later use in the
(can be used multiple times). createNewStickerSet and addStickerToSet methods
Return the uploaded File on success. (the file can be used multiple times).
`png_sticker` must be a *.png image up to 512 kilobytes in size, `sticker` must be a file path, a file handle or a dict
dimensions must not exceed 512px, and either width or height must {"file": io_file_handle}, to allow multipart/form-data encoding.
be exactly 512px. Returns the uploaded File on success.
See https://core.telegram.org/bots/api#uploadstickerfile for details. See https://core.telegram.org/bots/api#uploadstickerfile for details.
""" """
return await self.api_request( if 'png_sticker' in kwargs:
sticker = kwargs['png_sticker']
logging.error("DEPRECATION WARNING: `png_sticker` parameter of function"
"`uploadStickerFile` has been deprecated since Bot API 6.6. "
"Use `sticker` instead.")
if sticker_format not in ("static", "animated", "video"):
logging.error(f"Unknown sticker format `{sticker_format}`.")
sticker = self.prepare_file_object(sticker)
if sticker is None:
logging.error("Invalid sticker provided!")
return
result = await self.api_request(
'uploadStickerFile', 'uploadStickerFile',
parameters=locals() parameters=locals()
) )
if type(sticker) is dict: # Close sticker file, if it was open
sticker['file'].close()
return result
async def createNewStickerSet(self, user_id: int, name: str, title: str, async def createNewStickerSet(self, user_id: int, name: str, title: str,
emojis: str, stickers: List['InputSticker'],
png_sticker: Union[str, dict, IO] = None, sticker_format: str = 'static',
tgs_sticker: Union[str, dict, IO] = None,
webm_sticker: Union[str, dict, IO] = None,
sticker_type: str = 'regular', sticker_type: str = 'regular',
mask_position: dict = None, needs_repainting: bool = False,
**kwargs): **kwargs):
"""Create new sticker set owned by a user. """Create new sticker set owned by a user.
@ -1525,58 +1663,72 @@ class TelegramBot:
Returns True on success. Returns True on success.
See https://core.telegram.org/bots/api#createnewstickerset for details. See https://core.telegram.org/bots/api#createnewstickerset for details.
""" """
if stickers is None:
stickers = []
if 'contains_masks' in kwargs: if 'contains_masks' in kwargs:
logging.error("Parameter `contains_masks` of method " logging.error("Parameter `contains_masks` of method "
"`createNewStickerSet` has been deprecated. " "`createNewStickerSet` has been deprecated. "
"Use `sticker_type = 'mask'` instead.") "Use `sticker_type = 'mask'` instead.")
sticker_type = 'mask' if kwargs['contains_masks'] else 'regular' sticker_type = 'mask' if kwargs['contains_masks'] else 'regular'
if sticker_type not in ('regular', 'mask'): for old_sticker_format in ('png_sticker', 'tgs_sticker', 'webm_sticker'):
raise TypeError if old_sticker_format in kwargs:
png_sticker = self.prepare_file_object(png_sticker) if 'emojis' not in kwargs:
tgs_sticker = self.prepare_file_object(tgs_sticker) logging.error(f"No `emojis` provided together with "
webm_sticker = self.prepare_file_object(webm_sticker) f"`{old_sticker_format}`. To create new "
if png_sticker is None and tgs_sticker is None and webm_sticker is None: f"sticker set with some stickers in it, use "
logging.error("Invalid sticker provided!") f"the new `stickers` parameter.")
return return
logging.error(f"Parameter `{old_sticker_format}` of method "
"`createNewStickerSet` has been deprecated since"
"Bot API 6.6. "
"Use `stickers` instead.")
stickers.append(
self.make_input_sticker(
sticker=kwargs[old_sticker_format],
emoji_list=kwargs['emojis']
)
)
if sticker_type not in ('regular', 'mask', 'custom_emoji'):
raise TypeError(f"Unknown sticker type `{sticker_type}`.")
result = await self.api_request( result = await self.api_request(
'createNewStickerSet', 'createNewStickerSet',
parameters=locals() parameters=locals(),
exclude=['old_sticker_format']
) )
if type(png_sticker) is dict: # Close png_sticker file, if it was open
png_sticker['file'].close()
if type(tgs_sticker) is dict: # Close tgs_sticker file, if it was open
tgs_sticker['file'].close()
if type(webm_sticker) is dict: # Close webm_sticker file, if it was open
webm_sticker['file'].close()
return result return result
async def addStickerToSet(self, user_id: int, name: str, async def addStickerToSet(self, user_id: int, name: str,
emojis: str, sticker: InputSticker = None,
png_sticker: Union[str, dict, IO] = None, **kwargs):
tgs_sticker: Union[str, dict, IO] = None,
webm_sticker: Union[str, dict, IO] = None,
mask_position: dict = None):
"""Add a new sticker to a set created by the bot. """Add a new sticker to a set created by the bot.
Returns True on success. Returns True on success.
See https://core.telegram.org/bots/api#addstickertoset for details. See https://core.telegram.org/bots/api#addstickertoset for details.
""" """
png_sticker = self.prepare_file_object(png_sticker) for old_sticker_format in ('png_sticker', 'tgs_sticker', 'webm_sticker'):
tgs_sticker = self.prepare_file_object(tgs_sticker) if old_sticker_format in kwargs:
webm_sticker = self.prepare_file_object(webm_sticker) if 'emojis' not in kwargs:
if png_sticker is None and tgs_sticker is None and webm_sticker is None: logging.error(f"No `emojis` provided together with "
logging.error("Invalid sticker provided!") f"`{old_sticker_format}`.")
return
logging.error(f"Parameter `{old_sticker_format}` of method "
"`addStickerToSet` has been deprecated since"
"Bot API 6.6. "
"Use `sticker` instead.")
sticker = self.make_input_sticker(
sticker=kwargs[old_sticker_format],
emoji_list=kwargs['emojis'],
mask_position=kwargs['mask_position'] if 'mask_position' in kwargs else None
)
if sticker is None:
logging.error("Must provide a sticker of type `InputSticker` to "
"`addStickerToSet` method.")
return return
result = await self.api_request( result = await self.api_request(
'addStickerToSet', 'addStickerToSet',
parameters=locals() parameters=locals(),
exclude=['old_sticker_format']
) )
if type(png_sticker) is dict: # Close png_sticker file, if it was open
png_sticker['file'].close()
if type(tgs_sticker) is dict: # Close tgs_sticker file, if it was open
tgs_sticker['file'].close()
if type(webm_sticker) is dict: # Close webm_sticker file, if it was open
webm_sticker['file'].close()
return result return result
async def setStickerPositionInSet(self, sticker, position): async def setStickerPositionInSet(self, sticker, position):
@ -1608,14 +1760,18 @@ class TelegramBot:
cache_time=None, cache_time=None,
is_personal=None, is_personal=None,
next_offset=None, next_offset=None,
switch_pm_text=None, button: Union['InlineQueryResultsButton', None] = None,
switch_pm_parameter=None): **kwargs):
"""Send answers to an inline query. """Send answers to an inline query.
On success, True is returned. On success, True is returned.
No more than 50 results per query are allowed. No more than 50 results per query are allowed.
See https://core.telegram.org/bots/api#answerinlinequery for details. See https://core.telegram.org/bots/api#answerinlinequery for details.
""" """
if 'switch_pm_text' in kwargs:
button = InlineQueryResultsButton(text=kwargs['switch_pm_text'])
if 'switch_pm_parameter' in kwargs:
button = InlineQueryResultsButton(start_parameter=kwargs['switch_pm_parameter'])
return await self.api_request( return await self.api_request(
'answerInlineQuery', 'answerInlineQuery',
parameters=locals() parameters=locals()
@ -2358,3 +2514,153 @@ class TelegramBot:
'unhideGeneralForumTopic', 'unhideGeneralForumTopic',
parameters=locals() parameters=locals()
) )
async def setMyName(self, name: str, language_code: str):
"""Change the bot's name.
Returns True on success.
See https://core.telegram.org/bots/api#setmyname for details.
"""
return await self.api_request(
'setMyName',
parameters=locals()
)
async def getMyName(self, language_code: str):
"""Get the current bot name for the given user language.
Returns BotName on success.
See https://core.telegram.org/bots/api#getmyname for details.
"""
return await self.api_request(
'getMyName',
parameters=locals()
)
async def setMyDescription(self, description: str, language_code: str):
"""Change the bot's description, which is shown in the chat with the bot if
the chat is empty.
Returns True on success.
See https://core.telegram.org/bots/api#setmydescription for details.
"""
return await self.api_request(
'setMyDescription',
parameters=locals()
)
async def getMyDescription(self, language_code: str):
"""Get the current bot description for the given user language.
Returns BotDescription on success.
See https://core.telegram.org/bots/api#getmydescription for details.
"""
return await self.api_request(
'getMyDescription',
parameters=locals()
)
async def setMyShortDescription(self, short_description: str, language_code: str):
"""Change the bot's short description, which is shown on the bot's profile
page and is sent together with the link when users share the bot.
Returns True on success.
See https://core.telegram.org/bots/api#setmyshortdescription for details.
"""
return await self.api_request(
'setMyShortDescription',
parameters=locals()
)
async def getMyShortDescription(self, language_code: str):
"""Get the current bot short description for the given user language.
Returns BotShortDescription on success.
See https://core.telegram.org/bots/api#getmyshortdescription for details.
"""
return await self.api_request(
'getMyShortDescription',
parameters=locals()
)
async def setStickerEmojiList(self, sticker: str, emoji_list: List[str]):
"""Change the list of emoji assigned to a regular or custom emoji sticker.
The sticker must belong to a sticker set created by the bot.
Returns True on success.
See https://core.telegram.org/bots/api#setstickeremojilist for details.
"""
return await self.api_request(
'setStickerEmojiList',
parameters=locals()
)
async def setStickerKeywords(self, sticker: str, keywords: List[str]):
"""Change search keywords assigned to a regular or custom emoji sticker.
The sticker must belong to a sticker set created by the bot.
Returns True on success.
See https://core.telegram.org/bots/api#setstickerkeywords for details.
"""
return await self.api_request(
'setStickerKeywords',
parameters=locals()
)
async def setStickerMaskPosition(self, sticker: str, mask_position: 'MaskPosition'):
"""Change the mask position of a mask sticker.
The sticker must belong to a sticker set that was created by the bot.
Returns True on success.
See https://core.telegram.org/bots/api#setstickermaskposition for details.
"""
return await self.api_request(
'setStickerMaskPosition',
parameters=locals()
)
async def setStickerSetTitle(self, name: str, title: str):
"""Set the title of a created sticker set.
Returns True on success.
See https://core.telegram.org/bots/api#setstickersettitle for details.
"""
return await self.api_request(
'setStickerSetTitle',
parameters=locals()
)
async def setStickerSetThumbnail(self, name: str, user_id: int, thumbnail: 'InputFile or String'):
"""Set the thumbnail of a regular or mask sticker set.
The format of the thumbnail file must match the format of the stickers
in the set.
Returns True on success.
See https://core.telegram.org/bots/api#setstickersetthumbnail for details.
"""
return await self.api_request(
'setStickerSetThumbnail',
parameters=locals()
)
async def setCustomEmojiStickerSetThumbnail(self, name: str, custom_emoji_id: str):
"""Set the thumbnail of a custom emoji sticker set.
Returns True on success.
See https://core.telegram.org/bots/api#setcustomemojistickersetthumbnail for details.
"""
return await self.api_request(
'setCustomEmojiStickerSetThumbnail',
parameters=locals()
)
async def deleteStickerSet(self, name: str):
"""Delete a sticker set that was created by the bot.
Returns True on success.
See https://core.telegram.org/bots/api#deletestickerset for details.
"""
return await self.api_request(
'deleteStickerSet',
parameters=locals()
)

View File

@ -99,7 +99,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
) )
] ]
_log_file_name = None _log_file_name = None
_log_file_path = None
_errors_file_name = None _errors_file_name = None
_errors_file_path = None
_documents_max_dimension = 50 * 1000 * 1000 # 50 MB _documents_max_dimension = 50 * 1000 * 1000 # 50 MB
def __init__( def __init__(
@ -237,7 +239,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
self.default_reply_keyboard_elements = [] self.default_reply_keyboard_elements = []
self.recent_users = OrderedDict() self.recent_users = OrderedDict()
self._log_file_name = None self._log_file_name = None
self._log_file_path = None
self._errors_file_name = None self._errors_file_name = None
self._errors_file_path = None
self.placeholder_requests = dict() self.placeholder_requests = dict()
self.shared_data = dict() self.shared_data = dict()
self.Role = None self.Role = None
@ -321,10 +325,17 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
@property @property
def log_file_path(self): def log_file_path(self):
"""Return log file path basing on self.path and `_log_file_name`. """Return log file path.
If an instance file path is set, return it.
If not and a class file path is set, return that.
Otherwise, generate a file path basing on `self.path` and `_log_file_name`
Fallback to class file if set, otherwise return None. Fallback to class file if set, otherwise return None.
""" """
if self._log_file_path:
return self._log_file_path
if self.__class__._log_file_path:
return self.__class__._log_file_path
if self.log_file_name: if self.log_file_name:
return f"{self.path}/data/{self.log_file_name}" return f"{self.path}/data/{self.log_file_name}"
@ -337,6 +348,15 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
"""Set class log file name.""" """Set class log file name."""
cls._log_file_name = file_name cls._log_file_name = file_name
def set_log_file_path(self, file_path):
"""Set log file path."""
self._log_file_path = file_path
@classmethod
def set_class_log_file_path(cls, file_path):
"""Set class log file path."""
cls._log_file_path = file_path
@property @property
def errors_file_name(self): def errors_file_name(self):
"""Return errors file name. """Return errors file name.
@ -347,10 +367,17 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
@property @property
def errors_file_path(self): def errors_file_path(self):
"""Return errors file path basing on `self.path` and `_errors_file_name`. """Return errors file path.
If an instance file path is set, return it.
If not and a class file path is set, return that.
Otherwise, generate a file path basing on `self.path` and `_errors_file_name`
Fallback to class file if set, otherwise return None. Fallback to class file if set, otherwise return None.
""" """
if self.__class__._errors_file_path:
return self.__class__._errors_file_path
if self._errors_file_path:
return self._errors_file_path
if self.errors_file_name: if self.errors_file_name:
return f"{self.path}/data/{self.errors_file_name}" return f"{self.path}/data/{self.errors_file_name}"
@ -363,6 +390,15 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
"""Set class errors file name.""" """Set class errors file name."""
cls._errors_file_name = file_name cls._errors_file_name = file_name
def set_errors_file_path(self, file_path):
"""Set errors file path."""
self._errors_file_path = file_path
@classmethod
def set_class_errors_file_path(cls, file_path):
"""Set class errors file path."""
cls._errors_file_path = file_path
@classmethod @classmethod
def get(cls, token, *args, **kwargs): def get(cls, token, *args, **kwargs):
"""Given a `token`, return class instance with that token. """Given a `token`, return class instance with that token.
@ -1658,7 +1694,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
duration: int = None, duration: int = None,
performer: str = None, performer: str = None,
title: str = None, title: str = None,
thumb=None, thumbnail=None,
disable_notification: bool = None, disable_notification: bool = None,
reply_to_message_id: int = None, reply_to_message_id: int = None,
allow_sending_without_reply: bool = None, allow_sending_without_reply: bool = None,
@ -1743,7 +1779,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
duration=duration, duration=duration,
performer=performer, performer=performer,
title=title, title=title,
thumb=thumb, thumbnail=thumbnail,
disable_notification=disable_notification, disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id, reply_to_message_id=reply_to_message_id,
allow_sending_without_reply=allow_sending_without_reply, allow_sending_without_reply=allow_sending_without_reply,
@ -1902,7 +1938,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
return sent_update return sent_update
async def send_document(self, chat_id: Union[int, str] = None, document=None, async def send_document(self, chat_id: Union[int, str] = None, document=None,
thumb=None, thumbnail=None,
caption: str = None, caption: str = None,
parse_mode: str = None, parse_mode: str = None,
caption_entities: List[dict] = None, caption_entities: List[dict] = None,
@ -2021,7 +2057,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
sent_document = await self.send_document( sent_document = await self.send_document(
chat_id=chat_id, chat_id=chat_id,
document=buffered_file, document=buffered_file,
thumb=thumb, thumbnail=thumbnail,
caption=caption, caption=caption,
parse_mode=parse_mode, parse_mode=parse_mode,
disable_notification=disable_notification, disable_notification=disable_notification,
@ -2050,7 +2086,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
sent_update = await self.sendDocument( sent_update = await self.sendDocument(
chat_id=chat_id, chat_id=chat_id,
document=document, document=document,
thumb=thumb, thumbnail=thumbnail,
caption=caption, caption=caption,
parse_mode=parse_mode, parse_mode=parse_mode,
caption_entities=caption_entities, caption_entities=caption_entities,
@ -3111,8 +3147,9 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
await session.close() await session.close()
async def send_one_message(self, *args, **kwargs): async def send_one_message(self, *args, **kwargs):
await self.send_message(*args, **kwargs) sent_message = await self.send_message(*args, **kwargs)
await self.close_sessions() await self.close_sessions()
return sent_message
async def set_webhook(self, url=None, certificate=None, async def set_webhook(self, url=None, certificate=None,
max_connections=None, allowed_updates=None): max_connections=None, allowed_updates=None):

195
davtelepot/cli.py Normal file
View File

@ -0,0 +1,195 @@
import argparse
import asyncio
import logging
import os.path
import sys
from typing import Any, Union
import davtelepot.authorization as authorization
import davtelepot.administration_tools as administration_tools
import davtelepot.helper as helper
from davtelepot.bot import Bot
from davtelepot.utilities import get_cleaned_text, get_secure_key, get_user, json_read, json_write, \
line_drawing_unordered_list
def join_path(*args):
return os.path.abspath(os.path.join(*args))
def dir_path(path):
if os.path.isdir(path) and os.access(path, os.W_OK):
return path
else:
raise argparse.ArgumentTypeError(f"`{path}` is not a valid path")
def get_cli_arguments() -> dict[str, Any]:
default_path = join_path(os.path.dirname(__file__), 'data')
cli_parser = argparse.ArgumentParser(
description='Run a davtelepot-powered Telegram bot from command line.',
allow_abbrev=False,
)
cli_parser.add_argument('-a', '--action', type=str,
default='run',
required=False,
help='Action to perform (currently supported: run).')
cli_parser.add_argument('-p', '--path', type=dir_path,
default=default_path,
required=False,
help='Folder to store secrets, data and log files.')
cli_parser.add_argument('-l', '--log_file', type=argparse.FileType('a'),
default=None,
required=False,
help='File path to store full log')
cli_parser.add_argument('-e', '--error_log_file', type=argparse.FileType('a'),
default=None,
required=False,
help='File path to store only error log')
cli_parser.add_argument('-t', '--token', type=str,
required=False,
help='Telegram bot token (you may get one from t.me/botfather)')
cli_parsed_arguments = vars(cli_parser.parse_args())
for key in cli_parsed_arguments:
if key.endswith('_file') and cli_parsed_arguments[key]:
cli_parsed_arguments[key] = cli_parsed_arguments[key].name
for key, default in {'error_log_file': "davtelepot.errors",
'log_file': "davtelepot.log"}.items():
if cli_parsed_arguments[key] is None:
cli_parsed_arguments[key] = join_path(cli_parsed_arguments['path'], default)
return cli_parsed_arguments
def set_loggers(log_file: str = 'davtelepot.log',
error_log_file: str = 'davtelepot.errors'):
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
log_formatter = logging.Formatter(
"%(asctime)s [%(module)-10s %(levelname)-8s] %(message)s",
style='%'
)
file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
file_handler = logging.FileHandler(error_log_file, mode="a", encoding="utf-8")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.ERROR)
root_logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.DEBUG)
root_logger.addHandler(console_handler)
async def elevate_to_admin(bot: Bot, update: dict, user_record: dict,
secret: str) -> Union[str, None]:
text = get_cleaned_text(update=update, bot=bot,
replace=['00elevate_', 'elevate '])
if text == secret:
bot.db['users'].upsert(dict(id=user_record['id'], privileges=1), ['id'])
return "👑 You have been granted full powers! 👑"
else:
print(f"The secret entered (`{text}`) is wrong. Enter `{secret}` instead.")
def allow_elevation_to_admin(telegram_bot: Bot) -> None:
secret = get_secure_key(length=15)
@telegram_bot.additional_task('BEFORE')
async def print_secret():
await telegram_bot.get_me()
logging.info(f"To get administration privileges, enter code {secret} "
f"or click here: https://t.me/{telegram_bot.name}?start=00elevate_{secret}")
@telegram_bot.command(command='/elevate', aliases=['00elevate_'], show_in_keyboard=False,
authorization_level='anybody')
async def _elevate_to_admin(bot, update, user_record):
return await elevate_to_admin(bot=bot, update=update,
user_record=user_record,
secret=secret)
return
def send_single_message(telegram_bot: Bot):
records = []
text, last_text = '', ''
offset = 0
max_shown = 3
while True:
if text == '+' and len(records) > max_shown:
offset += 1
elif offset > 0 and text == '-':
offset -= 1
else:
offset = 0
if text in ('+', '-'):
text = last_text
condition = (f"WHERE username LIKE '%{text}%' "
f"OR first_name LIKE '%{text}%' "
f"OR last_name LIKE '%{text}%' ")
records = list(telegram_bot.db.query("SELECT username, first_name, "
"last_name, telegram_id "
"FROM users "
f"{condition} "
f"LIMIT {max_shown+1} "
f"OFFSET {offset*max_shown} "))
if len(records) == 1 and offset == 0:
break
last_text = text
print("=== Users ===",
line_drawing_unordered_list(
list(map(lambda x: get_user(x, False),
records[:max_shown]))
+ (['...'] if len(records)>=max_shown else [])
),
sep='\n')
text = input("Select a recipient: write part of their name.\t\t")
while True:
text = input(f"Write a message for {get_user(records[0], False)}\t\t")
if input("Should I send it? Y to send, anything else cancel\t\t").lower() == "y":
break
async def send_and_print_message():
sent_message = await telegram_bot.send_one_message(chat_id=records[0]['telegram_id'], text=text)
print(sent_message)
asyncio.run(send_and_print_message())
return
def run_from_command_line():
arguments = get_cli_arguments()
stored_arguments_file = os.path.join(arguments['path'],
'cli_args.json')
for key, value in json_read(file_=stored_arguments_file,
default={}).items():
if key not in arguments or not arguments[key]:
arguments[key] = value
set_loggers(**{k: v
for k, v in arguments.items()
if k in ('log_file', 'error_log_file')})
if 'error_log_file' in arguments:
Bot.set_class_errors_file_path(file_path=arguments['error_log_file'])
if 'log_file' in arguments:
Bot.set_class_log_file_path(file_path=arguments['log_file'])
if 'path' in arguments:
Bot.set_class_path(arguments['path'])
if 'token' in arguments and arguments['token']:
token = arguments['token']
else:
token = input("Enter bot Token:\t\t")
arguments['token'] = token
json_write(arguments, stored_arguments_file)
bot = Bot(token=token, database_url=join_path(arguments['path'], 'bot.db'))
action = arguments['action'] if 'action' in arguments else 'run'
if action == 'run':
administration_tools.init(telegram_bot=bot)
authorization.init(telegram_bot=bot)
allow_elevation_to_admin(telegram_bot=bot)
helper.init(telegram_bot=bot)
exit_state = Bot.run(**{k: v
for k, v in arguments.items()
if k in ('local_host', 'port')})
sys.exit(exit_state)
if action == 'send':
try:
send_single_message(telegram_bot=bot)
except KeyboardInterrupt:
print("\nExiting...")