improve platforms support for ScraperBot
This commit is contained in:
@@ -3,11 +3,12 @@ __all__ = ['ScraperBot']
|
|||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
|
from collections import defaultdict
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
|
|
||||||
import flanautils
|
import flanautils
|
||||||
from flanaapis import instagram, tiktok, twitter, youtube
|
from flanaapis import instagram, reddit, tiktok, twitter, yt_dlp_wrapper
|
||||||
from flanautils import Media, MediaType, OrderedSet, Source, return_if_first_empty
|
from flanautils import Media, MediaType, OrderedSet, return_if_first_empty
|
||||||
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
|
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
|
||||||
|
|
||||||
from flanabot import constants
|
from flanabot import constants
|
||||||
@@ -38,16 +39,12 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
||||||
medias_count = {
|
medias_count = defaultdict(lambda: defaultdict(int))
|
||||||
Source.TWITTER: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.INSTAGRAM: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.TIKTOK: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.REDDIT: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.YOUTUBE: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
None: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}
|
|
||||||
}
|
|
||||||
for media in medias:
|
for media in medias:
|
||||||
|
if not media.source or isinstance(media.source, str):
|
||||||
medias_count[media.source][media.type_] += 1
|
medias_count[media.source][media.type_] += 1
|
||||||
|
else:
|
||||||
|
medias_count[media.source.name][media.type_] += 1
|
||||||
|
|
||||||
medias_sended_info = []
|
medias_sended_info = []
|
||||||
for source, media_type_count in medias_count.items():
|
for source, media_type_count in medias_count.items():
|
||||||
@@ -70,7 +67,7 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
MediaType.ERROR: 'errores'}[media_type]
|
MediaType.ERROR: 'errores'}[media_type]
|
||||||
source_medias_sended_info.append(f'{count} {type_text}')
|
source_medias_sended_info.append(f'{count} {type_text}')
|
||||||
if source_medias_sended_info:
|
if source_medias_sended_info:
|
||||||
medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source.name if source else 'algún sitio'}")
|
medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source if source else 'algún sitio'}")
|
||||||
|
|
||||||
medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n')
|
medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n')
|
||||||
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
||||||
@@ -123,13 +120,17 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]:
|
async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]:
|
||||||
medias = OrderedSet()
|
medias = OrderedSet()
|
||||||
|
|
||||||
tweet_ids = twitter.find_tweet_ids(message.text)
|
tweet_ids = twitter.find_ids(message.text)
|
||||||
instagram_ids = instagram.find_instagram_ids(message.text)
|
instagram_ids = instagram.find_ids(message.text)
|
||||||
tiktok_ids = await tiktok.find_tiktok_ids(message.text)
|
reddit_ids = reddit.find_ids(message.text)
|
||||||
tiktok_download_urls = tiktok.find_download_urls(message.text)
|
tiktok_download_urls = tiktok.find_download_urls(message.text)
|
||||||
youtube_ids = youtube.find_youtube_ids(message.text)
|
media_urls = ()
|
||||||
|
|
||||||
if not any((tweet_ids, instagram_ids, tiktok_ids, tiktok_download_urls, youtube_ids)):
|
if (
|
||||||
|
not any((tweet_ids, instagram_ids, reddit_ids, tiktok_download_urls))
|
||||||
|
and
|
||||||
|
not (media_urls := flanautils.find_urls(message.text))
|
||||||
|
):
|
||||||
return medias
|
return medias
|
||||||
|
|
||||||
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
|
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
|
||||||
@@ -137,8 +138,9 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
gather_result = asyncio.gather(
|
gather_result = asyncio.gather(
|
||||||
twitter.get_medias(tweet_ids, audio_only),
|
twitter.get_medias(tweet_ids, audio_only),
|
||||||
instagram.get_medias(instagram_ids, audio_only),
|
instagram.get_medias(instagram_ids, audio_only),
|
||||||
tiktok.get_medias(tiktok_ids, tiktok_download_urls, audio_only),
|
reddit.get_medias(reddit_ids, audio_only, 'h264', 'mp4', timeout_for_media),
|
||||||
youtube.get_medias(youtube_ids, audio_only, timeout_for_media),
|
tiktok.get_download_url_medias(tiktok_download_urls, audio_only),
|
||||||
|
yt_dlp_wrapper.get_medias(media_urls, audio_only, 'h264', 'mp4', timeout_for_media),
|
||||||
return_exceptions=True
|
return_exceptions=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user