110 Commits
v1.5.1 ... main

Author SHA1 Message Date
AlberLC
6c2f54592d Delete useless code 2025-10-26 02:34:47 +02:00
AlberLC
eecb488ea9 Fix FlanaServer file_name extension 2025-10-18 01:05:05 +02:00
AlberLC
a44a71f3be Remove FormData quote_fields arg 2025-10-17 08:20:18 +02:00
AlberLC
aee14449ad Upload large videos to FlanaServer 2025-10-17 03:00:47 +02:00
AlberLC
e20a5135f2 Add minor changes 2025-10-17 02:58:29 +02:00
AlberLC
c392e7bdde Update Dockerfile 2025-10-17 02:57:44 +02:00
AlberLC
c95621ca7e Update requirements.txt 2025-10-17 02:57:30 +02:00
AlberLC
8efa12ad22 Update requirements.txt 2025-05-01 23:20:04 +02:00
AlberLC
b672cbf0a1 Fix imports 2025-05-01 23:18:42 +02:00
AlberLC
d377aae6ff Update requirements.txt 2025-05-01 23:17:58 +02:00
AlberLC
789034aa9f Fix BtcOffersBot._on_stop_btc_offers_notification (private or mentioned) 2025-04-18 00:14:04 +02:00
AlberLC
647b9288aa Add BtcOffersBot.stop_all_btc_offers_notification 2025-04-17 23:46:51 +02:00
AlberLC
ad2434ab67 Update type hints 2025-04-17 23:44:16 +02:00
AlberLC
fca648f8b5 Fix 0.00 premium sign 2025-04-17 17:23:15 +02:00
AlberLC
ada357fcb8 Add minor changes 2025-04-17 16:11:56 +02:00
AlberLC
b1c07ea251 Fix BtcOffersBot._on_btc_offers (server disconnected) 2025-04-17 16:11:01 +02:00
AlberLC
5ec3c1e81b Update BtcOffersBot (manage websocket reconnections) 2025-04-17 16:08:26 +02:00
AlberLC
39aea44803 Fix imports 2025-04-17 13:28:24 +02:00
AlberLC
4dbe9e0504 Update BtcOffersBot (manage websocket reconnections) 2025-04-16 20:47:36 +02:00
AlberLC
d977d5d882 Update BtcOffersBot (notifications in dollars and premiums) 2025-04-16 20:45:23 +02:00
AlberLC
aac5fe951f Fix BtcOffersBot (validate amount) 2025-04-16 20:42:19 +02:00
AlberLC
abc4462723 Fix BtcOffersBot (notify without amount) 2025-04-16 20:41:36 +02:00
AlberLC
bdf658ee9a Update requirements.txt 2025-04-14 23:03:36 +02:00
AlberLC
6aa735ddd5 Add BtcOffersBot 2025-04-14 23:03:28 +02:00
AlberLC
9f640014a3 Fix UberEatsBot._on_ready 2025-04-14 23:00:25 +02:00
AlberLC
edebc48b1f Update SteamBot._insert_conversion_rates 2025-04-14 22:58:08 +02:00
AlberLC
2babe0fee7 Update type hints 2025-04-14 22:57:35 +02:00
AlberLC
123498b1f1 Add minor changes 2025-04-14 22:56:48 +02:00
AlberLC
7f8cbd6e54 Add PenaltyBot._check_message_spam 2025-03-17 10:09:40 +01:00
AlberLC
37db1b3590 Add PenaltyBot._check_message_spam 2025-03-17 10:05:27 +01:00
AlberLC
aa6642ea26 Add PenaltyBot._check_message_spam 2025-03-17 09:05:01 +01:00
AlberLC
7d538316ad Add PenaltyBot._check_message_spam 2025-03-17 08:31:16 +01:00
AlberLC
20ca2b3ab9 Add SteamBot 2024-07-06 06:20:36 +02:00
AlberLC
6f0cd1fcf6 Add SteamBot 2024-07-06 06:06:55 +02:00
AlberLC
39bf560407 Add SteamBot 2024-07-06 06:01:49 +02:00
AlberLC
76f9920de4 Add SteamBot 2024-07-04 20:02:49 +02:00
AlberLC
2465755bc0 Add SteamBot 2024-07-04 04:18:12 +02:00
AlberLC
46a50767af Add SteamBot 2024-07-04 04:17:21 +02:00
AlberLC
05e53bdc87 Add SteamBot 2024-07-04 04:05:02 +02:00
AlberLC
4e44cdd112 Add SteamBot 2024-07-04 03:56:22 +02:00
AlberLC
848d354da2 Add SteamBot 2024-07-04 03:40:35 +02:00
AlberLC
083356ac0b Update requirements.txt 2024-07-04 03:18:39 +02:00
AlberLC
c33c04cba4 Add SteamBot 2024-07-04 03:10:10 +02:00
AlberLC
5f027e665a Add SteamBot 2024-07-04 03:03:54 +02:00
AlberLC
71a5cb74d9 Fix heating_context (models.__init__) 2024-07-04 03:02:00 +02:00
AlberLC
3ea232b183 Update requirements.txt 2024-07-04 02:58:42 +02:00
AlberLC
2419d36bbb Fix connect_4 (bots.__init__) 2024-07-04 02:55:19 +02:00
AlberLC
778500caac Fix channel heating (level 0 channel name) 2024-06-26 10:18:42 +02:00
AlberLC
4f4a05f0ff Update instagram scraper (to yt-dlp) 2024-06-11 01:05:42 +02:00
AlberLC
d6986d8b63 Update twitter scraper (to yt-dlp) 2024-06-08 15:25:25 +02:00
AlberLC
25c5391f5a Update requirements.txt 2024-05-30 15:06:14 +02:00
AlberLC
df72d752c1 Fix channel heating (level 0 channel name) 2024-05-26 09:07:00 +02:00
AlberLC
07ab104203 Add telegram restart 2024-05-17 16:24:31 +02:00
AlberLC
564a65c5a1 Add telegram restart 2024-05-17 16:06:09 +02:00
AlberLC
ebe87decf5 Add telegram restart 2024-05-17 15:57:24 +02:00
AlberLC
86a1beed72 Update requirements.txt 2024-05-17 15:57:11 +02:00
AlberLC
e6721dab8e Update FlanaBot._on_delete (user own messages) 2024-04-02 21:46:52 +02:00
AlberLC
f1f29a089f Update callback registration 2024-03-28 10:08:18 +01:00
AlberLC
2f0ca1e012 Update FlanaBot._on_delete 2024-03-28 10:04:50 +01:00
AlberLC
c57b76bf6d Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-28 01:42:36 +01:00
AlberLC
4c06b573ea Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-27 15:27:59 +01:00
AlberLC
00dbe52d84 Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-26 03:21:46 +01:00
AlberLC
473279d74a Update requirements.txt 2024-03-18 09:45:48 +01:00
AlberLC
d0bc075e59 Update FlanaBot._on_database_messages 2024-03-07 22:39:31 +01:00
AlberLC
7a0e6ba4d8 Update FlanaBot._on_database_messages 2024-03-07 22:36:33 +01:00
AlberLC
528324253c Fix channel heating (independent servers) 2024-02-15 20:54:38 +01:00
AlberLC
a45f0e76b9 Fix channel heating (independent servers) 2024-02-15 06:10:18 +01:00
AlberLC
fd12cf198a Fix channel heating (independent servers) 2024-02-14 10:19:23 +01:00
AlberLC
b9beaf1a36 Fix FlanaBot._on_database_messages (chat chat user user) 2024-02-06 01:05:35 +01:00
AlberLC
d57eae19b5 Fix FlanaBot._on_database_messages 2024-02-06 00:43:54 +01:00
AlberLC
505ec0cf51 Update FlanaBot._on_recover_message 2024-01-27 02:42:55 +01:00
AlberLC
fb7b155368 Update _add_handlers (reset polls) 2024-01-27 02:11:59 +01:00
AlberLC
0769b16dd6 Update changeable roles 2024-01-25 21:37:15 +01:00
AlberLC
024c01b46d Update constants.KEYWORDS 2024-01-24 08:20:11 +01:00
AlberLC
94883dac1e Update poll_bot._add_handlers 2024-01-24 08:19:50 +01:00
AlberLC
401769f5cd Update poll_bot._add_handlers 2024-01-24 08:01:40 +01:00
AlberLC
55eb293e11 Update constants 2024-01-24 07:59:32 +01:00
AlberLC
fed7212a3f Update requirements.txt 2024-01-22 04:12:48 +01:00
AlberLC
21367e0fc9 Fix send_negative after guard clauses 2023-12-07 22:04:10 +01:00
AlberLC
52086be24d Fix FlanaDiscBot.restore_channel_names n_fires 2023-11-12 14:00:09 +01:00
AlberLC
f0551265c4 Add FlanaDiscBot.restore_channel_names 2023-11-08 07:48:54 +01:00
AlberLC
889a6c19dd Add FlanaDiscBot.restore_channel_names 2023-11-07 09:03:56 +01:00
AlberLC
688f5fee4e Add FlanaDiscBot.restore_channel_names 2023-11-07 08:58:12 +01:00
AlberLC
4f5427093c Update FlanaBot._on_database_messages 2023-11-07 08:29:28 +01:00
AlberLC
e415516b00 Fix poll buttons text (max) 2023-10-12 14:56:09 +02:00
AlberLC
a154653e9a Fix poll buttons text 2023-09-29 00:46:02 +02:00
AlberLC
61e60fc312 Fix FlanaBot._on_database_messages 2023-09-09 06:17:36 +02:00
AlberLC
e4f5a6bab4 Update FlanaBot._on_database_messages 2023-09-05 05:00:53 +02:00
AlberLC
bf03b1c8e7 Update FlanaBot._on_database_messages 2023-09-05 01:15:11 +02:00
AlberLC
c8cea16e7d Update FlanaBot._on_database_messages 2023-09-05 00:47:14 +02:00
AlberLC
5961e1a679 Update FlanaBot._on_database_messages 2023-09-05 00:38:27 +02:00
AlberLC
b4e7287de9 Update FlanaBot._on_database_messages 2023-09-05 00:15:48 +02:00
AlberLC
818b385f2d Update FlanaBot._on_database_messages 2023-09-05 00:06:06 +02:00
AlberLC
9813064674 Fix PollBot._on_poll_button_press 2023-08-18 06:32:05 +02:00
AlberLC
20592332a2 Fix FlanaBot._on_delete n_messages with mentions 2023-08-17 19:03:33 +02:00
AlberLC
ebf58e039b Fix tunnel chats 2023-08-17 18:44:21 +02:00
AlberLC
dd12533a48 Fix ScraperBot._search_medias (restricted age results filtered) 2023-08-15 03:08:08 +02:00
AlberLC
dad3482096 Add minor changes 2023-08-06 07:23:49 +02:00
AlberLC
69e352d5ec Add FlanaBot._on_delete_until 2023-08-05 00:55:06 +02:00
AlberLC
7adfa819df Fix scraping codec and extension with audio_only 2023-07-10 17:22:43 +02:00
AlberLC
7b03f6db63 Fix punishments accumulation 2023-07-08 19:40:59 +02:00
AlberLC
72696397ec Fix punishments accumulation 2023-07-07 22:14:10 +02:00
AlberLC
c3a2d2e9f0 Fix context manager in _get_contacts_ids for telegram userbots 2023-06-27 07:20:55 +02:00
AlberLC
bb9b5caa55 Fix ScraperBot._search_medias 2023-06-11 22:05:52 +02:00
AlberLC
ce71095060 Update requirements.txt 2023-05-28 23:23:11 +02:00
AlberLC
b54dd8d944 Update requirements.txt 2023-05-19 07:07:03 +02:00
AlberLC
376da6f072 Update requirements.txt 2023-05-19 06:56:18 +02:00
AlberLC
f396d2b232 Improve legibility 2023-05-19 06:47:16 +02:00
AlberLC
6c3a8e5e4a Fix fake instagram ban (restricted age) 2023-05-19 06:43:13 +02:00
AlberLC
d9eb01e5c0 Add minor changes 2023-05-09 05:32:59 +02:00
20 changed files with 1438 additions and 415 deletions

