Add force_gif_download
This commit is contained in:
@@ -8,6 +8,7 @@ from typing import Iterable
|
||||
|
||||
import flanautils
|
||||
from flanaapis import instagram, reddit, tiktok, twitter, yt_dlp_wrapper
|
||||
from flanaapis.scraping import constants as flanaapis_constants
|
||||
from flanautils import Media, MediaType, OrderedSet, return_if_first_empty
|
||||
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
|
||||
|
||||
@@ -25,6 +26,10 @@ class ScraperBot(MultiBot, ABC):
|
||||
def _add_handlers(self):
|
||||
super()._add_handlers()
|
||||
|
||||
self.register(self._on_force_scraping, constants.KEYWORDS['force'])
|
||||
self.register(self._on_force_scraping, constants.KEYWORDS['scraping'])
|
||||
self.register(self._on_force_scraping, (constants.KEYWORDS['force'], constants.KEYWORDS['scraping']))
|
||||
|
||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
|
||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
|
||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
||||
@@ -83,7 +88,7 @@ class ScraperBot(MultiBot, ABC):
|
||||
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
||||
return f'{new_line}{medias_sended_info_joined}:'
|
||||
|
||||
async def _scrape_and_send(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
||||
async def _scrape_and_send(self, message: Message, audio_only=False, force_gif_download=False) -> OrderedSet[Media]:
|
||||
kwargs = {}
|
||||
if self._parse_callbacks(
|
||||
message.text,
|
||||
@@ -93,7 +98,7 @@ class ScraperBot(MultiBot, ABC):
|
||||
]
|
||||
):
|
||||
kwargs['timeout_for_media'] = None
|
||||
if not (medias := await self._search_medias(message, audio_only, **kwargs)):
|
||||
if not (medias := await self._search_medias(message, audio_only, force_gif_download, **kwargs)):
|
||||
return OrderedSet()
|
||||
|
||||
sended_media_messages, _ = await self.send_medias(medias, message)
|
||||
@@ -107,12 +112,13 @@ class ScraperBot(MultiBot, ABC):
|
||||
self,
|
||||
message: Message,
|
||||
audio_only=False,
|
||||
force_gif_download=False,
|
||||
sended_media_messages: OrderedSet[Media] = None
|
||||
) -> OrderedSet[Media]:
|
||||
if sended_media_messages is None:
|
||||
sended_media_messages = OrderedSet()
|
||||
|
||||
sended_media_messages += await self._scrape_and_send(message, audio_only)
|
||||
sended_media_messages += await self._scrape_and_send(message, audio_only, force_gif_download)
|
||||
|
||||
if (
|
||||
sended_media_messages
|
||||
@@ -127,13 +133,31 @@ class ScraperBot(MultiBot, ABC):
|
||||
|
||||
return sended_media_messages
|
||||
|
||||
async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]:
|
||||
async def _search_medias(
|
||||
self,
|
||||
message: Message,
|
||||
audio_only=False,
|
||||
force_gif_download=False,
|
||||
timeout_for_media: int | float = None
|
||||
) -> OrderedSet[Media]:
|
||||
medias = OrderedSet()
|
||||
|
||||
ids = await self._find_ids(message.text)
|
||||
media_urls = ()
|
||||
|
||||
if not any(ids) and not (media_urls := flanautils.find_urls(message.text)):
|
||||
if (
|
||||
not any(ids)
|
||||
and
|
||||
(
|
||||
not (media_urls := flanautils.find_urls(message.text))
|
||||
or
|
||||
(
|
||||
not force_gif_download
|
||||
and
|
||||
any(domain in url for url in media_urls for domain in flanaapis_constants.YT_DLP_WRAPPER_DISCARDED_DOMAINS)
|
||||
)
|
||||
)
|
||||
):
|
||||
return medias
|
||||
|
||||
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
|
||||
@@ -142,9 +166,9 @@ class ScraperBot(MultiBot, ABC):
|
||||
gather_result = asyncio.gather(
|
||||
twitter.get_medias(tweet_ids, audio_only),
|
||||
instagram.get_medias(instagram_ids, audio_only),
|
||||
reddit.get_medias(reddit_ids, audio_only, 'h264', 'mp4', timeout_for_media),
|
||||
reddit.get_medias(reddit_ids, 'h264', 'mp4', audio_only, force_gif_download, timeout_for_media),
|
||||
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, audio_only, 'h264', 'mp4', timeout_for_media),
|
||||
yt_dlp_wrapper.get_medias(media_urls, audio_only, 'h264', 'mp4', timeout_for_media),
|
||||
yt_dlp_wrapper.get_medias(media_urls, 'h264', 'mp4', audio_only, force_gif_download, timeout_for_media),
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
@@ -159,6 +183,9 @@ class ScraperBot(MultiBot, ABC):
|
||||
# ---------------------------------------------- #
|
||||
# HANDLERS #
|
||||
# ---------------------------------------------- #
|
||||
async def _on_force_scraping(self, message: Message) -> OrderedSet[Media]:
|
||||
return await self._on_scraping(message, force_gif_download=True)
|
||||
|
||||
async def _on_no_delete_original(self, message: Message):
|
||||
if not await self._scrape_and_send(message):
|
||||
await self._on_recover_message(message)
|
||||
@@ -166,15 +193,15 @@ class ScraperBot(MultiBot, ABC):
|
||||
async def _on_recover_message(self, message: Message):
|
||||
pass
|
||||
|
||||
async def _on_scraping(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
||||
async def _on_scraping(self, message: Message, audio_only=False, force_gif_download=False) -> OrderedSet[Media]:
|
||||
sended_media_messages = OrderedSet()
|
||||
if not message.chat.config['auto_scraping'] and not self.is_bot_mentioned(message):
|
||||
return sended_media_messages
|
||||
|
||||
if message.replied_message:
|
||||
sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only)
|
||||
sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only, force_gif_download)
|
||||
|
||||
return await self._scrape_send_and_delete(message, audio_only, sended_media_messages)
|
||||
return await self._scrape_send_and_delete(message, audio_only, force_gif_download, sended_media_messages)
|
||||
|
||||
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
|
||||
return await self._on_scraping(message, audio_only=True)
|
||||
@@ -216,6 +243,7 @@ class ScraperBot(MultiBot, ABC):
|
||||
(
|
||||
*multibot_constants.KEYWORDS['audio'],
|
||||
*multibot_constants.KEYWORDS['delete'],
|
||||
*constants.KEYWORDS['force'],
|
||||
*multibot_constants.KEYWORDS['negate'],
|
||||
*constants.KEYWORDS['scraping']
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user