Split FlanaBot into multiple bots
This commit is contained in:
@@ -1,3 +1,7 @@
|
|||||||
from flanabot.bots.flana_bot import *
|
from flanabot.bots.flana_bot import *
|
||||||
from flanabot.bots.flana_disc_bot import *
|
from flanabot.bots.flana_disc_bot import *
|
||||||
from flanabot.bots.flana_tele_bot import *
|
from flanabot.bots.flana_tele_bot import *
|
||||||
|
from flanabot.bots.penalty_bot import *
|
||||||
|
from flanabot.bots.poll_bot import *
|
||||||
|
from flanabot.bots.scraper_bot import *
|
||||||
|
from flanabot.bots.weather_bot import *
|
||||||
|
|||||||
@@ -1,83 +1,49 @@
|
|||||||
__all__ = ['FlanaBot']
|
__all__ = ['FlanaBot']
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import datetime
|
import datetime
|
||||||
import math
|
|
||||||
import random
|
import random
|
||||||
import re
|
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
from typing import Iterable, Sequence
|
from typing import Iterable
|
||||||
|
|
||||||
import flanaapis.geolocation.functions
|
|
||||||
import flanaapis.weather.functions
|
|
||||||
import flanautils
|
import flanautils
|
||||||
import plotly.graph_objects
|
|
||||||
import pymongo
|
import pymongo
|
||||||
from flanaapis import InstagramLoginError, MediaNotFoundError, Place, PlaceNotFoundError, WeatherEmoji, instagram, tiktok, twitter, youtube
|
from flanaapis import InstagramLoginError, MediaNotFoundError, PlaceNotFoundError
|
||||||
from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TimeUnits, TraceMetadata, return_if_first_empty
|
from flanautils import return_if_first_empty
|
||||||
from multibot import BadRoleError, LimitError, MultiBot, RegisteredCallback, Role, SendError, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message, inline, reply
|
from multibot import BadRoleError, LimitError, MultiBot, Role, bot_mentioned, constants as multibot_constants, group, inline
|
||||||
|
|
||||||
from flanabot import constants
|
from flanabot import constants
|
||||||
from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message, Punishment, WeatherChart
|
from flanabot.bots.penalty_bot import PenaltyBot
|
||||||
|
from flanabot.bots.poll_bot import PollBot
|
||||||
|
from flanabot.bots.scraper_bot import ScraperBot
|
||||||
|
from flanabot.bots.weather_bot import WeatherBot
|
||||||
|
from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------------------------------- #
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
# --------------------------------------------- FLANA_BOT --------------------------------------------- #
|
# --------------------------------------------- FLANA_BOT --------------------------------------------- #
|
||||||
# ----------------------------------------------------------------------------------------------------- #
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
class FlanaBot(MultiBot, ABC):
|
class FlanaBot(PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBot, ABC):
|
||||||
Chat = Chat
|
Chat = Chat
|
||||||
Message = Message
|
Message = Message
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.lock = asyncio.Lock()
|
|
||||||
|
|
||||||
# ----------------------------------------------------------- #
|
# ----------------------------------------------------------- #
|
||||||
# -------------------- PROTECTED METHODS -------------------- #
|
# -------------------- PROTECTED METHODS -------------------- #
|
||||||
# ----------------------------------------------------------- #
|
# ----------------------------------------------------------- #
|
||||||
def _add_handlers(self):
|
def _add_handlers(self):
|
||||||
super()._add_handlers()
|
super()._add_handlers()
|
||||||
self.register(self._on_ban, multibot_constants.KEYWORDS['ban'])
|
|
||||||
|
|
||||||
self.register(self._on_bye, multibot_constants.KEYWORDS['bye'])
|
self.register(self._on_bye, multibot_constants.KEYWORDS['bye'])
|
||||||
|
|
||||||
self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2)
|
|
||||||
self.register(self._on_choose, constants.KEYWORDS['random'], priority=2)
|
|
||||||
self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2)
|
|
||||||
|
|
||||||
self.register(self._on_config, multibot_constants.KEYWORDS['config'])
|
self.register(self._on_config, multibot_constants.KEYWORDS['config'])
|
||||||
self.register(self._on_config, (multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
|
self.register(self._on_config, (multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
|
||||||
|
|
||||||
self.register(self._on_delete, multibot_constants.KEYWORDS['delete'])
|
self.register(self._on_delete, multibot_constants.KEYWORDS['delete'])
|
||||||
self.register(self._on_delete, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
self.register(self._on_delete, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
||||||
|
|
||||||
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
|
|
||||||
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
|
|
||||||
|
|
||||||
self.register(self._on_dice, constants.KEYWORDS['dice'])
|
|
||||||
|
|
||||||
self.register(self._on_hello, multibot_constants.KEYWORDS['hello'])
|
self.register(self._on_hello, multibot_constants.KEYWORDS['hello'])
|
||||||
|
|
||||||
self.register(self._on_mute, multibot_constants.KEYWORDS['mute'])
|
|
||||||
self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute']))
|
|
||||||
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
|
|
||||||
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
|
|
||||||
|
|
||||||
self.register(self._on_new_message_default, default=True)
|
self.register(self._on_new_message_default, default=True)
|
||||||
|
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
|
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
|
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
|
||||||
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
|
||||||
|
|
||||||
self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2)
|
|
||||||
|
|
||||||
self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
|
|
||||||
|
|
||||||
self.register(self._on_punish, constants.KEYWORDS['punish'])
|
|
||||||
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
|
|
||||||
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
|
|
||||||
|
|
||||||
self.register(self._on_recover_message, multibot_constants.KEYWORDS['reset'])
|
self.register(self._on_recover_message, multibot_constants.KEYWORDS['reset'])
|
||||||
self.register(self._on_recover_message, (multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
|
self.register(self._on_recover_message, (multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
|
||||||
|
|
||||||
@@ -88,131 +54,15 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
self.register(self._on_roles, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
|
self.register(self._on_roles, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
|
||||||
self.register(self._on_roles, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
|
self.register(self._on_roles, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
|
||||||
|
|
||||||
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
|
|
||||||
|
|
||||||
self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio'])
|
|
||||||
self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
|
||||||
|
|
||||||
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
|
|
||||||
|
|
||||||
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate'])
|
|
||||||
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
|
|
||||||
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop'])
|
|
||||||
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
|
|
||||||
|
|
||||||
self.register(self._on_unban, multibot_constants.KEYWORDS['unban'])
|
|
||||||
|
|
||||||
self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute'])
|
|
||||||
self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
|
|
||||||
self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
|
|
||||||
|
|
||||||
self.register(self._on_unpunish, constants.KEYWORDS['unpunish'])
|
|
||||||
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
|
|
||||||
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
|
|
||||||
|
|
||||||
self.register(self._on_users, multibot_constants.KEYWORDS['user'])
|
self.register(self._on_users, multibot_constants.KEYWORDS['user'])
|
||||||
|
|
||||||
self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
|
|
||||||
|
|
||||||
self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
|
|
||||||
|
|
||||||
self.register(self._on_weather, constants.KEYWORDS['weather_chart'])
|
|
||||||
self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather_chart']))
|
|
||||||
|
|
||||||
self.register_button(self._on_config_button_press, ButtonsGroup.CONFIG)
|
self.register_button(self._on_config_button_press, ButtonsGroup.CONFIG)
|
||||||
self.register_button(self._on_poll_button_press, ButtonsGroup.POLL)
|
|
||||||
self.register_button(self._on_roles_button_press, ButtonsGroup.ROLES)
|
self.register_button(self._on_roles_button_press, ButtonsGroup.ROLES)
|
||||||
self.register_button(self._on_users_button_press, ButtonsGroup.USERS)
|
self.register_button(self._on_users_button_press, ButtonsGroup.USERS)
|
||||||
self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER)
|
|
||||||
|
|
||||||
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
|
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@admin(False)
|
|
||||||
@group
|
|
||||||
async def _check_message_flood(self, message: Message):
|
|
||||||
if await self.is_punished(message.author, message.chat):
|
|
||||||
return
|
|
||||||
|
|
||||||
last_2s_messages = self.Message.find({
|
|
||||||
'platform': self.platform.value,
|
|
||||||
'author': message.author.object_id,
|
|
||||||
'chat': message.chat.object_id,
|
|
||||||
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=2)}
|
|
||||||
})
|
|
||||||
last_7s_messages = self.Message.find({
|
|
||||||
'platform': self.platform.value,
|
|
||||||
'author': message.author.object_id,
|
|
||||||
'chat': message.chat.object_id,
|
|
||||||
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=7)}
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(last_2s_messages) > constants.FLOOD_2s_LIMIT or len(last_7s_messages) > constants.FLOOD_7s_LIMIT:
|
|
||||||
punishment = Punishment.find_one({
|
|
||||||
'platform': self.platform.value,
|
|
||||||
'user_id': message.author.id,
|
|
||||||
'group_id': message.chat.group_id
|
|
||||||
})
|
|
||||||
punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT
|
|
||||||
try:
|
|
||||||
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message)
|
|
||||||
except BadRoleError as e:
|
|
||||||
await self._manage_exceptions(e, message)
|
|
||||||
else:
|
|
||||||
await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message)
|
|
||||||
|
|
||||||
def _distribute_buttons(self, texts: Sequence[str]) -> list[list[str]]:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _filter_mention_ids(self, text: str | Iterable[str], message: Message, delete_names=False) -> list[str]:
|
|
||||||
if isinstance(text, str):
|
|
||||||
words = text.split()
|
|
||||||
else:
|
|
||||||
words = text
|
|
||||||
|
|
||||||
ids = []
|
|
||||||
if delete_names:
|
|
||||||
for user in message.mentions:
|
|
||||||
ids.append(user.name.lower())
|
|
||||||
ids.append(user.name.split('#')[0].lower())
|
|
||||||
ids.append(str(user.id))
|
|
||||||
else:
|
|
||||||
for user in message.mentions:
|
|
||||||
ids.append(str(user.id))
|
|
||||||
for role in await self.get_group_roles(message):
|
|
||||||
ids.append(str(role.id))
|
|
||||||
|
|
||||||
return [word for word in words if flanautils.remove_symbols(word).strip() not in ids]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]:
|
|
||||||
options = (option for option in text.split() if not flanautils.cartesian_product_string_matching(option.lower(), discarded_words, min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT))
|
|
||||||
text = ' '.join(options)
|
|
||||||
|
|
||||||
conjunctions = [f' {conjunction} ' for conjunction in flanautils.CommonWords.get('conjunctions')]
|
|
||||||
if any(char in text for char in (',', ';', *conjunctions)):
|
|
||||||
conjunction_parts = [f'(?:[,;]*{conjunction}[,;]*)+' for conjunction in conjunctions]
|
|
||||||
options = re.split(f"{'|'.join(conjunction_parts)}|[,;]+", text)
|
|
||||||
return [option.strip() for option in options if option]
|
|
||||||
else:
|
|
||||||
return text.split()
|
|
||||||
|
|
||||||
async def _get_poll_message(self, message: Message) -> Message | None:
|
|
||||||
if poll_message := message.replied_message:
|
|
||||||
if poll_message.contents.get('poll') is None:
|
|
||||||
return
|
|
||||||
return poll_message
|
|
||||||
elif (
|
|
||||||
(message.chat.is_private or self.is_bot_mentioned(message))
|
|
||||||
and
|
|
||||||
flanautils.cartesian_product_string_matching(message.text, constants.KEYWORDS['poll'], min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT)
|
|
||||||
and
|
|
||||||
(poll_message := self.Message.find_one({'contents.poll.is_active': True}, sort_keys=(('date', pymongo.DESCENDING),)))
|
|
||||||
):
|
|
||||||
return await self.get_message(poll_message.chat.id, poll_message.id)
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
@return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals())
|
@return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals())
|
||||||
async def _manage_exceptions(self, exceptions: BaseException | Iterable[BaseException], context: Chat | Message):
|
async def _manage_exceptions(self, exceptions: BaseException | Iterable[BaseException], context: Chat | Message):
|
||||||
if not isinstance(exceptions, Iterable):
|
if not isinstance(exceptions, Iterable):
|
||||||
@@ -232,49 +82,6 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
await super()._manage_exceptions(e, context)
|
await super()._manage_exceptions(e, context)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
|
||||||
medias_count = {
|
|
||||||
Source.TWITTER: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.INSTAGRAM: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.TIKTOK: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.REDDIT: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
Source.YOUTUBE: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
|
||||||
None: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}
|
|
||||||
}
|
|
||||||
for media in medias:
|
|
||||||
medias_count[media.source][media.type_] += 1
|
|
||||||
|
|
||||||
medias_sended_info = []
|
|
||||||
for source, media_type_count in medias_count.items():
|
|
||||||
source_medias_sended_info = []
|
|
||||||
for media_type, count in media_type_count.items():
|
|
||||||
if count:
|
|
||||||
if count == 1:
|
|
||||||
type_text = {MediaType.IMAGE: 'imagen',
|
|
||||||
MediaType.AUDIO: 'audio',
|
|
||||||
MediaType.GIF: 'gif',
|
|
||||||
MediaType.VIDEO: 'vídeo',
|
|
||||||
None: 'cosa que no sé que tipo de archivo es',
|
|
||||||
MediaType.ERROR: 'error'}[media_type]
|
|
||||||
else:
|
|
||||||
type_text = {MediaType.IMAGE: 'imágenes',
|
|
||||||
MediaType.AUDIO: 'audios',
|
|
||||||
MediaType.GIF: 'gifs',
|
|
||||||
MediaType.VIDEO: 'vídeos',
|
|
||||||
None: 'cosas que no sé que tipos de archivos son',
|
|
||||||
MediaType.ERROR: 'errores'}[media_type]
|
|
||||||
source_medias_sended_info.append(f'{count} {type_text}')
|
|
||||||
if source_medias_sended_info:
|
|
||||||
medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source.name if source else 'algún sitio'}")
|
|
||||||
|
|
||||||
medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n')
|
|
||||||
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
|
||||||
return f'{new_line}{medias_sended_info_joined}:'
|
|
||||||
|
|
||||||
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _role_state_options(self, group_: int | str | Chat | Message, activated_user_role_names: list[str]) -> list[str]:
|
async def _role_state_options(self, group_: int | str | Chat | Message, activated_user_role_names: list[str]) -> list[str]:
|
||||||
options = []
|
options = []
|
||||||
for role in await self._changeable_roles(group_):
|
for role in await self._changeable_roles(group_):
|
||||||
@@ -289,144 +96,14 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
|
|
||||||
return options
|
return options
|
||||||
|
|
||||||
async def _scrape_and_send(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
|
||||||
kwargs = {}
|
|
||||||
if self._parse_callbacks(message.text, [RegisteredCallback(..., [['sin'], ['timeout', 'limite']])]):
|
|
||||||
kwargs['timeout_for_media'] = None
|
|
||||||
if not (medias := await self._search_medias(message, audio_only, **kwargs)):
|
|
||||||
return OrderedSet()
|
|
||||||
|
|
||||||
sended_media_messages, _ = await self.send_medias(medias, message)
|
|
||||||
sended_media_messages = OrderedSet(sended_media_messages)
|
|
||||||
|
|
||||||
await self.send_inline_results(message)
|
|
||||||
|
|
||||||
return sended_media_messages
|
|
||||||
|
|
||||||
async def _scrape_send_and_delete(
|
|
||||||
self,
|
|
||||||
message: Message,
|
|
||||||
audio_only=False,
|
|
||||||
sended_media_messages: OrderedSet[Media] = None
|
|
||||||
) -> OrderedSet[Media]:
|
|
||||||
if sended_media_messages is None:
|
|
||||||
sended_media_messages = OrderedSet()
|
|
||||||
|
|
||||||
sended_media_messages += await self._scrape_and_send(message, audio_only)
|
|
||||||
|
|
||||||
if (
|
|
||||||
sended_media_messages
|
|
||||||
and
|
|
||||||
message.chat.is_group
|
|
||||||
and
|
|
||||||
not message.replied_message
|
|
||||||
and
|
|
||||||
message.chat.config['delete_original']
|
|
||||||
):
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save()
|
|
||||||
await self.delete_message(message)
|
|
||||||
|
|
||||||
return sended_media_messages
|
|
||||||
|
|
||||||
async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]:
|
|
||||||
medias = OrderedSet()
|
|
||||||
|
|
||||||
tweet_ids = twitter.find_tweet_ids(message.text)
|
|
||||||
instagram_ids = instagram.find_instagram_ids(message.text)
|
|
||||||
tiktok_ids = await tiktok.find_tiktok_ids(message.text)
|
|
||||||
tiktok_download_urls = tiktok.find_download_urls(message.text)
|
|
||||||
youtube_ids = youtube.find_youtube_ids(message.text)
|
|
||||||
|
|
||||||
if not any((tweet_ids, instagram_ids, tiktok_ids, tiktok_download_urls, youtube_ids)):
|
|
||||||
return medias
|
|
||||||
|
|
||||||
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
|
|
||||||
|
|
||||||
gather_result = asyncio.gather(
|
|
||||||
twitter.get_medias(tweet_ids, audio_only),
|
|
||||||
instagram.get_medias(instagram_ids, audio_only),
|
|
||||||
tiktok.get_medias(tiktok_ids, tiktok_download_urls, audio_only),
|
|
||||||
youtube.get_medias(youtube_ids, audio_only, timeout_for_media),
|
|
||||||
return_exceptions=True
|
|
||||||
)
|
|
||||||
|
|
||||||
await gather_result
|
|
||||||
await self.delete_message(bot_state_message)
|
|
||||||
|
|
||||||
medias, exceptions = flanautils.filter_exceptions(gather_result.result())
|
|
||||||
await self._manage_exceptions(exceptions, message)
|
|
||||||
|
|
||||||
return OrderedSet(*medias)
|
|
||||||
|
|
||||||
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _update_poll_buttons(self, message: Message):
|
|
||||||
if message.contents['poll']['is_multiple_answer']:
|
|
||||||
total_votes = len({option_vote[0] for option_votes in message.contents['poll']['votes'].values() if option_votes for option_vote in option_votes})
|
|
||||||
else:
|
|
||||||
total_votes = sum(len(option_votes) for option_votes in message.contents['poll']['votes'].values())
|
|
||||||
|
|
||||||
if total_votes:
|
|
||||||
buttons = []
|
|
||||||
for option, option_votes in message.contents['poll']['votes'].items():
|
|
||||||
ratio = f'{len(option_votes)}/{total_votes}'
|
|
||||||
names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else ''
|
|
||||||
buttons.append(f'{option} ➜ {ratio} {names}')
|
|
||||||
else:
|
|
||||||
buttons = list(message.contents['poll']['votes'].keys())
|
|
||||||
|
|
||||||
await self.edit(self._distribute_buttons(buttons), message)
|
|
||||||
|
|
||||||
# ---------------------------------------------- #
|
# ---------------------------------------------- #
|
||||||
# HANDLERS #
|
# HANDLERS #
|
||||||
# ---------------------------------------------- #
|
# ---------------------------------------------- #
|
||||||
@bot_mentioned
|
|
||||||
@group
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_ban(self, message: Message):
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.ban(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
|
||||||
|
|
||||||
async def _on_bye(self, message: Message):
|
async def _on_bye(self, message: Message):
|
||||||
if message.chat.is_private or self.is_bot_mentioned(message):
|
if message.chat.is_private or self.is_bot_mentioned(message):
|
||||||
await self.send_bye(message)
|
await self.send_bye(message)
|
||||||
|
|
||||||
async def _on_choose(self, message: Message):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
discarded_words = {
|
|
||||||
*constants.KEYWORDS['choose'],
|
|
||||||
*constants.KEYWORDS['random'],
|
|
||||||
self.name.lower(), f'<@{self.id}>',
|
|
||||||
'entre', 'between'
|
|
||||||
}
|
|
||||||
|
|
||||||
if options := self._get_options(message.text, discarded_words):
|
|
||||||
for i in range(1, len(options) - 1):
|
|
||||||
try:
|
|
||||||
n1 = flanautils.cast_number(options[i - 1])
|
|
||||||
except ValueError:
|
|
||||||
try:
|
|
||||||
n1 = flanautils.words_to_numbers(options[i - 1], ignore_no_numbers=False)
|
|
||||||
except KeyError:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
n2 = flanautils.cast_number(options[i + 1])
|
|
||||||
except ValueError:
|
|
||||||
try:
|
|
||||||
n2 = flanautils.words_to_numbers(options[i + 1], ignore_no_numbers=False)
|
|
||||||
except KeyError:
|
|
||||||
continue
|
|
||||||
if options[i] in ('al', 'to'):
|
|
||||||
await self.send(random.randint(math.ceil(n1), math.floor(n2)), message)
|
|
||||||
return
|
|
||||||
await self.send(random.choice(options), message)
|
|
||||||
else:
|
|
||||||
await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message)
|
|
||||||
|
|
||||||
@group
|
@group
|
||||||
@bot_mentioned
|
@bot_mentioned
|
||||||
async def _on_config(self, message: Message):
|
async def _on_config(self, message: Message):
|
||||||
@@ -472,39 +149,10 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
except LimitError as e:
|
except LimitError as e:
|
||||||
await self._manage_exceptions(e, message)
|
await self._manage_exceptions(e, message)
|
||||||
|
|
||||||
@admin
|
|
||||||
async def _on_delete_votes(self, message: Message):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
|
||||||
return
|
|
||||||
|
|
||||||
await self.delete_message(message)
|
|
||||||
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
for option_name, option_votes in poll_message.contents['poll']['votes'].items():
|
|
||||||
poll_message.contents['poll']['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id]
|
|
||||||
|
|
||||||
await self._update_poll_buttons(poll_message)
|
|
||||||
|
|
||||||
async def _on_dice(self, message: Message):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
if top_number := flanautils.sum_numbers_in_text(message.text):
|
|
||||||
await self.send(random.randint(1, math.floor(top_number)), message)
|
|
||||||
else:
|
|
||||||
await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message)
|
|
||||||
|
|
||||||
async def _on_hello(self, message: Message):
|
async def _on_hello(self, message: Message):
|
||||||
if message.chat.is_private or self.is_bot_mentioned(message):
|
if message.chat.is_private or self.is_bot_mentioned(message):
|
||||||
await self.send_hello(message)
|
await self.send_hello(message)
|
||||||
|
|
||||||
@group
|
|
||||||
@bot_mentioned
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_mute(self, message: Message):
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.mute(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
|
||||||
|
|
||||||
async def _on_new_message_default(self, message: Message):
|
async def _on_new_message_default(self, message: Message):
|
||||||
if message.is_inline:
|
if message.is_inline:
|
||||||
await self._scrape_and_send(message)
|
await self._scrape_and_send(message)
|
||||||
@@ -543,96 +191,6 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
):
|
):
|
||||||
await self.send_insult(message)
|
await self.send_insult(message)
|
||||||
|
|
||||||
@ignore_self_message
|
|
||||||
async def _on_new_message_raw(self, message: Message):
|
|
||||||
await super()._on_new_message_raw(message)
|
|
||||||
if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline:
|
|
||||||
async with self.lock:
|
|
||||||
await self._check_message_flood(message)
|
|
||||||
|
|
||||||
async def _on_no_delete_original(self, message: Message):
|
|
||||||
if not await self._scrape_and_send(message):
|
|
||||||
await self._on_recover_message(message)
|
|
||||||
|
|
||||||
async def _on_poll(self, message: Message, is_multiple_answer=False):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message):
|
|
||||||
return
|
|
||||||
|
|
||||||
await self.delete_message(message)
|
|
||||||
|
|
||||||
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
|
|
||||||
if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]:
|
|
||||||
await self.send(
|
|
||||||
f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...",
|
|
||||||
self._distribute_buttons(final_options),
|
|
||||||
message,
|
|
||||||
buttons_key=ButtonsGroup.POLL,
|
|
||||||
contents={'poll': {
|
|
||||||
'is_active': True,
|
|
||||||
'is_multiple_answer': is_multiple_answer,
|
|
||||||
'votes': {option: [] for option in final_options},
|
|
||||||
'banned_users_tries': {}
|
|
||||||
}}
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await self.send(random.choice(('¿Y las opciones?', '?', '🤔')), message)
|
|
||||||
|
|
||||||
async def _on_poll_button_press(self, message: Message):
|
|
||||||
await self.accept_button_event(message)
|
|
||||||
if not message.contents['poll']['is_active']:
|
|
||||||
return
|
|
||||||
|
|
||||||
presser_id = message.buttons_info.presser_user.id
|
|
||||||
presser_name = message.buttons_info.presser_user.name.split('#')[0]
|
|
||||||
if (presser_id_str := str(presser_id)) in message.contents['poll']['banned_users_tries']:
|
|
||||||
message.contents['poll']['banned_users_tries'][presser_id_str] += 1
|
|
||||||
message.save()
|
|
||||||
if message.contents['poll']['banned_users_tries'][presser_id_str] == 3:
|
|
||||||
await self.send(random.choice((
|
|
||||||
f'Deja de dar por culo {presser_name}, que no puedes votar aqui.',
|
|
||||||
f'No es pesao {presser_name}, que no tienes permitido votar aqui',
|
|
||||||
f'Deja de pulsar botones que no puedes votar aqui {presser_name}',
|
|
||||||
f'{presser_name} deja de intentar votar aqui que no puedes',
|
|
||||||
f'Te han prohibido votar aqui {presser_name}',
|
|
||||||
f'No puedes votar aqui {presser_name}'
|
|
||||||
)), reply_to=message)
|
|
||||||
return
|
|
||||||
|
|
||||||
option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text
|
|
||||||
selected_option_votes = message.contents['poll']['votes'][option_name]
|
|
||||||
|
|
||||||
if [presser_id, presser_name] in selected_option_votes:
|
|
||||||
selected_option_votes.remove([presser_id, presser_name])
|
|
||||||
else:
|
|
||||||
if not message.contents['poll']['is_multiple_answer']:
|
|
||||||
for option_votes in message.contents['poll']['votes'].values():
|
|
||||||
try:
|
|
||||||
option_votes.remove([presser_id, presser_name])
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
selected_option_votes.append((presser_id, presser_name))
|
|
||||||
|
|
||||||
await self._update_poll_buttons(message)
|
|
||||||
|
|
||||||
async def _on_poll_multi(self, message: Message):
|
|
||||||
await self._on_poll(message, is_multiple_answer=True)
|
|
||||||
|
|
||||||
@bot_mentioned
|
|
||||||
@group
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_punish(self, message: Message):
|
|
||||||
if not message.chat.config['punish']:
|
|
||||||
return
|
|
||||||
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.punish(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
|
||||||
|
|
||||||
async def _on_ready(self):
|
|
||||||
await super()._on_ready()
|
|
||||||
await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
|
|
||||||
|
|
||||||
@inline(False)
|
@inline(False)
|
||||||
async def _on_recover_message(self, message: Message):
|
async def _on_recover_message(self, message: Message):
|
||||||
if message.replied_message and message.replied_message.author.id == self.id:
|
if message.replied_message and message.replied_message.author.id == self.id:
|
||||||
@@ -694,79 +252,6 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
|
|
||||||
message.buttons_info.presser_user.save()
|
message.buttons_info.presser_user.save()
|
||||||
|
|
||||||
async def _on_scraping(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
|
||||||
sended_media_messages = OrderedSet()
|
|
||||||
|
|
||||||
if message.replied_message:
|
|
||||||
sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only)
|
|
||||||
|
|
||||||
return await self._scrape_send_and_delete(message, audio_only, sended_media_messages)
|
|
||||||
|
|
||||||
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
|
|
||||||
return await self._on_scraping(message, audio_only=True)
|
|
||||||
|
|
||||||
@reply
|
|
||||||
async def _on_song_info(self, message: Message):
|
|
||||||
song_infos = message.replied_message.song_infos if message.replied_message else []
|
|
||||||
|
|
||||||
if song_infos:
|
|
||||||
for song_info in song_infos:
|
|
||||||
await self.send_song_info(song_info, message)
|
|
||||||
elif message.chat.is_private or self.is_bot_mentioned(message):
|
|
||||||
await self._manage_exceptions(SendError('No hay información musical en ese mensaje.'), message)
|
|
||||||
|
|
||||||
async def _on_stop_poll(self, message: Message):
|
|
||||||
if not (poll_message := await self._get_poll_message(message)):
|
|
||||||
return
|
|
||||||
|
|
||||||
winners = []
|
|
||||||
max_votes = 1
|
|
||||||
for option, votes in poll_message.contents['poll']['votes'].items():
|
|
||||||
if len(votes) > max_votes:
|
|
||||||
winners = [option]
|
|
||||||
max_votes = len(votes)
|
|
||||||
elif len(votes) == max_votes:
|
|
||||||
winners.append(option)
|
|
||||||
|
|
||||||
match winners:
|
|
||||||
case [_, _, *_]:
|
|
||||||
winners = [f'<b>{winner}</b>' for winner in winners]
|
|
||||||
text = f"Encuesta finalizada. Los ganadores son: {flanautils.join_last_separator(winners, ', ', ' y ')}."
|
|
||||||
case [winner]:
|
|
||||||
text = f'Encuesta finalizada. Ganador: <b>{winner}</b>.'
|
|
||||||
case _:
|
|
||||||
text = 'Encuesta finalizada.'
|
|
||||||
|
|
||||||
poll_message.contents['poll']['is_active'] = False
|
|
||||||
|
|
||||||
await self.edit(text, poll_message)
|
|
||||||
if not message.replied_message:
|
|
||||||
await self.send(text, reply_to=poll_message)
|
|
||||||
|
|
||||||
@bot_mentioned
|
|
||||||
@group
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_unban(self, message: Message):
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.unban(user, message, message)
|
|
||||||
|
|
||||||
@group
|
|
||||||
@bot_mentioned
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_unmute(self, message: Message):
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.unmute(user, message, message)
|
|
||||||
|
|
||||||
@group
|
|
||||||
@bot_mentioned
|
|
||||||
@admin(send_negative=True)
|
|
||||||
async def _on_unpunish(self, message: Message):
|
|
||||||
if not message.chat.config['punish']:
|
|
||||||
return
|
|
||||||
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
await self.unpunish(user, message, message)
|
|
||||||
|
|
||||||
@group
|
@group
|
||||||
@bot_mentioned
|
@bot_mentioned
|
||||||
async def _on_users(self, message: Message):
|
async def _on_users(self, message: Message):
|
||||||
@@ -803,248 +288,15 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
message
|
message
|
||||||
)
|
)
|
||||||
|
|
||||||
@admin
|
|
||||||
async def _on_voting_ban(self, message: Message):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
|
||||||
return
|
|
||||||
|
|
||||||
await self.delete_message(message)
|
|
||||||
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
if str(user.id) not in poll_message.contents['poll']['banned_users_tries']:
|
|
||||||
poll_message.contents['poll']['banned_users_tries'][str(user.id)] = 0
|
|
||||||
poll_message.save()
|
|
||||||
|
|
||||||
@admin
|
|
||||||
async def _on_voting_unban(self, message: Message):
|
|
||||||
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
|
||||||
return
|
|
||||||
|
|
||||||
await self.delete_message(message)
|
|
||||||
|
|
||||||
for user in await self._find_users_to_punish(message):
|
|
||||||
try:
|
|
||||||
del poll_message.contents['poll']['banned_users_tries'][str(user.id)]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
poll_message.save()
|
|
||||||
|
|
||||||
async def _on_weather(self, message: Message):
|
|
||||||
bot_state_message: Message | None = None
|
|
||||||
if message.is_inline:
|
|
||||||
show_progress_state = False
|
|
||||||
elif message.chat.is_group and not self.is_bot_mentioned(message):
|
|
||||||
if message.chat.config['weather_chart']:
|
|
||||||
if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}):
|
|
||||||
return
|
|
||||||
show_progress_state = False
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
show_progress_state = True
|
|
||||||
|
|
||||||
original_text_words = flanautils.remove_accents(message.text.lower())
|
|
||||||
original_text_words = flanautils.remove_symbols(original_text_words, ignore=('-', '.'), replace_with=' ').split()
|
|
||||||
original_text_words = await self._filter_mention_ids(original_text_words, message, delete_names=True)
|
|
||||||
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
place_words = (
|
|
||||||
OrderedSet(original_text_words)
|
|
||||||
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
|
|
||||||
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather_chart'], min_score=0.85).keys()
|
|
||||||
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
|
|
||||||
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
|
|
||||||
- flanautils.CommonWords.get()
|
|
||||||
)
|
|
||||||
if not place_words:
|
|
||||||
if not message.is_inline:
|
|
||||||
await self.send_error(random.choice(('¿Tiempo dónde?', 'Indica el sitio.', 'Y el sitio?', 'y el sitio? me lo invento?')), message)
|
|
||||||
return
|
|
||||||
|
|
||||||
if 'calle' in original_text_words:
|
|
||||||
place_words.insert(0, 'calle')
|
|
||||||
place_query = ' '.join(place_words)
|
|
||||||
if len(place_query) >= constants.MAX_PLACE_QUERY_LENGTH:
|
|
||||||
if show_progress_state:
|
|
||||||
await self.send_error(Media(str(flanautils.resolve_path('resources/mucho_texto.png')), MediaType.IMAGE, 'jpg', Source.LOCAL), message, send_as_file=False)
|
|
||||||
return
|
|
||||||
if show_progress_state:
|
|
||||||
bot_state_message = await self.send(f'Buscando "{place_query}" en el mapa 🧐...', message)
|
|
||||||
|
|
||||||
result: str | Place | None = None
|
|
||||||
async for result in flanaapis.geolocation.functions.find_place_showing_progress(place_query):
|
|
||||||
if isinstance(result, str) and bot_state_message:
|
|
||||||
await self.edit(result, bot_state_message)
|
|
||||||
|
|
||||||
place: Place = result
|
|
||||||
if not place:
|
|
||||||
if bot_state_message:
|
|
||||||
await self.delete_message(bot_state_message)
|
|
||||||
await self._manage_exceptions(PlaceNotFoundError(place_query), message)
|
|
||||||
return
|
|
||||||
|
|
||||||
if bot_state_message:
|
|
||||||
bot_state_message = await self.edit(f'Obteniendo datos del tiempo para "{place_query}"...', bot_state_message)
|
|
||||||
current_weather, day_weathers = await flanaapis.weather.functions.get_day_weathers_by_place(place)
|
|
||||||
|
|
||||||
if bot_state_message:
|
|
||||||
bot_state_message = await self.edit('Creando gráficas del tiempo...', bot_state_message)
|
|
||||||
|
|
||||||
weather_chart = WeatherChart(
|
|
||||||
_font={'size': 30},
|
|
||||||
_title={
|
|
||||||
'text': place.name[:40].strip(' ,-'),
|
|
||||||
'xref': 'paper',
|
|
||||||
'yref': 'paper',
|
|
||||||
'xanchor': 'left',
|
|
||||||
'yanchor': 'top',
|
|
||||||
'x': 0.025,
|
|
||||||
'y': 0.975,
|
|
||||||
'font': {
|
|
||||||
'size': 50,
|
|
||||||
'family': 'open sans'
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_legend={'x': 0.99, 'y': 0.99, 'xanchor': 'right', 'yanchor': 'top', 'bgcolor': 'rgba(0,0,0,0)'},
|
|
||||||
_margin={'l': 20, 'r': 20, 't': 20, 'b': 20},
|
|
||||||
trace_metadatas={
|
|
||||||
'temperature': TraceMetadata(name='temperature', group='temperature', legend='Temperatura', show=False, color='#ff8400', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
|
|
||||||
'temperature_feel': TraceMetadata(name='temperature_feel', group='temperature', legend='Sensación de temperatura', show=True, color='red', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
|
|
||||||
'clouds': TraceMetadata(name='clouds', legend='Nubes', show=False, color='#86abe3', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
|
|
||||||
'visibility': TraceMetadata(name='visibility', legend='Visibilidad', show=False, color='#c99a34', default_min=0, default_max='{max_y_data} * 2', y_tick_suffix=' km', y_delta_tick=2, hide_y_ticks_if='{tick} > {max_y_data}'),
|
|
||||||
'uvi': TraceMetadata(name='uvi', legend='UVI', show=False, color='#ffd000', default_min=-12, default_max=12, hide_y_ticks_if='{tick} < 0', y_delta_tick=1, y_axis_width=75),
|
|
||||||
'humidity': TraceMetadata(name='humidity', legend='Humedad', show=False, color='#2baab5', default_min=0, default_max=100, y_tick_suffix=' %'),
|
|
||||||
'precipitation_probability': TraceMetadata(name='precipitation_probability', legend='Probabilidad de precipitaciones', show=True, color='#0033ff', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
|
|
||||||
'rain_volume': TraceMetadata(plotly.graph_objects.Histogram, name='rain_volume', group='precipitation', legend='Volumen de lluvia', show=True, color='#34a4eb', opacity=0.3, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
|
|
||||||
'snow_volume': TraceMetadata(plotly.graph_objects.Histogram, name='snow_volume', group='precipitation', legend='Volumen de nieve', show=True, color='#34a4eb', opacity=0.8, pattern={'shape': '.', 'fgcolor': '#ffffff', 'bgcolor': '#b0d6f3', 'solidity': 0.5, 'size': 14}, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
|
|
||||||
'pressure': TraceMetadata(name='pressure', legend='Presión', show=False, color='#31a339', default_min=1013.25 - 90, default_max=1013.25 + 90, y_tick_suffix=' hPa', y_axis_width=225),
|
|
||||||
'wind_speed': TraceMetadata(name='wind_speed', legend='Velocidad del viento', show=False, color='#d8abff', default_min=-120, default_max=120, y_tick_suffix=' km/h', hide_y_ticks_if='{tick} < 0', y_axis_width=165)
|
|
||||||
},
|
|
||||||
x_data=[instant_weather.date_time for day_weather in day_weathers for instant_weather in day_weather.instant_weathers],
|
|
||||||
all_y_data=[],
|
|
||||||
current_weather=current_weather,
|
|
||||||
day_weathers=day_weathers,
|
|
||||||
timezone=(timezone := day_weathers[0].timezone),
|
|
||||||
place=place,
|
|
||||||
view_position=datetime.datetime.now(timezone)
|
|
||||||
)
|
|
||||||
|
|
||||||
weather_chart.apply_zoom()
|
|
||||||
weather_chart.draw()
|
|
||||||
if not (image_bytes := weather_chart.to_image()):
|
|
||||||
if bot_state_message:
|
|
||||||
await self.delete_message(bot_state_message)
|
|
||||||
raise NotFoundError('No hay suficientes datos del tiempo.')
|
|
||||||
|
|
||||||
if bot_state_message:
|
|
||||||
bot_state_message = await self.edit('Enviando...', bot_state_message)
|
|
||||||
bot_message: Message = await self.send(
|
|
||||||
Media(image_bytes, MediaType.IMAGE, 'jpg'),
|
|
||||||
[
|
|
||||||
[WeatherEmoji.ZOOM_IN.value, WeatherEmoji.ZOOM_OUT.value, WeatherEmoji.LEFT.value, WeatherEmoji.RIGHT.value],
|
|
||||||
[WeatherEmoji.TEMPERATURE.value, WeatherEmoji.TEMPERATURE_FEEL.value, WeatherEmoji.CLOUDS.value, WeatherEmoji.VISIBILITY.value, WeatherEmoji.UVI.value],
|
|
||||||
[WeatherEmoji.HUMIDITY.value, WeatherEmoji.PRECIPITATION_PROBABILITY.value, WeatherEmoji.PRECIPITATION_VOLUME.value, WeatherEmoji.PRESSURE.value, WeatherEmoji.WIND_SPEED.value]
|
|
||||||
],
|
|
||||||
message,
|
|
||||||
buttons_key=ButtonsGroup.WEATHER,
|
|
||||||
send_as_file=False
|
|
||||||
)
|
|
||||||
await self.send_inline_results(message)
|
|
||||||
|
|
||||||
if bot_state_message:
|
|
||||||
await self.delete_message(bot_state_message)
|
|
||||||
|
|
||||||
if bot_message:
|
|
||||||
bot_message.weather_chart = weather_chart
|
|
||||||
bot_message.save()
|
|
||||||
if not self.is_bot_mentioned(message):
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save()
|
|
||||||
|
|
||||||
async def _on_weather_button_press(self, message: Message):
|
|
||||||
await self.accept_button_event(message)
|
|
||||||
|
|
||||||
match message.buttons_info.pressed_text:
|
|
||||||
case WeatherEmoji.ZOOM_IN.value:
|
|
||||||
message.weather_chart.zoom_in()
|
|
||||||
case WeatherEmoji.ZOOM_OUT.value:
|
|
||||||
message.weather_chart.zoom_out()
|
|
||||||
case WeatherEmoji.LEFT.value:
|
|
||||||
message.weather_chart.move_left()
|
|
||||||
case WeatherEmoji.RIGHT.value:
|
|
||||||
message.weather_chart.move_right()
|
|
||||||
case WeatherEmoji.PRECIPITATION_VOLUME.value:
|
|
||||||
message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show
|
|
||||||
message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show
|
|
||||||
case emoji if emoji in WeatherEmoji.values:
|
|
||||||
trace_metadata_name = WeatherEmoji(emoji).name.lower()
|
|
||||||
message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show
|
|
||||||
case _:
|
|
||||||
return
|
|
||||||
|
|
||||||
message.weather_chart.apply_zoom()
|
|
||||||
message.weather_chart.draw()
|
|
||||||
|
|
||||||
image_bytes = message.weather_chart.to_image()
|
|
||||||
await self.edit(Media(image_bytes, MediaType.IMAGE, 'jpg'), message)
|
|
||||||
|
|
||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
# -------------------- PUBLIC METHODS -------------------- #
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
async def check_old_punishments(self):
|
|
||||||
punishments = Punishment.find({'platform': self.platform.value}, lazy=True)
|
|
||||||
|
|
||||||
for punishment in punishments:
|
|
||||||
now = datetime.datetime.now(datetime.timezone.utc)
|
|
||||||
if not punishment.until or now < punishment.until:
|
|
||||||
continue
|
|
||||||
|
|
||||||
await self._remove_penalty(punishment, self._unpunish, delete=False)
|
|
||||||
if punishment.is_active:
|
|
||||||
punishment.is_active = False
|
|
||||||
punishment.last_update = now
|
|
||||||
punishment.save()
|
|
||||||
|
|
||||||
if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now:
|
|
||||||
if punishment.level == 1:
|
|
||||||
punishment.delete()
|
|
||||||
else:
|
|
||||||
punishment.level -= 1
|
|
||||||
punishment.last_update = now
|
|
||||||
punishment.save()
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def clear_old_database_items(cls):
|
async def clear_old_database_items(cls):
|
||||||
await super().clear_old_database_items()
|
await super().clear_old_database_items()
|
||||||
before_date = datetime.datetime.now(datetime.timezone.utc) - multibot_constants.MESSAGE_EXPIRATION_TIME
|
before_date = datetime.datetime.now(datetime.timezone.utc) - multibot_constants.MESSAGE_EXPIRATION_TIME
|
||||||
BotAction.collection.delete_many({'date': {'$lte': before_date}})
|
BotAction.collection.delete_many({'date': {'$lte': before_date}})
|
||||||
|
|
||||||
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def punish(
|
|
||||||
self,
|
|
||||||
user: int | str | User,
|
|
||||||
group_: int | str | Chat | Message,
|
|
||||||
time: int | datetime.timedelta = None,
|
|
||||||
message: Message = None
|
|
||||||
):
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time)
|
|
||||||
punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',))
|
|
||||||
punishment.level += 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self._punish(punishment.user_id, punishment.group_id)
|
|
||||||
except BadRoleError as e:
|
|
||||||
if message and message.chat.original_object:
|
|
||||||
await self._manage_exceptions(e, message)
|
|
||||||
else:
|
|
||||||
raise e
|
|
||||||
else:
|
|
||||||
punishment.save(pull_exclude_fields=('until',))
|
|
||||||
await self._unpenalize_later(punishment, self._unpunish, message)
|
|
||||||
|
|
||||||
async def send_bye(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE:
|
async def send_bye(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE:
|
||||||
return await self.send(random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())), message)
|
return await self.send(random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())), message)
|
||||||
|
|
||||||
@@ -1054,60 +306,3 @@ class FlanaBot(MultiBot, ABC):
|
|||||||
async def send_insult(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE | None:
|
async def send_insult(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE | None:
|
||||||
await self.typing_delay(message)
|
await self.typing_delay(message)
|
||||||
return await self.send(random.choice(constants.INSULTS), message)
|
return await self.send(random.choice(constants.INSULTS), message)
|
||||||
|
|
||||||
async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]:
|
|
||||||
sended_media_messages = []
|
|
||||||
fails = 0
|
|
||||||
bot_state_message: Message | None = None
|
|
||||||
sended_info_message: Message | None = None
|
|
||||||
|
|
||||||
if not message.is_inline:
|
|
||||||
bot_state_message: Message = await self.send('Enviando...', message)
|
|
||||||
|
|
||||||
if message.chat.is_group:
|
|
||||||
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message)
|
|
||||||
|
|
||||||
for media in medias:
|
|
||||||
if not media.content:
|
|
||||||
fails += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if media.song_info:
|
|
||||||
message.song_infos.add(media.song_info)
|
|
||||||
message.save()
|
|
||||||
|
|
||||||
if bot_message := await self.send(media, message):
|
|
||||||
sended_media_messages.append(bot_message)
|
|
||||||
if media.song_info and bot_message:
|
|
||||||
bot_message.song_infos.add(media.song_info)
|
|
||||||
bot_message.save()
|
|
||||||
else:
|
|
||||||
fails += 1
|
|
||||||
|
|
||||||
if send_song_info and media.song_info:
|
|
||||||
await self.send_song_info(media.song_info, message)
|
|
||||||
|
|
||||||
if fails and sended_info_message:
|
|
||||||
if fails == len(medias):
|
|
||||||
await self.delete_message(sended_info_message)
|
|
||||||
if bot_state_message:
|
|
||||||
await self.delete_message(bot_state_message)
|
|
||||||
|
|
||||||
return sended_media_messages, fails
|
|
||||||
|
|
||||||
@return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals())
|
|
||||||
async def send_song_info(self, song_info: Media, message: Message):
|
|
||||||
attributes = (
|
|
||||||
f'Título: {song_info.title}\n' if song_info.title else '',
|
|
||||||
f'Autor: {song_info.author}\n' if song_info.author else '',
|
|
||||||
f'Álbum: {song_info.album}\n' if song_info.album else '',
|
|
||||||
f'Previa:'
|
|
||||||
)
|
|
||||||
await self.send(''.join(attributes), message)
|
|
||||||
if song_info:
|
|
||||||
await self.send(song_info, message)
|
|
||||||
|
|
||||||
async def unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_))
|
|
||||||
await self._remove_penalty(punishment, self._unpunish, message)
|
|
||||||
|
|||||||
205
flanabot/bots/penalty_bot.py
Normal file
205
flanabot/bots/penalty_bot.py
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
__all__ = ['PenaltyBot']
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
from abc import ABC
|
||||||
|
|
||||||
|
import flanautils
|
||||||
|
from flanautils import TimeUnits
|
||||||
|
from multibot import BadRoleError, MultiBot, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
|
||||||
|
|
||||||
|
from flanabot import constants
|
||||||
|
from flanabot.models import Chat, Message, Punishment
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
# --------------------------------------------- PENALTY_BOT --------------------------------------------- #
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
class PenaltyBot(MultiBot, ABC):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.lock = asyncio.Lock()
|
||||||
|
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
# -------------------- PROTECTED METHODS -------------------- #
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
def _add_handlers(self):
|
||||||
|
super()._add_handlers()
|
||||||
|
|
||||||
|
self.register(self._on_ban, multibot_constants.KEYWORDS['ban'])
|
||||||
|
|
||||||
|
self.register(self._on_mute, multibot_constants.KEYWORDS['mute'])
|
||||||
|
self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute']))
|
||||||
|
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
|
||||||
|
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
|
||||||
|
|
||||||
|
self.register(self._on_punish, constants.KEYWORDS['punish'])
|
||||||
|
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
|
||||||
|
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
|
||||||
|
|
||||||
|
self.register(self._on_unban, multibot_constants.KEYWORDS['unban'])
|
||||||
|
|
||||||
|
self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute'])
|
||||||
|
self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
|
||||||
|
self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
|
||||||
|
|
||||||
|
self.register(self._on_unpunish, constants.KEYWORDS['unpunish'])
|
||||||
|
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
|
||||||
|
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
|
||||||
|
|
||||||
|
@admin(False)
|
||||||
|
@group
|
||||||
|
async def _check_message_flood(self, message: Message):
|
||||||
|
if await self.is_punished(message.author, message.chat):
|
||||||
|
return
|
||||||
|
|
||||||
|
last_2s_messages = self.Message.find({
|
||||||
|
'platform': self.platform.value,
|
||||||
|
'author': message.author.object_id,
|
||||||
|
'chat': message.chat.object_id,
|
||||||
|
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=2)}
|
||||||
|
})
|
||||||
|
last_7s_messages = self.Message.find({
|
||||||
|
'platform': self.platform.value,
|
||||||
|
'author': message.author.object_id,
|
||||||
|
'chat': message.chat.object_id,
|
||||||
|
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=7)}
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(last_2s_messages) > constants.FLOOD_2s_LIMIT or len(last_7s_messages) > constants.FLOOD_7s_LIMIT:
|
||||||
|
punishment = Punishment.find_one({
|
||||||
|
'platform': self.platform.value,
|
||||||
|
'user_id': message.author.id,
|
||||||
|
'group_id': message.chat.group_id
|
||||||
|
})
|
||||||
|
punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT
|
||||||
|
try:
|
||||||
|
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message)
|
||||||
|
except BadRoleError as e:
|
||||||
|
await self._manage_exceptions(e, message)
|
||||||
|
else:
|
||||||
|
await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message)
|
||||||
|
|
||||||
|
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
# HANDLERS #
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
@bot_mentioned
|
||||||
|
@group
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_ban(self, message: Message):
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.ban(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
||||||
|
|
||||||
|
@group
|
||||||
|
@bot_mentioned
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_mute(self, message: Message):
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.mute(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
||||||
|
|
||||||
|
@ignore_self_message
|
||||||
|
async def _on_new_message_raw(self, message: Message):
|
||||||
|
await super()._on_new_message_raw(message)
|
||||||
|
if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline:
|
||||||
|
async with self.lock:
|
||||||
|
await self._check_message_flood(message)
|
||||||
|
|
||||||
|
@bot_mentioned
|
||||||
|
@group
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_punish(self, message: Message):
|
||||||
|
if not message.chat.config['punish']:
|
||||||
|
return
|
||||||
|
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.punish(user, message, flanautils.words_to_time(await self._filter_mention_ids(message.text, message)), message)
|
||||||
|
|
||||||
|
async def _on_ready(self):
|
||||||
|
await super()._on_ready()
|
||||||
|
await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
|
||||||
|
|
||||||
|
@bot_mentioned
|
||||||
|
@group
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_unban(self, message: Message):
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.unban(user, message, message)
|
||||||
|
|
||||||
|
@group
|
||||||
|
@bot_mentioned
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_unmute(self, message: Message):
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.unmute(user, message, message)
|
||||||
|
|
||||||
|
@group
|
||||||
|
@bot_mentioned
|
||||||
|
@admin(send_negative=True)
|
||||||
|
async def _on_unpunish(self, message: Message):
|
||||||
|
if not message.chat.config['punish']:
|
||||||
|
return
|
||||||
|
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
await self.unpunish(user, message, message)
|
||||||
|
|
||||||
|
# -------------------------------------------------------- #
|
||||||
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
|
# -------------------------------------------------------- #
|
||||||
|
async def check_old_punishments(self):
|
||||||
|
punishments = Punishment.find({'platform': self.platform.value}, lazy=True)
|
||||||
|
|
||||||
|
for punishment in punishments:
|
||||||
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
if not punishment.until or now < punishment.until:
|
||||||
|
continue
|
||||||
|
|
||||||
|
await self._remove_penalty(punishment, self._unpunish, delete=False)
|
||||||
|
if punishment.is_active:
|
||||||
|
punishment.is_active = False
|
||||||
|
punishment.last_update = now
|
||||||
|
punishment.save()
|
||||||
|
|
||||||
|
if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now:
|
||||||
|
if punishment.level == 1:
|
||||||
|
punishment.delete()
|
||||||
|
else:
|
||||||
|
punishment.level -= 1
|
||||||
|
punishment.last_update = now
|
||||||
|
punishment.save()
|
||||||
|
|
||||||
|
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def punish(
|
||||||
|
self,
|
||||||
|
user: int | str | User,
|
||||||
|
group_: int | str | Chat | Message,
|
||||||
|
time: int | datetime.timedelta = None,
|
||||||
|
message: Message = None
|
||||||
|
):
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time)
|
||||||
|
punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',))
|
||||||
|
punishment.level += 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._punish(punishment.user_id, punishment.group_id)
|
||||||
|
except BadRoleError as e:
|
||||||
|
if message and message.chat.original_object:
|
||||||
|
await self._manage_exceptions(e, message)
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
else:
|
||||||
|
punishment.save(pull_exclude_fields=('until',))
|
||||||
|
await self._unpenalize_later(punishment, self._unpunish, message)
|
||||||
|
|
||||||
|
async def unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_))
|
||||||
|
await self._remove_penalty(punishment, self._unpunish, message)
|
||||||
277
flanabot/bots/poll_bot.py
Normal file
277
flanabot/bots/poll_bot.py
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
__all__ = ['PollBot']
|
||||||
|
|
||||||
|
import math
|
||||||
|
import random
|
||||||
|
import re
|
||||||
|
from abc import ABC
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
import flanautils
|
||||||
|
import pymongo
|
||||||
|
from multibot import MultiBot, admin, constants as multibot_constants
|
||||||
|
|
||||||
|
from flanabot import constants
|
||||||
|
from flanabot.models import ButtonsGroup, Message
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
# --------------------------------------------- POLL_BOT --------------------------------------------- #
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
class PollBot(MultiBot, ABC):
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
# -------------------- PROTECTED METHODS -------------------- #
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
def _add_handlers(self):
|
||||||
|
super()._add_handlers()
|
||||||
|
|
||||||
|
self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2)
|
||||||
|
self.register(self._on_choose, constants.KEYWORDS['random'], priority=2)
|
||||||
|
self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2)
|
||||||
|
|
||||||
|
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
|
||||||
|
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
|
||||||
|
|
||||||
|
self.register(self._on_dice, constants.KEYWORDS['dice'])
|
||||||
|
|
||||||
|
self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2)
|
||||||
|
|
||||||
|
self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
|
||||||
|
|
||||||
|
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate'])
|
||||||
|
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
|
||||||
|
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop'])
|
||||||
|
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
|
||||||
|
|
||||||
|
self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
|
||||||
|
|
||||||
|
self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
|
||||||
|
|
||||||
|
self.register_button(self._on_poll_button_press, ButtonsGroup.POLL)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]:
|
||||||
|
options = (option for option in text.split() if not flanautils.cartesian_product_string_matching(option.lower(), discarded_words, min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT))
|
||||||
|
text = ' '.join(options)
|
||||||
|
|
||||||
|
conjunctions = [f' {conjunction} ' for conjunction in flanautils.CommonWords.get('conjunctions')]
|
||||||
|
if any(char in text for char in (',', ';', *conjunctions)):
|
||||||
|
conjunction_parts = [f'(?:[,;]*{conjunction}[,;]*)+' for conjunction in conjunctions]
|
||||||
|
options = re.split(f"{'|'.join(conjunction_parts)}|[,;]+", text)
|
||||||
|
return [option.strip() for option in options if option]
|
||||||
|
else:
|
||||||
|
return text.split()
|
||||||
|
|
||||||
|
async def _get_poll_message(self, message: Message) -> Message | None:
|
||||||
|
if poll_message := message.replied_message:
|
||||||
|
if poll_message.contents.get('poll') is None:
|
||||||
|
return
|
||||||
|
return poll_message
|
||||||
|
elif (
|
||||||
|
(message.chat.is_private or self.is_bot_mentioned(message))
|
||||||
|
and
|
||||||
|
flanautils.cartesian_product_string_matching(message.text, constants.KEYWORDS['poll'], min_score=multibot_constants.PARSE_CALLBACKS_MIN_SCORE_DEFAULT)
|
||||||
|
and
|
||||||
|
(poll_message := self.Message.find_one({'contents.poll.is_active': True}, sort_keys=(('date', pymongo.DESCENDING),)))
|
||||||
|
):
|
||||||
|
return await self.get_message(poll_message.chat.id, poll_message.id)
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
async def _update_poll_buttons(self, message: Message):
|
||||||
|
if message.contents['poll']['is_multiple_answer']:
|
||||||
|
total_votes = len({option_vote[0] for option_votes in message.contents['poll']['votes'].values() if option_votes for option_vote in option_votes})
|
||||||
|
else:
|
||||||
|
total_votes = sum(len(option_votes) for option_votes in message.contents['poll']['votes'].values())
|
||||||
|
|
||||||
|
if total_votes:
|
||||||
|
buttons = []
|
||||||
|
for option, option_votes in message.contents['poll']['votes'].items():
|
||||||
|
ratio = f'{len(option_votes)}/{total_votes}'
|
||||||
|
names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else ''
|
||||||
|
buttons.append(f'{option} ➜ {ratio} {names}')
|
||||||
|
else:
|
||||||
|
buttons = list(message.contents['poll']['votes'].keys())
|
||||||
|
|
||||||
|
await self.edit(self._distribute_buttons(buttons), message)
|
||||||
|
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
# HANDLERS #
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
async def _on_choose(self, message: Message):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
discarded_words = {
|
||||||
|
*constants.KEYWORDS['choose'],
|
||||||
|
*constants.KEYWORDS['random'],
|
||||||
|
self.name.lower(), f'<@{self.id}>',
|
||||||
|
'entre', 'between'
|
||||||
|
}
|
||||||
|
|
||||||
|
if options := self._get_options(message.text, discarded_words):
|
||||||
|
for i in range(1, len(options) - 1):
|
||||||
|
try:
|
||||||
|
n1 = flanautils.cast_number(options[i - 1])
|
||||||
|
except ValueError:
|
||||||
|
try:
|
||||||
|
n1 = flanautils.words_to_numbers(options[i - 1], ignore_no_numbers=False)
|
||||||
|
except KeyError:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
n2 = flanautils.cast_number(options[i + 1])
|
||||||
|
except ValueError:
|
||||||
|
try:
|
||||||
|
n2 = flanautils.words_to_numbers(options[i + 1], ignore_no_numbers=False)
|
||||||
|
except KeyError:
|
||||||
|
continue
|
||||||
|
if options[i] in ('al', 'to'):
|
||||||
|
await self.send(random.randint(math.ceil(n1), math.floor(n2)), message)
|
||||||
|
return
|
||||||
|
await self.send(random.choice(options), message)
|
||||||
|
else:
|
||||||
|
await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message)
|
||||||
|
|
||||||
|
@admin
|
||||||
|
async def _on_delete_votes(self, message: Message):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.delete_message(message)
|
||||||
|
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
for option_name, option_votes in poll_message.contents['poll']['votes'].items():
|
||||||
|
poll_message.contents['poll']['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id]
|
||||||
|
|
||||||
|
await self._update_poll_buttons(poll_message)
|
||||||
|
|
||||||
|
async def _on_dice(self, message: Message):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
if top_number := flanautils.sum_numbers_in_text(message.text):
|
||||||
|
await self.send(random.randint(1, math.floor(top_number)), message)
|
||||||
|
else:
|
||||||
|
await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message)
|
||||||
|
|
||||||
|
async def _on_poll(self, message: Message, is_multiple_answer=False):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message):
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.delete_message(message)
|
||||||
|
|
||||||
|
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
|
||||||
|
if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]:
|
||||||
|
await self.send(
|
||||||
|
f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...",
|
||||||
|
self._distribute_buttons(final_options),
|
||||||
|
message,
|
||||||
|
buttons_key=ButtonsGroup.POLL,
|
||||||
|
contents={'poll': {
|
||||||
|
'is_active': True,
|
||||||
|
'is_multiple_answer': is_multiple_answer,
|
||||||
|
'votes': {option: [] for option in final_options},
|
||||||
|
'banned_users_tries': {}
|
||||||
|
}}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await self.send(random.choice(('¿Y las opciones?', '?', '🤔')), message)
|
||||||
|
|
||||||
|
async def _on_poll_button_press(self, message: Message):
|
||||||
|
await self.accept_button_event(message)
|
||||||
|
if not message.contents['poll']['is_active']:
|
||||||
|
return
|
||||||
|
|
||||||
|
presser_id = message.buttons_info.presser_user.id
|
||||||
|
presser_name = message.buttons_info.presser_user.name.split('#')[0]
|
||||||
|
if (presser_id_str := str(presser_id)) in message.contents['poll']['banned_users_tries']:
|
||||||
|
message.contents['poll']['banned_users_tries'][presser_id_str] += 1
|
||||||
|
message.save()
|
||||||
|
if message.contents['poll']['banned_users_tries'][presser_id_str] == 3:
|
||||||
|
await self.send(random.choice((
|
||||||
|
f'Deja de dar por culo {presser_name} que no puedes votar aqui',
|
||||||
|
f'No es pesao {presser_name}, que no tienes permitido votar aqui',
|
||||||
|
f'Deja de pulsar botones que no puedes votar aqui {presser_name}',
|
||||||
|
f'{presser_name} deja de intentar votar aqui que no puedes',
|
||||||
|
f'Te han prohibido votar aquì {presser_name}.',
|
||||||
|
f'No puedes votar aquí, {presser_name}.'
|
||||||
|
)), reply_to=message)
|
||||||
|
return
|
||||||
|
|
||||||
|
option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text
|
||||||
|
selected_option_votes = message.contents['poll']['votes'][option_name]
|
||||||
|
|
||||||
|
if [presser_id, presser_name] in selected_option_votes:
|
||||||
|
selected_option_votes.remove([presser_id, presser_name])
|
||||||
|
else:
|
||||||
|
if not message.contents['poll']['is_multiple_answer']:
|
||||||
|
for option_votes in message.contents['poll']['votes'].values():
|
||||||
|
try:
|
||||||
|
option_votes.remove([presser_id, presser_name])
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
selected_option_votes.append((presser_id, presser_name))
|
||||||
|
|
||||||
|
await self._update_poll_buttons(message)
|
||||||
|
|
||||||
|
async def _on_poll_multi(self, message: Message):
|
||||||
|
await self._on_poll(message, is_multiple_answer=True)
|
||||||
|
|
||||||
|
async def _on_stop_poll(self, message: Message):
|
||||||
|
if not (poll_message := await self._get_poll_message(message)):
|
||||||
|
return
|
||||||
|
|
||||||
|
winners = []
|
||||||
|
max_votes = 1
|
||||||
|
for option, votes in poll_message.contents['poll']['votes'].items():
|
||||||
|
if len(votes) > max_votes:
|
||||||
|
winners = [option]
|
||||||
|
max_votes = len(votes)
|
||||||
|
elif len(votes) == max_votes:
|
||||||
|
winners.append(option)
|
||||||
|
|
||||||
|
match winners:
|
||||||
|
case [_, _, *_]:
|
||||||
|
winners = [f'<b>{winner}</b>' for winner in winners]
|
||||||
|
text = f"Encuesta finalizada. Los ganadores son: {flanautils.join_last_separator(winners, ', ', ' y ')}."
|
||||||
|
case [winner]:
|
||||||
|
text = f'Encuesta finalizada. Ganador: <b>{winner}</b>.'
|
||||||
|
case _:
|
||||||
|
text = 'Encuesta finalizada.'
|
||||||
|
|
||||||
|
poll_message.contents['poll']['is_active'] = False
|
||||||
|
|
||||||
|
await self.edit(text, poll_message)
|
||||||
|
if not message.replied_message:
|
||||||
|
await self.send(text, reply_to=poll_message)
|
||||||
|
|
||||||
|
@admin
|
||||||
|
async def _on_voting_ban(self, message: Message):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.delete_message(message)
|
||||||
|
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
if str(user.id) not in poll_message.contents['poll']['banned_users_tries']:
|
||||||
|
poll_message.contents['poll']['banned_users_tries'][str(user.id)] = 0
|
||||||
|
poll_message.save()
|
||||||
|
|
||||||
|
@admin
|
||||||
|
async def _on_voting_unban(self, message: Message):
|
||||||
|
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := await self._get_poll_message(message)):
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.delete_message(message)
|
||||||
|
|
||||||
|
for user in await self._find_users_to_punish(message):
|
||||||
|
try:
|
||||||
|
del poll_message.contents['poll']['banned_users_tries'][str(user.id)]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
poll_message.save()
|
||||||
|
|
||||||
|
# -------------------------------------------------------- #
|
||||||
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
|
# -------------------------------------------------------- #
|
||||||
232
flanabot/bots/scraper_bot.py
Normal file
232
flanabot/bots/scraper_bot.py
Normal file
@@ -0,0 +1,232 @@
|
|||||||
|
__all__ = ['ScraperBot']
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
from abc import ABC
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
import flanautils
|
||||||
|
from flanaapis import instagram, tiktok, twitter, youtube
|
||||||
|
from flanautils import Media, MediaType, OrderedSet, Source, return_if_first_empty
|
||||||
|
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
|
||||||
|
|
||||||
|
from flanabot import constants
|
||||||
|
from flanabot.models import Action, BotAction, Message
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
# --------------------------------------------- SCRAPER_BOT --------------------------------------------- #
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
class ScraperBot(MultiBot, ABC):
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
# -------------------- PROTECTED METHODS -------------------- #
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
def _add_handlers(self):
|
||||||
|
super()._add_handlers()
|
||||||
|
|
||||||
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
|
||||||
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
|
||||||
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
||||||
|
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
|
||||||
|
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
|
||||||
|
|
||||||
|
self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio'])
|
||||||
|
self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
|
||||||
|
|
||||||
|
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
||||||
|
medias_count = {
|
||||||
|
Source.TWITTER: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
||||||
|
Source.INSTAGRAM: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
||||||
|
Source.TIKTOK: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
||||||
|
Source.REDDIT: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
||||||
|
Source.YOUTUBE: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0},
|
||||||
|
None: {MediaType.IMAGE: 0, MediaType.AUDIO: 0, MediaType.GIF: 0, MediaType.VIDEO: 0, None: 0, MediaType.ERROR: 0}
|
||||||
|
}
|
||||||
|
for media in medias:
|
||||||
|
medias_count[media.source][media.type_] += 1
|
||||||
|
|
||||||
|
medias_sended_info = []
|
||||||
|
for source, media_type_count in medias_count.items():
|
||||||
|
source_medias_sended_info = []
|
||||||
|
for media_type, count in media_type_count.items():
|
||||||
|
if count:
|
||||||
|
if count == 1:
|
||||||
|
type_text = {MediaType.IMAGE: 'imagen',
|
||||||
|
MediaType.AUDIO: 'audio',
|
||||||
|
MediaType.GIF: 'gif',
|
||||||
|
MediaType.VIDEO: 'vídeo',
|
||||||
|
None: 'cosa que no sé que tipo de archivo es',
|
||||||
|
MediaType.ERROR: 'error'}[media_type]
|
||||||
|
else:
|
||||||
|
type_text = {MediaType.IMAGE: 'imágenes',
|
||||||
|
MediaType.AUDIO: 'audios',
|
||||||
|
MediaType.GIF: 'gifs',
|
||||||
|
MediaType.VIDEO: 'vídeos',
|
||||||
|
None: 'cosas que no sé que tipos de archivos son',
|
||||||
|
MediaType.ERROR: 'errores'}[media_type]
|
||||||
|
source_medias_sended_info.append(f'{count} {type_text}')
|
||||||
|
if source_medias_sended_info:
|
||||||
|
medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source.name if source else 'algún sitio'}")
|
||||||
|
|
||||||
|
medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n')
|
||||||
|
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
|
||||||
|
return f'{new_line}{medias_sended_info_joined}:'
|
||||||
|
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
# HANDLERS #
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
async def _on_no_delete_original(self, message: Message):
|
||||||
|
if not await self._scrape_and_send(message):
|
||||||
|
await self._on_recover_message(message)
|
||||||
|
|
||||||
|
async def _on_recover_message(self, message: Message):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _on_scraping(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
||||||
|
sended_media_messages = OrderedSet()
|
||||||
|
|
||||||
|
if message.replied_message:
|
||||||
|
sended_media_messages += await self._scrape_and_send(message.replied_message, audio_only)
|
||||||
|
|
||||||
|
return await self._scrape_send_and_delete(message, audio_only, sended_media_messages)
|
||||||
|
|
||||||
|
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
|
||||||
|
return await self._on_scraping(message, audio_only=True)
|
||||||
|
|
||||||
|
@reply
|
||||||
|
async def _on_song_info(self, message: Message):
|
||||||
|
song_infos = message.replied_message.song_infos if message.replied_message else []
|
||||||
|
|
||||||
|
if song_infos:
|
||||||
|
for song_info in song_infos:
|
||||||
|
await self.send_song_info(song_info, message)
|
||||||
|
elif message.chat.is_private or self.is_bot_mentioned(message):
|
||||||
|
await self._manage_exceptions(SendError('No hay información musical en ese mensaje.'), message)
|
||||||
|
|
||||||
|
async def _scrape_and_send(self, message: Message, audio_only=False) -> OrderedSet[Media]:
|
||||||
|
kwargs = {}
|
||||||
|
if self._parse_callbacks(message.text, [RegisteredCallback(..., [['sin'], ['timeout', 'limite']])]):
|
||||||
|
kwargs['timeout_for_media'] = None
|
||||||
|
if not (medias := await self._search_medias(message, audio_only, **kwargs)):
|
||||||
|
return OrderedSet()
|
||||||
|
|
||||||
|
sended_media_messages, _ = await self.send_medias(medias, message)
|
||||||
|
sended_media_messages = OrderedSet(sended_media_messages)
|
||||||
|
|
||||||
|
await self.send_inline_results(message)
|
||||||
|
|
||||||
|
return sended_media_messages
|
||||||
|
|
||||||
|
async def _scrape_send_and_delete(
|
||||||
|
self,
|
||||||
|
message: Message,
|
||||||
|
audio_only=False,
|
||||||
|
sended_media_messages: OrderedSet[Media] = None
|
||||||
|
) -> OrderedSet[Media]:
|
||||||
|
if sended_media_messages is None:
|
||||||
|
sended_media_messages = OrderedSet()
|
||||||
|
|
||||||
|
sended_media_messages += await self._scrape_and_send(message, audio_only)
|
||||||
|
|
||||||
|
if (
|
||||||
|
sended_media_messages
|
||||||
|
and
|
||||||
|
message.chat.is_group
|
||||||
|
and
|
||||||
|
not message.replied_message
|
||||||
|
and
|
||||||
|
message.chat.config['delete_original']
|
||||||
|
):
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save()
|
||||||
|
await self.delete_message(message)
|
||||||
|
|
||||||
|
return sended_media_messages
|
||||||
|
|
||||||
|
async def _search_medias(self, message: Message, audio_only=False, timeout_for_media: int | float = None) -> OrderedSet[Media]:
|
||||||
|
medias = OrderedSet()
|
||||||
|
|
||||||
|
tweet_ids = twitter.find_tweet_ids(message.text)
|
||||||
|
instagram_ids = instagram.find_instagram_ids(message.text)
|
||||||
|
tiktok_ids = await tiktok.find_tiktok_ids(message.text)
|
||||||
|
tiktok_download_urls = tiktok.find_download_urls(message.text)
|
||||||
|
youtube_ids = youtube.find_youtube_ids(message.text)
|
||||||
|
|
||||||
|
if not any((tweet_ids, instagram_ids, tiktok_ids, tiktok_download_urls, youtube_ids)):
|
||||||
|
return medias
|
||||||
|
|
||||||
|
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
|
||||||
|
|
||||||
|
gather_result = asyncio.gather(
|
||||||
|
twitter.get_medias(tweet_ids, audio_only),
|
||||||
|
instagram.get_medias(instagram_ids, audio_only),
|
||||||
|
tiktok.get_medias(tiktok_ids, tiktok_download_urls, audio_only),
|
||||||
|
youtube.get_medias(youtube_ids, audio_only, timeout_for_media),
|
||||||
|
return_exceptions=True
|
||||||
|
)
|
||||||
|
|
||||||
|
await gather_result
|
||||||
|
await self.delete_message(bot_state_message)
|
||||||
|
|
||||||
|
medias, exceptions = flanautils.filter_exceptions(gather_result.result())
|
||||||
|
await self._manage_exceptions(exceptions, message)
|
||||||
|
|
||||||
|
return OrderedSet(*medias)
|
||||||
|
|
||||||
|
# -------------------------------------------------------- #
|
||||||
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
|
# -------------------------------------------------------- #
|
||||||
|
async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]:
|
||||||
|
sended_media_messages = []
|
||||||
|
fails = 0
|
||||||
|
bot_state_message: Message | None = None
|
||||||
|
sended_info_message: Message | None = None
|
||||||
|
|
||||||
|
if not message.is_inline:
|
||||||
|
bot_state_message: Message = await self.send('Enviando...', message)
|
||||||
|
|
||||||
|
if message.chat.is_group:
|
||||||
|
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message)
|
||||||
|
|
||||||
|
for media in medias:
|
||||||
|
if not media.content:
|
||||||
|
fails += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if media.song_info:
|
||||||
|
message.song_infos.add(media.song_info)
|
||||||
|
message.save()
|
||||||
|
|
||||||
|
if bot_message := await self.send(media, message):
|
||||||
|
sended_media_messages.append(bot_message)
|
||||||
|
if media.song_info and bot_message:
|
||||||
|
bot_message.song_infos.add(media.song_info)
|
||||||
|
bot_message.save()
|
||||||
|
else:
|
||||||
|
fails += 1
|
||||||
|
|
||||||
|
if send_song_info and media.song_info:
|
||||||
|
await self.send_song_info(media.song_info, message)
|
||||||
|
|
||||||
|
if fails and sended_info_message:
|
||||||
|
if fails == len(medias):
|
||||||
|
await self.delete_message(sended_info_message)
|
||||||
|
if bot_state_message:
|
||||||
|
await self.delete_message(bot_state_message)
|
||||||
|
|
||||||
|
return sended_media_messages, fails
|
||||||
|
|
||||||
|
@return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals())
|
||||||
|
async def send_song_info(self, song_info: Media, message: Message):
|
||||||
|
attributes = (
|
||||||
|
f'Título: {song_info.title}\n' if song_info.title else '',
|
||||||
|
f'Autor: {song_info.author}\n' if song_info.author else '',
|
||||||
|
f'Álbum: {song_info.album}\n' if song_info.album else '',
|
||||||
|
f'Previa:'
|
||||||
|
)
|
||||||
|
await self.send(''.join(attributes), message)
|
||||||
|
if song_info:
|
||||||
|
await self.send(song_info, message)
|
||||||
194
flanabot/bots/weather_bot.py
Normal file
194
flanabot/bots/weather_bot.py
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
__all__ = ['WeatherBot']
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import random
|
||||||
|
from abc import ABC
|
||||||
|
|
||||||
|
import flanaapis.geolocation.functions
|
||||||
|
import flanaapis.weather.functions
|
||||||
|
import flanautils
|
||||||
|
import plotly.graph_objects
|
||||||
|
from flanaapis import Place, PlaceNotFoundError, WeatherEmoji
|
||||||
|
from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TraceMetadata
|
||||||
|
from multibot import MultiBot, constants as multibot_constants
|
||||||
|
|
||||||
|
from flanabot import constants
|
||||||
|
from flanabot.models import Action, BotAction, ButtonsGroup, Message, WeatherChart
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
# --------------------------------------------- WEATHER_BOT --------------------------------------------- #
|
||||||
|
# ----------------------------------------------------------------------------------------------------- #
|
||||||
|
class WeatherBot(MultiBot, ABC):
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
# -------------------- PROTECTED METHODS -------------------- #
|
||||||
|
# ----------------------------------------------------------- #
|
||||||
|
def _add_handlers(self):
|
||||||
|
super()._add_handlers()
|
||||||
|
|
||||||
|
self.register(self._on_weather, constants.KEYWORDS['weather_chart'])
|
||||||
|
self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather_chart']))
|
||||||
|
|
||||||
|
self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER)
|
||||||
|
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
# HANDLERS #
|
||||||
|
# ---------------------------------------------- #
|
||||||
|
async def _on_weather(self, message: Message):
|
||||||
|
bot_state_message: Message | None = None
|
||||||
|
if message.is_inline:
|
||||||
|
show_progress_state = False
|
||||||
|
elif message.chat.is_group and not self.is_bot_mentioned(message):
|
||||||
|
if message.chat.config['weather_chart']:
|
||||||
|
if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}):
|
||||||
|
return
|
||||||
|
show_progress_state = False
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
show_progress_state = True
|
||||||
|
|
||||||
|
original_text_words = flanautils.remove_accents(message.text.lower())
|
||||||
|
original_text_words = flanautils.remove_symbols(original_text_words, ignore=('-', '.'), replace_with=' ').split()
|
||||||
|
original_text_words = await self._filter_mention_ids(original_text_words, message, delete_names=True)
|
||||||
|
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
place_words = (
|
||||||
|
OrderedSet(original_text_words)
|
||||||
|
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
|
||||||
|
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather_chart'], min_score=0.85).keys()
|
||||||
|
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
|
||||||
|
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
|
||||||
|
- flanautils.CommonWords.get()
|
||||||
|
)
|
||||||
|
if not place_words:
|
||||||
|
if not message.is_inline:
|
||||||
|
await self.send_error(random.choice(('¿Tiempo dónde?', 'Indica el sitio.', 'Y el sitio?', 'y el sitio? me lo invento?')), message)
|
||||||
|
return
|
||||||
|
|
||||||
|
if 'calle' in original_text_words:
|
||||||
|
place_words.insert(0, 'calle')
|
||||||
|
place_query = ' '.join(place_words)
|
||||||
|
if len(place_query) >= constants.MAX_PLACE_QUERY_LENGTH:
|
||||||
|
if show_progress_state:
|
||||||
|
await self.send_error(Media(str(flanautils.resolve_path('resources/mucho_texto.png')), MediaType.IMAGE, 'jpg', Source.LOCAL), message, send_as_file=False)
|
||||||
|
return
|
||||||
|
if show_progress_state:
|
||||||
|
bot_state_message = await self.send(f'Buscando "{place_query}" en el mapa 🧐...', message)
|
||||||
|
|
||||||
|
result: str | Place | None = None
|
||||||
|
async for result in flanaapis.geolocation.functions.find_place_showing_progress(place_query):
|
||||||
|
if isinstance(result, str) and bot_state_message:
|
||||||
|
await self.edit(result, bot_state_message)
|
||||||
|
|
||||||
|
place: Place = result
|
||||||
|
if not place:
|
||||||
|
if bot_state_message:
|
||||||
|
await self.delete_message(bot_state_message)
|
||||||
|
await self._manage_exceptions(PlaceNotFoundError(place_query), message)
|
||||||
|
return
|
||||||
|
|
||||||
|
if bot_state_message:
|
||||||
|
bot_state_message = await self.edit(f'Obteniendo datos del tiempo para "{place_query}"...', bot_state_message)
|
||||||
|
current_weather, day_weathers = await flanaapis.weather.functions.get_day_weathers_by_place(place)
|
||||||
|
|
||||||
|
if bot_state_message:
|
||||||
|
bot_state_message = await self.edit('Creando gráficas del tiempo...', bot_state_message)
|
||||||
|
|
||||||
|
weather_chart = WeatherChart(
|
||||||
|
_font={'size': 30},
|
||||||
|
_title={
|
||||||
|
'text': place.name[:40].strip(' ,-'),
|
||||||
|
'xref': 'paper',
|
||||||
|
'yref': 'paper',
|
||||||
|
'xanchor': 'left',
|
||||||
|
'yanchor': 'top',
|
||||||
|
'x': 0.025,
|
||||||
|
'y': 0.975,
|
||||||
|
'font': {
|
||||||
|
'size': 50,
|
||||||
|
'family': 'open sans'
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_legend={'x': 0.99, 'y': 0.99, 'xanchor': 'right', 'yanchor': 'top', 'bgcolor': 'rgba(0,0,0,0)'},
|
||||||
|
_margin={'l': 20, 'r': 20, 't': 20, 'b': 20},
|
||||||
|
trace_metadatas={
|
||||||
|
'temperature': TraceMetadata(name='temperature', group='temperature', legend='Temperatura', show=False, color='#ff8400', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
|
||||||
|
'temperature_feel': TraceMetadata(name='temperature_feel', group='temperature', legend='Sensación de temperatura', show=True, color='red', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
|
||||||
|
'clouds': TraceMetadata(name='clouds', legend='Nubes', show=False, color='#86abe3', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
|
||||||
|
'visibility': TraceMetadata(name='visibility', legend='Visibilidad', show=False, color='#c99a34', default_min=0, default_max='{max_y_data} * 2', y_tick_suffix=' km', y_delta_tick=2, hide_y_ticks_if='{tick} > {max_y_data}'),
|
||||||
|
'uvi': TraceMetadata(name='uvi', legend='UVI', show=False, color='#ffd000', default_min=-12, default_max=12, hide_y_ticks_if='{tick} < 0', y_delta_tick=1, y_axis_width=75),
|
||||||
|
'humidity': TraceMetadata(name='humidity', legend='Humedad', show=False, color='#2baab5', default_min=0, default_max=100, y_tick_suffix=' %'),
|
||||||
|
'precipitation_probability': TraceMetadata(name='precipitation_probability', legend='Probabilidad de precipitaciones', show=True, color='#0033ff', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
|
||||||
|
'rain_volume': TraceMetadata(plotly.graph_objects.Histogram, name='rain_volume', group='precipitation', legend='Volumen de lluvia', show=True, color='#34a4eb', opacity=0.3, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
|
||||||
|
'snow_volume': TraceMetadata(plotly.graph_objects.Histogram, name='snow_volume', group='precipitation', legend='Volumen de nieve', show=True, color='#34a4eb', opacity=0.8, pattern={'shape': '.', 'fgcolor': '#ffffff', 'bgcolor': '#b0d6f3', 'solidity': 0.5, 'size': 14}, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
|
||||||
|
'pressure': TraceMetadata(name='pressure', legend='Presión', show=False, color='#31a339', default_min=1013.25 - 90, default_max=1013.25 + 90, y_tick_suffix=' hPa', y_axis_width=225),
|
||||||
|
'wind_speed': TraceMetadata(name='wind_speed', legend='Velocidad del viento', show=False, color='#d8abff', default_min=-120, default_max=120, y_tick_suffix=' km/h', hide_y_ticks_if='{tick} < 0', y_axis_width=165)
|
||||||
|
},
|
||||||
|
x_data=[instant_weather.date_time for day_weather in day_weathers for instant_weather in day_weather.instant_weathers],
|
||||||
|
all_y_data=[],
|
||||||
|
current_weather=current_weather,
|
||||||
|
day_weathers=day_weathers,
|
||||||
|
timezone=(timezone := day_weathers[0].timezone),
|
||||||
|
place=place,
|
||||||
|
view_position=datetime.datetime.now(timezone)
|
||||||
|
)
|
||||||
|
|
||||||
|
weather_chart.apply_zoom()
|
||||||
|
weather_chart.draw()
|
||||||
|
if not (image_bytes := weather_chart.to_image()):
|
||||||
|
if bot_state_message:
|
||||||
|
await self.delete_message(bot_state_message)
|
||||||
|
raise NotFoundError('No hay suficientes datos del tiempo.')
|
||||||
|
|
||||||
|
if bot_state_message:
|
||||||
|
bot_state_message = await self.edit('Enviando...', bot_state_message)
|
||||||
|
bot_message: Message = await self.send(
|
||||||
|
Media(image_bytes, MediaType.IMAGE, 'jpg'),
|
||||||
|
[
|
||||||
|
[WeatherEmoji.ZOOM_IN.value, WeatherEmoji.ZOOM_OUT.value, WeatherEmoji.LEFT.value, WeatherEmoji.RIGHT.value],
|
||||||
|
[WeatherEmoji.TEMPERATURE.value, WeatherEmoji.TEMPERATURE_FEEL.value, WeatherEmoji.CLOUDS.value, WeatherEmoji.VISIBILITY.value, WeatherEmoji.UVI.value],
|
||||||
|
[WeatherEmoji.HUMIDITY.value, WeatherEmoji.PRECIPITATION_PROBABILITY.value, WeatherEmoji.PRECIPITATION_VOLUME.value, WeatherEmoji.PRESSURE.value, WeatherEmoji.WIND_SPEED.value]
|
||||||
|
],
|
||||||
|
message,
|
||||||
|
buttons_key=ButtonsGroup.WEATHER,
|
||||||
|
send_as_file=False
|
||||||
|
)
|
||||||
|
await self.send_inline_results(message)
|
||||||
|
|
||||||
|
if bot_state_message:
|
||||||
|
await self.delete_message(bot_state_message)
|
||||||
|
|
||||||
|
if bot_message:
|
||||||
|
bot_message.weather_chart = weather_chart
|
||||||
|
bot_message.save()
|
||||||
|
if not self.is_bot_mentioned(message):
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save()
|
||||||
|
|
||||||
|
async def _on_weather_button_press(self, message: Message):
|
||||||
|
await self.accept_button_event(message)
|
||||||
|
|
||||||
|
match message.buttons_info.pressed_text:
|
||||||
|
case WeatherEmoji.ZOOM_IN.value:
|
||||||
|
message.weather_chart.zoom_in()
|
||||||
|
case WeatherEmoji.ZOOM_OUT.value:
|
||||||
|
message.weather_chart.zoom_out()
|
||||||
|
case WeatherEmoji.LEFT.value:
|
||||||
|
message.weather_chart.move_left()
|
||||||
|
case WeatherEmoji.RIGHT.value:
|
||||||
|
message.weather_chart.move_right()
|
||||||
|
case WeatherEmoji.PRECIPITATION_VOLUME.value:
|
||||||
|
message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show
|
||||||
|
message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show
|
||||||
|
case emoji if emoji in WeatherEmoji.values:
|
||||||
|
trace_metadata_name = WeatherEmoji(emoji).name.lower()
|
||||||
|
message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show
|
||||||
|
case _:
|
||||||
|
return
|
||||||
|
|
||||||
|
message.weather_chart.apply_zoom()
|
||||||
|
message.weather_chart.draw()
|
||||||
|
|
||||||
|
image_bytes = message.weather_chart.to_image()
|
||||||
|
await self.edit(Media(image_bytes, MediaType.IMAGE, 'jpg'), message)
|
||||||
Reference in New Issue
Block a user