View File

@@ -3,7 +3,7 @@ FROM flanaganvaquero/flanawright
WORKDIR /application
COPY flanabot flanabot
COPY venv/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
COPY .venv/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
ENV PYTHONPATH=/application

View File

@@ -1,8 +1,10 @@
from flanabot.bots.connect_4_bot import *
from flanabot.bots.flana_bot import *
from flanabot.bots.flana_disc_bot import *
from flanabot.bots.flana_tele_bot import *
from flanabot.bots.penalty_bot import *
from flanabot.bots.poll_bot import *
from flanabot.bots.scraper_bot import *
from flanabot.bots.steam_bot import *
from flanabot.bots.ubereats_bot import *
from flanabot.bots.weather_bot import *

View File

@@ -0,0 +1,309 @@
from __future__ import annotations # todo0 remove when it's by default
__all__ = ['BtcOffersBot']
import asyncio
import functools
import json
import os
from abc import ABC
from collections.abc import Callable
from typing import Any
import aiohttp
import flanautils
import websockets
from multibot import MultiBot, constants as multibot_constants
from flanabot import constants
from flanabot.models import Chat, Message
# ---------------------------------------------------- #
# -------------------- DECORATORS -------------------- #
# ---------------------------------------------------- #
def preprocess_btc_offers(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(self: BtcOffersBot, message: Message) -> Any:
if message.chat.is_group and not self.is_bot_mentioned(message):
return
eur_mode = (
'' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['eur'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
usd_mode = (
'$' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['usd'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
premium_mode = (
'%' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['premium'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
if len([arg for arg in (eur_mode, usd_mode, premium_mode) if arg]) > 1:
await self.send_error(
'Indica únicamente uno de los siguientes: precio en euros, precio en dólares o prima.',
message
)
return
parsed_number = flanautils.text_to_number(message.text)
if parsed_number and not any((eur_mode, usd_mode, premium_mode)):
eur_mode = True
if (eur_mode or usd_mode) and parsed_number < 0 or not flanautils.validate_mongodb_number(parsed_number):
await self.send_error('❌ Por favor, introduce un número válido.', message)
return
if eur_mode:
query = {'max_price_eur': parsed_number}
elif usd_mode:
query = {'max_price_usd': parsed_number}
elif premium_mode:
query = {'max_premium': parsed_number}
else:
query = {}
return await func(self, message, query)
return wrapper
# ---------------------------------------------------------------------------------------------------- #
# ------------------------------------------ BTC_OFFERS_BOT ------------------------------------------ #
# ---------------------------------------------------------------------------------------------------- #
class BtcOffersBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._websocket: websockets.ClientConnection | None = None
self._notification_task: asyncio.Task[None] | None = None
self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers"
self._websocket_lock = asyncio.Lock()
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_btc_offers, keywords=constants.KEYWORDS['offer'])
self.register(self._on_btc_offers, keywords=constants.KEYWORDS['money'])
self.register(self._on_btc_offers, keywords=(constants.KEYWORDS['offer'], constants.KEYWORDS['money']))
self.register(self._on_notify_btc_offers, keywords=constants.KEYWORDS['notify'])
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['offer']))
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['money']))
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['offer'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['offer']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['notify']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['offer']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['notify']))
def _find_chats_to_notify(self) -> list[Chat]:
return self.Chat.find({'platform': self.platform.value, 'btc_offers_query': {'$exists': True, '$ne': {}}})
def _is_websocket_connected(self) -> bool:
return self._websocket and self._websocket.state in {websockets.State.CONNECTING, websockets.State.OPEN}
async def _send_offers(self, offers: list[dict], chat: Chat, notifications_disabled=False):
offers_parts = []
for i, offer in enumerate(offers, start=1):
offer_parts = [
f'<b>{i}.</b>',
f"<b>Plataforma:</b> <code>{offer['exchange']}</code>",
f"<b>Id:</b> <code>{offer['id']}</code>"
]
if offer['author']:
offer_parts.append(f"<b>Autor:</b> <code>{offer['author']}</code>")
payment_methods_text = ''.join(
f'\n <code>{payment_method}</code>' for payment_method in offer['payment_methods']
)
rounded_premium = round(offer['premium'], 2)
offer_parts.extend(
(
f"<b>Cantidad:</b> <code>{offer['amount']}</code>",
f"<b>Precio (EUR):</b> <code>{offer['price_eur']:.2f} €</code>",
f"<b>Precio (USD):</b> <code>{offer['price_usd']:.2f} $</code>",
f"<b>Prima:</b> <code>{rounded_premium if rounded_premium else '0.00'} %</code>",
f'<b>Métodos de pago:</b>{payment_methods_text}'
)
)
if offer['description']:
offer_parts.append(
f"<b>Descripción:</b>\n<code><code><code>{offer['description']}</code></code></code>"
)
offers_parts.append('\n'.join(offer_parts))
offers_parts_chunks = flanautils.chunks(offers_parts, 5)
messages_parts = [
[
'<b>💰💰💰 OFERTAS BTC 💰💰💰</b>',
'',
'\n\n'.join(offers_parts_chunks[0])
]
]
for offers_parts_chunk in offers_parts_chunks[1:]:
messages_parts.append(
[
'­',
'\n\n'.join(offers_parts_chunk)
]
)
if notifications_disabled:
messages_parts[-1].extend(
(
'',
'-' * 70,
'<b> Los avisos de ofertas BTC se han eliminado. Si quieres volver a recibirlos, no dudes en pedírmelo.</b>'
)
)
for message_parts in messages_parts:
await self.send('\n'.join(message_parts), chat)
async def _wait_btc_offers_notification(self):
while True:
while True:
try:
data = json.loads(await self._websocket.recv())
except websockets.ConnectionClosed:
await self.start_all_btc_offers_notifications()
else:
break
chat = await self.get_chat(data['chat_id'])
chat.btc_offers_query = {}
chat.save(pull_exclude_fields=('btc_offers_query',))
await self._send_offers(data['offers'], chat, notifications_disabled=True)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@preprocess_btc_offers
async def _on_btc_offers(self, message: Message, query: dict[str, float]):
bot_state_message = await self.send('Obteniendo ofertas BTC...', message)
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{self._api_endpoint}', params=query) as response:
offers = await response.json()
except aiohttp.ClientConnectorError:
await self.send_error('❌🌐 El servidor de ofertas BTC está desconectado.', bot_state_message, edit=True)
return
if offers:
await self._send_offers(offers, message.chat)
await self.delete_message(bot_state_message)
else:
await self.edit('No hay ofertas BTC actualmente que cumplan esa condición.', bot_state_message)
@preprocess_btc_offers
async def _on_notify_btc_offers(self, message: Message, query: dict[str, float]):
if not query:
await self.send_error('❌ Especifica una cantidad para poder avisarte.', message)
return
match query:
case {'max_price_eur': max_price_eur}:
response_text = f'✅ ¡Perfecto! Te avisaré cuando existan ofertas por {max_price_eur:.2f} € o menos.'
case {'max_price_usd': max_price_usd}:
response_text = f'✅ ¡Perfecto! Te avisaré cuando existan ofertas por {max_price_usd:.2f} $ o menos.'
case _:
rounded_max_premium = round(query['max_premium'], 2)
response_text = f"✅ ¡Perfecto! Te avisaré cuando existan ofertas con una prima del {rounded_max_premium if rounded_max_premium else '0.00'} % o menor."
await self.send(response_text, message)
await self.start_btc_offers_notification(message.chat, query)
async def _on_ready(self):
await super()._on_ready()
asyncio.create_task(self.start_all_btc_offers_notifications())
async def _on_stop_btc_offers_notification(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
previous_btc_offers_query = message.chat.btc_offers_query
await self.stop_btc_offers_notification(message.chat)
if previous_btc_offers_query:
await self.send('🛑 Los avisos de ofertas BTC se han eliminado.', message)
else:
await self.send('🤔 No existía ningún aviso de ofertas BTC configurado.', message)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
async def start_all_btc_offers_notifications(self):
if chats := self._find_chats_to_notify():
for chat in chats:
chat = await self.get_chat(chat.id)
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
elif self._notification_task and not self._notification_task.done():
self._notification_task.cancel()
await asyncio.sleep(0)
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
async with self._websocket_lock:
if not self._is_websocket_connected():
while True:
try:
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
except ConnectionRefusedError:
await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS)
else:
break
if not self._notification_task or self._notification_task.done():
self._notification_task = asyncio.create_task(self._wait_btc_offers_notification())
chat.btc_offers_query = query
chat.save()
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'query': query}))
async def stop_all_btc_offers_notification(self):
for chat in self._find_chats_to_notify():
await self.stop_btc_offers_notification(chat)
async def stop_btc_offers_notification(self, chat: Chat):
if self._is_websocket_connected():
await self._websocket.send(json.dumps({'action': 'stop', 'chat_id': chat.id}))
chat.btc_offers_query = {}
chat.save(pull_exclude_fields=('btc_offers_query',))

View File

