|
|
|
|
@@ -1,3 +1,5 @@
|
|
|
|
|
__all__ = ['FlanaBot']
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import datetime
|
|
|
|
|
import random
|
|
|
|
|
@@ -10,12 +12,13 @@ import flanaapis.geolocation.functions
|
|
|
|
|
import flanaapis.weather.functions
|
|
|
|
|
import flanautils
|
|
|
|
|
import plotly.graph_objects
|
|
|
|
|
import pymongo
|
|
|
|
|
from flanaapis import InstagramLoginError, MediaNotFoundError, Place, PlaceNotFoundError, WeatherEmoji, instagram, tiktok, twitter
|
|
|
|
|
from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TimeUnits, TraceMetadata, return_if_first_empty
|
|
|
|
|
from multibot import Action, BadRoleError, BotAction, MultiBot, SendError, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message, inline, reply
|
|
|
|
|
|
|
|
|
|
from flanabot import constants
|
|
|
|
|
from flanabot.models import ButtonsMessageType, Chat, Message, Punishment, WeatherChart
|
|
|
|
|
from flanabot.models import Chat, Message, Punishment, WeatherChart
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------- #
|
|
|
|
|
@@ -128,7 +131,8 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
self.register(self._on_weather_chart_config_show, (constants.KEYWORDS['weather_chart'], multibot_constants.KEYWORDS['config']))
|
|
|
|
|
self.register(self._on_weather_chart_config_show, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather_chart'], multibot_constants.KEYWORDS['config']))
|
|
|
|
|
|
|
|
|
|
self.register_button(self._on_button_press)
|
|
|
|
|
self.register_button(self._on_config_button_press, list(Chat.DEFAULT_CONFIG.keys()))
|
|
|
|
|
self.register_button(self._on_weather_button_press, WeatherEmoji.values)
|
|
|
|
|
|
|
|
|
|
async def _change_config(self, config_name: str, message: Message, value: bool = None):
|
|
|
|
|
if value is None:
|
|
|
|
|
@@ -184,10 +188,11 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
config = message.chat.config
|
|
|
|
|
|
|
|
|
|
buttons_texts = [f"{'✔' if v else '❌'} {k}" for k, v in config.items()]
|
|
|
|
|
# noinspection PyTypeChecker
|
|
|
|
|
return flanautils.chunks(buttons_texts, 3)
|
|
|
|
|
|
|
|
|
|
@return_if_first_empty(exclude_self_types='FlanaBot', globals_=globals())
|
|
|
|
|
async def _manage_exceptions(self, exceptions: BaseException | Iterable[BaseException], message: Message):
|
|
|
|
|
async def _manage_exceptions(self, exceptions: BaseException | Iterable[BaseException], context: Chat | Message):
|
|
|
|
|
if not isinstance(exceptions, Iterable):
|
|
|
|
|
exceptions = (exceptions,)
|
|
|
|
|
|
|
|
|
|
@@ -195,15 +200,15 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
try:
|
|
|
|
|
raise exception
|
|
|
|
|
except BadRoleError as e:
|
|
|
|
|
await self.send_error(f'Rol no encontrado en {e}', message)
|
|
|
|
|
await self.send_error(f'Rol no encontrado en {e}', context)
|
|
|
|
|
except InstagramLoginError as e:
|
|
|
|
|
await self.send_error(f'No me puedo loguear en Instagram {random.choice(multibot_constants.SAD_EMOJIS)} 👉 {e}', message)
|
|
|
|
|
await self.send_error(f'No me puedo loguear en Instagram {random.choice(multibot_constants.SAD_EMOJIS)} 👉 {e}', context)
|
|
|
|
|
except MediaNotFoundError as e:
|
|
|
|
|
await self.send_error(f'No he podido sacar nada de {e.source} {random.choice(multibot_constants.SAD_EMOJIS)}', message)
|
|
|
|
|
await self.send_error(f'No he podido sacar nada de {e.source} {random.choice(multibot_constants.SAD_EMOJIS)}', context)
|
|
|
|
|
except PlaceNotFoundError as e:
|
|
|
|
|
await self.send_error(f'No he podido encontrar "{e}" {random.choice(multibot_constants.SAD_EMOJIS)}', message)
|
|
|
|
|
await self.send_error(f'No he podido encontrar "{e}" {random.choice(multibot_constants.SAD_EMOJIS)}', context)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
await super()._manage_exceptions(e, message)
|
|
|
|
|
await super()._manage_exceptions(e, context)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _medias_sended_info(medias: Iterable[Media]) -> str:
|
|
|
|
|
@@ -292,59 +297,23 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
# ---------------------------------------------- #
|
|
|
|
|
# HANDLERS #
|
|
|
|
|
# ---------------------------------------------- #
|
|
|
|
|
async def _on_button_press(self, message: Message):
|
|
|
|
|
await self._accept_button_event(message)
|
|
|
|
|
|
|
|
|
|
match message.button_pressed_text:
|
|
|
|
|
case WeatherEmoji.ZOOM_IN.value:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
message.weather_chart.zoom_in()
|
|
|
|
|
case WeatherEmoji.ZOOM_OUT.value:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
message.weather_chart.zoom_out()
|
|
|
|
|
case WeatherEmoji.LEFT.value:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
message.weather_chart.move_left()
|
|
|
|
|
case WeatherEmoji.RIGHT.value:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
message.weather_chart.move_right()
|
|
|
|
|
case WeatherEmoji.PRECIPITATION_VOLUME.value:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show
|
|
|
|
|
message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show
|
|
|
|
|
case emoji if emoji in WeatherEmoji.values:
|
|
|
|
|
buttons_message_type = ButtonsMessageType.WEATHER
|
|
|
|
|
trace_metadata_name = WeatherEmoji(emoji).name.lower()
|
|
|
|
|
message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show
|
|
|
|
|
case _ if message.button_pressed_user.is_admin and 'auto_' in (config := message.button_pressed_text.split()[1]):
|
|
|
|
|
buttons_message_type = ButtonsMessageType.CONFIG
|
|
|
|
|
message.chat.config[config] = not message.chat.config[config]
|
|
|
|
|
message.save()
|
|
|
|
|
await self.edit('<b>Estos son los ajustes del grupo:</b>\n\n', self._get_config_buttons(message), message)
|
|
|
|
|
case _:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if buttons_message_type is ButtonsMessageType.WEATHER:
|
|
|
|
|
message.weather_chart.apply_zoom()
|
|
|
|
|
message.weather_chart.draw()
|
|
|
|
|
message.save()
|
|
|
|
|
|
|
|
|
|
image_bytes = message.weather_chart.to_image()
|
|
|
|
|
await self.edit(Media(image_bytes, MediaType.IMAGE), message)
|
|
|
|
|
|
|
|
|
|
async def _on_bye(self, message: Message):
|
|
|
|
|
if not message.chat.is_group or self.is_bot_mentioned(message):
|
|
|
|
|
await self.send_bye(message)
|
|
|
|
|
|
|
|
|
|
async def _on_config_button_press(self, message: Message):
|
|
|
|
|
await self._accept_button_event(message)
|
|
|
|
|
|
|
|
|
|
if not message.button_pressed_user.is_admin or 'auto_' not in (config := message.button_pressed_text.split()[1]):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
message.chat.config[config] = not message.chat.config[config]
|
|
|
|
|
message.save()
|
|
|
|
|
await self.edit('<b>Estos son los ajustes del grupo:</b>\n\n', self._get_config_buttons(message), message)
|
|
|
|
|
|
|
|
|
|
@group
|
|
|
|
|
@bot_mentioned
|
|
|
|
|
async def _on_config_list_show(self, message: Message):
|
|
|
|
|
# config_info = pprint.pformat(message.chat.config)
|
|
|
|
|
# config_info = flanautils.translate(config_info, {'{': None, '}': None, ',': None, 'True': '', "'": None, 'False': '❌'})
|
|
|
|
|
# config_info = config_info.splitlines()
|
|
|
|
|
# config_info = '\n'.join(config_info_.strip() for config_info_ in config_info)
|
|
|
|
|
# await self.send(f'<b>Estos son los ajustes del grupo:</b>\n\n<code>{config_info}</code>', message)
|
|
|
|
|
|
|
|
|
|
await self.send('<b>Estos son los ajustes del grupo:</b>\n\n', self._get_config_buttons(message), message)
|
|
|
|
|
|
|
|
|
|
async def _on_covid_chart(self, message: Message): # todo2
|
|
|
|
|
@@ -460,14 +429,18 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
@inline(False)
|
|
|
|
|
async def _on_recover_message(self, message: Message):
|
|
|
|
|
if message.replied_message:
|
|
|
|
|
message_deleted_bot_action = BotAction.find_one({'action': bytes(Action.MESSAGE_DELETED), 'chat': message.chat.object_id, 'affected_objects': message.replied_message.object_id})
|
|
|
|
|
message_deleted_bot_action = BotAction.find_one({'action': Action.MESSAGE_DELETED.value, 'chat': message.chat.object_id, 'affected_objects': message.replied_message.object_id})
|
|
|
|
|
elif self.is_bot_mentioned(message):
|
|
|
|
|
message_deleted_bot_action = BotAction.find_one({'action': bytes(Action.MESSAGE_DELETED), 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.RECOVERY_DELETED_MESSAGE_BEFORE}})
|
|
|
|
|
message_deleted_bot_action = BotAction.find_one({
|
|
|
|
|
'action': Action.MESSAGE_DELETED.value,
|
|
|
|
|
'chat': message.chat.object_id,
|
|
|
|
|
'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.RECOVERY_DELETED_MESSAGE_BEFORE}
|
|
|
|
|
}, sort_keys=(('date', pymongo.DESCENDING),))
|
|
|
|
|
else:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not message_deleted_bot_action:
|
|
|
|
|
await self.send_error(random.choice(constants.RECOVER_PHRASES), message)
|
|
|
|
|
await self.send_error('No hay nada que recuperar.', message)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
affected_object_ids = [affected_message_object_id for affected_message_object_id in message_deleted_bot_action.affected_objects]
|
|
|
|
|
@@ -475,10 +448,12 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
for deleted_message in deleted_messages:
|
|
|
|
|
await self.send(deleted_message.text, message)
|
|
|
|
|
|
|
|
|
|
message_deleted_bot_action.delete()
|
|
|
|
|
|
|
|
|
|
async def _on_scraping(self, message: Message, delete_original: bool = None) -> OrderedSet[Media]:
|
|
|
|
|
sended_media_messages = await self._search_and_send_medias(message.replied_message) if message.replied_message else OrderedSet()
|
|
|
|
|
sended_media_messages = OrderedSet()
|
|
|
|
|
if message.replied_message:
|
|
|
|
|
word_matches = flanautils.cartesian_product_string_matching(message.text, constants.KEYWORDS['scraping'], min_ratio=multibot_constants.PARSE_CALLBACKS_MIN_RATIO_DEFAULT)
|
|
|
|
|
if sum(max(matches.values()) for matches in word_matches.values()):
|
|
|
|
|
sended_media_messages += await self._search_and_send_medias(message.replied_message)
|
|
|
|
|
sended_media_messages += await self._search_and_send_medias(message)
|
|
|
|
|
await self.send_inline_results(message)
|
|
|
|
|
|
|
|
|
|
@@ -491,6 +466,8 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
(
|
|
|
|
|
delete_original is None
|
|
|
|
|
and
|
|
|
|
|
not message.replied_message
|
|
|
|
|
and
|
|
|
|
|
message.chat.config['auto_delete_original']
|
|
|
|
|
)
|
|
|
|
|
or
|
|
|
|
|
@@ -548,6 +525,34 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
for user in await self._find_users_to_punish(message):
|
|
|
|
|
await self.unpunish(user, message, message)
|
|
|
|
|
|
|
|
|
|
async def _on_weather_button_press(self, message: Message):
|
|
|
|
|
await self._accept_button_event(message)
|
|
|
|
|
|
|
|
|
|
match message.button_pressed_text:
|
|
|
|
|
case WeatherEmoji.ZOOM_IN.value:
|
|
|
|
|
message.weather_chart.zoom_in()
|
|
|
|
|
case WeatherEmoji.ZOOM_OUT.value:
|
|
|
|
|
message.weather_chart.zoom_out()
|
|
|
|
|
case WeatherEmoji.LEFT.value:
|
|
|
|
|
message.weather_chart.move_left()
|
|
|
|
|
case WeatherEmoji.RIGHT.value:
|
|
|
|
|
message.weather_chart.move_right()
|
|
|
|
|
case WeatherEmoji.PRECIPITATION_VOLUME.value:
|
|
|
|
|
message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show
|
|
|
|
|
message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show
|
|
|
|
|
case emoji if emoji in WeatherEmoji.values:
|
|
|
|
|
trace_metadata_name = WeatherEmoji(emoji).name.lower()
|
|
|
|
|
message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show
|
|
|
|
|
case _:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
message.weather_chart.apply_zoom()
|
|
|
|
|
message.weather_chart.draw()
|
|
|
|
|
message.save()
|
|
|
|
|
|
|
|
|
|
image_bytes = message.weather_chart.to_image()
|
|
|
|
|
await self.edit(Media(image_bytes, MediaType.IMAGE), message)
|
|
|
|
|
|
|
|
|
|
async def _on_weather_chart(self, message: Message):
|
|
|
|
|
bot_state_message: Message | None = None
|
|
|
|
|
if message.is_inline:
|
|
|
|
|
@@ -568,7 +573,7 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
possible_mentioned_ids.append(user.name.split('#')[0].lower())
|
|
|
|
|
possible_mentioned_ids.append(f'@{user.id}')
|
|
|
|
|
|
|
|
|
|
if roles := await self.get_group_roles(message):
|
|
|
|
|
if roles := await self.get_roles(message):
|
|
|
|
|
for role in roles:
|
|
|
|
|
possible_mentioned_ids.append(f'@{role.id}')
|
|
|
|
|
|
|
|
|
|
@@ -758,10 +763,7 @@ class FlanaBot(MultiBot, ABC):
|
|
|
|
|
message.song_infos.add(media.song_info)
|
|
|
|
|
message.save()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
bot_message = await self.send(media, message)
|
|
|
|
|
except SendError as e:
|
|
|
|
|
await self._manage_exceptions(e, message)
|
|
|
|
|
if not (bot_message := await self.send(media, message)):
|
|
|
|
|
fails += 1
|
|
|
|
|
else:
|
|
|
|
|
sended_media_messages.append(bot_message)
|
|
|
|
|
|