Update scraper_bot.py
This commit is contained in:
@@ -212,7 +212,7 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
|
|||||||
and
|
and
|
||||||
not message.chat.config['auto_scraping']
|
not message.chat.config['auto_scraping']
|
||||||
or
|
or
|
||||||
not await self._scrape_send_and_delete(message)
|
not await self._on_scraping(message, scrape_replied=False)
|
||||||
)
|
)
|
||||||
and
|
and
|
||||||
(
|
(
|
||||||
|
|||||||
@@ -25,11 +25,11 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
def _add_handlers(self):
|
def _add_handlers(self):
|
||||||
super()._add_handlers()
|
super()._add_handlers()
|
||||||
|
|
||||||
self.register(self._on_force_scraping, constants.KEYWORDS['force'])
|
self.register(lambda message: self._on_scraping(message, force=True), constants.KEYWORDS['force'])
|
||||||
self.register(self._on_force_scraping, (constants.KEYWORDS['force'], constants.KEYWORDS['scraping']))
|
self.register(lambda message: self._on_scraping(message, force=True), (constants.KEYWORDS['force'], constants.KEYWORDS['scraping']))
|
||||||
|
|
||||||
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio']))
|
self.register(lambda message: self._on_scraping(message, force=True, audio_only=True), (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio']))
|
||||||
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
self.register(lambda message: self._on_scraping(message, force=True, audio_only=True), (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
||||||
|
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
|
||||||
@@ -40,8 +40,8 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
|
|
||||||
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
|
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
|
||||||
|
|
||||||
self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio'])
|
self.register(lambda message: self._on_scraping(message, audio_only=True), multibot_constants.KEYWORDS['audio'])
|
||||||
self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
self.register(lambda message: self._on_scraping(message, audio_only=True), (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
||||||
|
|
||||||
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
|
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
|
||||||
|
|
||||||
@@ -55,6 +55,37 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
tiktok.find_download_urls(text)
|
tiktok.find_download_urls(text)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_keywords(delete=True, force=False, audio_only=False) -> list[str]:
|
||||||
|
keywords = list(constants.KEYWORDS['scraping'])
|
||||||
|
|
||||||
|
if not delete:
|
||||||
|
keywords += [
|
||||||
|
*multibot_constants.KEYWORDS['negate'],
|
||||||
|
*multibot_constants.KEYWORDS['deactivate'],
|
||||||
|
*multibot_constants.KEYWORDS['delete'],
|
||||||
|
*multibot_constants.KEYWORDS['message']
|
||||||
|
]
|
||||||
|
|
||||||
|
if force:
|
||||||
|
keywords += constants.KEYWORDS['force']
|
||||||
|
|
||||||
|
if audio_only:
|
||||||
|
keywords += multibot_constants.KEYWORDS['audio']
|
||||||
|
|
||||||
|
return keywords
|
||||||
|
|
||||||
|
def _is_full_scraping(self, text: str) -> bool:
|
||||||
|
return bool(
|
||||||
|
self._parse_callbacks(
|
||||||
|
text,
|
||||||
|
[
|
||||||
|
RegisteredCallback(..., [['sin'], ['timeout', 'limite']]),
|
||||||
|
RegisteredCallback(..., multibot_constants.KEYWORDS['all'])
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
||||||
medias_count = defaultdict(lambda: defaultdict(int))
|
medias_count = defaultdict(lambda: defaultdict(int))
|
||||||
@@ -91,21 +122,28 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
||||||
return f'{new_line}{medias_sended_info_joined}:'
|
return f'{new_line}{medias_sended_info_joined}:'
|
||||||
|
|
||||||
async def _scrape_and_send(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
|
async def _scrape_and_send(
|
||||||
kwargs = {}
|
self,
|
||||||
if self._parse_callbacks(
|
message: Message,
|
||||||
message.text,
|
force=False,
|
||||||
[
|
full=False,
|
||||||
RegisteredCallback(..., [['sin'], ['timeout', 'limite']]),
|
audio_only=False,
|
||||||
RegisteredCallback(..., 'completo entero full todo')
|
send_user_context=True,
|
||||||
]
|
keywords: list[str] = None,
|
||||||
):
|
sended_media_messages: OrderedSet[Message] = None
|
||||||
kwargs['timeout_for_media'] = None
|
) -> OrderedSet[Message]:
|
||||||
|
if not keywords:
|
||||||
|
keywords = []
|
||||||
|
if sended_media_messages is None:
|
||||||
|
sended_media_messages = OrderedSet()
|
||||||
|
|
||||||
|
kwargs = {'timeout_for_media': None} if full else {}
|
||||||
|
|
||||||
if not (medias := await self._search_medias(message, force, audio_only, **kwargs)):
|
if not (medias := await self._search_medias(message, force, audio_only, **kwargs)):
|
||||||
return OrderedSet()
|
return OrderedSet()
|
||||||
|
|
||||||
sended_media_messages, _ = await self.send_medias(medias, message)
|
new_sended_media_messages, _ = await self.send_medias(medias, message, send_user_context=send_user_context, keywords=keywords)
|
||||||
sended_media_messages = OrderedSet(sended_media_messages)
|
sended_media_messages |= new_sended_media_messages
|
||||||
|
|
||||||
await self.send_inline_results(message)
|
await self.send_inline_results(message)
|
||||||
|
|
||||||
@@ -115,13 +153,18 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
self,
|
self,
|
||||||
message: Message,
|
message: Message,
|
||||||
force=False,
|
force=False,
|
||||||
|
full=False,
|
||||||
audio_only=False,
|
audio_only=False,
|
||||||
sended_media_messages: OrderedSet[Media] = None
|
send_user_context=True,
|
||||||
) -> OrderedSet[Media]:
|
keywords: list[str] = None,
|
||||||
|
sended_media_messages: OrderedSet[Message] = None
|
||||||
|
) -> OrderedSet[Message]:
|
||||||
|
if not keywords:
|
||||||
|
keywords = []
|
||||||
if sended_media_messages is None:
|
if sended_media_messages is None:
|
||||||
sended_media_messages = OrderedSet()
|
sended_media_messages = OrderedSet()
|
||||||
|
|
||||||
sended_media_messages += await self._scrape_and_send(message, force, audio_only)
|
sended_media_messages += await self._scrape_and_send(message, force, full, audio_only, send_user_context, keywords)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
sended_media_messages
|
sended_media_messages
|
||||||
@@ -208,14 +251,10 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
# ---------------------------------------------- #
|
# ---------------------------------------------- #
|
||||||
# HANDLERS #
|
# HANDLERS #
|
||||||
# ---------------------------------------------- #
|
# ---------------------------------------------- #
|
||||||
async def _on_force_scraping(self, message: Message) -> OrderedSet[Media]:
|
async def _on_no_delete_original(self, message: Message) -> OrderedSet[Message] | None:
|
||||||
return await self._on_scraping(message, force=True)
|
if sended_media_messages := await self._on_scraping(message, delete=False):
|
||||||
|
return sended_media_messages
|
||||||
|
|
||||||
async def _on_force_scraping_audio(self, message: Message) -> OrderedSet[Media]:
|
|
||||||
return await self._on_scraping(message, force=True, audio_only=True)
|
|
||||||
|
|
||||||
async def _on_no_delete_original(self, message: Message):
|
|
||||||
if not await self._scrape_and_send(message):
|
|
||||||
await self._on_recover_message(message)
|
await self._on_recover_message(message)
|
||||||
|
|
||||||
async def _on_no_scraping(self, message: Message):
|
async def _on_no_scraping(self, message: Message):
|
||||||
@@ -224,18 +263,43 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
async def _on_recover_message(self, message: Message):
|
async def _on_recover_message(self, message: Message):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def _on_scraping(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
|
async def _on_scraping(
|
||||||
|
self,
|
||||||
|
message: Message,
|
||||||
|
delete=True,
|
||||||
|
force=False,
|
||||||
|
audio_only=False,
|
||||||
|
scrape_replied=True,
|
||||||
|
) -> OrderedSet[Message]:
|
||||||
sended_media_messages = OrderedSet()
|
sended_media_messages = OrderedSet()
|
||||||
if not message.chat.config['auto_scraping'] and not self.is_bot_mentioned(message):
|
if not message.chat.config['auto_scraping'] and not self.is_bot_mentioned(message):
|
||||||
return sended_media_messages
|
return sended_media_messages
|
||||||
|
|
||||||
if message.replied_message:
|
keywords = self._get_keywords(delete, force, audio_only)
|
||||||
sended_media_messages += await self._scrape_and_send(message.replied_message, force, audio_only)
|
if full := self._is_full_scraping(message.text):
|
||||||
|
keywords += ['sin', 'timeout', 'limite', *multibot_constants.KEYWORDS['all']]
|
||||||
|
|
||||||
return await self._scrape_send_and_delete(message, force, audio_only, sended_media_messages)
|
if scrape_replied and message.replied_message:
|
||||||
|
sended_media_messages += await self._scrape_and_send(
|
||||||
|
message.replied_message,
|
||||||
|
force,
|
||||||
|
full,
|
||||||
|
audio_only,
|
||||||
|
send_user_context=False
|
||||||
|
)
|
||||||
|
|
||||||
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
|
kwargs = {
|
||||||
return await self._on_scraping(message, audio_only=True)
|
'message': message,
|
||||||
|
'force': force,
|
||||||
|
'full': full,
|
||||||
|
'audio_only': audio_only,
|
||||||
|
'keywords': keywords,
|
||||||
|
'sended_media_messages': sended_media_messages
|
||||||
|
}
|
||||||
|
if delete:
|
||||||
|
return await self._scrape_send_and_delete(**kwargs)
|
||||||
|
else:
|
||||||
|
return await self._scrape_and_send(**kwargs)
|
||||||
|
|
||||||
@reply
|
@reply
|
||||||
async def _on_song_info(self, message: Message):
|
async def _on_song_info(self, message: Message):
|
||||||
@@ -251,7 +315,17 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
# -------------------- PUBLIC METHODS -------------------- #
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
@return_if_first_empty(([], 0), exclude_self_types='ScraperBot', globals_=globals())
|
@return_if_first_empty(([], 0), exclude_self_types='ScraperBot', globals_=globals())
|
||||||
async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]:
|
async def send_medias(
|
||||||
|
self,
|
||||||
|
medias: OrderedSet[Media],
|
||||||
|
message: Message,
|
||||||
|
send_song_info=False,
|
||||||
|
send_user_context=True,
|
||||||
|
keywords: list[str] = None
|
||||||
|
) -> tuple[list[Message], int]:
|
||||||
|
if not keywords:
|
||||||
|
keywords = []
|
||||||
|
|
||||||
sended_media_messages = []
|
sended_media_messages = []
|
||||||
fails = 0
|
fails = 0
|
||||||
bot_state_message: Message | None = None
|
bot_state_message: Message | None = None
|
||||||
@@ -263,29 +337,22 @@ class ScraperBot(MultiBot, ABC):
|
|||||||
|
|
||||||
if message.chat.is_group:
|
if message.chat.is_group:
|
||||||
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message, reply_to=message.replied_message)
|
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message, reply_to=message.replied_message)
|
||||||
user_text = ' '.join(
|
if (
|
||||||
|
send_user_context
|
||||||
|
and
|
||||||
|
(user_text := ' '.join(
|
||||||
[word for word in message.text.split()
|
[word for word in message.text.split()
|
||||||
if (
|
if (
|
||||||
not any(await self._find_ids(word))
|
not any(await self._find_ids(word))
|
||||||
and
|
and
|
||||||
not flanautils.find_urls(word)
|
not flanautils.find_urls(word)
|
||||||
and
|
and
|
||||||
not flanautils.cartesian_product_string_matching(
|
not flanautils.cartesian_product_string_matching(word, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT)
|
||||||
word,
|
|
||||||
(
|
|
||||||
*multibot_constants.KEYWORDS['audio'],
|
|
||||||
*multibot_constants.KEYWORDS['delete'],
|
|
||||||
*constants.KEYWORDS['force'],
|
|
||||||
*multibot_constants.KEYWORDS['negate'],
|
|
||||||
*constants.KEYWORDS['scraping']
|
|
||||||
),
|
|
||||||
multibot_constants.PARSER_MIN_SCORE_DEFAULT
|
|
||||||
)
|
|
||||||
and
|
and
|
||||||
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
|
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
|
||||||
)]
|
)]
|
||||||
)
|
))
|
||||||
if user_text:
|
):
|
||||||
user_text_bot_message = await self.send(user_text, message, reply_to=message.replied_message)
|
user_text_bot_message = await self.send(user_text, message, reply_to=message.replied_message)
|
||||||
|
|
||||||
for media in medias:
|
for media in medias:
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ CHANGEABLE_ROLES = defaultdict(
|
|||||||
)
|
)
|
||||||
|
|
||||||
DISCORD_HEAT_NAMES = [
|
DISCORD_HEAT_NAMES = [
|
||||||
'Canal Congelado',
|
|
||||||
'Canal Fresquito',
|
'Canal Fresquito',
|
||||||
'Canal Templaillo',
|
'Canal Templaillo',
|
||||||
'Canal Calentito',
|
'Canal Calentito',
|
||||||
@@ -97,7 +96,7 @@ KEYWORDS = {
|
|||||||
'purgatorio', 'purgatory', 'sancion', 'shoot', 'teach', 'whip'),
|
'purgatorio', 'purgatory', 'sancion', 'shoot', 'teach', 'whip'),
|
||||||
'random': ('aleatorio', 'azar', 'random'),
|
'random': ('aleatorio', 'azar', 'random'),
|
||||||
'scraping': ('busca', 'contenido', 'content', 'descarga', 'descargar', 'descargues', 'download', 'envia', 'scrap',
|
'scraping': ('busca', 'contenido', 'content', 'descarga', 'descargar', 'descargues', 'download', 'envia', 'scrap',
|
||||||
'scrapea', 'scraping', 'search', 'send'),
|
'scrapea', 'scrapees', 'scraping', 'search', 'send'),
|
||||||
'self': (('contigo', 'contra', 'ti'), ('mismo', 'ti')),
|
'self': (('contigo', 'contra', 'ti'), ('mismo', 'ti')),
|
||||||
'song_info': ('aqui', 'cancion', 'data', 'datos', 'info', 'informacion', 'information', 'llama', 'media', 'name',
|
'song_info': ('aqui', 'cancion', 'data', 'datos', 'info', 'informacion', 'information', 'llama', 'media', 'name',
|
||||||
'nombre', 'sonaba', 'sonando', 'song', 'sono', 'sound', 'suena', 'title', 'titulo', 'video'),
|
'nombre', 'sonaba', 'sonando', 'song', 'sono', 'sound', 'suena', 'title', 'titulo', 'video'),
|
||||||
|
|||||||
Reference in New Issue
Block a user