@@ -10,13 +10,12 @@ from typing import Iterable
from flanautils import Media, MediaType, Source
from multibot import MultiBot
import connect_4_frontend
from flanabot import constants
from flanabot import connect_4_frontend, constants
from flanabot.models import ButtonsGroup, Message, Player
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- CONNECT_4_BOT --------------------------------------------- #
# ------------------------------------------- CONNECT_4_BOT ------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class Connect4Bot(MultiBot, ABC):
# -------------------------------------------------------- #
@@ -25,11 +24,11 @@ class Connect4Bot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_connect_4, constants.KEYWORDS['connect_4'])
self.register(self._on_connect_4, keywords=constants.KEYWORDS['connect_4'])
self.register(self._on_connect_4_vs_itself, (*constants.KEYWORDS['connect_4'], *constants.KEYWORDS['self']))
self.register(self._on_connect_4_vs_itself, keywords=(*constants.KEYWORDS['connect_4'], *constants.KEYWORDS['self']))
self.register_button(self._on_connect_4_button_press, ButtonsGroup.CONNECT_4)
self.register_button(self._on_connect_4_button_press, key=ButtonsGroup.CONNECT_4)
def _ai_insert(
self,
@@ -304,105 +303,105 @@ class Connect4Bot(MultiBot, ABC):
@staticmethod
def _check_winner_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < j and board[i][j - 3] == board[i][j - 2] == board[i][j - 1]
or
1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i][j - 2] == board[i][j - 1] == board[i][j + 1]
)
and
board[i][j - 1] is not None
(
2 < j and board[i][j - 3] == board[i][j - 2] == board[i][j - 1]
or
1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i][j - 2] == board[i][j - 1] == board[i][j + 1]
)
and
board[i][j - 1] is not None
):
return board[i][j - 1]
@staticmethod
def _check_winner_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
j < constants.CONNECT_4_N_COLUMNS - 3 and board[i][j + 1] == board[i][j + 2] == board[i][j + 3]
or
0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i][j - 1] == board[i][j + 1] == board[i][j + 2]
)
and
board[i][j + 1] is not None
(
j < constants.CONNECT_4_N_COLUMNS - 3 and board[i][j + 1] == board[i][j + 2] == board[i][j + 3]
or
0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i][j - 1] == board[i][j + 1] == board[i][j + 2]
)
and
board[i][j + 1] is not None
):
return board[i][j + 1]
@staticmethod
def _check_winner_up(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and board[i - 3][j] == board[i - 2][j] == board[i - 1][j]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and board[i - 2][j] == board[i - 1][j] == board[i + 1][j]
)
and
board[i - 1][j] is not None
(
2 < i and board[i - 3][j] == board[i - 2][j] == board[i - 1][j]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and board[i - 2][j] == board[i - 1][j] == board[i + 1][j]
)
and
board[i - 1][j] is not None
):
return board[i - 1][j]
@staticmethod
def _check_winner_down(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and board[i + 1][j] == board[i + 2][j] == board[i + 3][j]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and board[i - 1][j] == board[i + 1][j] == board[i + 2][j]
)
and
board[i + 1][j] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and board[i + 1][j] == board[i + 2][j] == board[i + 3][j]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and board[i - 1][j] == board[i + 1][j] == board[i + 2][j]
)
and
board[i + 1][j] is not None
):
return board[i + 1][j]
@staticmethod
def _check_winner_up_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and 2 < j and board[i - 3][j - 3] == board[i - 2][j - 2] == board[i - 1][j - 1]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i - 2][j - 2] == board[i - 1][j - 1] == board[i + 1][j + 1]
(
2 < i and 2 < j and board[i - 3][j - 3] == board[i - 2][j - 2] == board[i - 1][j - 1]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i - 2][j - 2] == board[i - 1][j - 1] == board[i + 1][j + 1]
)
and
board[i - 1][j - 1] is not None
)
and
board[i - 1][j - 1] is not None
):
return board[i - 1][j - 1]
@staticmethod
def _check_winner_up_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i - 1][j + 1] == board[i - 2][j + 2] == board[i - 3][j + 3]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i + 1][j - 1] == board[i - 1][j + 1] == board[i - 2][j + 2]
)
and
board[i - 1][j + 1] is not None
(
2 < i and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i - 1][j + 1] == board[i - 2][j + 2] == board[i - 3][j + 3]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i + 1][j - 1] == board[i - 1][j + 1] == board[i - 2][j + 2]
)
and
board[i - 1][j + 1] is not None
):
return board[i - 1][j + 1]
@staticmethod
def _check_winner_down_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and 2 < j and board[i + 3][j - 3] == board[i + 2][j - 2] == board[i + 1][j - 1]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i + 2][j - 2] == board[i + 1][j - 1] == board[i - 1][j + 1]
)
and
board[i + 1][j - 1] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and 2 < j and board[i + 3][j - 3] == board[i + 2][j - 2] == board[i + 1][j - 1]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i + 2][j - 2] == board[i + 1][j - 1] == board[i - 1][j + 1]
)
and
board[i + 1][j - 1] is not None
):
return board[i + 1][j - 1]
@staticmethod
def _check_winner_down_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i + 1][j + 1] == board[i + 2][j + 2] == board[i + 3][j + 3]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i - 1][j - 1] == board[i + 1][j + 1] == board[i + 2][j + 2]
)
and
board[i + 1][j + 1] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i + 1][j + 1] == board[i + 2][j + 2] == board[i + 3][j + 3]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i - 1][j - 1] == board[i + 1][j + 1] == board[i + 2][j + 2]
)
and
board[i + 1][j + 1] is not None
):
return board[i + 1][j + 1]
@@ -530,14 +529,14 @@ class Connect4Bot(MultiBot, ABC):
if player_2.id == self.id:
connect_4_data['turn'] += 1
if await self._ai_turn(
player_1,
player_2,
next_player,
current_player,
connect_4_data['turn'],
constants.CONNECT_4_AI_DELAY_SECONDS,
board,
message
player_1,
player_2,
next_player,
current_player,
connect_4_data['turn'],
constants.CONNECT_4_AI_DELAY_SECONDS,
board,
message
):
return
@@ -564,14 +563,14 @@ class Connect4Bot(MultiBot, ABC):
while True:
turn += 1
if await self._ai_turn(
player_1,
player_2,
current_player,
next_player,
turn,
constants.CONNECT_4_AI_DELAY_SECONDS / 2,
board,
bot_message
player_1,
player_2,
current_player,
next_player,
turn,
constants.CONNECT_4_AI_DELAY_SECONDS / 2,
board,
bot_message
):
break
current_player, next_player = next_player, current_player

View File

