From af9d284fa13cd834a24423c032c573b19d406512 Mon Sep 17 00:00:00 2001 From: AlberLC Date: Tue, 8 Nov 2022 03:20:31 +0100 Subject: [PATCH] Split FlanaBot into multiple bots --- flanabot/bots/__init__.py | 4 + flanabot/bots/flana_bot.py | 825 +---------------------------------- flanabot/bots/penalty_bot.py | 205 +++++++++ flanabot/bots/poll_bot.py | 277 ++++++++++++ flanabot/bots/scraper_bot.py | 232 ++++++++++ flanabot/bots/weather_bot.py | 194 ++++++++ 6 files changed, 922 insertions(+), 815 deletions(-) create mode 100644 flanabot/bots/penalty_bot.py create mode 100644 flanabot/bots/poll_bot.py create mode 100644 flanabot/bots/scraper_bot.py create mode 100644 flanabot/bots/weather_bot.py diff --git a/flanabot/bots/__init__.py b/flanabot/bots/__init__.py index 1a75b72..0a3ebc7 100644 --- a/flanabot/bots/__init__.py +++ b/flanabot/bots/__init__.py @@ -1,3 +1,7 @@ from flanabot.bots.flana_bot import * from flanabot.bots.flana_disc_bot import * from flanabot.bots.flana_tele_bot import * +from flanabot.bots.penalty_bot import * +from flanabot.bots.poll_bot import * +from flanabot.bots.scraper_bot import * +from flanabot.bots.weather_bot import * diff --git a/flanabot/bots/flana_bot.py b/flanabot/bots/flana_bot.py index 7ca70cd..80869fa 100644 --- a/flanabot/bots/flana_bot.py +++ b/flanabot/bots/flana_bot.py @@ -1,83 +1,49 @@ __all__ = ['FlanaBot'] -import asyncio import datetime -import math import random -import re from abc import ABC -from typing import Iterable, Sequence +from typing import Iterable -import flanaapis.geolocation.functions -import flanaapis.weather.functions import flanautils -import plotly.graph_objects import pymongo -from flanaapis import InstagramLoginError, MediaNotFoundError, Place, PlaceNotFoundError, WeatherEmoji, instagram, tiktok, twitter, youtube -from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TimeUnits, TraceMetadata, return_if_first_empty -from multibot import BadRoleError, LimitError, MultiBot, RegisteredCallback, Role, SendError, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message, inline, reply +from flanaapis import InstagramLoginError, MediaNotFoundError, PlaceNotFoundError +from flanautils import return_if_first_empty +from multibot import BadRoleError, LimitError, MultiBot, Role, bot_mentioned, constants as multibot_constants, group, inline from flanabot import constants -from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message, Punishment, WeatherChart +from flanabot.bots.penalty_bot import PenaltyBot +from flanabot.bots.poll_bot import PollBot +from flanabot.bots.scraper_bot import ScraperBot +from flanabot.bots.weather_bot import WeatherBot +from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message # ----------------------------------------------------------------------------------------------------- # # --------------------------------------------- FLANA_BOT --------------------------------------------- # # ----------------------------------------------------------------------------------------------------- # -class FlanaBot(MultiBot, ABC): +class FlanaBot(PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBot, ABC): Chat = Chat Message = Message - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.lock = asyncio.Lock() - # ----------------------------------------------------------- # # -------------------- PROTECTED METHODS -------------------- # # ----------------------------------------------------------- # def _add_handlers(self): super()._add_handlers() - self.register(self._on_ban, multibot_constants.KEYWORDS['ban']) self.register(self._on_bye, multibot_constants.KEYWORDS['bye']) - self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2) - self.register(self._on_choose, constants.KEYWORDS['random'], priority=2) - self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2) - self.register(self._on_config, multibot_constants.KEYWORDS['config']) self.register(self._on_config, (multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config'])) self.register(self._on_delete, multibot_constants.KEYWORDS['delete']) self.register(self._on_delete, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message'])) - self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote'])) - self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote'])) - - self.register(self._on_dice, constants.KEYWORDS['dice']) - self.register(self._on_hello, multibot_constants.KEYWORDS['hello']) - self.register(self._on_mute, multibot_constants.KEYWORDS['mute']) - self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute'])) - self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute'])) - self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound'])) - self.register(self._on_new_message_default, default=True) - self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'])) - self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message'])) - self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message'])) - self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message'])) - - self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2) - - self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2) - - self.register(self._on_punish, constants.KEYWORDS['punish']) - self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish'])) - self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'])) - self.register(self._on_recover_message, multibot_constants.KEYWORDS['reset']) self.register(self._on_recover_message, (multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message'])) @@ -88,131 +54,15 @@ class FlanaBot(MultiBot, ABC): self.register(self._on_roles, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role'])) self.register(self._on_roles, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role'])) - self.register(self._on_scraping, constants.KEYWORDS['scraping']) - - self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio']) - self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping'])) - - self.register(self._on_song_info, constants.KEYWORDS['song_info']) - - self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate']) - self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll'])) - self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop']) - self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll'])) - - self.register(self._on_unban, multibot_constants.KEYWORDS['unban']) - - self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute']) - self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute'])) - self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound'])) - - self.register(self._on_unpunish, constants.KEYWORDS['unpunish']) - self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish'])) - self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'])) - self.register(self._on_users, multibot_constants.KEYWORDS['user']) - self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote'])) - - self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote'])) - - self.register(self._on_weather, constants.KEYWORDS['weather_chart']) - self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather_chart'])) - self.register_button(self._on_config_button_press, ButtonsGroup.CONFIG) - self.register_button(self._on_poll_button_press, ButtonsGroup.POLL) self.register_button(self._on_roles_button_press, ButtonsGroup.ROLES) self.register_button(self._on_users_button_press, ButtonsGroup.USERS) - self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER) async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]: pass - @admin(False) - @group - async def _check_message_flood(self, message: Message): - if await self.is_punished(message.author, message.chat): - return - - last_2s_messages = self.Message.find({ - 'platform': self.platform.value, - 'author': message.author.object_id, - 'chat': message.chat.object_id, - 'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=2)} - }) - last_7s_messages = self.Message.find({ - 'platform': self.platform.value, - 'author': message.author.object_id, - 'chat': message.chat.object_id, - 'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=7)} - }) - - if len(last_2s_messages) > constants.FLOOD_2s_LIMIT or len(last_7s_messages) > constants.FLOOD_7s_LIMIT: - punishment = Punishment.find_one({ - 'platform': self.platform.value, - 'user_id': message.author.id, - 'group_id': message.chat.group_id - }) - punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT - try: - await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message) - except BadRoleError as e: - await self._manage_exceptions(e, message) - else: - await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message) - - def _distribute_buttons(self, texts: Sequence[str]) -> list[list[str]]: - pass - - async def _filter_mention_ids(self, text: str | Iterable[str], message: Message, delete_names=False) -> list[str]: - if isinstance(text, str): - words = text.split() - else: - words = text - - ids = [] - if delete_names: - for user in message.mentions: - ids.append(user.name.lower()) - ids.append(user.name.split('#')[0].lower()) - ids.append(str(user.id)) - else: - for user in message.mentions: - ids.append(str(user.id)) - for role in await self.get_group_roles(message): - ids.append(str(role.id)) - - return [word for word in words if flanautils.remove_symbols(word).strip() not in ids] - - @staticmethod - def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]: - options = (option for option in text.split() if not flanautils.cartesian_product_string_matching(option.lower(), discarded_words, min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT)) - text = ' '.join(options) - - conjunctions = [f' {conjunction} ' for conjunction in flanautils.CommonWords.get('conjunctions')] - if any(char in text for char in (',', ';', *conjunctions)): - conjunction_parts = [f'(?:[,;]*{conjunction}[,;]*)+' for conjunction in conjunctions] - options = re.split(f"{'|'.join(conjunction_parts)}|[,;]+", text) - return [option.strip() for option in options if option] - else: - return text.split() - - async def _get_poll_message(self, message: Message) -> Message | None: - if poll_message := message.replied_message: - if poll_message.contents.get('poll') is None: - return - return poll_message - elif ( - (message.chat.is_private or self.is_bot_mentioned(message)) - and - flanautils.cartesian_product_string_matching(message.text, constants.KEYWORDS['poll'], min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT) - and - (poll_message := self.Message.find_one({'contents.poll.is_active': True}, sort_keys=(('date', pymongo.DESCENDING),))) - ): - return await self.get_message(poll_message.chat.id, poll_message.id) - else: - return - @return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals()) async def _manage_exceptions(self, exceptions: BaseException | Iterable[BaseException], context: Chat | Message): if not isinstance(exceptions, Iterable): @@ -232,49 +82,6 @@ class FlanaBot(MultiBot, ABC): except Exception as e: await super()._manage_exceptions(e, context) - @staticmethod - def _medias_sended_info(medias: Iterable[Media]) -> str: - medias_count = { - Source.TWITTER: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, - Source.INSTAGRAM: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, - Source.TIKTOK: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, - Source.REDDIT: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, - Source.YOUTUBE: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, - None: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0} - } - for media in medias: - medias_count[media.source][media.type_] += 1 - - medias_sended_info = [] - for source, media_type_count in medias_count.items(): - source_medias_sended_info = [] - for media_type, count in media_type_count.items(): - if count: - if count == 1: - type_text = {MediaType.IMAGE: 'imagen', - MediaType.AUDIO: 'audio', - MediaType.GIF: 'gif', - MediaType.VIDEO: 'vídeo', - None: 'cosa que no sé que tipo de archivo es', - MediaType.ERROR: 'error'}[media_type] - else: - type_text = {MediaType.IMAGE: 'imágenes', - MediaType.AUDIO: 'audios', - MediaType.GIF: 'gifs', - MediaType.VIDEO: 'vídeos', - None: 'cosas que no sé que tipos de archivos son', - MediaType.ERROR: 'errores'}[media_type] - source_medias_sended_info.append(f'{count} {type_text}') - if source_medias_sended_info: - medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source.name if source else 'algún sitio'}") - - medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n') - new_line = ' ' if len(medias_sended_info) == 1 else '\n' - return f'{new_line}{medias_sended_info_joined}:' - - async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): - pass - async def _role_state_options(self, group_: int | str | Chat | Message, activated_user_role_names: list[str]) -> list[str]: options = [] for role in await self._changeable_roles(group_): @@ -289,144 +96,14 @@ class FlanaBot(MultiBot, ABC): return options - async def _scrape_and_send(self, message: Message, audio_only=False) -> OrderedSet[Media]: - kwargs = {} - if self._parse_callbacks(message.text, [RegisteredCallback(..., [['sin'], ['timeout', 'limite']])]): - kwargs['timeout_for_media'] = None - if not (medias := await self._search_medias(message, audio_only, **kwargs)): - return OrderedSet() - - sended_media_messages, _ = await self.send_medias(medias, message) - sended_media_messages = OrderedSet(sended_media_messages) - - await self.send_inline_results(message) - - return sended_media_messages - - async def _scrape_send_and_delete( - self, - message: Message, - audio_only=False, - sended_media_messages: OrderedSet[Media] = None - ) -> OrderedSet[Media]: - if sended_media_messages is None: - sended_media_messages = OrderedSet() - - sended_media_messages += await self._scrape_and_send(message, audio_only) - - if ( - sended_media_messages - and - message.chat.is_group - and - not message.replied_message - and - message.chat.config['delete_original'] - ): - # noinspection PyTypeChecker - BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save() - await self.delete_message(message) - - return sended_media_messages - - async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]: - medias = OrderedSet() - - tweet_ids = twitter.find_tweet_ids(message.text) - instagram_ids = instagram.find_instagram_ids(message.text) - tiktok_ids = await tiktok.find_tiktok_ids(message.text) - tiktok_download_urls = tiktok.find_download_urls(message.text) - youtube_ids = youtube.find_youtube_ids(message.text) - - if not any((tweet_ids, instagram_ids, tiktok_ids, tiktok_download_urls, youtube_ids)): - return medias - - bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message) - - gather_result = asyncio.gather( - twitter.get_medias(tweet_ids, audio_only), - instagram.get_medias(instagram_ids, audio_only), - tiktok.get_medias(tiktok_ids, tiktok_download_urls, audio_only), - youtube.get_medias(youtube_ids, audio_only, timeout_for_media), - return_exceptions=True - ) - - await gather_result - await self.delete_message(bot_state_message) - - medias, exceptions = flanautils.filter_exceptions(gather_result.result()) - await self._manage_exceptions(exceptions, message) - - return OrderedSet(*medias) - - async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): - pass - - async def _update_poll_buttons(self, message: Message): - if message.contents['poll']['is_multiple_answer']: - total_votes = len({option_vote[0] for option_votes in message.contents['poll']['votes'].values() if option_votes for option_vote in option_votes}) - else: - total_votes = sum(len(option_votes) for option_votes in message.contents['poll']['votes'].values()) - - if total_votes: - buttons = [] - for option, option_votes in message.contents['poll']['votes'].items(): - ratio = f'{len(option_votes)}/{total_votes}' - names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else '' - buttons.append(f'{option} ➜ {ratio} {names}') - else: - buttons = list(message.contents['poll']['votes'].keys()) - - await self.edit(self._distribute_buttons(buttons), message) - # ---------------------------------------------- # # HANDLERS # # ---------------------------------------------- # - @bot_mentioned - @group - @admin(send_negative=True) - async def _on_ban(self, message: Message): - for user in await self._find_users_to_punish(message): - await self.ban(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) async def _on_bye(self, message: Message): if message.chat.is_private or self.is_bot_mentioned(message): await self.send_bye(message) - async def _on_choose(self, message: Message): - if message.chat.is_group and not self.is_bot_mentioned(message): - return - - discarded_words = { - *constants.KEYWORDS['choose'], - *constants.KEYWORDS['random'], - self.name.lower(), f'<@{self.id}>', - 'entre', 'between' - } - - if options := self._get_options(message.text, discarded_words): - for i in range(1, len(options) - 1): - try: - n1 = flanautils.cast_number(options[i - 1]) - except ValueError: - try: - n1 = flanautils.words_to_numbers(options[i - 1], ignore_no_numbers=False) - except KeyError: - continue - try: - n2 = flanautils.cast_number(options[i + 1]) - except ValueError: - try: - n2 = flanautils.words_to_numbers(options[i + 1], ignore_no_numbers=False) - except KeyError: - continue - if options[i] in ('al', 'to'): - await self.send(random.randint(math.ceil(n1), math.floor(n2)), message) - return - await self.send(random.choice(options), message) - else: - await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message) - @group @bot_mentioned async def _on_config(self, message: Message): @@ -472,39 +149,10 @@ class FlanaBot(MultiBot, ABC): except LimitError as e: await self._manage_exceptions(e, message) - @admin - async def _on_delete_votes(self, message: Message): - if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): - return - - await self.delete_message(message) - - for user in await self._find_users_to_punish(message): - for option_name, option_votes in poll_message.contents['poll']['votes'].items(): - poll_message.contents['poll']['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id] - - await self._update_poll_buttons(poll_message) - - async def _on_dice(self, message: Message): - if message.chat.is_group and not self.is_bot_mentioned(message): - return - - if top_number := flanautils.sum_numbers_in_text(message.text): - await self.send(random.randint(1, math.floor(top_number)), message) - else: - await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message) - async def _on_hello(self, message: Message): if message.chat.is_private or self.is_bot_mentioned(message): await self.send_hello(message) - @group - @bot_mentioned - @admin(send_negative=True) - async def _on_mute(self, message: Message): - for user in await self._find_users_to_punish(message): - await self.mute(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) - async def _on_new_message_default(self, message: Message): if message.is_inline: await self._scrape_and_send(message) @@ -543,96 +191,6 @@ class FlanaBot(MultiBot, ABC): ): await self.send_insult(message) - @ignore_self_message - async def _on_new_message_raw(self, message: Message): - await super()._on_new_message_raw(message) - if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline: - async with self.lock: - await self._check_message_flood(message) - - async def _on_no_delete_original(self, message: Message): - if not await self._scrape_and_send(message): - await self._on_recover_message(message) - - async def _on_poll(self, message: Message, is_multiple_answer=False): - if message.chat.is_group and not self.is_bot_mentioned(message): - return - - await self.delete_message(message) - - discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'} - if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]: - await self.send( - f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...", - self._distribute_buttons(final_options), - message, - buttons_key=ButtonsGroup.POLL, - contents={'poll': { - 'is_active': True, - 'is_multiple_answer': is_multiple_answer, - 'votes': {option: [] for option in final_options}, - 'banned_users_tries': {} - }} - ) - else: - await self.send(random.choice(('¿Y las opciones?', '?', '🤔')), message) - - async def _on_poll_button_press(self, message: Message): - await self.accept_button_event(message) - if not message.contents['poll']['is_active']: - return - - presser_id = message.buttons_info.presser_user.id - presser_name = message.buttons_info.presser_user.name.split('#')[0] - if (presser_id_str := str(presser_id)) in message.contents['poll']['banned_users_tries']: - message.contents['poll']['banned_users_tries'][presser_id_str] += 1 - message.save() - if message.contents['poll']['banned_users_tries'][presser_id_str] == 3: - await self.send(random.choice(( - f'Deja de dar por culo {presser_name}, que no puedes votar aqui.', - f'No es pesao {presser_name}, que no tienes permitido votar aqui', - f'Deja de pulsar botones que no puedes votar aqui {presser_name}', - f'{presser_name} deja de intentar votar aqui que no puedes', - f'Te han prohibido votar aqui {presser_name}', - f'No puedes votar aqui {presser_name}' - )), reply_to=message) - return - - option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text - selected_option_votes = message.contents['poll']['votes'][option_name] - - if [presser_id, presser_name] in selected_option_votes: - selected_option_votes.remove([presser_id, presser_name]) - else: - if not message.contents['poll']['is_multiple_answer']: - for option_votes in message.contents['poll']['votes'].values(): - try: - option_votes.remove([presser_id, presser_name]) - except ValueError: - pass - else: - break - selected_option_votes.append((presser_id, presser_name)) - - await self._update_poll_buttons(message) - - async def _on_poll_multi(self, message: Message): - await self._on_poll(message, is_multiple_answer=True) - - @bot_mentioned - @group - @admin(send_negative=True) - async def _on_punish(self, message: Message): - if not message.chat.config['punish']: - return - - for user in await self._find_users_to_punish(message): - await self.punish(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) - - async def _on_ready(self): - await super()._on_ready() - await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments) - @inline(False) async def _on_recover_message(self, message: Message): if message.replied_message and message.replied_message.author.id == self.id: @@ -694,79 +252,6 @@ class FlanaBot(MultiBot, ABC): message.buttons_info.presser_user.save() - async def _on_scraping(self, message: Message, audio_only=False) -> OrderedSet[Media]: - sended_media_messages = OrderedSet() - - if message.replied_message: - sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only) - - return await self._scrape_send_and_delete(message, audio_only, sended_media_messages) - - async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]: - return await self._on_scraping(message, audio_only=True) - - @reply - async def _on_song_info(self, message: Message): - song_infos = message.replied_message.song_infos if message.replied_message else [] - - if song_infos: - for song_info in song_infos: - await self.send_song_info(song_info, message) - elif message.chat.is_private or self.is_bot_mentioned(message): - await self._manage_exceptions(SendError('No hay información musical en ese mensaje.'), message) - - async def _on_stop_poll(self, message: Message): - if not (poll_message := await self._get_poll_message(message)): - return - - winners = [] - max_votes = 1 - for option, votes in poll_message.contents['poll']['votes'].items(): - if len(votes) > max_votes: - winners = [option] - max_votes = len(votes) - elif len(votes) == max_votes: - winners.append(option) - - match winners: - case [_, _, *_]: - winners = [f'{winner}' for winner in winners] - text = f"Encuesta finalizada. Los ganadores son: {flanautils.join_last_separator(winners, ', ', ' y ')}." - case [winner]: - text = f'Encuesta finalizada. Ganador: {winner}.' - case _: - text = 'Encuesta finalizada.' - - poll_message.contents['poll']['is_active'] = False - - await self.edit(text, poll_message) - if not message.replied_message: - await self.send(text, reply_to=poll_message) - - @bot_mentioned - @group - @admin(send_negative=True) - async def _on_unban(self, message: Message): - for user in await self._find_users_to_punish(message): - await self.unban(user, message, message) - - @group - @bot_mentioned - @admin(send_negative=True) - async def _on_unmute(self, message: Message): - for user in await self._find_users_to_punish(message): - await self.unmute(user, message, message) - - @group - @bot_mentioned - @admin(send_negative=True) - async def _on_unpunish(self, message: Message): - if not message.chat.config['punish']: - return - - for user in await self._find_users_to_punish(message): - await self.unpunish(user, message, message) - @group @bot_mentioned async def _on_users(self, message: Message): @@ -803,248 +288,15 @@ class FlanaBot(MultiBot, ABC): message ) - @admin - async def _on_voting_ban(self, message: Message): - if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): - return - - await self.delete_message(message) - - for user in await self._find_users_to_punish(message): - if str(user.id) not in poll_message.contents['poll']['banned_users_tries']: - poll_message.contents['poll']['banned_users_tries'][str(user.id)] = 0 - poll_message.save() - - @admin - async def _on_voting_unban(self, message: Message): - if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): - return - - await self.delete_message(message) - - for user in await self._find_users_to_punish(message): - try: - del poll_message.contents['poll']['banned_users_tries'][str(user.id)] - except KeyError: - pass - poll_message.save() - - async def _on_weather(self, message: Message): - bot_state_message: Message | None = None - if message.is_inline: - show_progress_state = False - elif message.chat.is_group and not self.is_bot_mentioned(message): - if message.chat.config['weather_chart']: - if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}): - return - show_progress_state = False - else: - return - else: - show_progress_state = True - - original_text_words = flanautils.remove_accents(message.text.lower()) - original_text_words = flanautils.remove_symbols(original_text_words, ignore=('-', '.'), replace_with=' ').split() - original_text_words = await self._filter_mention_ids(original_text_words, message, delete_names=True) - - # noinspection PyTypeChecker - place_words = ( - OrderedSet(original_text_words) - - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys() - - flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather_chart'], min_score=0.85).keys() - - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys() - - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys() - - flanautils.CommonWords.get() - ) - if not place_words: - if not message.is_inline: - await self.send_error(random.choice(('¿Tiempo dónde?', 'Indica el sitio.', 'Y el sitio?', 'y el sitio? me lo invento?')), message) - return - - if 'calle' in original_text_words: - place_words.insert(0, 'calle') - place_query = ' '.join(place_words) - if len(place_query) >= constants.MAX_PLACE_QUERY_LENGTH: - if show_progress_state: - await self.send_error(Media(str(flanautils.resolve_path('resources/mucho_texto.png')), MediaType.IMAGE, 'jpg', Source.LOCAL), message, send_as_file=False) - return - if show_progress_state: - bot_state_message = await self.send(f'Buscando "{place_query}" en el mapa 🧐...', message) - - result: str | Place | None = None - async for result in flanaapis.geolocation.functions.find_place_showing_progress(place_query): - if isinstance(result, str) and bot_state_message: - await self.edit(result, bot_state_message) - - place: Place = result - if not place: - if bot_state_message: - await self.delete_message(bot_state_message) - await self._manage_exceptions(PlaceNotFoundError(place_query), message) - return - - if bot_state_message: - bot_state_message = await self.edit(f'Obteniendo datos del tiempo para "{place_query}"...', bot_state_message) - current_weather, day_weathers = await flanaapis.weather.functions.get_day_weathers_by_place(place) - - if bot_state_message: - bot_state_message = await self.edit('Creando gráficas del tiempo...', bot_state_message) - - weather_chart = WeatherChart( - _font={'size': 30}, - _title={ - 'text': place.name[:40].strip(' ,-'), - 'xref': 'paper', - 'yref': 'paper', - 'xanchor': 'left', - 'yanchor': 'top', - 'x': 0.025, - 'y': 0.975, - 'font': { - 'size': 50, - 'family': 'open sans' - } - }, - _legend={'x': 0.99, 'y': 0.99, 'xanchor': 'right', 'yanchor': 'top', 'bgcolor': 'rgba(0,0,0,0)'}, - _margin={'l': 20, 'r': 20, 't': 20, 'b': 20}, - trace_metadatas={ - 'temperature': TraceMetadata(name='temperature', group='temperature', legend='Temperatura', show=False, color='#ff8400', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130), - 'temperature_feel': TraceMetadata(name='temperature_feel', group='temperature', legend='Sensación de temperatura', show=True, color='red', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130), - 'clouds': TraceMetadata(name='clouds', legend='Nubes', show=False, color='#86abe3', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'), - 'visibility': TraceMetadata(name='visibility', legend='Visibilidad', show=False, color='#c99a34', default_min=0, default_max='{max_y_data} * 2', y_tick_suffix=' km', y_delta_tick=2, hide_y_ticks_if='{tick} > {max_y_data}'), - 'uvi': TraceMetadata(name='uvi', legend='UVI', show=False, color='#ffd000', default_min=-12, default_max=12, hide_y_ticks_if='{tick} < 0', y_delta_tick=1, y_axis_width=75), - 'humidity': TraceMetadata(name='humidity', legend='Humedad', show=False, color='#2baab5', default_min=0, default_max=100, y_tick_suffix=' %'), - 'precipitation_probability': TraceMetadata(name='precipitation_probability', legend='Probabilidad de precipitaciones', show=True, color='#0033ff', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'), - 'rain_volume': TraceMetadata(plotly.graph_objects.Histogram, name='rain_volume', group='precipitation', legend='Volumen de lluvia', show=True, color='#34a4eb', opacity=0.3, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130), - 'snow_volume': TraceMetadata(plotly.graph_objects.Histogram, name='snow_volume', group='precipitation', legend='Volumen de nieve', show=True, color='#34a4eb', opacity=0.8, pattern={'shape': '.', 'fgcolor': '#ffffff', 'bgcolor': '#b0d6f3', 'solidity': 0.5, 'size': 14}, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130), - 'pressure': TraceMetadata(name='pressure', legend='Presión', show=False, color='#31a339', default_min=1013.25 - 90, default_max=1013.25 + 90, y_tick_suffix=' hPa', y_axis_width=225), - 'wind_speed': TraceMetadata(name='wind_speed', legend='Velocidad del viento', show=False, color='#d8abff', default_min=-120, default_max=120, y_tick_suffix=' km/h', hide_y_ticks_if='{tick} < 0', y_axis_width=165) - }, - x_data=[instant_weather.date_time for day_weather in day_weathers for instant_weather in day_weather.instant_weathers], - all_y_data=[], - current_weather=current_weather, - day_weathers=day_weathers, - timezone=(timezone := day_weathers[0].timezone), - place=place, - view_position=datetime.datetime.now(timezone) - ) - - weather_chart.apply_zoom() - weather_chart.draw() - if not (image_bytes := weather_chart.to_image()): - if bot_state_message: - await self.delete_message(bot_state_message) - raise NotFoundError('No hay suficientes datos del tiempo.') - - if bot_state_message: - bot_state_message = await self.edit('Enviando...', bot_state_message) - bot_message: Message = await self.send( - Media(image_bytes, MediaType.IMAGE, 'jpg'), - [ - [WeatherEmoji.ZOOM_IN.value, WeatherEmoji.ZOOM_OUT.value, WeatherEmoji.LEFT.value, WeatherEmoji.RIGHT.value], - [WeatherEmoji.TEMPERATURE.value, WeatherEmoji.TEMPERATURE_FEEL.value, WeatherEmoji.CLOUDS.value, WeatherEmoji.VISIBILITY.value, WeatherEmoji.UVI.value], - [WeatherEmoji.HUMIDITY.value, WeatherEmoji.PRECIPITATION_PROBABILITY.value, WeatherEmoji.PRECIPITATION_VOLUME.value, WeatherEmoji.PRESSURE.value, WeatherEmoji.WIND_SPEED.value] - ], - message, - buttons_key=ButtonsGroup.WEATHER, - send_as_file=False - ) - await self.send_inline_results(message) - - if bot_state_message: - await self.delete_message(bot_state_message) - - if bot_message: - bot_message.weather_chart = weather_chart - bot_message.save() - if not self.is_bot_mentioned(message): - # noinspection PyTypeChecker - BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save() - - async def _on_weather_button_press(self, message: Message): - await self.accept_button_event(message) - - match message.buttons_info.pressed_text: - case WeatherEmoji.ZOOM_IN.value: - message.weather_chart.zoom_in() - case WeatherEmoji.ZOOM_OUT.value: - message.weather_chart.zoom_out() - case WeatherEmoji.LEFT.value: - message.weather_chart.move_left() - case WeatherEmoji.RIGHT.value: - message.weather_chart.move_right() - case WeatherEmoji.PRECIPITATION_VOLUME.value: - message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show - message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show - case emoji if emoji in WeatherEmoji.values: - trace_metadata_name = WeatherEmoji(emoji).name.lower() - message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show - case _: - return - - message.weather_chart.apply_zoom() - message.weather_chart.draw() - - image_bytes = message.weather_chart.to_image() - await self.edit(Media(image_bytes, MediaType.IMAGE, 'jpg'), message) - # -------------------------------------------------------- # # -------------------- PUBLIC METHODS -------------------- # # -------------------------------------------------------- # - async def check_old_punishments(self): - punishments = Punishment.find({'platform': self.platform.value}, lazy=True) - - for punishment in punishments: - now = datetime.datetime.now(datetime.timezone.utc) - if not punishment.until or now < punishment.until: - continue - - await self._remove_penalty(punishment, self._unpunish, delete=False) - if punishment.is_active: - punishment.is_active = False - punishment.last_update = now - punishment.save() - - if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now: - if punishment.level == 1: - punishment.delete() - else: - punishment.level -= 1 - punishment.last_update = now - punishment.save() - @classmethod async def clear_old_database_items(cls): await super().clear_old_database_items() before_date = datetime.datetime.now(datetime.timezone.utc) - multibot_constants.MESSAGE_EXPIRATION_TIME BotAction.collection.delete_many({'date': {'$lte': before_date}}) - async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool: - pass - - async def punish( - self, - user: int | str | User, - group_: int | str | Chat | Message, - time: int | datetime.timedelta = None, - message: Message = None - ): - # noinspection PyTypeChecker - punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time) - punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',)) - punishment.level += 1 - - try: - await self._punish(punishment.user_id, punishment.group_id) - except BadRoleError as e: - if message and message.chat.original_object: - await self._manage_exceptions(e, message) - else: - raise e - else: - punishment.save(pull_exclude_fields=('until',)) - await self._unpenalize_later(punishment, self._unpunish, message) - async def send_bye(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE: return await self.send(random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())), message) @@ -1054,60 +306,3 @@ class FlanaBot(MultiBot, ABC): async def send_insult(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE | None: await self.typing_delay(message) return await self.send(random.choice(constants.INSULTS), message) - - async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]: - sended_media_messages = [] - fails = 0 - bot_state_message: Message | None = None - sended_info_message: Message | None = None - - if not message.is_inline: - bot_state_message: Message = await self.send('Enviando...', message) - - if message.chat.is_group: - sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message) - - for media in medias: - if not media.content: - fails += 1 - continue - - if media.song_info: - message.song_infos.add(media.song_info) - message.save() - - if bot_message := await self.send(media, message): - sended_media_messages.append(bot_message) - if media.song_info and bot_message: - bot_message.song_infos.add(media.song_info) - bot_message.save() - else: - fails += 1 - - if send_song_info and media.song_info: - await self.send_song_info(media.song_info, message) - - if fails and sended_info_message: - if fails == len(medias): - await self.delete_message(sended_info_message) - if bot_state_message: - await self.delete_message(bot_state_message) - - return sended_media_messages, fails - - @return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals()) - async def send_song_info(self, song_info: Media, message: Message): - attributes = ( - f'Título: {song_info.title}\n' if song_info.title else '', - f'Autor: {song_info.author}\n' if song_info.author else '', - f'Álbum: {song_info.album}\n' if song_info.album else '', - f'Previa:' - ) - await self.send(''.join(attributes), message) - if song_info: - await self.send(song_info, message) - - async def unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): - # noinspection PyTypeChecker - punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_)) - await self._remove_penalty(punishment, self._unpunish, message) diff --git a/flanabot/bots/penalty_bot.py b/flanabot/bots/penalty_bot.py new file mode 100644 index 0000000..adb7794 --- /dev/null +++ b/flanabot/bots/penalty_bot.py @@ -0,0 +1,205 @@ +__all__ = ['PenaltyBot'] + +import asyncio +import datetime +from abc import ABC + +import flanautils +from flanautils import TimeUnits +from multibot import BadRoleError, MultiBot, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message + +from flanabot import constants +from flanabot.models import Chat, Message, Punishment + + +# ----------------------------------------------------------------------------------------------------- # +# --------------------------------------------- PENALTY_BOT --------------------------------------------- # +# ----------------------------------------------------------------------------------------------------- # +class PenaltyBot(MultiBot, ABC): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.lock = asyncio.Lock() + + # ----------------------------------------------------------- # + # -------------------- PROTECTED METHODS -------------------- # + # ----------------------------------------------------------- # + def _add_handlers(self): + super()._add_handlers() + + self.register(self._on_ban, multibot_constants.KEYWORDS['ban']) + + self.register(self._on_mute, multibot_constants.KEYWORDS['mute']) + self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute'])) + self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute'])) + self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound'])) + + self.register(self._on_punish, constants.KEYWORDS['punish']) + self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish'])) + self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'])) + + self.register(self._on_unban, multibot_constants.KEYWORDS['unban']) + + self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute']) + self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute'])) + self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound'])) + + self.register(self._on_unpunish, constants.KEYWORDS['unpunish']) + self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish'])) + self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'])) + + @admin(False) + @group + async def _check_message_flood(self, message: Message): + if await self.is_punished(message.author, message.chat): + return + + last_2s_messages = self.Message.find({ + 'platform': self.platform.value, + 'author': message.author.object_id, + 'chat': message.chat.object_id, + 'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=2)} + }) + last_7s_messages = self.Message.find({ + 'platform': self.platform.value, + 'author': message.author.object_id, + 'chat': message.chat.object_id, + 'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=7)} + }) + + if len(last_2s_messages) > constants.FLOOD_2s_LIMIT or len(last_7s_messages) > constants.FLOOD_7s_LIMIT: + punishment = Punishment.find_one({ + 'platform': self.platform.value, + 'user_id': message.author.id, + 'group_id': message.chat.group_id + }) + punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT + try: + await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message) + except BadRoleError as e: + await self._manage_exceptions(e, message) + else: + await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message) + + async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): + pass + + async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): + pass + + # ---------------------------------------------- # + # HANDLERS # + # ---------------------------------------------- # + @bot_mentioned + @group + @admin(send_negative=True) + async def _on_ban(self, message: Message): + for user in await self._find_users_to_punish(message): + await self.ban(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) + + @group + @bot_mentioned + @admin(send_negative=True) + async def _on_mute(self, message: Message): + for user in await self._find_users_to_punish(message): + await self.mute(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) + + @ignore_self_message + async def _on_new_message_raw(self, message: Message): + await super()._on_new_message_raw(message) + if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline: + async with self.lock: + await self._check_message_flood(message) + + @bot_mentioned + @group + @admin(send_negative=True) + async def _on_punish(self, message: Message): + if not message.chat.config['punish']: + return + + for user in await self._find_users_to_punish(message): + await self.punish(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message) + + async def _on_ready(self): + await super()._on_ready() + await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments) + + @bot_mentioned + @group + @admin(send_negative=True) + async def _on_unban(self, message: Message): + for user in await self._find_users_to_punish(message): + await self.unban(user, message, message) + + @group + @bot_mentioned + @admin(send_negative=True) + async def _on_unmute(self, message: Message): + for user in await self._find_users_to_punish(message): + await self.unmute(user, message, message) + + @group + @bot_mentioned + @admin(send_negative=True) + async def _on_unpunish(self, message: Message): + if not message.chat.config['punish']: + return + + for user in await self._find_users_to_punish(message): + await self.unpunish(user, message, message) + + # -------------------------------------------------------- # + # -------------------- PUBLIC METHODS -------------------- # + # -------------------------------------------------------- # + async def check_old_punishments(self): + punishments = Punishment.find({'platform': self.platform.value}, lazy=True) + + for punishment in punishments: + now = datetime.datetime.now(datetime.timezone.utc) + if not punishment.until or now < punishment.until: + continue + + await self._remove_penalty(punishment, self._unpunish, delete=False) + if punishment.is_active: + punishment.is_active = False + punishment.last_update = now + punishment.save() + + if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now: + if punishment.level == 1: + punishment.delete() + else: + punishment.level -= 1 + punishment.last_update = now + punishment.save() + + async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool: + pass + + async def punish( + self, + user: int | str | User, + group_: int | str | Chat | Message, + time: int | datetime.timedelta = None, + message: Message = None + ): + # noinspection PyTypeChecker + punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time) + punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',)) + punishment.level += 1 + + try: + await self._punish(punishment.user_id, punishment.group_id) + except BadRoleError as e: + if message and message.chat.original_object: + await self._manage_exceptions(e, message) + else: + raise e + else: + punishment.save(pull_exclude_fields=('until',)) + await self._unpenalize_later(punishment, self._unpunish, message) + + async def unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None): + # noinspection PyTypeChecker + punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_)) + await self._remove_penalty(punishment, self._unpunish, message) diff --git a/flanabot/bots/poll_bot.py b/flanabot/bots/poll_bot.py new file mode 100644 index 0000000..51eac01 --- /dev/null +++ b/flanabot/bots/poll_bot.py @@ -0,0 +1,277 @@ +__all__ = ['PollBot'] + +import math +import random +import re +from abc import ABC +from typing import Iterable + +import flanautils +import pymongo +from multibot import MultiBot, admin, constants as multibot_constants + +from flanabot import constants +from flanabot.models import ButtonsGroup, Message + + +# ----------------------------------------------------------------------------------------------------- # +# --------------------------------------------- POLL_BOT --------------------------------------------- # +# ----------------------------------------------------------------------------------------------------- # +class PollBot(MultiBot, ABC): + # ----------------------------------------------------------- # + # -------------------- PROTECTED METHODS -------------------- # + # ----------------------------------------------------------- # + def _add_handlers(self): + super()._add_handlers() + + self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2) + self.register(self._on_choose, constants.KEYWORDS['random'], priority=2) + self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2) + + self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote'])) + self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote'])) + + self.register(self._on_dice, constants.KEYWORDS['dice']) + + self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2) + + self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2) + + self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate']) + self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll'])) + self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop']) + self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll'])) + + self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote'])) + + self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote'])) + + self.register_button(self._on_poll_button_press, ButtonsGroup.POLL) + + @staticmethod + def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]: + options = (option for option in text.split() if not flanautils.cartesian_product_string_matching(option.lower(), discarded_words, min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT)) + text = ' '.join(options) + + conjunctions = [f' {conjunction} ' for conjunction in flanautils.CommonWords.get('conjunctions')] + if any(char in text for char in (',', ';', *conjunctions)): + conjunction_parts = [f'(?:[,;]*{conjunction}[,;]*)+' for conjunction in conjunctions] + options = re.split(f"{'|'.join(conjunction_parts)}|[,;]+", text) + return [option.strip() for option in options if option] + else: + return text.split() + + async def _get_poll_message(self, message: Message) -> Message | None: + if poll_message := message.replied_message: + if poll_message.contents.get('poll') is None: + return + return poll_message + elif ( + (message.chat.is_private or self.is_bot_mentioned(message)) + and + flanautils.cartesian_product_string_matching(message.text, constants.KEYWORDS['poll'], min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT) + and + (poll_message := self.Message.find_one({'contents.poll.is_active': True}, sort_keys=(('date', pymongo.DESCENDING),))) + ): + return await self.get_message(poll_message.chat.id, poll_message.id) + else: + return + + async def _update_poll_buttons(self, message: Message): + if message.contents['poll']['is_multiple_answer']: + total_votes = len({option_vote[0] for option_votes in message.contents['poll']['votes'].values() if option_votes for option_vote in option_votes}) + else: + total_votes = sum(len(option_votes) for option_votes in message.contents['poll']['votes'].values()) + + if total_votes: + buttons = [] + for option, option_votes in message.contents['poll']['votes'].items(): + ratio = f'{len(option_votes)}/{total_votes}' + names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else '' + buttons.append(f'{option} ➜ {ratio} {names}') + else: + buttons = list(message.contents['poll']['votes'].keys()) + + await self.edit(self._distribute_buttons(buttons), message) + + # ---------------------------------------------- # + # HANDLERS # + # ---------------------------------------------- # + async def _on_choose(self, message: Message): + if message.chat.is_group and not self.is_bot_mentioned(message): + return + + discarded_words = { + *constants.KEYWORDS['choose'], + *constants.KEYWORDS['random'], + self.name.lower(), f'<@{self.id}>', + 'entre', 'between' + } + + if options := self._get_options(message.text, discarded_words): + for i in range(1, len(options) - 1): + try: + n1 = flanautils.cast_number(options[i - 1]) + except ValueError: + try: + n1 = flanautils.words_to_numbers(options[i - 1], ignore_no_numbers=False) + except KeyError: + continue + try: + n2 = flanautils.cast_number(options[i + 1]) + except ValueError: + try: + n2 = flanautils.words_to_numbers(options[i + 1], ignore_no_numbers=False) + except KeyError: + continue + if options[i] in ('al', 'to'): + await self.send(random.randint(math.ceil(n1), math.floor(n2)), message) + return + await self.send(random.choice(options), message) + else: + await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message) + + @admin + async def _on_delete_votes(self, message: Message): + if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): + return + + await self.delete_message(message) + + for user in await self._find_users_to_punish(message): + for option_name, option_votes in poll_message.contents['poll']['votes'].items(): + poll_message.contents['poll']['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id] + + await self._update_poll_buttons(poll_message) + + async def _on_dice(self, message: Message): + if message.chat.is_group and not self.is_bot_mentioned(message): + return + + if top_number := flanautils.sum_numbers_in_text(message.text): + await self.send(random.randint(1, math.floor(top_number)), message) + else: + await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message) + + async def _on_poll(self, message: Message, is_multiple_answer=False): + if message.chat.is_group and not self.is_bot_mentioned(message): + return + + await self.delete_message(message) + + discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'} + if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]: + await self.send( + f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...", + self._distribute_buttons(final_options), + message, + buttons_key=ButtonsGroup.POLL, + contents={'poll': { + 'is_active': True, + 'is_multiple_answer': is_multiple_answer, + 'votes': {option: [] for option in final_options}, + 'banned_users_tries': {} + }} + ) + else: + await self.send(random.choice(('¿Y las opciones?', '?', '🤔')), message) + + async def _on_poll_button_press(self, message: Message): + await self.accept_button_event(message) + if not message.contents['poll']['is_active']: + return + + presser_id = message.buttons_info.presser_user.id + presser_name = message.buttons_info.presser_user.name.split('#')[0] + if (presser_id_str := str(presser_id)) in message.contents['poll']['banned_users_tries']: + message.contents['poll']['banned_users_tries'][presser_id_str] += 1 + message.save() + if message.contents['poll']['banned_users_tries'][presser_id_str] == 3: + await self.send(random.choice(( + f'Deja de dar por culo {presser_name} que no puedes votar aqui', + f'No es pesao {presser_name}, que no tienes permitido votar aqui', + f'Deja de pulsar botones que no puedes votar aqui {presser_name}', + f'{presser_name} deja de intentar votar aqui que no puedes', + f'Te han prohibido votar aquì {presser_name}.', + f'No puedes votar aquí, {presser_name}.' + )), reply_to=message) + return + + option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text + selected_option_votes = message.contents['poll']['votes'][option_name] + + if [presser_id, presser_name] in selected_option_votes: + selected_option_votes.remove([presser_id, presser_name]) + else: + if not message.contents['poll']['is_multiple_answer']: + for option_votes in message.contents['poll']['votes'].values(): + try: + option_votes.remove([presser_id, presser_name]) + except ValueError: + pass + else: + break + selected_option_votes.append((presser_id, presser_name)) + + await self._update_poll_buttons(message) + + async def _on_poll_multi(self, message: Message): + await self._on_poll(message, is_multiple_answer=True) + + async def _on_stop_poll(self, message: Message): + if not (poll_message := await self._get_poll_message(message)): + return + + winners = [] + max_votes = 1 + for option, votes in poll_message.contents['poll']['votes'].items(): + if len(votes) > max_votes: + winners = [option] + max_votes = len(votes) + elif len(votes) == max_votes: + winners.append(option) + + match winners: + case [_, _, *_]: + winners = [f'{winner}' for winner in winners] + text = f"Encuesta finalizada. Los ganadores son: {flanautils.join_last_separator(winners, ', ', ' y ')}." + case [winner]: + text = f'Encuesta finalizada. Ganador: {winner}.' + case _: + text = 'Encuesta finalizada.' + + poll_message.contents['poll']['is_active'] = False + + await self.edit(text, poll_message) + if not message.replied_message: + await self.send(text, reply_to=poll_message) + + @admin + async def _on_voting_ban(self, message: Message): + if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): + return + + await self.delete_message(message) + + for user in await self._find_users_to_punish(message): + if str(user.id) not in poll_message.contents['poll']['banned_users_tries']: + poll_message.contents['poll']['banned_users_tries'][str(user.id)] = 0 + poll_message.save() + + @admin + async def _on_voting_unban(self, message: Message): + if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)): + return + + await self.delete_message(message) + + for user in await self._find_users_to_punish(message): + try: + del poll_message.contents['poll']['banned_users_tries'][str(user.id)] + except KeyError: + pass + poll_message.save() + + # -------------------------------------------------------- # + # -------------------- PUBLIC METHODS -------------------- # + # -------------------------------------------------------- # diff --git a/flanabot/bots/scraper_bot.py b/flanabot/bots/scraper_bot.py new file mode 100644 index 0000000..cc235ea --- /dev/null +++ b/flanabot/bots/scraper_bot.py @@ -0,0 +1,232 @@ +__all__ = ['ScraperBot'] + +import asyncio +import random +from abc import ABC +from typing import Iterable + +import flanautils +from flanaapis import instagram, tiktok, twitter, youtube +from flanautils import Media, MediaType, OrderedSet, Source, return_if_first_empty +from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply + +from flanabot import constants +from flanabot.models import Action, BotAction, Message + + +# ----------------------------------------------------------------------------------------------------- # +# --------------------------------------------- SCRAPER_BOT --------------------------------------------- # +# ----------------------------------------------------------------------------------------------------- # +class ScraperBot(MultiBot, ABC): + # ----------------------------------------------------------- # + # -------------------- PROTECTED METHODS -------------------- # + # ----------------------------------------------------------- # + def _add_handlers(self): + super()._add_handlers() + + self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'])) + self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message'])) + self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message'])) + self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message'])) + self.register(self._on_scraping, constants.KEYWORDS['scraping']) + + self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio']) + self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping'])) + + self.register(self._on_song_info, constants.KEYWORDS['song_info']) + + @staticmethod + def _medias_sended_info(medias: Iterable[Media]) -> str: + medias_count = { + Source.TWITTER: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, + Source.INSTAGRAM: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, + Source.TIKTOK: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, + Source.REDDIT: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, + Source.YOUTUBE: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}, + None: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0} + } + for media in medias: + medias_count[media.source][media.type_] += 1 + + medias_sended_info = [] + for source, media_type_count in medias_count.items(): + source_medias_sended_info = [] + for media_type, count in media_type_count.items(): + if count: + if count == 1: + type_text = {MediaType.IMAGE: 'imagen', + MediaType.AUDIO: 'audio', + MediaType.GIF: 'gif', + MediaType.VIDEO: 'vídeo', + None: 'cosa que no sé que tipo de archivo es', + MediaType.ERROR: 'error'}[media_type] + else: + type_text = {MediaType.IMAGE: 'imágenes', + MediaType.AUDIO: 'audios', + MediaType.GIF: 'gifs', + MediaType.VIDEO: 'vídeos', + None: 'cosas que no sé que tipos de archivos son', + MediaType.ERROR: 'errores'}[media_type] + source_medias_sended_info.append(f'{count} {type_text}') + if source_medias_sended_info: + medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source.name if source else 'algún sitio'}") + + medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n') + new_line = ' ' if len(medias_sended_info) == 1 else '\n' + return f'{new_line}{medias_sended_info_joined}:' + + # ---------------------------------------------- # + # HANDLERS # + # ---------------------------------------------- # + async def _on_no_delete_original(self, message: Message): + if not await self._scrape_and_send(message): + await self._on_recover_message(message) + + async def _on_recover_message(self, message: Message): + pass + + async def _on_scraping(self, message: Message, audio_only=False) -> OrderedSet[Media]: + sended_media_messages = OrderedSet() + + if message.replied_message: + sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only) + + return await self._scrape_send_and_delete(message, audio_only, sended_media_messages) + + async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]: + return await self._on_scraping(message, audio_only=True) + + @reply + async def _on_song_info(self, message: Message): + song_infos = message.replied_message.song_infos if message.replied_message else [] + + if song_infos: + for song_info in song_infos: + await self.send_song_info(song_info, message) + elif message.chat.is_private or self.is_bot_mentioned(message): + await self._manage_exceptions(SendError('No hay información musical en ese mensaje.'), message) + + async def _scrape_and_send(self, message: Message, audio_only=False) -> OrderedSet[Media]: + kwargs = {} + if self._parse_callbacks(message.text, [RegisteredCallback(..., [['sin'], ['timeout', 'limite']])]): + kwargs['timeout_for_media'] = None + if not (medias := await self._search_medias(message, audio_only, **kwargs)): + return OrderedSet() + + sended_media_messages, _ = await self.send_medias(medias, message) + sended_media_messages = OrderedSet(sended_media_messages) + + await self.send_inline_results(message) + + return sended_media_messages + + async def _scrape_send_and_delete( + self, + message: Message, + audio_only=False, + sended_media_messages: OrderedSet[Media] = None + ) -> OrderedSet[Media]: + if sended_media_messages is None: + sended_media_messages = OrderedSet() + + sended_media_messages += await self._scrape_and_send(message, audio_only) + + if ( + sended_media_messages + and + message.chat.is_group + and + not message.replied_message + and + message.chat.config['delete_original'] + ): + # noinspection PyTypeChecker + BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save() + await self.delete_message(message) + + return sended_media_messages + + async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]: + medias = OrderedSet() + + tweet_ids = twitter.find_tweet_ids(message.text) + instagram_ids = instagram.find_instagram_ids(message.text) + tiktok_ids = await tiktok.find_tiktok_ids(message.text) + tiktok_download_urls = tiktok.find_download_urls(message.text) + youtube_ids = youtube.find_youtube_ids(message.text) + + if not any((tweet_ids, instagram_ids, tiktok_ids, tiktok_download_urls, youtube_ids)): + return medias + + bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message) + + gather_result = asyncio.gather( + twitter.get_medias(tweet_ids, audio_only), + instagram.get_medias(instagram_ids, audio_only), + tiktok.get_medias(tiktok_ids, tiktok_download_urls, audio_only), + youtube.get_medias(youtube_ids, audio_only, timeout_for_media), + return_exceptions=True + ) + + await gather_result + await self.delete_message(bot_state_message) + + medias, exceptions = flanautils.filter_exceptions(gather_result.result()) + await self._manage_exceptions(exceptions, message) + + return OrderedSet(*medias) + + # -------------------------------------------------------- # + # -------------------- PUBLIC METHODS -------------------- # + # -------------------------------------------------------- # + async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]: + sended_media_messages = [] + fails = 0 + bot_state_message: Message | None = None + sended_info_message: Message | None = None + + if not message.is_inline: + bot_state_message: Message = await self.send('Enviando...', message) + + if message.chat.is_group: + sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message) + + for media in medias: + if not media.content: + fails += 1 + continue + + if media.song_info: + message.song_infos.add(media.song_info) + message.save() + + if bot_message := await self.send(media, message): + sended_media_messages.append(bot_message) + if media.song_info and bot_message: + bot_message.song_infos.add(media.song_info) + bot_message.save() + else: + fails += 1 + + if send_song_info and media.song_info: + await self.send_song_info(media.song_info, message) + + if fails and sended_info_message: + if fails == len(medias): + await self.delete_message(sended_info_message) + if bot_state_message: + await self.delete_message(bot_state_message) + + return sended_media_messages, fails + + @return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals()) + async def send_song_info(self, song_info: Media, message: Message): + attributes = ( + f'Título: {song_info.title}\n' if song_info.title else '', + f'Autor: {song_info.author}\n' if song_info.author else '', + f'Álbum: {song_info.album}\n' if song_info.album else '', + f'Previa:' + ) + await self.send(''.join(attributes), message) + if song_info: + await self.send(song_info, message) diff --git a/flanabot/bots/weather_bot.py b/flanabot/bots/weather_bot.py new file mode 100644 index 0000000..1c65869 --- /dev/null +++ b/flanabot/bots/weather_bot.py @@ -0,0 +1,194 @@ +__all__ = ['WeatherBot'] + +import datetime +import random +from abc import ABC + +import flanaapis.geolocation.functions +import flanaapis.weather.functions +import flanautils +import plotly.graph_objects +from flanaapis import Place, PlaceNotFoundError, WeatherEmoji +from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TraceMetadata +from multibot import MultiBot, constants as multibot_constants + +from flanabot import constants +from flanabot.models import Action, BotAction, ButtonsGroup, Message, WeatherChart + + +# ----------------------------------------------------------------------------------------------------- # +# --------------------------------------------- WEATHER_BOT --------------------------------------------- # +# ----------------------------------------------------------------------------------------------------- # +class WeatherBot(MultiBot, ABC): + # ----------------------------------------------------------- # + # -------------------- PROTECTED METHODS -------------------- # + # ----------------------------------------------------------- # + def _add_handlers(self): + super()._add_handlers() + + self.register(self._on_weather, constants.KEYWORDS['weather_chart']) + self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather_chart'])) + + self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER) + + # ---------------------------------------------- # + # HANDLERS # + # ---------------------------------------------- # + async def _on_weather(self, message: Message): + bot_state_message: Message | None = None + if message.is_inline: + show_progress_state = False + elif message.chat.is_group and not self.is_bot_mentioned(message): + if message.chat.config['weather_chart']: + if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}): + return + show_progress_state = False + else: + return + else: + show_progress_state = True + + original_text_words = flanautils.remove_accents(message.text.lower()) + original_text_words = flanautils.remove_symbols(original_text_words, ignore=('-', '.'), replace_with=' ').split() + original_text_words = await self._filter_mention_ids(original_text_words, message, delete_names=True) + + # noinspection PyTypeChecker + place_words = ( + OrderedSet(original_text_words) + - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys() + - flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather_chart'], min_score=0.85).keys() + - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys() + - flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys() + - flanautils.CommonWords.get() + ) + if not place_words: + if not message.is_inline: + await self.send_error(random.choice(('¿Tiempo dónde?', 'Indica el sitio.', 'Y el sitio?', 'y el sitio? me lo invento?')), message) + return + + if 'calle' in original_text_words: + place_words.insert(0, 'calle') + place_query = ' '.join(place_words) + if len(place_query) >= constants.MAX_PLACE_QUERY_LENGTH: + if show_progress_state: + await self.send_error(Media(str(flanautils.resolve_path('resources/mucho_texto.png')), MediaType.IMAGE, 'jpg', Source.LOCAL), message, send_as_file=False) + return + if show_progress_state: + bot_state_message = await self.send(f'Buscando "{place_query}" en el mapa 🧐...', message) + + result: str | Place | None = None + async for result in flanaapis.geolocation.functions.find_place_showing_progress(place_query): + if isinstance(result, str) and bot_state_message: + await self.edit(result, bot_state_message) + + place: Place = result + if not place: + if bot_state_message: + await self.delete_message(bot_state_message) + await self._manage_exceptions(PlaceNotFoundError(place_query), message) + return + + if bot_state_message: + bot_state_message = await self.edit(f'Obteniendo datos del tiempo para "{place_query}"...', bot_state_message) + current_weather, day_weathers = await flanaapis.weather.functions.get_day_weathers_by_place(place) + + if bot_state_message: + bot_state_message = await self.edit('Creando gráficas del tiempo...', bot_state_message) + + weather_chart = WeatherChart( + _font={'size': 30}, + _title={ + 'text': place.name[:40].strip(' ,-'), + 'xref': 'paper', + 'yref': 'paper', + 'xanchor': 'left', + 'yanchor': 'top', + 'x': 0.025, + 'y': 0.975, + 'font': { + 'size': 50, + 'family': 'open sans' + } + }, + _legend={'x': 0.99, 'y': 0.99, 'xanchor': 'right', 'yanchor': 'top', 'bgcolor': 'rgba(0,0,0,0)'}, + _margin={'l': 20, 'r': 20, 't': 20, 'b': 20}, + trace_metadatas={ + 'temperature': TraceMetadata(name='temperature', group='temperature', legend='Temperatura', show=False, color='#ff8400', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130), + 'temperature_feel': TraceMetadata(name='temperature_feel', group='temperature', legend='Sensación de temperatura', show=True, color='red', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130), + 'clouds': TraceMetadata(name='clouds', legend='Nubes', show=False, color='#86abe3', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'), + 'visibility': TraceMetadata(name='visibility', legend='Visibilidad', show=False, color='#c99a34', default_min=0, default_max='{max_y_data} * 2', y_tick_suffix=' km', y_delta_tick=2, hide_y_ticks_if='{tick} > {max_y_data}'), + 'uvi': TraceMetadata(name='uvi', legend='UVI', show=False, color='#ffd000', default_min=-12, default_max=12, hide_y_ticks_if='{tick} < 0', y_delta_tick=1, y_axis_width=75), + 'humidity': TraceMetadata(name='humidity', legend='Humedad', show=False, color='#2baab5', default_min=0, default_max=100, y_tick_suffix=' %'), + 'precipitation_probability': TraceMetadata(name='precipitation_probability', legend='Probabilidad de precipitaciones', show=True, color='#0033ff', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'), + 'rain_volume': TraceMetadata(plotly.graph_objects.Histogram, name='rain_volume', group='precipitation', legend='Volumen de lluvia', show=True, color='#34a4eb', opacity=0.3, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130), + 'snow_volume': TraceMetadata(plotly.graph_objects.Histogram, name='snow_volume', group='precipitation', legend='Volumen de nieve', show=True, color='#34a4eb', opacity=0.8, pattern={'shape': '.', 'fgcolor': '#ffffff', 'bgcolor': '#b0d6f3', 'solidity': 0.5, 'size': 14}, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130), + 'pressure': TraceMetadata(name='pressure', legend='Presión', show=False, color='#31a339', default_min=1013.25 - 90, default_max=1013.25 + 90, y_tick_suffix=' hPa', y_axis_width=225), + 'wind_speed': TraceMetadata(name='wind_speed', legend='Velocidad del viento', show=False, color='#d8abff', default_min=-120, default_max=120, y_tick_suffix=' km/h', hide_y_ticks_if='{tick} < 0', y_axis_width=165) + }, + x_data=[instant_weather.date_time for day_weather in day_weathers for instant_weather in day_weather.instant_weathers], + all_y_data=[], + current_weather=current_weather, + day_weathers=day_weathers, + timezone=(timezone := day_weathers[0].timezone), + place=place, + view_position=datetime.datetime.now(timezone) + ) + + weather_chart.apply_zoom() + weather_chart.draw() + if not (image_bytes := weather_chart.to_image()): + if bot_state_message: + await self.delete_message(bot_state_message) + raise NotFoundError('No hay suficientes datos del tiempo.') + + if bot_state_message: + bot_state_message = await self.edit('Enviando...', bot_state_message) + bot_message: Message = await self.send( + Media(image_bytes, MediaType.IMAGE, 'jpg'), + [ + [WeatherEmoji.ZOOM_IN.value, WeatherEmoji.ZOOM_OUT.value, WeatherEmoji.LEFT.value, WeatherEmoji.RIGHT.value], + [WeatherEmoji.TEMPERATURE.value, WeatherEmoji.TEMPERATURE_FEEL.value, WeatherEmoji.CLOUDS.value, WeatherEmoji.VISIBILITY.value, WeatherEmoji.UVI.value], + [WeatherEmoji.HUMIDITY.value, WeatherEmoji.PRECIPITATION_PROBABILITY.value, WeatherEmoji.PRECIPITATION_VOLUME.value, WeatherEmoji.PRESSURE.value, WeatherEmoji.WIND_SPEED.value] + ], + message, + buttons_key=ButtonsGroup.WEATHER, + send_as_file=False + ) + await self.send_inline_results(message) + + if bot_state_message: + await self.delete_message(bot_state_message) + + if bot_message: + bot_message.weather_chart = weather_chart + bot_message.save() + if not self.is_bot_mentioned(message): + # noinspection PyTypeChecker + BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save() + + async def _on_weather_button_press(self, message: Message): + await self.accept_button_event(message) + + match message.buttons_info.pressed_text: + case WeatherEmoji.ZOOM_IN.value: + message.weather_chart.zoom_in() + case WeatherEmoji.ZOOM_OUT.value: + message.weather_chart.zoom_out() + case WeatherEmoji.LEFT.value: + message.weather_chart.move_left() + case WeatherEmoji.RIGHT.value: + message.weather_chart.move_right() + case WeatherEmoji.PRECIPITATION_VOLUME.value: + message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show + message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show + case emoji if emoji in WeatherEmoji.values: + trace_metadata_name = WeatherEmoji(emoji).name.lower() + message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show + case _: + return + + message.weather_chart.apply_zoom() + message.weather_chart.draw() + + image_bytes = message.weather_chart.to_image() + await self.edit(Media(image_bytes, MediaType.IMAGE, 'jpg'), message)