@@ -12,13 +12,15 @@ import pymongo
import pytz
from flanaapis import InstagramLoginError, MediaNotFoundError, PlaceNotFoundError
from flanautils import return_if_first_empty
from multibot import BadRoleError, MultiBot, RegisteredCallback, Role, User, bot_mentioned, constants as multibot_constants, group, inline, owner
from multibot import BadRoleError, MessagesFormat, MultiBot, Platform, RegisteredCallback, Role, User, bot_mentioned, constants as multibot_constants, group, ignore_self_message, inline, owner
from flanabot import constants
from flanabot.bots.btc_offers_bot import BtcOffersBot
from flanabot.bots.connect_4_bot import Connect4Bot
from flanabot.bots.penalty_bot import PenaltyBot
from flanabot.bots.poll_bot import PollBot
from flanabot.bots.scraper_bot import ScraperBot
from flanabot.bots.steam_bot import SteamBot
from flanabot.bots.ubereats_bot import UberEatsBot
from flanabot.bots.weather_bot import WeatherBot
from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message
@@ -27,14 +29,14 @@ from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- FLANA_BOT --------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, WeatherBot, MultiBot, ABC):
class FlanaBot(Connect4Bot, BtcOffersBot, PenaltyBot, PollBot, ScraperBot, SteamBot, UberEatsBot, WeatherBot, MultiBot, ABC):
Chat = Chat
Message = Message
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tunnel_chat = None
self.help_calls = {}
self.tunnel_chat: Chat | None = None
self.help_calls: dict[int, datetime.timedelta] = {}
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -42,45 +44,51 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_activate_tunnel, (multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel']))
self.register(self._on_activate_tunnel, keywords=(multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel']))
self.register(self._on_bye, multibot_constants.KEYWORDS['bye'])
self.register(self._on_bye, keywords=multibot_constants.KEYWORDS['bye'])
self.register(self._on_config, multibot_constants.KEYWORDS['config'])
self.register(self._on_config, (multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
self.register(self._on_config, keywords=multibot_constants.KEYWORDS['config'])
self.register(self._on_config, keywords=(multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
self.register(self._on_database_messages, (multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message']))
self.register(lambda message: self._on_database_messages(message, simple=True), (multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['simple']))
self.register(self._on_database_messages, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.SIMPLE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['simple']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.COMPLETE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['all']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.COMPLETE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['text']))
self.register(self._on_deactivate_tunnel, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
self.register(self._on_deactivate_tunnel, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
self.register(self._on_delete, multibot_constants.KEYWORDS['delete'])
self.register(self._on_delete, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_delete, keywords=multibot_constants.KEYWORDS['delete'])
self.register(self._on_delete, keywords=(multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_hello, multibot_constants.KEYWORDS['hello'])
self.register(self._on_delete, extra_kwargs={'until': True}, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['until']))
self.register(self._on_delete, extra_kwargs={'until': True}, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['until'], multibot_constants.KEYWORDS['message']))
self.register(self._on_help, multibot_constants.KEYWORDS['help'])
self.register(self._on_hello, keywords=multibot_constants.KEYWORDS['hello'])
self.register(self._on_help, keywords=multibot_constants.KEYWORDS['help'])
self.register(self._on_new_message_default, default=True)
self.register(self._on_recover_message, multibot_constants.KEYWORDS['reset'])
self.register(self._on_recover_message, multibot_constants.KEYWORDS['message'])
self.register(self._on_recover_message, (multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
self.register(self._on_recover_message, keywords=multibot_constants.KEYWORDS['message'])
self.register(self._on_roles, multibot_constants.KEYWORDS['permission'])
self.register(self._on_roles, multibot_constants.KEYWORDS['role'])
self.register(self._on_roles, (multibot_constants.KEYWORDS['permission'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['change'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_reset, keywords=multibot_constants.KEYWORDS['reset'])
self.register(self._on_reset, keywords=(multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
self.register(self._on_roles, keywords=multibot_constants.KEYWORDS['permission'])
self.register(self._on_roles, keywords=multibot_constants.KEYWORDS['role'])
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['permission'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['change'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_tunnel_message, always=True)
self.register(self._on_users, multibot_constants.KEYWORDS['user'])
self.register(self._on_users, keywords=multibot_constants.KEYWORDS['user'])
self.register_button(self._on_config_button_press, ButtonsGroup.CONFIG)
self.register_button(self._on_roles_button_press, ButtonsGroup.ROLES)
self.register_button(self._on_users_button_press, ButtonsGroup.USERS)
self.register_button(self._on_config_button_press, key=ButtonsGroup.CONFIG)
self.register_button(self._on_roles_button_press, key=ButtonsGroup.ROLES)
self.register_button(self._on_users_button_press, key=ButtonsGroup.USERS)
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
return []
@@ -135,10 +143,13 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@owner
@group(False)
async def _on_activate_tunnel(self, message: Message):
keywords = (*multibot_constants.KEYWORDS['activate'], *constants.KEYWORDS['tunnel'])
text_parts = await self.filter_mention_ids(flanautils.remove_accents(message.text.lower()), message, delete_names=True)
try:
chat_id_or_name = next(part for part in flanautils.remove_accents(message.text.lower()).split() if not flanautils.cartesian_product_string_matching(part, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT))
chat_id_or_name = next(part for part in text_parts if not flanautils.cartesian_product_string_matching(part, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT))
except StopIteration:
return
@@ -151,6 +162,7 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
async def _on_bye(self, message: Message):
if message.chat.is_private or self.is_bot_mentioned(message):
message.is_inline = False
await self.send_bye(message)
async def _on_config(self, message: Message):
@@ -199,58 +211,134 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
await self.edit(message.buttons_info.buttons, message)
@owner
async def _on_database_messages(self, message: Message, simple=False):
@inline(False)
async def _on_database_messages(self, message: Message, format=MessagesFormat.NORMAL):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
if message.author.id != self.owner_id:
await self.send_negative(message)
return
n_messages = flanautils.text_to_number(message.text)
if not n_messages:
n_messages = 1
words = await self.filter_mention_ids(message.text, message, delete_names=True)
n_messages = 0
platforms = []
is_group = False
is_private = False
parsing_users = False
parsing_chats = False
users = []
chats = []
for word in words:
lower_word = word.lower()
await self.send(
self.get_formatted_last_database_messages(
n_messages,
timezone=pytz.timezone('Europe/Madrid'),
simple=simple
),
message
)
await self.delete_message(message)
if (
not parsing_users
and
flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['user'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
):
parsing_users = True
parsing_chats = False
elif (
not parsing_chats
and
flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['chat'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
):
parsing_users = False
parsing_chats = True
elif parsing_users:
users.append(flanautils.cast_number(word, raise_exception=False))
elif parsing_chats:
chats.append(flanautils.cast_number(word, raise_exception=False))
elif platform_matches := flanautils.cartesian_product_string_matching(
(element.name.lower() for element in Platform),
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
platforms.extend(Platform[key.upper()] for key in platform_matches)
elif flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['group'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
is_group = True
elif flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['private'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
is_private = True
else:
try:
n_messages += flanautils.cast_number(word)
except ValueError:
pass
if not is_group and not is_private:
is_group = True
is_private = True
if (
n_messages >= 0
and
(messages := await self.get_last_database_messages(
n_messages=max(1, n_messages),
platforms=platforms,
authors=[user for user in message.mentions if user.id != self.id] + users,
is_group=is_group,
is_private=is_private,
chats=chats
))
):
await self.send(
self.format_messages(messages, timezone=pytz.timezone('Europe/Madrid'), format=format),
message
)
await self.delete_message(message)
@owner
@group(False)
async def _on_deactivate_tunnel(self, message: Message):
self.tunnel_chat = None
await self.send('Túnel cerrado.', message)
@inline(False)
async def _on_delete(self, message: Message):
async def _on_delete(self, message: Message, until=False):
if message.replied_message:
if (
self.is_bot_mentioned(message)
and
(message.author.is_admin or message.replied_message.author.id == self.id)
):
if not self.is_bot_mentioned(message):
return
if until and message.author.is_admin:
await self.clear(message.chat, until_message=message.replied_message)
elif not until and (message.author.is_admin or message.replied_message.author.id in {self.id, message.author.id}):
flanautils.do_later(flanautils.text_to_time(message.text).total_seconds(), self.delete_message, message.replied_message)
await self.delete_message(message)
elif message.chat.is_group and self.is_bot_mentioned(message):
elif message.chat.is_group:
await self.send_negative(message)
elif (
(message.chat.is_private or self.is_bot_mentioned(message))
and
(n_messages := flanautils.text_to_number(message.text))
(n_messages := flanautils.text_to_number(' '.join(await self.filter_mention_ids(message.text, message))))
):
if message.author.is_admin is False:
await self.send_negative(message)
return
if n_messages <= 0:
await self.delete_message(message)
return
await self.clear(n_messages + 1, message.chat)
await self.clear(message.chat, n_messages + 1)
async def _on_hello(self, message: Message):
if message.chat.is_private or self.is_bot_mentioned(message):
message.is_inline = False
await self.send_hello(message)
async def _on_help(self, message: Message):
@@ -301,14 +389,6 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
and
message.author.id != self.owner_id
and
(
not message.replied_message
or
message.replied_message.author.id != self.id
or
not message.replied_message.medias
)
and
(
self.is_bot_mentioned(message)
or
@@ -316,33 +396,39 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
and
random.random() < constants.INSULT_PROBABILITY
)
and
(
not self.tunnel_chat
or
self.tunnel_chat != message.chat
)
):
await self.send_insult(message)
@ignore_self_message
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
if (
message.replied_message
and
message.replied_message.author.id == self.id
and
message.replied_message.medias
):
whitelist_callbacks = (whitelist_callbacks or set()) | {
self._on_delete,
self._on_recover_message,
self._on_reset,
self._on_song_info
}
elif self.tunnel_chat and message.chat == await self.owner_chat:
whitelist_callbacks = (whitelist_callbacks or set()) | {self._on_deactivate_tunnel, self._on_tunnel_message}
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
async def _on_ready(self):
if not self._is_initialized:
flanautils.do_every(multibot_constants.CHECK_OLD_DATABASE_MESSAGES_EVERY_SECONDS, self.check_old_database_actions)
await super()._on_ready()
flanautils.do_every(multibot_constants.CHECK_OLD_DATABASE_MESSAGES_EVERY_SECONDS, self.check_old_database_actions)
for chat in Chat.find({
'platform': self.platform.value,
'config.ubereats': {"$exists": True, "$eq": True},
'ubereats.cookies': {"$exists": True, "$ne": []}
}):
chat = await self.get_chat(chat.id)
chat.pull_from_database(overwrite_fields=('_id', 'config', 'ubereats'))
if (
chat.ubereats['next_execution']
and
(delta_time := chat.ubereats['next_execution'] - datetime.datetime.now(datetime.timezone.utc)) > datetime.timedelta()
):
flanautils.do_later(delta_time, self.start_ubereats, chat)
else:
await self.start_ubereats(chat)
@inline(False)
async def _on_recover_message(self, message: Message):
@@ -353,7 +439,7 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
'chat': message.chat.object_id,
'affected_objects': message.replied_message.object_id
})
elif self.is_bot_mentioned(message):
elif message.chat.is_private or self.is_bot_mentioned(message):
message_deleted_bot_action = BotAction.find_one({
'platform': self.platform.value,
'action': Action.MESSAGE_DELETED.value,
@@ -375,6 +461,12 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
for deleted_message in deleted_messages:
await self.send(deleted_message.text, message)
async def _on_reset(self, message: Message):
if self._get_poll_message(message):
await self._on_delete_votes(message, all_=True)
else:
await self._on_recover_message(message)
@group
@bot_mentioned
async def _on_roles(self, message: Message):
@@ -418,8 +510,8 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
self._parse_callbacks(
message.text,
[
RegisteredCallback(..., (multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel'])),
RegisteredCallback(..., (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
RegisteredCallback(..., keywords=(multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel'])),
RegisteredCallback(..., keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
]
)
):
@@ -427,7 +519,7 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
if message.chat == self.tunnel_chat:
await self.send(f"<b>{message.author.name.split('#')[0]}:</b> {message.text}", await self.owner_chat)
elif message.author.id == self.owner_id and message.chat.is_private:
elif message.chat == await self.owner_chat:
if message.text:
await self.send(message.text, self.tunnel_chat)
else:
@@ -485,12 +577,16 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, UberEatsBot, Weathe
BotAction.delete_many_raw({'platform': self.platform.value, 'date': {'$lte': before_date}})
async def send_bye(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE:
chat = await self.get_chat(chat)
return await self.send(random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())), chat)
return await self.send(
random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())),
chat
)
async def send_hello(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE:
chat = await self.get_chat(chat)
return await self.send(random.choice((*constants.HELLO_PHRASES, flanautils.CommonWords.random_time_greeting())), chat)
return await self.send(
random.choice((*constants.HELLO_PHRASES, flanautils.CommonWords.random_time_greeting())),
chat
)
async def send_insult(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE | None:
chat = await self.get_chat(chat)

View File

@@ -5,16 +5,21 @@ import datetime
import math
import os
import random
import urllib.parse
import uuid
from collections import defaultdict
from pathlib import Path
import aiohttp
import discord
import flanautils
import pytz
from flanautils import Media, NotFoundError, OrderedSet
from multibot import BadRoleError, DiscordBot, Platform, Role, User, bot_mentioned, constants as multibot_constants, group
from flanautils import Media, MediaType, NotFoundError, OrderedSet
from multibot import BadRoleError, DiscordBot, LimitError, Platform, Role, User, admin, bot_mentioned, constants as multibot_constants, group
from flanabot import constants
from flanabot.bots.flana_bot import FlanaBot
from flanabot.models import Chat, Message, Punishment
from flanabot.models.heating_context import ChannelData, HeatingContext
# ---------------------------------------------------------------------------------------------------- #
@@ -23,8 +28,8 @@ from flanabot.models import Chat, Message, Punishment
class FlanaDiscBot(DiscordBot, FlanaBot):
def __init__(self):
super().__init__(os.environ['DISCORD_BOT_TOKEN'])
self.heating = False
self.heat_level = 0.0
self.heating_contexts: dict[int, HeatingContext] = defaultdict(HeatingContext)
self._flanaserver_api_base_url = f"http://{os.environ['FLANASERVER_API_HOST']}:{os.environ['FLANASERVER_API_PORT']}"
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -35,59 +40,66 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
self.client.add_listener(self._on_member_remove, 'on_member_remove')
self.client.add_listener(self._on_voice_state_update, 'on_voice_state_update')
self.register(self._on_audit_log, multibot_constants.KEYWORDS['audit'])
self.register(self._on_audit_log, keywords=multibot_constants.KEYWORDS['audit'])
self.register(self._on_restore_channel_names, keywords=(multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['chat']))
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
group_roles = await self.get_group_roles(group_)
group_id = self.get_group_id(group_)
r = await self.get_group_roles(group_)
return [role for role in r if role.id in constants.CHANGEABLE_ROLES[Platform.DISCORD][group_id]]
return [role for role in group_roles if role.id in constants.CHANGEABLE_ROLES[Platform.DISCORD][group_id]]
async def _heat_channel(self, channel: discord.VoiceChannel):
async def set_fire_to(channel_key: str, depends_on: str, firewall=0):
fire_score = random.randint(0, channels[depends_on]['n_fires'] - channels[channel_key]['n_fires']) - firewall // 2
fire_score = random.randint(0, channels_data[depends_on].n_fires - channels_data[channel_key].n_fires) - firewall // 2
if fire_score < 1:
if not channels[channel_key]['n_fires']:
if not channels_data[channel_key].n_fires:
return
channels[channel_key]['n_fires'] -= 1
channels_data[channel_key].n_fires -= 1
elif fire_score == 1:
return
else:
channels[channel_key]['n_fires'] += 1
channels_data[channel_key].n_fires += 1
if channels[channel_key]['n_fires']:
new_name_ = '🔥' * channels[channel_key]['n_fires']
if channels_data[channel_key].n_fires:
new_name_ = '🔥' * channels_data[channel_key].n_fires
else:
new_name_ = channels[channel_key]['original_name']
await channels[channel_key]['object'].edit(name=new_name_)
new_name_ = channels_data[channel_key].original_name
await channels_data[channel_key].channel.edit(name=new_name_)
channels = {}
voice_channels = {}
for voice_channel in channel.guild.voice_channels:
voice_channels[voice_channel.id] = voice_channel
channels_data = {}
for letter, channel_id in constants.DISCORD_HOT_CHANNEL_IDS.items():
channel_ = flanautils.find(channel.guild.voice_channels, condition=lambda c: c.id == channel_id)
channels[letter] = {
'object': channel_,
'original_name': channel_.name,
'n_fires': 0
}
channels_data[letter] = ChannelData(
channel=voice_channels[channel_id],
original_name=voice_channels[channel_id].name
)
heating_context = self.heating_contexts[channel.guild.id]
heating_context.channels_data = channels_data
while True:
await asyncio.sleep(constants.HEAT_PERIOD_SECONDS)
if channel.members:
self.heat_level += 0.5
heating_context.heat_level += 0.5
else:
if not self.heat_level:
if heating_context.heat_level == constants.HEAT_FIRST_LEVEL:
return
self.heat_level -= 0.5
if self.heat_level > len(constants.DISCORD_HEAT_NAMES) - 1:
self.heat_level = float(int(self.heat_level))
if not self.heat_level.is_integer():
heating_context.heat_level -= 0.5
if heating_context.heat_level > len(constants.DISCORD_HEAT_NAMES) - 1:
heating_context.heat_level = float(int(heating_context.heat_level))
if not heating_context.heat_level.is_integer():
continue
i = int(self.heat_level)
if not i:
i = int(heating_context.heat_level)
if i == constants.HEAT_FIRST_LEVEL:
n_fires = 0
new_name = channels['C']['original_name']
new_name = channels_data['C'].original_name
elif i < len(constants.DISCORD_HEAT_NAMES):
n_fires = 0
new_name = constants.DISCORD_HEAT_NAMES[i]
@@ -95,14 +107,14 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
n_fires = i - len(constants.DISCORD_HEAT_NAMES) + 1
n_fires = round(math.log(n_fires + 4, 1.2) - 8)
new_name = '🔥' * n_fires
channels['C']['n_fires'] = n_fires
channels_data['C'].n_fires = n_fires
if channel.name != new_name:
await channel.edit(name=new_name)
await set_fire_to('B', depends_on='C', firewall=len(channels['B']['object'].members))
await set_fire_to('A', depends_on='B', firewall=len(channels['A']['object'].members))
await set_fire_to('D', depends_on='C', firewall=len(channels['C']['object'].members))
await set_fire_to('E', depends_on='D', firewall=len(channels['D']['object'].members))
await set_fire_to('B', depends_on='C', firewall=len(channels_data['B'].channel.members))
await set_fire_to('A', depends_on='B', firewall=len(channels_data['A'].channel.members))
await set_fire_to('D', depends_on='C', firewall=len(channels_data['C'].channel.members))
await set_fire_to('E', depends_on='D', firewall=len(channels_data['D'].channel.members))
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
@@ -121,6 +133,44 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
) -> OrderedSet[Media]:
return await super()._search_medias(message, force, audio_only, timeout_for_media)
async def _send_media(self, media: Media, bot_state_message: Message, message: Message) -> Message | None:
# noinspection PyBroadException
try:
return await self.send(media, message, reply_to=message.replied_message, raise_exceptions=True)
except LimitError:
if bot_state_message:
await self.edit('No cabe porque Discord es una mierda. Subiendo a FlanaServer...', bot_state_message)
async with aiohttp.ClientSession() as session:
form = aiohttp.FormData()
file_name = urllib.parse.unquote(media.title or Path(media.url).name or uuid.uuid4().hex)
if media.extension and not file_name.endswith(media.extension):
file_name = f'{file_name}.{media.extension}'
match media.type_:
case MediaType.AUDIO:
content_type = f"audio/{'mpeg' if media.extension == 'mp3' else media.extension}"
case MediaType.GIF:
content_type = 'image/gif'
case MediaType.IMAGE:
content_type = f'image/{media.extension}'
case MediaType.VIDEO:
content_type = f'video/{media.extension}'
form.add_field('file', media.bytes_, content_type=content_type, filename=file_name)
form.add_field('expires_in', str(constants.FLANASERVER_FILE_EXPIRATION_SECONDS))
async with session.post(f'{self._flanaserver_api_base_url}/files', data=form) as response:
if response.status != 201:
return
file_info = await response.json()
return await self.send(f"{constants.FLANASERVER_BASE_URL}{file_info['embed_url']}", message)
except Exception:
pass
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
try:
@@ -169,6 +219,13 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
async def _on_member_remove(self, member: discord.Member):
(await self._create_user_from_discord_user(member)).save()
@group
@bot_mentioned
@admin(send_negative=True)
async def _on_restore_channel_names(self, message: Message):
await self.delete_message(message)
await self.restore_channel_names(self.get_group_id(message))
async def _on_voice_state_update(self, _: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if getattr(before.channel, 'id', None) == constants.DISCORD_HOT_CHANNEL_IDS['C']:
channel = before.channel
@@ -177,10 +234,11 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
else:
return
if not self.heating:
self.heating = True
heating_context = self.heating_contexts[channel.guild.id]
if not heating_context.is_active:
heating_context.is_active = True
await self._heat_channel(channel)
self.heating = False
heating_context.is_active = False
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
@@ -195,3 +253,13 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
'group_id': group_id,
'is_active': True
}))
async def restore_channel_names(self, group_id: int):
heating_context = self.heating_contexts[group_id]
for channel_data in heating_context.channels_data.values():
if channel_data.channel.name != channel_data.original_name:
await channel_data.channel.edit(name=channel_data.original_name)
channel_data.n_fires = 0
heating_context.heat_level = constants.HEAT_FIRST_LEVEL

View File

@@ -4,11 +4,11 @@ __all__ = ['whitelisted', 'FlanaTeleBot']
import functools
import os
from typing import Callable
from typing import Any, Callable
import telethon.tl.functions
from flanautils import Media, OrderedSet
from multibot import TelegramBot, find_message, user_client
from multibot import RegisteredCallback, TelegramBot, find_message, use_user_client, user_client
from flanabot import constants
from flanabot.bots.flana_bot import FlanaBot
@@ -21,7 +21,7 @@ from flanabot.models import Message
def whitelisted(func: Callable) -> Callable:
@functools.wraps(func)
@find_message
async def wrapper(self: FlanaTeleBot, message: Message, *args, **kwargs):
async def wrapper(self: FlanaTeleBot, message: Message, *args, **kwargs) -> Any:
if message.author.id not in self.whitelist_ids:
return
@@ -48,7 +48,7 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
# -------------------------------------------------------- #
@user_client
async def _get_contacts_ids(self) -> list[int]:
async with self.user_client:
async with use_user_client(self):
contacts_data = await self.user_client(telethon.tl.functions.contacts.GetContactsRequest(hash=0))
return [contact.user_id for contact in contacts_data.contacts]
@@ -74,8 +74,13 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
await super()._on_inline_query_raw(message)
@whitelisted
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
async def _on_ready(self):
await super()._on_ready()

View File

@@ -6,15 +6,15 @@ from abc import ABC
import flanautils
from flanautils import TimeUnits
from multibot import MultiBot, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
from multibot import MultiBot, RegisteredCallback, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
from flanabot import constants
from flanabot.models import Chat, Message, Punishment
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- PENALTY_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- PENALTY_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class PenaltyBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -26,26 +26,26 @@ class PenaltyBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_ban, multibot_constants.KEYWORDS['ban'])
self.register(self._on_ban, keywords=multibot_constants.KEYWORDS['ban'])
self.register(self._on_mute, multibot_constants.KEYWORDS['mute'])
self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_mute, keywords=multibot_constants.KEYWORDS['mute'])
self.register(self._on_mute, keywords=(('haz', 'se'), multibot_constants.KEYWORDS['mute']))
self.register(self._on_mute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
self.register(self._on_mute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_punish, constants.KEYWORDS['punish'])
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_punish, keywords=constants.KEYWORDS['punish'])
self.register(self._on_punish, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
self.register(self._on_punish, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_unban, multibot_constants.KEYWORDS['unban'])
self.register(self._on_unban, keywords=multibot_constants.KEYWORDS['unban'])
self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute'])
self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_unmute, keywords=multibot_constants.KEYWORDS['unmute'])
self.register(self._on_unmute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
self.register(self._on_unmute, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_unpunish, constants.KEYWORDS['unpunish'])
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_unpunish, keywords=constants.KEYWORDS['unpunish'])
self.register(self._on_unpunish, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
self.register(self._on_unpunish, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
@admin(False)
@group
@@ -73,9 +73,68 @@ class PenaltyBot(MultiBot, ABC):
'group_id': message.chat.group_id
})
punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message)
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message, flood=True)
await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message)
@admin(False)
@group
async def _check_message_spam(self, message: Message) -> bool:
if await self.is_punished(message.author, message.chat):
return True
spam_messages = self.Message.find({
'text': message.text,
'platform': self.platform.value,
'author': message.author.object_id,
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - constants.SPAM_TIME_RANGE},
})
chats = {message.chat for message in spam_messages}
if len(chats) <= constants.SPAM_CHANNELS_LIMIT:
return False
await self.punish(message.author.id, message.chat.group_id)
await asyncio.sleep(constants.SPAM_DELETION_DELAY.total_seconds()) # We make sure to also delete any messages they may have sent before the punishment
spam_messages = self.Message.find({
'text': message.text,
'platform': self.platform.value,
'author': message.author.object_id,
'date': {
'$gte': datetime.datetime.now(datetime.timezone.utc)
-
constants.SPAM_TIME_RANGE
-
constants.SPAM_DELETION_DELAY
},
'is_deleted': False
})
chats = {message.chat for message in spam_messages}
for message in spam_messages:
await self.delete_message(await self.get_message(message.id, message.chat.id))
groups_data = {chat.group_id: chat.group_name for chat in chats}
owner_message_parts = (
'<b>Spammer castigado:</b>',
'<b>User:</b>',
f' <b>id:</b> <code>{message.author.id}</code>',
f' <b>name:</b> <code>{message.author.name}</code>',
f' <b>is_admin:<b> <code>{message.author.is_admin}</code>',
f' <b>is_bot:</b> <code>{message.author.is_bot}</code>',
'',
f'<b>Chats: {len(chats)}</b>',
'',
'<b>Groups:</b>',
'\n\n'.join(
f' <b>group_id:</b> <code>{group_id}</code>\n'
f' <b>group_name:</b> <code>{group_name}</code>'
for group_id, group_name in groups_data.items()
)
)
await self.send('\n'.join(owner_message_parts), await self.owner_chat)
return True
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
pass
@@ -100,11 +159,17 @@ class PenaltyBot(MultiBot, ABC):
await self.mute(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
@ignore_self_message
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline:
async with self.lock:
await self._check_message_flood(message)
if not await self._check_message_spam(message):
await self._check_message_flood(message)
@bot_mentioned
@group
@@ -117,8 +182,10 @@ class PenaltyBot(MultiBot, ABC):
await self.punish(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
async def _on_ready(self):
if not self._is_initialized:
flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
await super()._on_ready()
flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
@bot_mentioned
@group
@@ -155,19 +222,11 @@ class PenaltyBot(MultiBot, ABC):
if not punishment.until or now < punishment.until:
continue
await self._remove_penalty(punishment, self._unpunish, delete=False)
if punishment.is_active:
punishment.is_active = False
punishment.last_update = now
punishment.save()
await self._remove_penalty(punishment, self._unpunish)
if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now:
if punishment.level == 1:
punishment.delete()
else:
punishment.level -= 1
punishment.last_update = now
punishment.save()
punishment.level -= 1
punishment.delete()
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
pass
@@ -177,12 +236,14 @@ class PenaltyBot(MultiBot, ABC):
user: int | str | User,
group_: int | str | Chat | Message,
time: int | datetime.timedelta = None,
message: Message = None
message: Message = None,
flood=False
):
# noinspection PyTypeChecker
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time)
punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',))
punishment.level += 1
if flood:
punishment.level += 1
await self._punish(punishment.user_id, punishment.group_id)
punishment.save(pull_exclude_fields=('until',))

View File

@@ -8,7 +8,7 @@ from typing import Iterable
import flanautils
from flanautils import OrderedSet
from multibot import MultiBot, admin, constants as multibot_constants
from multibot import MultiBot, RegisteredCallback, constants as multibot_constants
from flanabot import constants
from flanabot.models import ButtonsGroup, Message
@@ -24,32 +24,32 @@ class PollBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2)
self.register(self._on_choose, constants.KEYWORDS['random'], priority=2)
self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2)
self.register(self._on_choose, keywords=constants.KEYWORDS['choose'], priority=2)
self.register(self._on_choose, keywords=multibot_constants.KEYWORDS['random'], priority=2)
self.register(self._on_choose, keywords=(constants.KEYWORDS['choose'], multibot_constants.KEYWORDS['random']), priority=2)
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, extra_kwargs={'all_': True}, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, extra_kwargs={'all_': True}, keywords=(multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
self.register(self._on_dice, constants.KEYWORDS['dice'])
self.register(self._on_dice, keywords=constants.KEYWORDS['dice'])
self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2)
self.register(self._on_poll, keywords=constants.KEYWORDS['poll'], priority=2)
self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
self.register(self._on_poll_multi, keywords=(constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, keywords=multibot_constants.KEYWORDS['deactivate'])
self.register(self._on_stop_poll, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, keywords=multibot_constants.KEYWORDS['stop'])
self.register(self._on_stop_poll, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_ban, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_unban, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register_button(self._on_poll_button_press, ButtonsGroup.POLL)
self.register_button(self._on_poll_button_press, key=ButtonsGroup.POLL)
@staticmethod
def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]:
@@ -82,7 +82,7 @@ class PollBot(MultiBot, ABC):
for option, option_votes in poll_data['votes'].items():
ratio = f'{len(option_votes)}/{total_votes}'
names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else ''
buttons.append(f'{option}{ratio} {names}')
buttons.append(f"{option}{ratio}{f' {names}' if names else ''}")
else:
buttons = list(poll_data['votes'].keys())
@@ -97,7 +97,7 @@ class PollBot(MultiBot, ABC):
discarded_words = {
*constants.KEYWORDS['choose'],
*constants.KEYWORDS['random'],
*multibot_constants.KEYWORDS['random'],
self.name.lower(), f'<@{self.id}>',
'entre', 'between'
}
@@ -125,23 +125,22 @@ class PollBot(MultiBot, ABC):
else:
await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message)
async def _on_delete_all_votes(self, message: Message):
await self._on_delete_votes(message, all_=True)
@admin(send_negative=True)
async def _on_delete_votes(self, message: Message, all_=False):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
poll_data = poll_message.data['poll']
if all_:
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name].clear()
for option_votes in poll_data['votes'].values():
option_votes.clear()
else:
for user in await self._find_users_to_punish(message):
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id]
user_ids = [user.id for user in await self._find_users_to_punish(message)]
for option_votes in poll_data['votes'].values():
option_votes[:] = [option_vote for option_vote in option_votes if option_vote[0] not in user_ids]
await self.delete_message(message)
await self._update_poll_buttons(poll_message)
@@ -156,21 +155,30 @@ class PollBot(MultiBot, ABC):
await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message)
async def _on_poll(self, message: Message, is_multiple_answer=False):
if (
self._get_poll_message(message)
and
self._parse_callbacks(message.text, [RegisteredCallback(..., keywords=multibot_constants.KEYWORDS['reset'])])
):
await self._on_delete_votes(message, all_=True)
return
if message.chat.is_group and not self.is_bot_mentioned(message):
return
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['vote'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]:
buttons = self.distribute_buttons(final_options, vertically=True)
await self.send(
f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...",
self.distribute_buttons(final_options, vertically=True),
buttons,
message,
buttons_key=ButtonsGroup.POLL,
data={
'poll': {
'is_active': True,
'is_multiple_answer': is_multiple_answer,
'votes': {option: [] for option in final_options},
'votes': {option: [] for option in (flanautils.flatten(buttons, lazy=True))},
'banned_users_tries': {}
}
}
@@ -185,7 +193,7 @@ class PollBot(MultiBot, ABC):
poll_data = message.data['poll']
if not poll_data['is_active']:
if not poll_data['is_active'] or not message.buttons_info.pressed_button or not message.buttons_info.presser_user:
return
presser_id = message.buttons_info.presser_user.id
@@ -193,14 +201,10 @@ class PollBot(MultiBot, ABC):
if (presser_id_str := str(presser_id)) in poll_data['banned_users_tries']:
poll_data['banned_users_tries'][presser_id_str] += 1
if poll_data['banned_users_tries'][presser_id_str] == 3:
await self.send(random.choice((
f'Deja de dar por culo {presser_name} que no puedes votar aqui',
f'No es pesao {presser_name}, que no tienes permitido votar aqui',
f'Deja de pulsar botones que no puedes votar aqui {presser_name}',
f'{presser_name} deja de intentar votar aqui que no puedes',
f'Te han prohibido votar aquí {presser_name}.',
f'No puedes votar aquí, {presser_name}.'
)), reply_to=message)
await self.send(
random.choice(constants.BANNED_POLL_PHRASES.format(presser_name=presser_name)),
reply_to=message
)
return
option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text
@@ -250,9 +254,11 @@ class PollBot(MultiBot, ABC):
await self.edit(text, poll_message)
@admin(send_negative=True)
async def _on_voting_ban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
await self.delete_message(message)
@@ -261,9 +267,11 @@ class PollBot(MultiBot, ABC):
if str(user.id) not in poll_message.data['poll']['banned_users_tries']:
poll_message.data['poll']['banned_users_tries'][str(user.id)] = 0
@admin(send_negative=True)
async def _on_voting_unban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
await self.delete_message(message)

View File

@@ -1,54 +1,44 @@
__all__ = ['ScraperBot']
import asyncio
import datetime
import random
from abc import ABC
from collections import defaultdict
from typing import Iterable
import flanautils
from flanaapis import InstagramMediaNotFoundError, RedditMediaNotFoundError, instagram, reddit, tiktok, twitter, yt_dlp_wrapper
from flanaapis import RedditMediaNotFoundError, reddit, tiktok, yt_dlp_wrapper
from flanautils import Media, MediaType, OrderedSet, return_if_first_empty
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, owner, reply
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
from flanabot import constants
from flanabot.models import Action, BotAction, Message
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- SCRAPER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- SCRAPER_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class ScraperBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.instagram_ban_date = None
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_no_scraping, (multibot_constants.KEYWORDS['negate'], constants.KEYWORDS['scraping']))
self.register(self._on_no_scraping, keywords=(multibot_constants.KEYWORDS['negate'], constants.KEYWORDS['scraping']))
self.register(self._on_reset_instagram_ban, (multibot_constants.KEYWORDS['delete'], 'instagram'))
self.register(self._on_reset_instagram_ban, (multibot_constants.KEYWORDS['reset'], 'instagram'))
self.register(self._on_scraping, keywords=constants.KEYWORDS['scraping'])
self.register(self._on_scraping, keywords=constants.KEYWORDS['force'])
self.register(self._on_scraping, keywords=multibot_constants.KEYWORDS['audio'])
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
self.register(self._on_scraping, constants.KEYWORDS['force'])
self.register(self._on_scraping, multibot_constants.KEYWORDS['audio'])
self.register(lambda message: self._on_scraping(message, delete=False), (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
self.register(lambda message: self._on_scraping(message, delete=False), (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
self.register(lambda message: self._on_scraping(message, delete=False), (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
self.register(self._on_song_info, keywords=constants.KEYWORDS['song_info'])
@staticmethod
async def _find_ids(text: str) -> tuple[OrderedSet[str], ...]:
return (
twitter.find_ids(text),
instagram.find_ids(text),
reddit.find_ids(text),
await tiktok.find_users_and_ids(text),
tiktok.find_download_urls(text)
@@ -174,6 +164,13 @@ class ScraperBot(MultiBot, ABC):
medias = OrderedSet()
exceptions: list[Exception] = []
if audio_only:
preferred_video_codec = None
preferred_extension = None
else:
preferred_video_codec = 'h264'
preferred_extension = 'mp4'
ids = []
media_urls = []
for text_part in message.text.split():
@@ -182,30 +179,40 @@ class ScraperBot(MultiBot, ABC):
ids[i] |= platform_ids
except IndexError:
ids.append(platform_ids)
if not any(ids) and flanautils.find_urls(text_part):
if force:
media_urls.append(text_part)
elif not any(domain.lower() in text_part for domain in multibot_constants.GIF_DOMAINS):
media_urls.append(text_part)
if (
not any(ids)
and
flanautils.find_urls(text_part)
and
(
force
or
not any(domain.lower() in text_part for domain in multibot_constants.GIF_DOMAINS)
)
):
media_urls.append(text_part)
if not any(ids) and not media_urls:
return medias
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
tweet_ids, instagram_ids, reddit_ids, tiktok_users_and_ids, tiktok_download_urls = ids
reddit_ids, tiktok_users_and_ids, tiktok_download_urls = ids
try:
reddit_medias = await reddit.get_medias(reddit_ids, 'h264', 'mp4', force, audio_only, timeout_for_media)
reddit_medias = await reddit.get_medias(reddit_ids, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media)
except RedditMediaNotFoundError as e:
exceptions.append(e)
reddit_medias = ()
reddit_urls = []
for reddit_media in reddit_medias:
if reddit_media.source:
medias.add(reddit_media)
else:
reddit_urls.append(reddit_media.url)
if force:
media_urls.extend(reddit_urls)
else:
@@ -217,40 +224,22 @@ class ScraperBot(MultiBot, ABC):
else:
media_urls.append(reddit_url)
gather_future = asyncio.gather(
twitter.get_medias(tweet_ids, audio_only),
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
yt_dlp_wrapper.get_medias(media_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
gather_results = await asyncio.gather(
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media),
yt_dlp_wrapper.get_medias(media_urls, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media),
return_exceptions=True
)
instagram_results = []
if instagram_ids:
can_instagram_v1 = not self.instagram_ban_date or datetime.datetime.now(datetime.timezone.utc) - self.instagram_ban_date >= constants.INSTAGRAM_BAN_SLEEP
if can_instagram_v1:
try:
instagram_results = await instagram.get_medias(instagram_ids, audio_only)
except InstagramMediaNotFoundError:
pass
if not instagram_results:
try:
instagram_results = await instagram.get_medias_v2(instagram_ids, audio_only)
except InstagramMediaNotFoundError as e:
if not (instagram_results := await yt_dlp_wrapper.get_medias(instagram.make_urls(instagram_ids), 'h264', 'mp4', force, audio_only, timeout_for_media)):
exceptions.append(e)
if instagram_results and can_instagram_v1:
self.instagram_ban_date = datetime.datetime.now(datetime.timezone.utc)
await self.send('Límite de Instagram excedido.', await self.owner_chat)
gather_results = await gather_future
await self.delete_message(bot_state_message)
gather_medias, gather_exceptions = flanautils.filter_exceptions(gather_results + instagram_results)
gather_medias, gather_exceptions = flanautils.filter_exceptions(gather_results)
await self._manage_exceptions(exceptions + gather_exceptions, message, print_traceback=True)
return medias | gather_medias
async def _send_media(self, media: Media, bot_state_message: Message, message: Message) -> Message | None:
return await self.send(media, message, reply_to=message.replied_message)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@@ -260,16 +249,6 @@ class ScraperBot(MultiBot, ABC):
async def _on_recover_message(self, message: Message):
pass
@owner
async def _on_reset_instagram_ban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
self.instagram_ban_date = None
bot_message = await self.send('Ban de Instagram reseteado.', message)
await self.delete_message(message)
flanautils.do_later(multibot_constants.COMMAND_MESSAGE_DURATION, self.delete_message, bot_message)
async def _on_scraping(
self,
message: Message,
@@ -297,8 +276,8 @@ class ScraperBot(MultiBot, ABC):
self._parse_callbacks(
message.text,
[
RegisteredCallback(..., (('sin',), ('timeout', 'limite'))),
RegisteredCallback(..., multibot_constants.KEYWORDS['all'])
RegisteredCallback(..., keywords=(('sin',), ('timeout', 'limite'))),
RegisteredCallback(..., keywords=multibot_constants.KEYWORDS['all'])
]
)
)
@@ -377,20 +356,20 @@ class ScraperBot(MultiBot, ABC):
if message.chat.is_group:
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message, reply_to=message.replied_message)
if (
send_user_context
and
(user_text := ' '.join(
[word for word in message.text.split()
if (
not any(await self._find_ids(word))
and
not flanautils.find_urls(word)
and
not flanautils.cartesian_product_string_matching(word, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT)
and
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
)]
))
send_user_context
and
(user_text := ' '.join(
[word for word in message.text.split()
if (
not any(await self._find_ids(word))
and
not flanautils.find_urls(word)
and
not flanautils.cartesian_product_string_matching(word, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT)
and
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
)]
))
):
user_text_bot_message = await self.send(user_text, message, reply_to=message.replied_message)
@@ -403,7 +382,7 @@ class ScraperBot(MultiBot, ABC):
message.song_infos.add(media.song_info)
message.save()
if bot_message := await self.send(media, message, reply_to=message.replied_message):
if bot_message := await self._send_media(media, bot_state_message, message):
sended_media_messages.append(bot_message)
if media.song_info and bot_message:
bot_message.song_infos.add(media.song_info)

368
flanabot/bots/steam_bot.py Normal file
View File

@@ -0,0 +1,368 @@
__all__ = ['SteamBot']
import asyncio
import base64
import os
import random
import re
import urllib.parse
from abc import ABC
from collections import defaultdict
from collections.abc import Awaitable, Callable, Iterable
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
import aiohttp
import flanautils
import playwright.async_api
import plotly
from flanautils import Media, MediaType, Source
from multibot import LimitError, MultiBot, RegisteredCallback, constants as multibot_constants
from flanabot import constants
from flanabot.models import Message, SteamRegion
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- STEAM_BOT --------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class SteamBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_steam, keywords='steam', priority=3)
async def _add_app_price(
self,
app_prices: dict[str, dict[str, float]],
steam_region: SteamRegion,
app_ids: Iterable[str],
session: aiohttp.ClientSession
) -> None:
for ids_batch_batch in flanautils.chunks(
flanautils.chunks(list(app_ids), constants.STEAM_IDS_BATCH),
constants.STEAM_MAX_CONCURRENT_REQUESTS
):
gather_results = await asyncio.gather(
*(self._get_app_data(session, ids_batch, steam_region.code) for ids_batch in ids_batch_batch)
)
for gather_result in gather_results:
for app_id, app_data in gather_result.items():
if (
(data := app_data.get('data'))
and
(price := data.get('price_overview', {}).get('final')) is not None
):
app_prices[app_id][steam_region.code] = price / 100 / steam_region.eur_conversion_rate
@staticmethod
@asynccontextmanager
async def _create_browser_context(
browser: playwright.async_api.Browser
) -> AsyncIterator[playwright.async_api.BrowserContext]:
async with await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36',
screen={
'width': 1920,
'height': 1080
},
viewport={
'width': 1920,
'height': 945
},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
default_browser_type='chromium',
locale='es-ES'
) as context:
yield context
@staticmethod
async def _get_app_data(
session: aiohttp.ClientSession,
ids_batch: Iterable[str],
country_code: str
) -> dict[str, Any]:
async with session.get(
constants.STEAM_APP_ENDPOINT_TEMPLATE.format(ids=','.join(ids_batch), country_code=country_code)
) as response:
if response.status != 200:
raise LimitError('🚫 Steam ban: espera 5 minutos antes de consultar de nuevo.')
return await response.json()
async def _get_most_apps_ids(self, browser: playwright.async_api.Browser) -> set[str]:
app_ids = set()
re_pattern = fr'{urllib.parse.urlparse(constants.STEAM_MOST_URLS[0]).netloc}/\w+/(\d+)'
async with self._create_browser_context(browser) as context:
context.set_default_timeout(10000)
page = await context.new_page()
for url in constants.STEAM_MOST_URLS:
await page.goto(url)
try:
await page.wait_for_selector('tr td a[href]')
except playwright.async_api.TimeoutError:
raise LimitError('🚫 Steam ban: espera 5 minutos antes de consultar de nuevo.')
locator = page.locator('tr td a[href]')
for i in range(await locator.count()):
href = await locator.nth(i).get_attribute('href')
app_ids.add(re.search(re_pattern, href).group(1))
return app_ids
@staticmethod
async def _insert_conversion_rates(
session: aiohttp.ClientSession,
steam_regions: list[SteamRegion]
) -> list[SteamRegion]:
async with session.get(
constants.STEAM_EXCHANGERATE_API_ENDPOINT.format(api_key=os.environ['EXCHANGERATE_API_KEY'])
) as response:
exchange_data = await response.json()
for steam_region in steam_regions:
alpha_3_code = constants.STEAM_REGION_CODE_MAPPING[steam_region.code]
steam_region.eur_conversion_rate = exchange_data['conversion_rates'][alpha_3_code]
return steam_regions
async def _scrape_steam_data(
self,
update_state: Callable[[str], Awaitable[Message]],
update_steam_regions=False,
most_apps=True
) -> tuple[list[SteamRegion], set[str]]:
steam_regions = SteamRegion.find()
most_apps_ids = set()
if update_steam_regions or most_apps:
async with playwright.async_api.async_playwright() as playwright_:
async with await playwright_.chromium.launch() as browser:
if update_steam_regions:
await update_state('Actualizando las regiones de Steam...')
SteamRegion.delete_many_raw({})
steam_regions = await self._update_steam_regions(browser)
if most_apps:
bot_state_message = await update_state(
'Obteniendo los productos más vendidos y jugados de Steam...'
)
try:
most_apps_ids = await self._get_most_apps_ids(browser)
except LimitError:
await self.delete_message(bot_state_message)
raise
return steam_regions, most_apps_ids
else:
return steam_regions, most_apps_ids
async def _update_steam_regions(self, browser: playwright.async_api.Browser) -> list[SteamRegion]:
steam_regions = []
for app_id in constants.STEAM_APP_IDS_FOR_SCRAPE_COUNTRIES:
async with self._create_browser_context(browser) as context:
page = await context.new_page()
await page.goto(f'{constants.STEAM_DB_URL}/app/{app_id}/')
locator = page.locator("#prices td[class='price-line']")
for i in range(await locator.count()):
td = locator.nth(i)
src = (await td.locator('img').get_attribute('src'))
name = (await td.text_content()).strip()
flag_url = f'{constants.STEAM_DB_URL}{src}'
region_code = await td.get_attribute('data-cc')
steam_region = SteamRegion(region_code, name, flag_url)
steam_region.save()
steam_regions.append(steam_region)
await asyncio.sleep(2)
return steam_regions
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_steam(
self,
message: Message,
update_steam_regions: bool | None = None,
most_apps: bool | None = None,
last_apps: bool | None = None,
random_apps: bool | None = None
):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
if update_steam_regions is None:
update_steam_regions = bool(
self._parse_callbacks(
message.text,
[
RegisteredCallback(
...,
keywords=(multibot_constants.KEYWORDS['update'], constants.KEYWORDS['region'])
)
]
)
)
if most_apps is None:
most_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
('jugados', 'played', 'sellers', 'selling', 'vendidos'),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if last_apps is None:
last_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
multibot_constants.KEYWORDS['last'] + ('new', 'novedades', 'nuevos'),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if random_apps is None:
random_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
multibot_constants.KEYWORDS['random'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if not any((most_apps, last_apps, random_apps)):
most_apps = True
last_apps = True
random_apps = True
update_state = self.create_message_updater(message, delete_user_message=True)
chart_title_parts = []
steam_regions, selected_app_ids = await self._scrape_steam_data(update_state, update_steam_regions, most_apps)
if most_apps:
chart_title_parts.append(f'los {len(selected_app_ids)} más vendidos/jugados')
async with aiohttp.ClientSession() as session:
if last_apps or random_apps:
await update_state('Obteniendo todos los productos de Steam...')
async with session.get(constants.STEAM_ALL_APPS_ENDPOINT) as response:
all_apps_data = await response.json()
app_ids = [
str(app_id) for app_data in all_apps_data['applist']['apps'] if (app_id := app_data['appid'])
]
if last_apps:
selected_app_ids.update(app_ids[-constants.STEAM_LAST_APPS:])
chart_title_parts.append(f'los {constants.STEAM_LAST_APPS} más nuevos')
if random_apps:
selected_app_ids.update(random.sample(app_ids, constants.STEAM_RANDOM_APPS))
chart_title_parts.append(f'{constants.STEAM_RANDOM_APPS} aleatorios')
steam_regions = await self._insert_conversion_rates(session, steam_regions)
bot_state_message = await update_state('Obteniendo los precios para todas las regiones...')
apps_prices = defaultdict(dict)
try:
await asyncio.gather(
*(
self._add_app_price(apps_prices, steam_region, selected_app_ids, session)
for steam_region in steam_regions
)
)
except LimitError:
await self.delete_message(bot_state_message)
raise
apps_prices = {k: v for k, v in apps_prices.items() if len(v) == len(steam_regions)}
total_prices = defaultdict(float)
for app_prices in apps_prices.values():
for region_code, price in app_prices.items():
total_prices[region_code] += price
for steam_region in steam_regions:
steam_region.mean_price = total_prices[steam_region.code] / len(apps_prices)
await update_state('Creando gráfico...')
steam_regions = sorted(steam_regions, key=lambda steam_region: steam_region.mean_price)
region_names = []
region_total_prices = []
bar_colors = []
bar_line_colors = []
images = []
for i, steam_region in enumerate(steam_regions):
region_names.append(steam_region.name)
region_total_prices.append(steam_region.mean_price)
bar_colors.append(steam_region.bar_color)
bar_line_colors.append(steam_region.bar_line_color)
async with session.get(steam_region.flag_url) as response:
images.append({
'source': f'data:image/svg+xml;base64,{base64.b64encode(await response.read()).decode()}',
'xref': 'x',
'yref': 'paper',
'x': i,
'y': 0.04,
'sizex': 0.425,
'sizey': 0.0225,
'xanchor': 'center',
'opacity': 1,
'layer': 'above'
})
figure = plotly.graph_objs.Figure(
[
plotly.graph_objs.Bar(
x=region_names,
y=region_total_prices,
marker={'color': bar_colors},
)
]
)
figure.update_layout(
width=1920,
height=945,
margin={'t': 20, 'r': 20, 'b': 20, 'l': 20},
title={
'text': f"Media de {len(apps_prices)} productos de Steam<br><sup>(solo los comparables entre {flanautils.join_last_separator(chart_title_parts, ', ', ' y ')})</sup>",
'xref': 'paper',
'yref': 'paper',
'x': 0.5,
'y': 0.93,
'font': {
'size': 35
}
},
images=images,
xaxis={
'tickfont': {
'size': 14
}
},
yaxis={
'ticksuffix': '',
'tickfont': {
'size': 18
}
}
)
bot_state_message = await update_state('Enviando...')
await self.send(Media(figure.to_image(), MediaType.IMAGE, 'png', Source.LOCAL), message)
await self.delete_message(bot_state_message)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #

View File

@@ -10,7 +10,7 @@ import flanautils
import playwright.async_api
from multibot import MultiBot, group
import constants
from flanabot import constants
from flanabot.models import Chat, Message
@@ -20,9 +20,7 @@ from flanabot.models import Chat, Message
class UberEatsBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.playwright: playwright.async_api.Playwright | None = None
self.browser: playwright.async_api.Browser | None = None
self.task_contexts: dict[int, dict] = defaultdict(lambda: defaultdict(lambda: None))
self._task_contexts: dict[int, dict] = defaultdict(lambda: defaultdict(lambda: None))
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -30,20 +28,20 @@ class UberEatsBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_ubereats, 'ubereats', priority=2)
self.register(self._on_ubereats, keywords='ubereats', priority=2)
async def _cancel_scraping_task(self, chat: Chat):
if not (task := self.task_contexts[chat.id]['task']) or task.done():
if not (task := self._task_contexts[chat.id]['task']) or task.done():
return
await self._close_playwright(chat)
task.cancel()
del self.task_contexts[chat.id]
del self._task_contexts[chat.id]
async def _close_playwright(self, chat: Chat):
if browser := self.task_contexts[chat.id]['browser']:
if browser := self._task_contexts[chat.id]['browser']:
await browser.close()
if playwright_ := self.task_contexts[chat.id]['playwright']:
if playwright_ := self._task_contexts[chat.id]['playwright']:
await playwright_.stop()
async def _scrape_codes(self, chat: Chat) -> list[str | None]:
@@ -53,12 +51,12 @@ class UberEatsBot(MultiBot, ABC):
codes: list[str | None] = [None] * len(chat.ubereats['cookies'])
async with playwright.async_api.async_playwright() as playwright_:
self.task_contexts[chat.id]['playwright'] = playwright_
self._task_contexts[chat.id]['playwright'] = playwright_
for i, cookies in enumerate(chat.ubereats['cookies']):
for _ in range(3):
try:
async with await playwright_.chromium.launch() as browser:
self.task_contexts[chat.id]['browser'] = browser
self._task_contexts[chat.id]['browser'] = browser
context: playwright.async_api.BrowserContext = await browser.new_context(
storage_state={'cookies': cookies},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36',
@@ -117,6 +115,25 @@ class UberEatsBot(MultiBot, ABC):
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_ready(self):
await super()._on_ready()
for chat in self.Chat.find({
'platform': self.platform.value,
'config.ubereats': True,
'ubereats.cookies': {"$exists": True, "$ne": []}
}):
chat = await self.get_chat(chat.id)
chat.pull_from_database(overwrite_fields=('_id', 'config', 'ubereats'))
if (
chat.ubereats['next_execution']
and
(delta_time := chat.ubereats['next_execution'] - datetime.datetime.now(datetime.timezone.utc)) > datetime.timedelta()
):
flanautils.do_later(delta_time, self.start_ubereats, chat)
else:
await self.start_ubereats(chat)
@group(False)
async def _on_ubereats(self, message: Message):
if not message.chat.ubereats['cookies']:
@@ -170,7 +187,7 @@ class UberEatsBot(MultiBot, ABC):
await self._cancel_scraping_task(chat)
chat.config['ubereats'] = True
chat.save(pull_overwrite_fields=('ubereats',))
self.task_contexts[chat.id]['task'] = flanautils.do_every(chat.ubereats['seconds'], self.send_ubereats_code, chat, do_first_now=send_code_now)
self._task_contexts[chat.id]['task'] = flanautils.do_every(chat.ubereats['seconds'], self.send_ubereats_code, chat, do_first_now=send_code_now)
async def stop_ubereats(self, chat: Chat):
await self._cancel_scraping_task(chat)

View File

@@ -16,9 +16,9 @@ from flanabot import constants
from flanabot.models import Action, BotAction, ButtonsGroup, Message, WeatherChart
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- WEATHER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- WEATHER_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class WeatherBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -26,10 +26,10 @@ class WeatherBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_weather, constants.KEYWORDS['weather'])
self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather']))
self.register(self._on_weather, keywords=constants.KEYWORDS['weather'])
self.register(self._on_weather, keywords=(multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather']))
self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER)
self.register_button(self._on_weather_button_press, key=ButtonsGroup.WEATHER)
# ---------------------------------------------- #
# HANDLERS #
@@ -54,12 +54,12 @@ class WeatherBot(MultiBot, ABC):
# noinspection PyTypeChecker
place_words = (
OrderedSet(original_text_words)
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
- flanautils.CommonWords.get()
OrderedSet(original_text_words)
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
- flanautils.CommonWords.get()
)
if not place_words:
if not message.is_inline:

View File

@@ -6,13 +6,17 @@ from multibot import Platform
AUDIT_LOG_AGE = datetime.timedelta(hours=1)
AUDIT_LOG_LIMIT = 5
AUTO_WEATHER_EVERY = datetime.timedelta(hours=6)
BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CHECK_PUNISHMENTS_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CONNECT_4_AI_DELAY_SECONDS = 1
CONNECT_4_CENTER_COLUMN_POINTS = 2
CONNECT_4_N_COLUMNS = 7
CONNECT_4_N_ROWS = 6
FLANASERVER_BASE_URL = 'https://flanaserver.duckdns.org'
FLANASERVER_FILE_EXPIRATION_SECONDS = 3 * 24 * 60 * 60
FLOOD_2s_LIMIT = 2
FLOOD_7s_LIMIT = 4
HEAT_FIRST_LEVEL = -1
HEAT_PERIOD_SECONDS = datetime.timedelta(minutes=15).total_seconds()
HELP_MINUTES_LIMIT = 1
INSTAGRAM_BAN_SLEEP = datetime.timedelta(days=1)
@@ -21,7 +25,39 @@ MAX_PLACE_QUERY_LENGTH = 50
PUNISHMENT_INCREMENT_EXPONENT = 6
PUNISHMENTS_RESET_TIME = datetime.timedelta(weeks=2)
RECOVERY_DELETED_MESSAGE_BEFORE = datetime.timedelta(hours=1)
SCRAPING_TIMEOUT_SECONDS = 10
SCRAPING_TIMEOUT_SECONDS = 20
SPAM_CHANNELS_LIMIT = 2
SPAM_DELETION_DELAY = datetime.timedelta(seconds=5)
SPAM_TIME_RANGE = datetime.timedelta(hours=1)
STEAM_ALL_APPS_ENDPOINT = 'https://api.steampowered.com/ISteamApps/GetAppList/v2'
STEAM_APP_ENDPOINT_TEMPLATE = 'https://store.steampowered.com/api/appdetails?appids={ids}&cc={country_code}&filters=price_overview'
STEAM_APP_IDS_FOR_SCRAPE_COUNTRIES = (400, 620, 730, 210970, 252490, 292030, 427520, 1712350)
STEAM_DB_URL = 'https://steamdb.info'
STEAM_EXCHANGERATE_API_ENDPOINT_TEMPLATE = 'https://v6.exchangerate-api.com/v6/{api_key}/latest/EUR'
STEAM_IDS_BATCH = 750
STEAM_LAST_APPS = 1500
STEAM_MAX_CONCURRENT_REQUESTS = 10
STEAM_MOST_URLS = (
'https://store.steampowered.com/charts/topselling/global',
'https://store.steampowered.com/charts/mostplayed'
)
STEAM_RANDOM_APPS = 1000
STEAM_REGION_CODE_MAPPING = {'eu': 'EUR', 'ru': 'RUB', 'pk': 'USD', 'ua': 'UAH', 'za': 'ZAR', 'vn': 'VND', 'tw': 'TWD',
'id': 'IDR', 'my': 'MYR', 'ar': 'USD', 'tr': 'USD', 'ph': 'PHP', 'in': 'INR', 'cn': 'CNY',
'br': 'BRL', 'sa': 'SAR', 'th': 'THB', 'pe': 'PEN', 'cl': 'CLP', 'kw': 'KWD', 'az': 'USD',
'kz': 'KZT', 'co': 'COP', 'mx': 'MXN', 'qa': 'QAR', 'sg': 'SGD', 'jp': 'JPY', 'uy': 'UYU',
'ae': 'AED', 'kr': 'KRW', 'hk': 'HKD', 'cr': 'CRC', 'nz': 'NZD', 'ca': 'CAD', 'au': 'AUD',
'il': 'ILS', 'us': 'USD', 'no': 'NOK', 'uk': 'GBP', 'pl': 'PLN', 'ch': 'CHF'}
YADIO_API_ENDPOINT = 'https://api.yadio.io/exrates/EUR'
BANNED_POLL_PHRASES = (
'Deja de dar por culo {presser_name} que no puedes votar aqui',
'No es pesao {presser_name}, que no tienes permitido votar aqui',
'Deja de pulsar botones que no puedes votar aqui {presser_name}',
'{presser_name} deja de intentar votar aqui que no puedes',
'Te han prohibido votar aquí {presser_name}.',
'No puedes votar aquí, {presser_name}.'
)
BYE_PHRASES = ('Adiós.', 'adio', 'adioh', 'adios', 'adió', 'adiós', 'agur', 'bye', 'byyeeee', 'chao', 'hasta la vista',
'hasta luego', 'hasta nunca', ' hasta pronto', 'hasta la próxima', 'nos vemos', 'taluego')
@@ -32,7 +68,7 @@ CHANGEABLE_ROLES = defaultdict(
Platform.DISCORD: defaultdict(
list,
{
360868977754505217: [881238165476741161, 991454395663401072, 1033098591725699222],
360868977754505217: [881238165476741161, 991454395663401072, 1033098591725699222, 1176639571677696173],
862823584670285835: [976660580939202610, 984269640752590868]
}
)
@@ -82,21 +118,23 @@ INSULTS = ('._.', 'aha', 'Aléjate de mi.', 'Ante la duda mi dedo corazón te sa
KEYWORDS = {
'choose': ('choose', 'elige', 'escoge'),
'connect_4': (('conecta', 'connect', 'ralla', 'raya'), ('4', 'cuatro', 'four')),
'covid_chart': ('case', 'caso', 'contagiado', 'contagio', 'corona', 'coronavirus', 'covid', 'covid19', 'death',
'disease', 'enfermedad', 'enfermos', 'fallecido', 'incidencia', 'jacovid', 'mascarilla', 'muerte',
'muerto', 'pandemia', 'sick', 'virus'),
'currency_chart': ('argentina', 'bitcoin', 'cardano', 'cripto', 'crypto', 'criptodivisa', 'cryptodivisa',
'cryptomoneda', 'cryptocurrency', 'currency', 'dinero', 'divisa', 'ethereum', 'inversion',
'moneda', 'pasta'),
'dice': ('dado', 'dice'),
'eur': ('eur', 'euro', 'euros', ''),
'force': ('force', 'forzar', 'fuerza'),
'money': ('bitcoin', 'btc', 'cripto', 'criptomoneda', 'crypto', 'cryptocurrency', 'currency', 'currency', 'dinero',
'divisa', 'moneda', 'money', 'precio', 'price', 'satoshi'),
'multiple_answer': ('multi', 'multi-answer', 'multiple', 'multirespuesta'),
'poll': ('encuesta', 'quiz'),
'notify': ('alert', 'alertame', 'alertar', 'avisame', 'avisar', 'aviso', 'inform', 'informame', 'informar',
'notificacion', 'notificame', 'notificar', 'notification'),
'offer': ('oferta', 'offer', 'orden', 'order', 'post', 'publicacion'),
'poll': ('encuesta', 'quiz', 'votacion', 'votar', 'voting'),
'premium': ('%', 'premium', 'prima'),
'punish': ('acaba', 'aprende', 'ataca', 'atalo', 'azota', 'beating', 'boss', 'castiga', 'castigo', 'condena',
'controla', 'destroy', 'destroza', 'duro', 'ejecuta', 'enseña', 'escarmiento', 'execute', 'fuck',
'fusila', 'hell', 'humos', 'infierno', 'jefe', 'jode', 'learn', 'leccion', 'lesson', 'manda', 'paliza',
'purgatorio', 'purgatory', 'sancion', 'shoot', 'teach', 'whip'),
'random': ('aleatorio', 'azar', 'random'),
'region': ('countries', 'country', 'pais', 'paises', 'region', 'regiones', 'regions', 'zona', 'zonas', 'zone',
'zones'),
'scraping': ('busca', 'contenido', 'content', 'descarga', 'descargar', 'descargues', 'download', 'envia', 'scrap',
'scrapea', 'scrapees', 'scraping', 'search', 'send'),
'self': (('contigo', 'contra', 'ti'), ('mismo', 'ti')),
@@ -104,7 +142,9 @@ KEYWORDS = {
'sound', 'suena'),
'tunnel': ('canal', 'channel', 'tunel', 'tunnel'),
'unpunish': ('absolve', 'forgive', 'innocent', 'inocente', 'perdona', 'spare'),
'vote': ('votacion', 'votar', 'vote', 'voting', 'voto'),
'until': ('hasta', 'until'),
'usd': ('$', 'dolar', 'dolares', 'dollar', 'dollars', 'usd'),
'vote': ('vote', 'voto'),
'weather': ('atmosfera', 'atmosferico', 'calle', 'calor', 'caloret', 'clima', 'climatologia', 'cloud', 'cloudless',
'cloudy', 'cold', 'congelar', 'congelado', 'denbora', 'despejado', 'diluvio', 'frio', 'frost', 'hielo',
'humedad', 'llover', 'llueva', 'llueve', 'lluvia', 'nevada', 'nieva', 'nieve', 'nube', 'nubes',

View File

@@ -1,7 +1,9 @@
from flanabot.models.bot_action import *
from flanabot.models.chat import *
from flanabot.models.enums import *
from flanabot.models.heating_context import *
from flanabot.models.message import *
from flanabot.models.player import *
from flanabot.models.punishment import *
from flanabot.models.steam_region import *
from flanabot.models.weather_chart import *

View File

@@ -16,6 +16,7 @@ class Chat(MultiBotChat):
'scraping_delete_original': True,
'ubereats': False
})
btc_offers_query: dict[str, float] = field(default_factory=lambda: {})
ubereats: dict = field(default_factory=lambda: {
'cookies': [],
'last_codes': [],

View File

@@ -0,0 +1,19 @@
__all__ = ['ChannelData', 'HeatingContext']
from dataclasses import dataclass, field
import discord
@dataclass
class ChannelData:
channel: discord.VoiceChannel
original_name: str
n_fires: int = 0
@dataclass
class HeatingContext:
channels_data: dict[str, ChannelData] = field(default_factory=dict)
is_active: bool = False
heat_level: float = -1

View File

@@ -1,5 +1,6 @@
__all__ = ['Punishment']
import datetime
from dataclasses import dataclass
from typing import Any
@@ -16,3 +17,11 @@ class Punishment(Penalty):
self_vars = super()._mongo_repr()
self_vars['level'] = self.level
return self_vars
def delete(self, cascade=False):
if self.level == 0:
super().delete(cascade)
elif self.is_active:
self.is_active = False
self.last_update = datetime.datetime.now(datetime.timezone.utc)
self.save()

View File

@@ -0,0 +1,40 @@
__all__ = ['SteamRegion']
from dataclasses import dataclass
from typing import Any
from flanautils import DCMongoBase, FlanaBase
@dataclass
class SteamRegion(DCMongoBase, FlanaBase):
collection_name = 'steam_region'
unique_keys = ('code',)
code: str
name: str
flag_url: str
eur_conversion_rate: float | None = None
mean_price: float = 0.0
def __post_init__(self):
super().__post_init__()
match self.code:
case 'eu':
self.bar_color = '#2b6ced'
self.bar_line_color = '#0055ff'
case 'in':
self.bar_color = '#ffc091'
self.bar_line_color = '#ff6600'
case 'ar':
self.bar_color = '#8adcff'
self.bar_line_color = '#00b3ff'
case 'tr':
self.bar_color = '#ff7878'
self.bar_line_color = '#ff0000'
case _:
self.bar_color = '#68a9f2'
self.bar_line_color = '#68a9f2'
def _mongo_repr(self) -> Any:
return {k: v for k, v in super()._mongo_repr().items() if k in ('code', 'name', 'flag_url')}

Binary file not shown.