187 Commits
v1.5.0 ... main

Author SHA1 Message Date
AlberLC
6c2f54592d Delete useless code 2025-10-26 02:34:47 +02:00
AlberLC
eecb488ea9 Fix FlanaServer file_name extension 2025-10-18 01:05:05 +02:00
AlberLC
a44a71f3be Remove FormData quote_fields arg 2025-10-17 08:20:18 +02:00
AlberLC
aee14449ad Upload large videos to FlanaServer 2025-10-17 03:00:47 +02:00
AlberLC
e20a5135f2 Add minor changes 2025-10-17 02:58:29 +02:00
AlberLC
c392e7bdde Update Dockerfile 2025-10-17 02:57:44 +02:00
AlberLC
c95621ca7e Update requirements.txt 2025-10-17 02:57:30 +02:00
AlberLC
8efa12ad22 Update requirements.txt 2025-05-01 23:20:04 +02:00
AlberLC
b672cbf0a1 Fix imports 2025-05-01 23:18:42 +02:00
AlberLC
d377aae6ff Update requirements.txt 2025-05-01 23:17:58 +02:00
AlberLC
789034aa9f Fix BtcOffersBot._on_stop_btc_offers_notification (private or mentioned) 2025-04-18 00:14:04 +02:00
AlberLC
647b9288aa Add BtcOffersBot.stop_all_btc_offers_notification 2025-04-17 23:46:51 +02:00
AlberLC
ad2434ab67 Update type hints 2025-04-17 23:44:16 +02:00
AlberLC
fca648f8b5 Fix 0.00 premium sign 2025-04-17 17:23:15 +02:00
AlberLC
ada357fcb8 Add minor changes 2025-04-17 16:11:56 +02:00
AlberLC
b1c07ea251 Fix BtcOffersBot._on_btc_offers (server disconnected) 2025-04-17 16:11:01 +02:00
AlberLC
5ec3c1e81b Update BtcOffersBot (manage websocket reconnections) 2025-04-17 16:08:26 +02:00
AlberLC
39aea44803 Fix imports 2025-04-17 13:28:24 +02:00
AlberLC
4dbe9e0504 Update BtcOffersBot (manage websocket reconnections) 2025-04-16 20:47:36 +02:00
AlberLC
d977d5d882 Update BtcOffersBot (notifications in dollars and premiums) 2025-04-16 20:45:23 +02:00
AlberLC
aac5fe951f Fix BtcOffersBot (validate amount) 2025-04-16 20:42:19 +02:00
AlberLC
abc4462723 Fix BtcOffersBot (notify without amount) 2025-04-16 20:41:36 +02:00
AlberLC
bdf658ee9a Update requirements.txt 2025-04-14 23:03:36 +02:00
AlberLC
6aa735ddd5 Add BtcOffersBot 2025-04-14 23:03:28 +02:00
AlberLC
9f640014a3 Fix UberEatsBot._on_ready 2025-04-14 23:00:25 +02:00
AlberLC
edebc48b1f Update SteamBot._insert_conversion_rates 2025-04-14 22:58:08 +02:00
AlberLC
2babe0fee7 Update type hints 2025-04-14 22:57:35 +02:00
AlberLC
123498b1f1 Add minor changes 2025-04-14 22:56:48 +02:00
AlberLC
7f8cbd6e54 Add PenaltyBot._check_message_spam 2025-03-17 10:09:40 +01:00
AlberLC
37db1b3590 Add PenaltyBot._check_message_spam 2025-03-17 10:05:27 +01:00
AlberLC
aa6642ea26 Add PenaltyBot._check_message_spam 2025-03-17 09:05:01 +01:00
AlberLC
7d538316ad Add PenaltyBot._check_message_spam 2025-03-17 08:31:16 +01:00
AlberLC
20ca2b3ab9 Add SteamBot 2024-07-06 06:20:36 +02:00
AlberLC
6f0cd1fcf6 Add SteamBot 2024-07-06 06:06:55 +02:00
AlberLC
39bf560407 Add SteamBot 2024-07-06 06:01:49 +02:00
AlberLC
76f9920de4 Add SteamBot 2024-07-04 20:02:49 +02:00
AlberLC
2465755bc0 Add SteamBot 2024-07-04 04:18:12 +02:00
AlberLC
46a50767af Add SteamBot 2024-07-04 04:17:21 +02:00
AlberLC
05e53bdc87 Add SteamBot 2024-07-04 04:05:02 +02:00
AlberLC
4e44cdd112 Add SteamBot 2024-07-04 03:56:22 +02:00
AlberLC
848d354da2 Add SteamBot 2024-07-04 03:40:35 +02:00
AlberLC
083356ac0b Update requirements.txt 2024-07-04 03:18:39 +02:00
AlberLC
c33c04cba4 Add SteamBot 2024-07-04 03:10:10 +02:00
AlberLC
5f027e665a Add SteamBot 2024-07-04 03:03:54 +02:00
AlberLC
71a5cb74d9 Fix heating_context (models.__init__) 2024-07-04 03:02:00 +02:00
AlberLC
3ea232b183 Update requirements.txt 2024-07-04 02:58:42 +02:00
AlberLC
2419d36bbb Fix connect_4 (bots.__init__) 2024-07-04 02:55:19 +02:00
AlberLC
778500caac Fix channel heating (level 0 channel name) 2024-06-26 10:18:42 +02:00
AlberLC
4f4a05f0ff Update instagram scraper (to yt-dlp) 2024-06-11 01:05:42 +02:00
AlberLC
d6986d8b63 Update twitter scraper (to yt-dlp) 2024-06-08 15:25:25 +02:00
AlberLC
25c5391f5a Update requirements.txt 2024-05-30 15:06:14 +02:00
AlberLC
df72d752c1 Fix channel heating (level 0 channel name) 2024-05-26 09:07:00 +02:00
AlberLC
07ab104203 Add telegram restart 2024-05-17 16:24:31 +02:00
AlberLC
564a65c5a1 Add telegram restart 2024-05-17 16:06:09 +02:00
AlberLC
ebe87decf5 Add telegram restart 2024-05-17 15:57:24 +02:00
AlberLC
86a1beed72 Update requirements.txt 2024-05-17 15:57:11 +02:00
AlberLC
e6721dab8e Update FlanaBot._on_delete (user own messages) 2024-04-02 21:46:52 +02:00
AlberLC
f1f29a089f Update callback registration 2024-03-28 10:08:18 +01:00
AlberLC
2f0ca1e012 Update FlanaBot._on_delete 2024-03-28 10:04:50 +01:00
AlberLC
c57b76bf6d Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-28 01:42:36 +01:00
AlberLC
4c06b573ea Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-27 15:27:59 +01:00
AlberLC
00dbe52d84 Add MultiBot._on_new_message_raw whitelist/blacklist 2024-03-26 03:21:46 +01:00
AlberLC
473279d74a Update requirements.txt 2024-03-18 09:45:48 +01:00
AlberLC
d0bc075e59 Update FlanaBot._on_database_messages 2024-03-07 22:39:31 +01:00
AlberLC
7a0e6ba4d8 Update FlanaBot._on_database_messages 2024-03-07 22:36:33 +01:00
AlberLC
528324253c Fix channel heating (independent servers) 2024-02-15 20:54:38 +01:00
AlberLC
a45f0e76b9 Fix channel heating (independent servers) 2024-02-15 06:10:18 +01:00
AlberLC
fd12cf198a Fix channel heating (independent servers) 2024-02-14 10:19:23 +01:00
AlberLC
b9beaf1a36 Fix FlanaBot._on_database_messages (chat chat user user) 2024-02-06 01:05:35 +01:00
AlberLC
d57eae19b5 Fix FlanaBot._on_database_messages 2024-02-06 00:43:54 +01:00
AlberLC
505ec0cf51 Update FlanaBot._on_recover_message 2024-01-27 02:42:55 +01:00
AlberLC
fb7b155368 Update _add_handlers (reset polls) 2024-01-27 02:11:59 +01:00
AlberLC
0769b16dd6 Update changeable roles 2024-01-25 21:37:15 +01:00
AlberLC
024c01b46d Update constants.KEYWORDS 2024-01-24 08:20:11 +01:00
AlberLC
94883dac1e Update poll_bot._add_handlers 2024-01-24 08:19:50 +01:00
AlberLC
401769f5cd Update poll_bot._add_handlers 2024-01-24 08:01:40 +01:00
AlberLC
55eb293e11 Update constants 2024-01-24 07:59:32 +01:00
AlberLC
fed7212a3f Update requirements.txt 2024-01-22 04:12:48 +01:00
AlberLC
21367e0fc9 Fix send_negative after guard clauses 2023-12-07 22:04:10 +01:00
AlberLC
52086be24d Fix FlanaDiscBot.restore_channel_names n_fires 2023-11-12 14:00:09 +01:00
AlberLC
f0551265c4 Add FlanaDiscBot.restore_channel_names 2023-11-08 07:48:54 +01:00
AlberLC
889a6c19dd Add FlanaDiscBot.restore_channel_names 2023-11-07 09:03:56 +01:00
AlberLC
688f5fee4e Add FlanaDiscBot.restore_channel_names 2023-11-07 08:58:12 +01:00
AlberLC
4f5427093c Update FlanaBot._on_database_messages 2023-11-07 08:29:28 +01:00
AlberLC
e415516b00 Fix poll buttons text (max) 2023-10-12 14:56:09 +02:00
AlberLC
a154653e9a Fix poll buttons text 2023-09-29 00:46:02 +02:00
AlberLC
61e60fc312 Fix FlanaBot._on_database_messages 2023-09-09 06:17:36 +02:00
AlberLC
e4f5a6bab4 Update FlanaBot._on_database_messages 2023-09-05 05:00:53 +02:00
AlberLC
bf03b1c8e7 Update FlanaBot._on_database_messages 2023-09-05 01:15:11 +02:00
AlberLC
c8cea16e7d Update FlanaBot._on_database_messages 2023-09-05 00:47:14 +02:00
AlberLC
5961e1a679 Update FlanaBot._on_database_messages 2023-09-05 00:38:27 +02:00
AlberLC
b4e7287de9 Update FlanaBot._on_database_messages 2023-09-05 00:15:48 +02:00
AlberLC
818b385f2d Update FlanaBot._on_database_messages 2023-09-05 00:06:06 +02:00
AlberLC
9813064674 Fix PollBot._on_poll_button_press 2023-08-18 06:32:05 +02:00
AlberLC
20592332a2 Fix FlanaBot._on_delete n_messages with mentions 2023-08-17 19:03:33 +02:00
AlberLC
ebf58e039b Fix tunnel chats 2023-08-17 18:44:21 +02:00
AlberLC
dd12533a48 Fix ScraperBot._search_medias (restricted age results filtered) 2023-08-15 03:08:08 +02:00
AlberLC
dad3482096 Add minor changes 2023-08-06 07:23:49 +02:00
AlberLC
69e352d5ec Add FlanaBot._on_delete_until 2023-08-05 00:55:06 +02:00
AlberLC
7adfa819df Fix scraping codec and extension with audio_only 2023-07-10 17:22:43 +02:00
AlberLC
7b03f6db63 Fix punishments accumulation 2023-07-08 19:40:59 +02:00
AlberLC
72696397ec Fix punishments accumulation 2023-07-07 22:14:10 +02:00
AlberLC
c3a2d2e9f0 Fix context manager in _get_contacts_ids for telegram userbots 2023-06-27 07:20:55 +02:00
AlberLC
bb9b5caa55 Fix ScraperBot._search_medias 2023-06-11 22:05:52 +02:00
AlberLC
ce71095060 Update requirements.txt 2023-05-28 23:23:11 +02:00
AlberLC
b54dd8d944 Update requirements.txt 2023-05-19 07:07:03 +02:00
AlberLC
376da6f072 Update requirements.txt 2023-05-19 06:56:18 +02:00
AlberLC
f396d2b232 Improve legibility 2023-05-19 06:47:16 +02:00
AlberLC
6c3a8e5e4a Fix fake instagram ban (restricted age) 2023-05-19 06:43:13 +02:00
AlberLC
d9eb01e5c0 Add minor changes 2023-05-09 05:32:59 +02:00
AlberLC
a9a79b2705 Add minor changes 2023-05-06 05:36:29 +02:00
AlberLC
29ebfd5e26 Fix ScraperBot._search_medias (fake instagram ban) 2023-04-29 01:35:41 +02:00
AlberLC
cf6b39e262 Fix _on_reset_instagram_ban (private or mentioned) 2023-04-29 01:34:28 +02:00
AlberLC
225bad07cc Update requirements.txt 2023-04-27 07:28:09 +02:00
AlberLC
336d8f8b8a Fix connect_4 message data 2023-04-25 06:44:52 +02:00
AlberLC
ad98464446 Fix replied message date 2023-04-25 06:44:32 +02:00
AlberLC
94fe902d41 Update flanautils (do_later, do_every) 2023-04-16 09:59:41 +02:00
AlberLC
b86fc98377 Update ScraperBot.send_song_info 2023-04-16 09:58:11 +02:00
AlberLC
fa67733416 Update instagram scraper 2023-04-11 11:03:12 +02:00
AlberLC
63e972a774 Update instagram scraper 2023-04-11 10:52:46 +02:00
AlberLC
a1434e54d3 Update instagram scraper 2023-04-11 08:57:57 +02:00
AlberLC
88b61cd87f Update self.owner_chat initialization 2023-04-11 08:57:11 +02:00
AlberLC
4fc52128ec Update connect 4 background color 2023-04-07 06:22:22 +02:00
AlberLC
8788904d44 Update instagram scraper 2023-04-04 05:36:01 +02:00
AlberLC
fdb02e9d65 Update instagram scraper 2023-04-04 04:17:21 +02:00
AlberLC
8594351229 Update instagram scraper 2023-04-03 12:00:20 +02:00
AlberLC
ae7fb1b9b2 Update UberEatsBot._scrape_codes 2023-04-03 11:22:07 +02:00
AlberLC
d21978b2da Update FlanaBot._on_delete 2023-03-31 09:01:26 +02:00
AlberLC
35e8ec5090 Remove UberEatsBot pyperclip 2023-03-31 07:39:15 +02:00
AlberLC
8703eca5ff Update UberEatsBot._scrape_codes 2023-03-30 02:44:51 +02:00
AlberLC
6be120da17 Remove UberEatsBot pyperclip 2023-03-30 02:05:32 +02:00
AlberLC
aaf17c6877 Add FlanaBot._on_help antiflood 2023-03-24 05:07:48 +01:00
AlberLC
68a3fde4ed Update UberEatsBot.send_ubereats_code 2023-03-24 00:44:06 +01:00
AlberLC
edad87b683 Update self.owner_chat initialization 2023-03-23 07:13:40 +01:00
AlberLC
3be1cf5c47 Add FlanaBot._on_help 2023-03-23 07:13:03 +01:00
AlberLC
ff934325c5 Add FlanaBot._on_help 2023-03-23 06:33:48 +01:00
AlberLC
11ef06b1d3 Update flana_tele_bot whitelisted decorator 2023-03-23 06:33:26 +01:00
AlberLC
d5b51e3f44 Update FlanaBot._on_database_messages 2023-03-23 06:25:49 +01:00
AlberLC
a6b0e699ea Update FlanaBot._on_database_messages 2023-03-23 06:02:26 +01:00
AlberLC
e90e081b69 Update FlanaBot._on_database_messages 2023-03-23 06:01:40 +01:00
AlberLC
8a23baf626 Update FlanaBot send methods 2023-03-23 05:09:00 +01:00
AlberLC
7f795e0b73 Update setup.cfg 2023-03-23 03:43:30 +01:00
AlberLC
9fbc16df02 Update UberEatsBot.send_ubereats_code message spaces 2023-03-23 03:27:06 +01:00
AlberLC
9da95e5e1c Update ScraperBot._on_scraping 2023-03-18 06:21:07 +01:00
AlberLC
a27f86d1b9 Update FlanaBot._on_new_message_default 2023-03-15 01:17:07 +01:00
AlberLC
26b3d714d3 Add tunnel chats 2023-03-13 23:07:37 +01:00
AlberLC
afa96325fe Add tunnel chats 2023-03-13 09:35:00 +01:00
AlberLC
72a803daec Add tunnel chats 2023-03-13 09:30:15 +01:00
AlberLC
7ce6985fa9 Add tunnel chats 2023-03-13 09:24:43 +01:00
AlberLC
12d9dd6075 Add tunnel chats 2023-03-13 09:14:40 +01:00
AlberLC
cf8ec9182e Add tunnel chats 2023-03-13 08:29:37 +01:00
AlberLC
060484396e Update scraper_bot.py 2023-03-13 08:26:03 +01:00
AlberLC
94e8f72245 Add constants.KEYWORDS['tunnel'] 2023-03-13 07:07:45 +01:00
AlberLC
d359ccb39e Update song_info keywords 2023-03-12 23:26:51 +01:00
AlberLC
44dcb3d987 Improve UberEatsBot new code obtaining 2023-03-12 05:21:47 +01:00
AlberLC
4ebcade424 Improve UberEatsBot new code obtaining 2023-03-12 05:18:41 +01:00
AlberLC
52e981fc58 Improve UberEatsBot new code obtaining 2023-03-12 05:15:02 +01:00
AlberLC
722c2ffbac Fix UberEatsBot headless and new_context args 2023-03-08 23:49:49 +01:00
AlberLC
55e3fe53c2 Add user agent to UberEatsBot 2023-03-08 02:15:09 +01:00
AlberLC
34320079eb Update Chat.ubereats 2023-03-08 02:14:47 +01:00
AlberLC
61daaa387a Add ubereats_next_execution 2023-03-06 07:34:09 +01:00
AlberLC
7fee5f382c Add ubereats_next_execution 2023-03-06 07:28:37 +01:00
AlberLC
f8be4d3e5e Add ubereats_next_execution 2023-03-06 07:12:42 +01:00
AlberLC
89207de059 Fix concurrent ubereats 2023-03-06 07:12:08 +01:00
AlberLC
14793e5eb5 Fix scraping delete original in private 2023-03-05 20:45:57 +01:00
AlberLC
488db76710 Fix quick click buttons 2023-03-05 07:07:26 +01:00
AlberLC
a1cefe437d Add UberEatsBot support for multiple cookies 2023-03-05 06:41:45 +01:00
AlberLC
506af05ebb Add minor changes 2023-03-05 01:16:14 +01:00
AlberLC
c17ffe1010 Fix UberEats task stderr 2023-03-05 01:03:02 +01:00
AlberLC
81854375d1 Fix UberEats task stderr 2023-03-04 05:12:33 +01:00
AlberLC
154f02a1b6 Update requirements.txt 2023-03-03 23:03:58 +01:00
AlberLC
786bab4e04 Fix UberEats task cancellation 2023-03-03 22:07:53 +01:00
AlberLC
d045551db9 Fix UberEats task cancellation 2023-03-03 22:06:47 +01:00
AlberLC
5021bdd7ae Add UberEatsBot 2023-03-03 05:21:10 +01:00
AlberLC
e8742eca42 Add UberEatsBot 2023-03-03 05:20:20 +01:00
AlberLC
a24021c2bd Update FlanaBot.check_old_database_actions 2023-03-03 05:19:52 +01:00
AlberLC
d145f70a42 Update BotAction 2023-03-03 05:19:18 +01:00
AlberLC
73ad75e8b2 Update channel heating 2023-02-28 11:01:51 +01:00
AlberLC
1da383d5eb Update scraper_bot.py 2023-02-28 11:01:14 +01:00
AlberLC
a3893df34b Update requirements.txt 2023-02-20 23:34:13 +01:00
AlberLC
6645bef22f Update SCRAPING_TIMEOUT_SECONDS 2023-02-20 23:33:39 +01:00
AlberLC
d02d5d4df2 Update SCRAPING_TIMEOUT_SECONDS 2023-02-17 05:32:03 +01:00
AlberLC
85b181c598 Improve legibility 2023-02-01 07:15:19 +01:00
AlberLC
db5a1974b2 Fix FlanaDiscBot._changeable_roles 2023-02-01 02:30:13 +01:00
AlberLC
5e65b5a298 Update requirements.txt 2023-01-16 21:30:32 +01:00
AlberLC
4d60281a23 Add delay to FlanaBot._on_delete 2023-01-16 03:31:41 +01:00
AlberLC
0ede68d495 Add ScraperBot._on_no_scraping 2023-01-15 20:11:06 +01:00
23 changed files with 1915 additions and 458 deletions

View File

@@ -3,7 +3,7 @@ FROM flanaganvaquero/flanawright
WORKDIR /application
COPY flanabot flanabot
COPY venv/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
COPY .venv/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
ENV PYTHONPATH=/application

View File

@@ -1,7 +1,10 @@
from flanabot.bots.connect_4_bot import *
from flanabot.bots.flana_bot import *
from flanabot.bots.flana_disc_bot import *
from flanabot.bots.flana_tele_bot import *
from flanabot.bots.penalty_bot import *
from flanabot.bots.poll_bot import *
from flanabot.bots.scraper_bot import *
from flanabot.bots.steam_bot import *
from flanabot.bots.ubereats_bot import *
from flanabot.bots.weather_bot import *

View File

@@ -0,0 +1,309 @@
from __future__ import annotations # todo0 remove when it's by default
__all__ = ['BtcOffersBot']
import asyncio
import functools
import json
import os
from abc import ABC
from collections.abc import Callable
from typing import Any
import aiohttp
import flanautils
import websockets
from multibot import MultiBot, constants as multibot_constants
from flanabot import constants
from flanabot.models import Chat, Message
# ---------------------------------------------------- #
# -------------------- DECORATORS -------------------- #
# ---------------------------------------------------- #
def preprocess_btc_offers(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(self: BtcOffersBot, message: Message) -> Any:
if message.chat.is_group and not self.is_bot_mentioned(message):
return
eur_mode = (
'' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['eur'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
usd_mode = (
'$' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['usd'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
premium_mode = (
'%' in message.text
or
bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['premium'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
)
if len([arg for arg in (eur_mode, usd_mode, premium_mode) if arg]) > 1:
await self.send_error(
'Indica únicamente uno de los siguientes: precio en euros, precio en dólares o prima.',
message
)
return
parsed_number = flanautils.text_to_number(message.text)
if parsed_number and not any((eur_mode, usd_mode, premium_mode)):
eur_mode = True
if (eur_mode or usd_mode) and parsed_number < 0 or not flanautils.validate_mongodb_number(parsed_number):
await self.send_error('❌ Por favor, introduce un número válido.', message)
return
if eur_mode:
query = {'max_price_eur': parsed_number}
elif usd_mode:
query = {'max_price_usd': parsed_number}
elif premium_mode:
query = {'max_premium': parsed_number}
else:
query = {}
return await func(self, message, query)
return wrapper
# ---------------------------------------------------------------------------------------------------- #
# ------------------------------------------ BTC_OFFERS_BOT ------------------------------------------ #
# ---------------------------------------------------------------------------------------------------- #
class BtcOffersBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._websocket: websockets.ClientConnection | None = None
self._notification_task: asyncio.Task[None] | None = None
self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers"
self._websocket_lock = asyncio.Lock()
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_btc_offers, keywords=constants.KEYWORDS['offer'])
self.register(self._on_btc_offers, keywords=constants.KEYWORDS['money'])
self.register(self._on_btc_offers, keywords=(constants.KEYWORDS['offer'], constants.KEYWORDS['money']))
self.register(self._on_notify_btc_offers, keywords=constants.KEYWORDS['notify'])
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['offer']))
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['money']))
self.register(self._on_notify_btc_offers, keywords=(constants.KEYWORDS['notify'], constants.KEYWORDS['offer'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['offer']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['notify']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['offer']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['money']))
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['notify']))
def _find_chats_to_notify(self) -> list[Chat]:
return self.Chat.find({'platform': self.platform.value, 'btc_offers_query': {'$exists': True, '$ne': {}}})
def _is_websocket_connected(self) -> bool:
return self._websocket and self._websocket.state in {websockets.State.CONNECTING, websockets.State.OPEN}
async def _send_offers(self, offers: list[dict], chat: Chat, notifications_disabled=False):
offers_parts = []
for i, offer in enumerate(offers, start=1):
offer_parts = [
f'<b>{i}.</b>',
f"<b>Plataforma:</b> <code>{offer['exchange']}</code>",
f"<b>Id:</b> <code>{offer['id']}</code>"
]
if offer['author']:
offer_parts.append(f"<b>Autor:</b> <code>{offer['author']}</code>")
payment_methods_text = ''.join(
f'\n <code>{payment_method}</code>' for payment_method in offer['payment_methods']
)
rounded_premium = round(offer['premium'], 2)
offer_parts.extend(
(
f"<b>Cantidad:</b> <code>{offer['amount']}</code>",
f"<b>Precio (EUR):</b> <code>{offer['price_eur']:.2f} €</code>",
f"<b>Precio (USD):</b> <code>{offer['price_usd']:.2f} $</code>",
f"<b>Prima:</b> <code>{rounded_premium if rounded_premium else '0.00'} %</code>",
f'<b>Métodos de pago:</b>{payment_methods_text}'
)
)
if offer['description']:
offer_parts.append(
f"<b>Descripción:</b>\n<code><code><code>{offer['description']}</code></code></code>"
)
offers_parts.append('\n'.join(offer_parts))
offers_parts_chunks = flanautils.chunks(offers_parts, 5)
messages_parts = [
[
'<b>💰💰💰 OFERTAS BTC 💰💰💰</b>',
'',
'\n\n'.join(offers_parts_chunks[0])
]
]
for offers_parts_chunk in offers_parts_chunks[1:]:
messages_parts.append(
[
'­',
'\n\n'.join(offers_parts_chunk)
]
)
if notifications_disabled:
messages_parts[-1].extend(
(
'',
'-' * 70,
'<b> Los avisos de ofertas BTC se han eliminado. Si quieres volver a recibirlos, no dudes en pedírmelo.</b>'
)
)
for message_parts in messages_parts:
await self.send('\n'.join(message_parts), chat)
async def _wait_btc_offers_notification(self):
while True:
while True:
try:
data = json.loads(await self._websocket.recv())
except websockets.ConnectionClosed:
await self.start_all_btc_offers_notifications()
else:
break
chat = await self.get_chat(data['chat_id'])
chat.btc_offers_query = {}
chat.save(pull_exclude_fields=('btc_offers_query',))
await self._send_offers(data['offers'], chat, notifications_disabled=True)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@preprocess_btc_offers
async def _on_btc_offers(self, message: Message, query: dict[str, float]):
bot_state_message = await self.send('Obteniendo ofertas BTC...', message)
try:
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{self._api_endpoint}', params=query) as response:
offers = await response.json()
except aiohttp.ClientConnectorError:
await self.send_error('❌🌐 El servidor de ofertas BTC está desconectado.', bot_state_message, edit=True)
return
if offers:
await self._send_offers(offers, message.chat)
await self.delete_message(bot_state_message)
else:
await self.edit('No hay ofertas BTC actualmente que cumplan esa condición.', bot_state_message)
@preprocess_btc_offers
async def _on_notify_btc_offers(self, message: Message, query: dict[str, float]):
if not query:
await self.send_error('❌ Especifica una cantidad para poder avisarte.', message)
return
match query:
case {'max_price_eur': max_price_eur}:
response_text = f'✅ ¡Perfecto! Te avisaré cuando existan ofertas por {max_price_eur:.2f} € o menos.'
case {'max_price_usd': max_price_usd}:
response_text = f'✅ ¡Perfecto! Te avisaré cuando existan ofertas por {max_price_usd:.2f} $ o menos.'
case _:
rounded_max_premium = round(query['max_premium'], 2)
response_text = f"✅ ¡Perfecto! Te avisaré cuando existan ofertas con una prima del {rounded_max_premium if rounded_max_premium else '0.00'} % o menor."
await self.send(response_text, message)
await self.start_btc_offers_notification(message.chat, query)
async def _on_ready(self):
await super()._on_ready()
asyncio.create_task(self.start_all_btc_offers_notifications())
async def _on_stop_btc_offers_notification(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
previous_btc_offers_query = message.chat.btc_offers_query
await self.stop_btc_offers_notification(message.chat)
if previous_btc_offers_query:
await self.send('🛑 Los avisos de ofertas BTC se han eliminado.', message)
else:
await self.send('🤔 No existía ningún aviso de ofertas BTC configurado.', message)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
async def start_all_btc_offers_notifications(self):
if chats := self._find_chats_to_notify():
for chat in chats:
chat = await self.get_chat(chat.id)
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
elif self._notification_task and not self._notification_task.done():
self._notification_task.cancel()
await asyncio.sleep(0)
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
async with self._websocket_lock:
if not self._is_websocket_connected():
while True:
try:
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
except ConnectionRefusedError:
await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS)
else:
break
if not self._notification_task or self._notification_task.done():
self._notification_task = asyncio.create_task(self._wait_btc_offers_notification())
chat.btc_offers_query = query
chat.save()
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'query': query}))
async def stop_all_btc_offers_notification(self):
for chat in self._find_chats_to_notify():
await self.stop_btc_offers_notification(chat)
async def stop_btc_offers_notification(self, chat: Chat):
if self._is_websocket_connected():
await self._websocket.send(json.dumps({'action': 'stop', 'chat_id': chat.id}))
chat.btc_offers_query = {}
chat.save(pull_exclude_fields=('btc_offers_query',))

View File

@@ -10,13 +10,12 @@ from typing import Iterable
from flanautils import Media, MediaType, Source
from multibot import MultiBot
import connect_4_frontend
from flanabot import constants
from flanabot import connect_4_frontend, constants
from flanabot.models import ButtonsGroup, Message, Player
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- CONNECT_4_BOT --------------------------------------------- #
# ------------------------------------------- CONNECT_4_BOT ------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class Connect4Bot(MultiBot, ABC):
# -------------------------------------------------------- #
@@ -25,11 +24,11 @@ class Connect4Bot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_connect_4, constants.KEYWORDS['connect_4'])
self.register(self._on_connect_4, keywords=constants.KEYWORDS['connect_4'])
self.register(self._on_connect_4_vs_itself, (*constants.KEYWORDS['connect_4'], *constants.KEYWORDS['self']))
self.register(self._on_connect_4_vs_itself, keywords=(*constants.KEYWORDS['connect_4'], *constants.KEYWORDS['self']))
self.register_button(self._on_connect_4_button_press, ButtonsGroup.CONNECT_4)
self.register_button(self._on_connect_4_button_press, key=ButtonsGroup.CONNECT_4)
def _ai_insert(
self,
@@ -284,8 +283,8 @@ class Connect4Bot(MultiBot, ABC):
return False
try:
message.data['is_active'] = False
except AttributeError:
message.data['connect_4']['is_active'] = False
except KeyError:
pass
await self.edit(
@@ -304,105 +303,105 @@ class Connect4Bot(MultiBot, ABC):
@staticmethod
def _check_winner_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < j and board[i][j - 3] == board[i][j - 2] == board[i][j - 1]
or
1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i][j - 2] == board[i][j - 1] == board[i][j + 1]
)
and
board[i][j - 1] is not None
(
2 < j and board[i][j - 3] == board[i][j - 2] == board[i][j - 1]
or
1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i][j - 2] == board[i][j - 1] == board[i][j + 1]
)
and
board[i][j - 1] is not None
):
return board[i][j - 1]
@staticmethod
def _check_winner_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
j < constants.CONNECT_4_N_COLUMNS - 3 and board[i][j + 1] == board[i][j + 2] == board[i][j + 3]
or
0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i][j - 1] == board[i][j + 1] == board[i][j + 2]
)
and
board[i][j + 1] is not None
(
j < constants.CONNECT_4_N_COLUMNS - 3 and board[i][j + 1] == board[i][j + 2] == board[i][j + 3]
or
0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i][j - 1] == board[i][j + 1] == board[i][j + 2]
)
and
board[i][j + 1] is not None
):
return board[i][j + 1]
@staticmethod
def _check_winner_up(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and board[i - 3][j] == board[i - 2][j] == board[i - 1][j]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and board[i - 2][j] == board[i - 1][j] == board[i + 1][j]
)
and
board[i - 1][j] is not None
(
2 < i and board[i - 3][j] == board[i - 2][j] == board[i - 1][j]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and board[i - 2][j] == board[i - 1][j] == board[i + 1][j]
)
and
board[i - 1][j] is not None
):
return board[i - 1][j]
@staticmethod
def _check_winner_down(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and board[i + 1][j] == board[i + 2][j] == board[i + 3][j]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and board[i - 1][j] == board[i + 1][j] == board[i + 2][j]
)
and
board[i + 1][j] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and board[i + 1][j] == board[i + 2][j] == board[i + 3][j]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and board[i - 1][j] == board[i + 1][j] == board[i + 2][j]
)
and
board[i + 1][j] is not None
):
return board[i + 1][j]
@staticmethod
def _check_winner_up_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and 2 < j and board[i - 3][j - 3] == board[i - 2][j - 2] == board[i - 1][j - 1]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i - 2][j - 2] == board[i - 1][j - 1] == board[i + 1][j + 1]
(
2 < i and 2 < j and board[i - 3][j - 3] == board[i - 2][j - 2] == board[i - 1][j - 1]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i - 2][j - 2] == board[i - 1][j - 1] == board[i + 1][j + 1]
)
and
board[i - 1][j - 1] is not None
)
and
board[i - 1][j - 1] is not None
):
return board[i - 1][j - 1]
@staticmethod
def _check_winner_up_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i - 1][j + 1] == board[i - 2][j + 2] == board[i - 3][j + 3]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i + 1][j - 1] == board[i - 1][j + 1] == board[i - 2][j + 2]
)
and
board[i - 1][j + 1] is not None
(
2 < i and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i - 1][j + 1] == board[i - 2][j + 2] == board[i - 3][j + 3]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i + 1][j - 1] == board[i - 1][j + 1] == board[i - 2][j + 2]
)
and
board[i - 1][j + 1] is not None
):
return board[i - 1][j + 1]
@staticmethod
def _check_winner_down_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and 2 < j and board[i + 3][j - 3] == board[i + 2][j - 2] == board[i + 1][j - 1]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i + 2][j - 2] == board[i + 1][j - 1] == board[i - 1][j + 1]
)
and
board[i + 1][j - 1] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and 2 < j and board[i + 3][j - 3] == board[i + 2][j - 2] == board[i + 1][j - 1]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i + 2][j - 2] == board[i + 1][j - 1] == board[i - 1][j + 1]
)
and
board[i + 1][j - 1] is not None
):
return board[i + 1][j - 1]
@staticmethod
def _check_winner_down_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i + 1][j + 1] == board[i + 2][j + 2] == board[i + 3][j + 3]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i - 1][j - 1] == board[i + 1][j + 1] == board[i + 2][j + 2]
)
and
board[i + 1][j + 1] is not None
(
i < constants.CONNECT_4_N_ROWS - 3 and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i + 1][j + 1] == board[i + 2][j + 2] == board[i + 3][j + 3]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i - 1][j - 1] == board[i + 1][j + 1] == board[i + 2][j + 2]
)
and
board[i + 1][j + 1] is not None
):
return board[i + 1][j + 1]
@@ -448,7 +447,7 @@ class Connect4Bot(MultiBot, ABC):
return winners
def _winning_positions(self, board: list[list[int | None]]) -> defaultdict[int, list[tuple[int, int]]]:
winning_positions = defaultdict(list)
winning_positions: defaultdict[int, list[tuple[int, int]]] = defaultdict(list)
for next_i, next_j in self._available_positions(board):
for player_number in self._check_winners(next_i, next_j, board):
winning_positions[player_number].append((next_i, next_j))
@@ -530,14 +529,14 @@ class Connect4Bot(MultiBot, ABC):
if player_2.id == self.id:
connect_4_data['turn'] += 1
if await self._ai_turn(
player_1,
player_2,
next_player,
current_player,
connect_4_data['turn'],
constants.CONNECT_4_AI_DELAY_SECONDS,
board,
message
player_1,
player_2,
next_player,
current_player,
connect_4_data['turn'],
constants.CONNECT_4_AI_DELAY_SECONDS,
board,
message
):
return
@@ -564,14 +563,14 @@ class Connect4Bot(MultiBot, ABC):
while True:
turn += 1
if await self._ai_turn(
player_1,
player_2,
current_player,
next_player,
turn,
constants.CONNECT_4_AI_DELAY_SECONDS / 2,
board,
bot_message
player_1,
player_2,
current_player,
next_player,
turn,
constants.CONNECT_4_AI_DELAY_SECONDS / 2,
board,
bot_message
):
break
current_player, next_player = next_player, current_player

View File

@@ -1,7 +1,9 @@
__all__ = ['FlanaBot']
import asyncio
import datetime
import random
import time
from abc import ABC
from typing import Iterable
@@ -10,13 +12,16 @@ import pymongo
import pytz
from flanaapis import InstagramLoginError, MediaNotFoundError, PlaceNotFoundError
from flanautils import return_if_first_empty
from multibot import BadRoleError, MultiBot, Role, bot_mentioned, constants as multibot_constants, group, inline, owner
from multibot import BadRoleError, MessagesFormat, MultiBot, Platform, RegisteredCallback, Role, User, bot_mentioned, constants as multibot_constants, group, ignore_self_message, inline, owner
from flanabot import constants
from flanabot.bots.btc_offers_bot import BtcOffersBot
from flanabot.bots.connect_4_bot import Connect4Bot
from flanabot.bots.penalty_bot import PenaltyBot
from flanabot.bots.poll_bot import PollBot
from flanabot.bots.scraper_bot import ScraperBot
from flanabot.bots.steam_bot import SteamBot
from flanabot.bots.ubereats_bot import UberEatsBot
from flanabot.bots.weather_bot import WeatherBot
from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message
@@ -24,48 +29,66 @@ from flanabot.models import Action, BotAction, ButtonsGroup, Chat, Message
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- FLANA_BOT --------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBot, ABC):
class FlanaBot(Connect4Bot, BtcOffersBot, PenaltyBot, PollBot, ScraperBot, SteamBot, UberEatsBot, WeatherBot, MultiBot, ABC):
Chat = Chat
Message = Message
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tunnel_chat: Chat | None = None
self.help_calls: dict[int, datetime.timedelta] = {}
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_bye, multibot_constants.KEYWORDS['bye'])
self.register(self._on_activate_tunnel, keywords=(multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel']))
self.register(self._on_config, multibot_constants.KEYWORDS['config'])
self.register(self._on_config, (multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
self.register(self._on_bye, keywords=multibot_constants.KEYWORDS['bye'])
self.register(self._on_database_messages, (multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message']))
self.register(self._on_config, keywords=multibot_constants.KEYWORDS['config'])
self.register(self._on_config, keywords=(multibot_constants.KEYWORDS['show'], multibot_constants.KEYWORDS['config']))
self.register(self._on_database_messages_simple, (multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['simple']))
self.register(self._on_database_messages, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.SIMPLE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['simple']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.COMPLETE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['all']))
self.register(self._on_database_messages, extra_kwargs={'format': MessagesFormat.COMPLETE}, keywords=(multibot_constants.KEYWORDS['last'], multibot_constants.KEYWORDS['message'], multibot_constants.KEYWORDS['text']))
self.register(self._on_delete, multibot_constants.KEYWORDS['delete'])
self.register(self._on_delete, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_deactivate_tunnel, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
self.register(self._on_hello, multibot_constants.KEYWORDS['hello'])
self.register(self._on_delete, keywords=multibot_constants.KEYWORDS['delete'])
self.register(self._on_delete, keywords=(multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_delete, extra_kwargs={'until': True}, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['until']))
self.register(self._on_delete, extra_kwargs={'until': True}, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['until'], multibot_constants.KEYWORDS['message']))
self.register(self._on_hello, keywords=multibot_constants.KEYWORDS['hello'])
self.register(self._on_help, keywords=multibot_constants.KEYWORDS['help'])
self.register(self._on_new_message_default, default=True)
self.register(self._on_recover_message, multibot_constants.KEYWORDS['reset'])
self.register(self._on_recover_message, multibot_constants.KEYWORDS['message'])
self.register(self._on_recover_message, (multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
self.register(self._on_recover_message, keywords=multibot_constants.KEYWORDS['message'])
self.register(self._on_roles, multibot_constants.KEYWORDS['permission'])
self.register(self._on_roles, multibot_constants.KEYWORDS['role'])
self.register(self._on_roles, (multibot_constants.KEYWORDS['permission'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['change'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_reset, keywords=multibot_constants.KEYWORDS['reset'])
self.register(self._on_reset, keywords=(multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['message']))
self.register(self._on_users, multibot_constants.KEYWORDS['user'])
self.register(self._on_roles, keywords=multibot_constants.KEYWORDS['permission'])
self.register(self._on_roles, keywords=multibot_constants.KEYWORDS['role'])
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['permission'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['change'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['role']))
self.register(self._on_roles, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['role']))
self.register_button(self._on_config_button_press, ButtonsGroup.CONFIG)
self.register_button(self._on_roles_button_press, ButtonsGroup.ROLES)
self.register_button(self._on_users_button_press, ButtonsGroup.USERS)
self.register(self._on_tunnel_message, always=True)
self.register(self._on_users, keywords=multibot_constants.KEYWORDS['user'])
self.register_button(self._on_config_button_press, key=ButtonsGroup.CONFIG)
self.register_button(self._on_roles_button_press, key=ButtonsGroup.ROLES)
self.register_button(self._on_users_button_press, key=ButtonsGroup.USERS)
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
return []
@@ -74,7 +97,7 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
async def _get_message(
self,
event: multibot_constants.MESSAGE_EVENT,
pull_overwrite_fields: Iterable[str] = ('_id', 'config')
pull_overwrite_fields: Iterable[str] = ('_id', 'config', 'date', 'ubereats')
) -> Message:
return await super()._get_message(event, pull_overwrite_fields)
@@ -120,129 +143,305 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_bye(self, message: Message):
if message.chat.is_private or self.is_bot_mentioned(message):
await self.send_bye(message)
@group
@bot_mentioned
async def _on_config(self, message: Message):
if not message.chat.config:
@owner
@group(False)
async def _on_activate_tunnel(self, message: Message):
keywords = (*multibot_constants.KEYWORDS['activate'], *constants.KEYWORDS['tunnel'])
text_parts = await self.filter_mention_ids(flanautils.remove_accents(message.text.lower()), message, delete_names=True)
try:
chat_id_or_name = next(part for part in text_parts if not flanautils.cartesian_product_string_matching(part, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT))
except StopIteration:
return
buttons_texts = [(f"{'' if v else ''} {k}", v) for k, v in message.chat.config.items()]
chat_id_or_name = flanautils.cast_number(chat_id_or_name, raise_exception=False)
if (chat := await self.get_chat(chat_id_or_name)) or (chat := await self.get_chat(await self.get_user(chat_id_or_name))):
self.tunnel_chat = chat
await self.send(f"Túnel abierto con <b>{chat.name}{f' ({chat.group_name})' if chat.group_name else ''}</b>.", message)
else:
await self.send_error('Chat inválido.', message)
async def _on_bye(self, message: Message):
if message.chat.is_private or self.is_bot_mentioned(message):
message.is_inline = False
await self.send_bye(message)
async def _on_config(self, message: Message):
if message.chat.is_private:
config_names = ('auto_insult', 'auto_scraping', 'scraping_delete_original', 'ubereats')
elif self.is_bot_mentioned(message):
config_names = (
'auto_insult',
'auto_scraping',
'auto_weather_chart',
'check_flood',
'punish',
'scraping_delete_original'
)
else:
return
buttons_texts = []
for k, v in message.chat.config.items():
if k not in config_names:
continue
if k == 'ubereats':
k = f"ubereats (cada {flanautils.TimeUnits(seconds=message.chat.ubereats['seconds']).to_words()})"
buttons_texts.append((f"{'' if v else ''} {k}", v))
await self.send('<b>Estos son los ajustes del chat:</b>\n\n', flanautils.chunks(buttons_texts, 3), message, buttons_key=ButtonsGroup.CONFIG)
await self.delete_message(message)
async def _on_config_button_press(self, message: Message):
await self.accept_button_event(message)
if message.buttons_info.presser_user.is_admin is False:
if message.buttons_info.presser_user.is_admin is False or not message.buttons_info.pressed_button:
return
config_name = message.buttons_info.pressed_text.split()[1]
message.chat.config[config_name] = not message.chat.config[config_name]
message.buttons_info.pressed_button.text = f"{'' if message.chat.config[config_name] else ''} {config_name}"
if config_name == 'ubereats':
if message.chat.config[config_name]:
await self.start_ubereats(message.chat)
else:
await self.stop_ubereats(message.chat)
button_text = f"ubereats (cada {flanautils.TimeUnits(seconds=message.chat.ubereats['seconds']).to_words()})"
else:
button_text = config_name
message.buttons_info.pressed_button.text = f"{'' if message.chat.config[config_name] else ''} {button_text}"
await self.edit(message.buttons_info.buttons, message)
@owner
async def _on_database_messages(self, message: Message, simple=False):
@inline(False)
async def _on_database_messages(self, message: Message, format=MessagesFormat.NORMAL):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
if message.author.id != self.owner_id:
await self.send_negative(message)
return
n_messages = flanautils.text_to_number(message.text)
if not n_messages:
n_messages = 1
words = await self.filter_mention_ids(message.text, message, delete_names=True)
n_messages = 0
platforms = []
is_group = False
is_private = False
parsing_users = False
parsing_chats = False
users = []
chats = []
for word in words:
lower_word = word.lower()
await self.send(
self.get_formatted_last_database_messages(
n_messages,
timezone=pytz.timezone('Europe/Madrid'),
simple=simple
),
message
)
if (
not parsing_users
and
flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['user'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
):
parsing_users = True
parsing_chats = False
elif (
not parsing_chats
and
flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['chat'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
):
parsing_users = False
parsing_chats = True
elif parsing_users:
users.append(flanautils.cast_number(word, raise_exception=False))
elif parsing_chats:
chats.append(flanautils.cast_number(word, raise_exception=False))
elif platform_matches := flanautils.cartesian_product_string_matching(
(element.name.lower() for element in Platform),
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
platforms.extend(Platform[key.upper()] for key in platform_matches)
elif flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['group'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
is_group = True
elif flanautils.cartesian_product_string_matching(
multibot_constants.KEYWORDS['private'],
lower_word,
multibot_constants.PARSER_MIN_SCORE_DEFAULT
):
is_private = True
else:
try:
n_messages += flanautils.cast_number(word)
except ValueError:
pass
async def _on_database_messages_simple(self, message: Message):
await self._on_database_messages(message, simple=True)
if not is_group and not is_private:
is_group = True
is_private = True
if (
n_messages >= 0
and
(messages := await self.get_last_database_messages(
n_messages=max(1, n_messages),
platforms=platforms,
authors=[user for user in message.mentions if user.id != self.id] + users,
is_group=is_group,
is_private=is_private,
chats=chats
))
):
await self.send(
self.format_messages(messages, timezone=pytz.timezone('Europe/Madrid'), format=format),
message
)
await self.delete_message(message)
@owner
@group(False)
async def _on_deactivate_tunnel(self, message: Message):
self.tunnel_chat = None
await self.send('Túnel cerrado.', message)
@inline(False)
async def _on_delete(self, message: Message):
async def _on_delete(self, message: Message, until=False):
if message.replied_message:
if message.replied_message.author.id == self.id:
await self.delete_message(message.replied_message)
if message.chat.is_group:
await self.delete_message(message)
elif message.chat.is_group and self.is_bot_mentioned(message):
if not self.is_bot_mentioned(message):
return
if until and message.author.is_admin:
await self.clear(message.chat, until_message=message.replied_message)
elif not until and (message.author.is_admin or message.replied_message.author.id in {self.id, message.author.id}):
flanautils.do_later(flanautils.text_to_time(message.text).total_seconds(), self.delete_message, message.replied_message)
await self.delete_message(message)
elif message.chat.is_group:
await self.send_negative(message)
elif (
(message.chat.is_private or self.is_bot_mentioned(message))
and
(n_messages := flanautils.text_to_number(message.text))
(message.chat.is_private or self.is_bot_mentioned(message))
and
(n_messages := flanautils.text_to_number(' '.join(await self.filter_mention_ids(message.text, message))))
):
if message.author.is_admin is False:
await self.send_negative(message)
return
if n_messages <= 0:
await self.delete_message(message)
return
await self.clear(n_messages + 1, message.chat)
await self.clear(message.chat, n_messages + 1)
async def _on_hello(self, message: Message):
if message.chat.is_private or self.is_bot_mentioned(message):
message.is_inline = False
await self.send_hello(message)
async def _on_help(self, message: Message):
now = datetime.timedelta(seconds=time.time())
if (
message.chat.is_group
and
not self.is_bot_mentioned(message)
or
self.help_calls.get(message.chat.id)
and
now - self.help_calls[message.chat.id] <= datetime.timedelta(minutes=1)
):
return
self.help_calls[message.chat.id] = now
await self.send(
'<b>Necesita ayuda:</b>\n'
'<b>User:</b>\n'
f' <b>id:</b> <code>{message.author.id}</code>\n'
f' <b>name:</b> <code>{message.author.name}</code>\n'
f' <b>is_admin:<b> <code>{message.author.is_admin}</code>\n'
f' <b>is_bot:</b> <code>{message.author.is_bot}</code>\n'
'\n'
'<b>Chat:</b>\n'
f' <b>id:</b> <code>{message.chat.id}</code>\n'
f' <b>name:</b> <code>{message.chat.name}</code>\n'
f' <b>group_id:</b> <code>{message.chat.group_id}</code>\n'
f' <b>group_name:</b> <code>{message.chat.group_name}</code>',
await self.owner_chat
)
await self.send('Se ha notificado a Flanagan. Se pondrá en contacto contigo cuando pueda.', message)
async def _on_new_message_default(self, message: Message):
if message.is_inline:
await self._scrape_and_send(message)
await self._on_scraping(message)
elif (
(
message.chat.is_group
and
not self.is_bot_mentioned(message)
and
not message.chat.config['auto_scraping']
or
not await self._scrape_send_and_delete(message)
)
(
message.chat.is_group
and
(
message.author.id != self.owner_id
and
(
not message.replied_message
or
message.replied_message.author.id != self.id
or
not message.replied_message.medias
)
and
(
self.is_bot_mentioned(message)
or
(
message.chat.config['auto_insult']
and
random.random() < constants.INSULT_PROBABILITY
)
)
)
not self.is_bot_mentioned(message)
and
not message.chat.config['auto_scraping']
or
not await self._on_scraping(message, scrape_replied=False)
)
and
message.author.id != self.owner_id
and
(
self.is_bot_mentioned(message)
or
message.chat.config['auto_insult']
and
random.random() < constants.INSULT_PROBABILITY
)
):
await self.send_insult(message)
@ignore_self_message
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
if (
message.replied_message
and
message.replied_message.author.id == self.id
and
message.replied_message.medias
):
whitelist_callbacks = (whitelist_callbacks or set()) | {
self._on_delete,
self._on_recover_message,
self._on_reset,
self._on_song_info
}
elif self.tunnel_chat and message.chat == await self.owner_chat:
whitelist_callbacks = (whitelist_callbacks or set()) | {self._on_deactivate_tunnel, self._on_tunnel_message}
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
async def _on_ready(self):
if not self._is_initialized:
flanautils.do_every(multibot_constants.CHECK_OLD_DATABASE_MESSAGES_EVERY_SECONDS, self.check_old_database_actions)
await super()._on_ready()
await flanautils.do_every(multibot_constants.CHECK_OLD_DATABASE_MESSAGES_EVERY_SECONDS, self.check_old_database_actions)
@inline(False)
async def _on_recover_message(self, message: Message):
if message.replied_message and message.replied_message.author.id == self.id:
message_deleted_bot_action = BotAction.find_one({'action': Action.MESSAGE_DELETED.value, 'chat': message.chat.object_id, 'affected_objects': message.replied_message.object_id})
elif self.is_bot_mentioned(message):
message_deleted_bot_action = BotAction.find_one({
'platform': self.platform.value,
'action': Action.MESSAGE_DELETED.value,
'chat': message.chat.object_id,
'affected_objects': message.replied_message.object_id
})
elif message.chat.is_private or self.is_bot_mentioned(message):
message_deleted_bot_action = BotAction.find_one({
'platform': self.platform.value,
'action': Action.MESSAGE_DELETED.value,
'chat': message.chat.object_id,
'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.RECOVERY_DELETED_MESSAGE_BEFORE}
@@ -262,6 +461,12 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
for deleted_message in deleted_messages:
await self.send(deleted_message.text, message)
async def _on_reset(self, message: Message):
if self._get_poll_message(message):
await self._on_delete_votes(message, all_=True)
else:
await self._on_recover_message(message)
@group
@bot_mentioned
async def _on_roles(self, message: Message):
@@ -298,6 +503,28 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
message.buttons_info.presser_user.save()
async def _on_tunnel_message(self, message: Message):
if (
not self.tunnel_chat
or
self._parse_callbacks(
message.text,
[
RegisteredCallback(..., keywords=(multibot_constants.KEYWORDS['activate'], constants.KEYWORDS['tunnel'])),
RegisteredCallback(..., keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['tunnel']))
]
)
):
return
if message.chat == self.tunnel_chat:
await self.send(f"<b>{message.author.name.split('#')[0]}:</b> {message.text}", await self.owner_chat)
elif message.chat == await self.owner_chat:
if message.text:
await self.send(message.text, self.tunnel_chat)
else:
await self.send('No puedo enviar un mensaje sin texto.', message)
@group
@bot_mentioned
async def _on_users(self, message: Message):
@@ -345,17 +572,24 @@ class FlanaBot(Connect4Bot, PenaltyBot, PollBot, ScraperBot, WeatherBot, MultiBo
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
@staticmethod
def check_old_database_actions():
def check_old_database_actions(self):
before_date = datetime.datetime.now(datetime.timezone.utc) - multibot_constants.DATABASE_MESSAGE_EXPIRATION_TIME
BotAction.delete_many_raw({'date': {'$lte': before_date}})
BotAction.delete_many_raw({'platform': self.platform.value, 'date': {'$lte': before_date}})
async def send_bye(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE:
return await self.send(random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())), message)
async def send_bye(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE:
return await self.send(
random.choice((*constants.BYE_PHRASES, flanautils.CommonWords.random_time_greeting())),
chat
)
async def send_hello(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE:
return await self.send(random.choice((*constants.HELLO_PHRASES, flanautils.CommonWords.random_time_greeting())), message)
async def send_hello(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE:
return await self.send(
random.choice((*constants.HELLO_PHRASES, flanautils.CommonWords.random_time_greeting())),
chat
)
async def send_insult(self, message: Message) -> multibot_constants.ORIGINAL_MESSAGE | None:
await self.typing_delay(message)
return await self.send(random.choice(constants.INSULTS), message)
async def send_insult(self, chat: int | str | User | Chat | Message) -> multibot_constants.ORIGINAL_MESSAGE | None:
chat = await self.get_chat(chat)
async with await self.typing(chat):
await asyncio.sleep(random.randint(1, 3))
return await self.send(random.choice(constants.INSULTS), chat)

View File

@@ -5,16 +5,21 @@ import datetime
import math
import os
import random
import urllib.parse
import uuid
from collections import defaultdict
from pathlib import Path
import aiohttp
import discord
import flanautils
import pytz
from flanautils import Media, NotFoundError, OrderedSet
from multibot import BadRoleError, DiscordBot, Role, User, bot_mentioned, constants as multibot_constants, group
from flanautils import Media, MediaType, NotFoundError, OrderedSet
from multibot import BadRoleError, DiscordBot, LimitError, Platform, Role, User, admin, bot_mentioned, constants as multibot_constants, group
from flanabot import constants
from flanabot.bots.flana_bot import FlanaBot
from flanabot.models import Chat, Message, Punishment
from flanabot.models.heating_context import ChannelData, HeatingContext
# ---------------------------------------------------------------------------------------------------- #
@@ -23,8 +28,8 @@ from flanabot.models import Chat, Message, Punishment
class FlanaDiscBot(DiscordBot, FlanaBot):
def __init__(self):
super().__init__(os.environ['DISCORD_BOT_TOKEN'])
self.heating = False
self.heat_level = 0.0
self.heating_contexts: dict[int, HeatingContext] = defaultdict(HeatingContext)
self._flanaserver_api_base_url = f"http://{os.environ['FLANASERVER_API_HOST']}:{os.environ['FLANASERVER_API_PORT']}"
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -35,70 +40,81 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
self.client.add_listener(self._on_member_remove, 'on_member_remove')
self.client.add_listener(self._on_voice_state_update, 'on_voice_state_update')
self.register(self._on_audit_log, multibot_constants.KEYWORDS['audit'])
self.register(self._on_audit_log, keywords=multibot_constants.KEYWORDS['audit'])
self.register(self._on_restore_channel_names, keywords=(multibot_constants.KEYWORDS['reset'], multibot_constants.KEYWORDS['chat']))
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
group_roles = await self.get_group_roles(group_)
group_id = self.get_group_id(group_)
return [role for role in await self.get_group_roles(group_) if role.id in constants.CHANGEABLE_ROLES[group_id]]
return [role for role in group_roles if role.id in constants.CHANGEABLE_ROLES[Platform.DISCORD][group_id]]
async def _heat_channel(self, channel: discord.VoiceChannel):
async def set_fire_to(channel_key: str, depends_on: str, firewall=0):
fire_score = random.randint(0, channels[depends_on]['n_fires'] - channels[channel_key]['n_fires']) - firewall // 2
fire_score = random.randint(0, channels_data[depends_on].n_fires - channels_data[channel_key].n_fires) - firewall // 2
if fire_score < 1:
if not channels[channel_key]['n_fires']:
if not channels_data[channel_key].n_fires:
return
channels[channel_key]['n_fires'] -= 1
channels_data[channel_key].n_fires -= 1
elif fire_score == 1:
return
else:
channels[channel_key]['n_fires'] += 1
channels_data[channel_key].n_fires += 1
if channels[channel_key]['n_fires']:
new_name_ = '🔥' * channels[channel_key]['n_fires']
if channels_data[channel_key].n_fires:
new_name_ = '🔥' * channels_data[channel_key].n_fires
else:
new_name_ = channels[channel_key]['original_name']
await channels[channel_key]['object'].edit(name=new_name_)
new_name_ = channels_data[channel_key].original_name
await channels_data[channel_key].channel.edit(name=new_name_)
channels = {}
for key in constants.DISCORD_HOT_CHANNEL_IDS:
channel_ = flanautils.find(channel.guild.voice_channels, condition=lambda c: c.id == constants.DISCORD_HOT_CHANNEL_IDS[key])
channels[key] = {
'object': channel_,
'original_name': channel_.name,
'n_fires': 0
}
voice_channels = {}
for voice_channel in channel.guild.voice_channels:
voice_channels[voice_channel.id] = voice_channel
channels_data = {}
for letter, channel_id in constants.DISCORD_HOT_CHANNEL_IDS.items():
channels_data[letter] = ChannelData(
channel=voice_channels[channel_id],
original_name=voice_channels[channel_id].name
)
heating_context = self.heating_contexts[channel.guild.id]
heating_context.channels_data = channels_data
while True:
await asyncio.sleep(constants.HEAT_PERIOD_SECONDS)
if channel.members:
self.heat_level += 0.5
heating_context.heat_level += 0.5
else:
if not self.heat_level:
if heating_context.heat_level == constants.HEAT_FIRST_LEVEL:
return
self.heat_level -= 0.5
if self.heat_level > len(constants.DISCORD_HEAT_NAMES) - 1:
self.heat_level = float(int(self.heat_level))
if not self.heat_level.is_integer():
heating_context.heat_level -= 0.5
if heating_context.heat_level > len(constants.DISCORD_HEAT_NAMES) - 1:
heating_context.heat_level = float(int(heating_context.heat_level))
if not heating_context.heat_level.is_integer():
continue
i = int(self.heat_level)
if i < len(constants.DISCORD_HEAT_NAMES):
i = int(heating_context.heat_level)
if i == constants.HEAT_FIRST_LEVEL:
n_fires = 0
new_name = channels_data['C'].original_name
elif i < len(constants.DISCORD_HEAT_NAMES):
n_fires = 0
new_name = constants.DISCORD_HEAT_NAMES[i]
else:
n_fires = i - len(constants.DISCORD_HEAT_NAMES) + 1
n_fires = round(math.log(n_fires + 4, 1.2) - 8)
new_name = '🔥' * n_fires
channels['C']['n_fires'] = n_fires
channels_data['C'].n_fires = n_fires
if channel.name != new_name:
await channel.edit(name=new_name)
await set_fire_to('B', depends_on='C', firewall=len(channels['B']['object'].members))
await set_fire_to('A', depends_on='B', firewall=len(channels['A']['object'].members))
await set_fire_to('D', depends_on='C', firewall=len(channels['C']['object'].members))
await set_fire_to('E', depends_on='D', firewall=len(channels['D']['object'].members))
await set_fire_to('B', depends_on='C', firewall=len(channels_data['B'].channel.members))
await set_fire_to('A', depends_on='B', firewall=len(channels_data['A'].channel.members))
await set_fire_to('D', depends_on='C', firewall=len(channels_data['C'].channel.members))
await set_fire_to('E', depends_on='D', firewall=len(channels_data['D'].channel.members))
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
@@ -113,10 +129,48 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
message: Message,
force=False,
audio_only=False,
timeout_for_media: int | float = 15
timeout_for_media: int | float = constants.SCRAPING_TIMEOUT_SECONDS
) -> OrderedSet[Media]:
return await super()._search_medias(message, force, audio_only, timeout_for_media)
async def _send_media(self, media: Media, bot_state_message: Message, message: Message) -> Message | None:
# noinspection PyBroadException
try:
return await self.send(media, message, reply_to=message.replied_message, raise_exceptions=True)
except LimitError:
if bot_state_message:
await self.edit('No cabe porque Discord es una mierda. Subiendo a FlanaServer...', bot_state_message)
async with aiohttp.ClientSession() as session:
form = aiohttp.FormData()
file_name = urllib.parse.unquote(media.title or Path(media.url).name or uuid.uuid4().hex)
if media.extension and not file_name.endswith(media.extension):
file_name = f'{file_name}.{media.extension}'
match media.type_:
case MediaType.AUDIO:
content_type = f"audio/{'mpeg' if media.extension == 'mp3' else media.extension}"
case MediaType.GIF:
content_type = 'image/gif'
case MediaType.IMAGE:
content_type = f'image/{media.extension}'
case MediaType.VIDEO:
content_type = f'video/{media.extension}'
form.add_field('file', media.bytes_, content_type=content_type, filename=file_name)
form.add_field('expires_in', str(constants.FLANASERVER_FILE_EXPIRATION_SECONDS))
async with session.post(f'{self._flanaserver_api_base_url}/files', data=form) as response:
if response.status != 201:
return
file_info = await response.json()
return await self.send(f"{constants.FLANASERVER_BASE_URL}{file_info['embed_url']}", message)
except Exception:
pass
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
try:
@@ -165,6 +219,13 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
async def _on_member_remove(self, member: discord.Member):
(await self._create_user_from_discord_user(member)).save()
@group
@bot_mentioned
@admin(send_negative=True)
async def _on_restore_channel_names(self, message: Message):
await self.delete_message(message)
await self.restore_channel_names(self.get_group_id(message))
async def _on_voice_state_update(self, _: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if getattr(before.channel, 'id', None) == constants.DISCORD_HOT_CHANNEL_IDS['C']:
channel = before.channel
@@ -173,10 +234,11 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
else:
return
if not self.heating:
self.heating = True
heating_context = self.heating_contexts[channel.guild.id]
if not heating_context.is_active:
heating_context.is_active = True
await self._heat_channel(channel)
self.heating = False
heating_context.is_active = False
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
@@ -191,3 +253,13 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
'group_id': group_id,
'is_active': True
}))
async def restore_channel_names(self, group_id: int):
heating_context = self.heating_contexts[group_id]
for channel_data in heating_context.channels_data.values():
if channel_data.channel.name != channel_data.original_name:
await channel_data.channel.edit(name=channel_data.original_name)
channel_data.n_fires = 0
heating_context.heat_level = constants.HEAT_FIRST_LEVEL

View File

@@ -4,12 +4,13 @@ __all__ = ['whitelisted', 'FlanaTeleBot']
import functools
import os
from typing import Callable
from typing import Any, Callable
import telethon.tl.functions
from flanautils import Media, OrderedSet
from multibot import TelegramBot, find_message, user_client
from multibot import RegisteredCallback, TelegramBot, find_message, use_user_client, user_client
from flanabot import constants
from flanabot.bots.flana_bot import FlanaBot
from flanabot.models import Message
@@ -20,11 +21,11 @@ from flanabot.models import Message
def whitelisted(func: Callable) -> Callable:
@functools.wraps(func)
@find_message
async def wrapper(self: FlanaTeleBot, message: Message):
async def wrapper(self: FlanaTeleBot, message: Message, *args, **kwargs) -> Any:
if message.author.id not in self.whitelist_ids:
return
return await func(self, message)
return await func(self, message, *args, **kwargs)
return wrapper
@@ -47,7 +48,7 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
# -------------------------------------------------------- #
@user_client
async def _get_contacts_ids(self) -> list[int]:
async with self.user_client:
async with use_user_client(self):
contacts_data = await self.user_client(telethon.tl.functions.contacts.GetContactsRequest(hash=0))
return [contact.user_id for contact in contacts_data.contacts]
@@ -57,7 +58,7 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
message: Message,
force=False,
audio_only=False,
timeout_for_media: int | float = 15
timeout_for_media: int | float = constants.SCRAPING_TIMEOUT_SECONDS
) -> OrderedSet[Media]:
return await super()._search_medias(message, force, audio_only, timeout_for_media)
@@ -73,8 +74,13 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
await super()._on_inline_query_raw(message)
@whitelisted
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
async def _on_ready(self):
await super()._on_ready()

View File

@@ -6,15 +6,15 @@ from abc import ABC
import flanautils
from flanautils import TimeUnits
from multibot import MultiBot, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
from multibot import MultiBot, RegisteredCallback, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
from flanabot import constants
from flanabot.models import Chat, Message, Punishment
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- PENALTY_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- PENALTY_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class PenaltyBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -26,26 +26,26 @@ class PenaltyBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_ban, multibot_constants.KEYWORDS['ban'])
self.register(self._on_ban, keywords=multibot_constants.KEYWORDS['ban'])
self.register(self._on_mute, multibot_constants.KEYWORDS['mute'])
self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_mute, keywords=multibot_constants.KEYWORDS['mute'])
self.register(self._on_mute, keywords=(('haz', 'se'), multibot_constants.KEYWORDS['mute']))
self.register(self._on_mute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
self.register(self._on_mute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_punish, constants.KEYWORDS['punish'])
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_punish, keywords=constants.KEYWORDS['punish'])
self.register(self._on_punish, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
self.register(self._on_punish, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_unban, multibot_constants.KEYWORDS['unban'])
self.register(self._on_unban, keywords=multibot_constants.KEYWORDS['unban'])
self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute'])
self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_unmute, keywords=multibot_constants.KEYWORDS['unmute'])
self.register(self._on_unmute, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
self.register(self._on_unmute, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_unpunish, constants.KEYWORDS['unpunish'])
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_unpunish, keywords=constants.KEYWORDS['unpunish'])
self.register(self._on_unpunish, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
self.register(self._on_unpunish, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
@admin(False)
@group
@@ -73,9 +73,68 @@ class PenaltyBot(MultiBot, ABC):
'group_id': message.chat.group_id
})
punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message)
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message, flood=True)
await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message)
@admin(False)
@group
async def _check_message_spam(self, message: Message) -> bool:
if await self.is_punished(message.author, message.chat):
return True
spam_messages = self.Message.find({
'text': message.text,
'platform': self.platform.value,
'author': message.author.object_id,
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - constants.SPAM_TIME_RANGE},
})
chats = {message.chat for message in spam_messages}
if len(chats) <= constants.SPAM_CHANNELS_LIMIT:
return False
await self.punish(message.author.id, message.chat.group_id)
await asyncio.sleep(constants.SPAM_DELETION_DELAY.total_seconds()) # We make sure to also delete any messages they may have sent before the punishment
spam_messages = self.Message.find({
'text': message.text,
'platform': self.platform.value,
'author': message.author.object_id,
'date': {
'$gte': datetime.datetime.now(datetime.timezone.utc)
-
constants.SPAM_TIME_RANGE
-
constants.SPAM_DELETION_DELAY
},
'is_deleted': False
})
chats = {message.chat for message in spam_messages}
for message in spam_messages:
await self.delete_message(await self.get_message(message.id, message.chat.id))
groups_data = {chat.group_id: chat.group_name for chat in chats}
owner_message_parts = (
'<b>Spammer castigado:</b>',
'<b>User:</b>',
f' <b>id:</b> <code>{message.author.id}</code>',
f' <b>name:</b> <code>{message.author.name}</code>',
f' <b>is_admin:<b> <code>{message.author.is_admin}</code>',
f' <b>is_bot:</b> <code>{message.author.is_bot}</code>',
'',
f'<b>Chats: {len(chats)}</b>',
'',
'<b>Groups:</b>',
'\n\n'.join(
f' <b>group_id:</b> <code>{group_id}</code>\n'
f' <b>group_name:</b> <code>{group_name}</code>'
for group_id, group_name in groups_data.items()
)
)
await self.send('\n'.join(owner_message_parts), await self.owner_chat)
return True
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
pass
@@ -100,11 +159,17 @@ class PenaltyBot(MultiBot, ABC):
await self.mute(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
@ignore_self_message
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)
async def _on_new_message_raw(
self,
message: Message,
whitelist_callbacks: set[RegisteredCallback] | None = None,
blacklist_callbacks: set[RegisteredCallback] | None = None
):
await super()._on_new_message_raw(message, whitelist_callbacks, blacklist_callbacks)
if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline:
async with self.lock:
await self._check_message_flood(message)
if not await self._check_message_spam(message):
await self._check_message_flood(message)
@bot_mentioned
@group
@@ -117,8 +182,10 @@ class PenaltyBot(MultiBot, ABC):
await self.punish(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
async def _on_ready(self):
if not self._is_initialized:
flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
await super()._on_ready()
await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
@bot_mentioned
@group
@@ -155,19 +222,11 @@ class PenaltyBot(MultiBot, ABC):
if not punishment.until or now < punishment.until:
continue
await self._remove_penalty(punishment, self._unpunish, delete=False)
if punishment.is_active:
punishment.is_active = False
punishment.last_update = now
punishment.save()
await self._remove_penalty(punishment, self._unpunish)
if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now:
if punishment.level == 1:
punishment.delete()
else:
punishment.level -= 1
punishment.last_update = now
punishment.save()
punishment.level -= 1
punishment.delete()
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
pass
@@ -177,12 +236,14 @@ class PenaltyBot(MultiBot, ABC):
user: int | str | User,
group_: int | str | Chat | Message,
time: int | datetime.timedelta = None,
message: Message = None
message: Message = None,
flood=False
):
# noinspection PyTypeChecker
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time)
punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',))
punishment.level += 1
if flood:
punishment.level += 1
await self._punish(punishment.user_id, punishment.group_id)
punishment.save(pull_exclude_fields=('until',))

View File

@@ -8,7 +8,7 @@ from typing import Iterable
import flanautils
from flanautils import OrderedSet
from multibot import MultiBot, admin, constants as multibot_constants
from multibot import MultiBot, RegisteredCallback, constants as multibot_constants
from flanabot import constants
from flanabot.models import ButtonsGroup, Message
@@ -24,32 +24,32 @@ class PollBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2)
self.register(self._on_choose, constants.KEYWORDS['random'], priority=2)
self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2)
self.register(self._on_choose, keywords=constants.KEYWORDS['choose'], priority=2)
self.register(self._on_choose, keywords=multibot_constants.KEYWORDS['random'], priority=2)
self.register(self._on_choose, keywords=(constants.KEYWORDS['choose'], multibot_constants.KEYWORDS['random']), priority=2)
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, extra_kwargs={'all_': True}, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, extra_kwargs={'all_': True}, keywords=(multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, keywords=(multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
self.register(self._on_dice, constants.KEYWORDS['dice'])
self.register(self._on_dice, keywords=constants.KEYWORDS['dice'])
self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2)
self.register(self._on_poll, keywords=constants.KEYWORDS['poll'], priority=2)
self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
self.register(self._on_poll_multi, keywords=(constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, keywords=multibot_constants.KEYWORDS['deactivate'])
self.register(self._on_stop_poll, keywords=(multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, keywords=multibot_constants.KEYWORDS['stop'])
self.register(self._on_stop_poll, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_ban, keywords=(multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_unban, keywords=(multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register_button(self._on_poll_button_press, ButtonsGroup.POLL)
self.register_button(self._on_poll_button_press, key=ButtonsGroup.POLL)
@staticmethod
def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]:
@@ -82,7 +82,7 @@ class PollBot(MultiBot, ABC):
for option, option_votes in poll_data['votes'].items():
ratio = f'{len(option_votes)}/{total_votes}'
names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else ''
buttons.append(f'{option}{ratio} {names}')
buttons.append(f"{option}{ratio}{f' {names}' if names else ''}")
else:
buttons = list(poll_data['votes'].keys())
@@ -97,7 +97,7 @@ class PollBot(MultiBot, ABC):
discarded_words = {
*constants.KEYWORDS['choose'],
*constants.KEYWORDS['random'],
*multibot_constants.KEYWORDS['random'],
self.name.lower(), f'<@{self.id}>',
'entre', 'between'
}
@@ -125,23 +125,22 @@ class PollBot(MultiBot, ABC):
else:
await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message)
async def _on_delete_all_votes(self, message: Message):
await self._on_delete_votes(message, all_=True)
@admin(send_negative=True)
async def _on_delete_votes(self, message: Message, all_=False):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
poll_data = poll_message.data['poll']
if all_:
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name].clear()
for option_votes in poll_data['votes'].values():
option_votes.clear()
else:
for user in await self._find_users_to_punish(message):
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id]
user_ids = [user.id for user in await self._find_users_to_punish(message)]
for option_votes in poll_data['votes'].values():
option_votes[:] = [option_vote for option_vote in option_votes if option_vote[0] not in user_ids]
await self.delete_message(message)
await self._update_poll_buttons(poll_message)
@@ -156,21 +155,30 @@ class PollBot(MultiBot, ABC):
await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message)
async def _on_poll(self, message: Message, is_multiple_answer=False):
if (
self._get_poll_message(message)
and
self._parse_callbacks(message.text, [RegisteredCallback(..., keywords=multibot_constants.KEYWORDS['reset'])])
):
await self._on_delete_votes(message, all_=True)
return
if message.chat.is_group and not self.is_bot_mentioned(message):
return
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['vote'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]:
buttons = self.distribute_buttons(final_options, vertically=True)
await self.send(
f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...",
self.distribute_buttons(final_options, vertically=True),
buttons,
message,
buttons_key=ButtonsGroup.POLL,
data={
'poll': {
'is_active': True,
'is_multiple_answer': is_multiple_answer,
'votes': {option: [] for option in final_options},
'votes': {option: [] for option in (flanautils.flatten(buttons, lazy=True))},
'banned_users_tries': {}
}
}
@@ -185,7 +193,7 @@ class PollBot(MultiBot, ABC):
poll_data = message.data['poll']
if not poll_data['is_active']:
if not poll_data['is_active'] or not message.buttons_info.pressed_button or not message.buttons_info.presser_user:
return
presser_id = message.buttons_info.presser_user.id
@@ -193,14 +201,10 @@ class PollBot(MultiBot, ABC):
if (presser_id_str := str(presser_id)) in poll_data['banned_users_tries']:
poll_data['banned_users_tries'][presser_id_str] += 1
if poll_data['banned_users_tries'][presser_id_str] == 3:
await self.send(random.choice((
f'Deja de dar por culo {presser_name} que no puedes votar aqui',
f'No es pesao {presser_name}, que no tienes permitido votar aqui',
f'Deja de pulsar botones que no puedes votar aqui {presser_name}',
f'{presser_name} deja de intentar votar aqui que no puedes',
f'Te han prohibido votar aquí {presser_name}.',
f'No puedes votar aquí, {presser_name}.'
)), reply_to=message)
await self.send(
random.choice(constants.BANNED_POLL_PHRASES.format(presser_name=presser_name)),
reply_to=message
)
return
option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text
@@ -250,9 +254,11 @@ class PollBot(MultiBot, ABC):
await self.edit(text, poll_message)
@admin(send_negative=True)
async def _on_voting_ban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
await self.delete_message(message)
@@ -261,9 +267,11 @@ class PollBot(MultiBot, ABC):
if str(user.id) not in poll_message.data['poll']['banned_users_tries']:
poll_message.data['poll']['banned_users_tries'][str(user.id)] = 0
@admin(send_negative=True)
async def _on_voting_unban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
if not (poll_message := self._get_poll_message(message)):
return
if message.chat.is_group and not message.author.is_admin:
await self.send_negative(message)
return
await self.delete_message(message)

View File

@@ -7,7 +7,7 @@ from collections import defaultdict
from typing import Iterable
import flanautils
from flanaapis import RedditMediaNotFoundError, instagram, reddit, tiktok, twitter, yt_dlp_wrapper
from flanaapis import RedditMediaNotFoundError, reddit, tiktok, yt_dlp_wrapper
from flanautils import Media, MediaType, OrderedSet, return_if_first_empty
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
@@ -15,9 +15,9 @@ from flanabot import constants
from flanabot.models import Action, BotAction, Message
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- SCRAPER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- SCRAPER_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class ScraperBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -25,37 +25,51 @@ class ScraperBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_force_scraping, constants.KEYWORDS['force'])
self.register(self._on_force_scraping, (constants.KEYWORDS['force'], constants.KEYWORDS['scraping']))
self.register(self._on_no_scraping, keywords=(multibot_constants.KEYWORDS['negate'], constants.KEYWORDS['scraping']))
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio']))
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
self.register(self._on_scraping, keywords=constants.KEYWORDS['scraping'])
self.register(self._on_scraping, keywords=constants.KEYWORDS['force'])
self.register(self._on_scraping, keywords=multibot_constants.KEYWORDS['audio'])
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
self.register(self._on_scraping, extra_kwargs={'delete': False}, keywords=(multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio'])
self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
self.register(self._on_song_info, keywords=constants.KEYWORDS['song_info'])
@staticmethod
async def _find_ids(text: str) -> tuple[OrderedSet[str], ...]:
return (
twitter.find_ids(text),
instagram.find_ids(text),
reddit.find_ids(text),
await tiktok.find_users_and_ids(text),
tiktok.find_download_urls(text)
)
@staticmethod
def _get_keywords(delete=True, force=False, full=False, audio_only=False) -> list[str]:
keywords = list(constants.KEYWORDS['scraping'])
if not delete:
keywords += [
*multibot_constants.KEYWORDS['negate'],
*multibot_constants.KEYWORDS['deactivate'],
*multibot_constants.KEYWORDS['delete'],
*multibot_constants.KEYWORDS['message']
]
if force:
keywords += constants.KEYWORDS['force']
if full:
keywords += ['sin', 'timeout', 'limite', *multibot_constants.KEYWORDS['all']]
if audio_only:
keywords += multibot_constants.KEYWORDS['audio']
return keywords
@staticmethod
def _medias_sended_info(medias: Iterable[Media]) -> str:
medias_count = defaultdict(lambda: defaultdict(int))
medias_count: dict = defaultdict(lambda: defaultdict(int))
for media in medias:
if not media.source or isinstance(media.source, str):
medias_count[media.source][media.type_] += 1
@@ -89,21 +103,28 @@ class ScraperBot(MultiBot, ABC):
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
return f'{new_line}{medias_sended_info_joined}:'
async def _scrape_and_send(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
kwargs = {}
if self._parse_callbacks(
message.text,
[
RegisteredCallback(..., [['sin'], ['timeout', 'limite']]),
RegisteredCallback(..., 'completo entero full todo')
]
):
kwargs['timeout_for_media'] = None
async def _scrape_and_send(
self,
message: Message,
force=False,
full=False,
audio_only=False,
send_user_context=True,
keywords: list[str] = None,
sended_media_messages: OrderedSet[Message] = None
) -> OrderedSet[Message]:
if not keywords:
keywords = []
if sended_media_messages is None:
sended_media_messages = OrderedSet()
kwargs = {'timeout_for_media': None} if full else {}
if not (medias := await self._search_medias(message, force, audio_only, **kwargs)):
return OrderedSet()
sended_media_messages, _ = await self.send_medias(medias, message)
sended_media_messages = OrderedSet(sended_media_messages)
new_sended_media_messages, _ = await self.send_medias(medias, message, send_user_context=send_user_context, keywords=keywords)
sended_media_messages |= new_sended_media_messages
await self.send_inline_results(message)
@@ -113,23 +134,22 @@ class ScraperBot(MultiBot, ABC):
self,
message: Message,
force=False,
full=False,
audio_only=False,
sended_media_messages: OrderedSet[Media] = None
) -> OrderedSet[Media]:
send_user_context=True,
keywords: list[str] = None,
sended_media_messages: OrderedSet[Message] = None
) -> OrderedSet[Message]:
if not keywords:
keywords = []
if sended_media_messages is None:
sended_media_messages = OrderedSet()
sended_media_messages += await self._scrape_and_send(message, force, audio_only)
sended_media_messages += await self._scrape_and_send(message, force, full, audio_only, send_user_context, keywords)
if (
sended_media_messages
and
message.chat.is_group
and
message.chat.config['scraping_delete_original']
):
if sended_media_messages and message.chat.config['scraping_delete_original']:
# noinspection PyTypeChecker
BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save()
BotAction(self.platform.value, Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save()
await self.delete_message(message)
return sended_media_messages
@@ -144,6 +164,13 @@ class ScraperBot(MultiBot, ABC):
medias = OrderedSet()
exceptions: list[Exception] = []
if audio_only:
preferred_video_codec = None
preferred_extension = None
else:
preferred_video_codec = 'h264'
preferred_extension = 'mp4'
ids = []
media_urls = []
for text_part in message.text.split():
@@ -152,31 +179,40 @@ class ScraperBot(MultiBot, ABC):
ids[i] |= platform_ids
except IndexError:
ids.append(platform_ids)
if not any(ids) and flanautils.find_urls(text_part):
if force:
media_urls.append(text_part)
else:
if not any(domain.lower() in text_part for domain in multibot_constants.GIF_DOMAINS):
media_urls.append(text_part)
if (
not any(ids)
and
flanautils.find_urls(text_part)
and
(
force
or
not any(domain.lower() in text_part for domain in multibot_constants.GIF_DOMAINS)
)
):
media_urls.append(text_part)
if not any(ids) and not media_urls:
return medias
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
tweet_ids, instagram_ids, reddit_ids, tiktok_users_and_ids, tiktok_download_urls = ids
reddit_ids, tiktok_users_and_ids, tiktok_download_urls = ids
try:
reddit_medias = await reddit.get_medias(reddit_ids, 'h264', 'mp4', force, audio_only, timeout_for_media)
reddit_medias = await reddit.get_medias(reddit_ids, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media)
except RedditMediaNotFoundError as e:
exceptions.append(e)
reddit_medias = ()
reddit_urls = []
for reddit_media in reddit_medias:
if reddit_media.source:
medias.add(reddit_media)
else:
reddit_urls.append(reddit_media.url)
if force:
media_urls.extend(reddit_urls)
else:
@@ -188,50 +224,100 @@ class ScraperBot(MultiBot, ABC):
else:
media_urls.append(reddit_url)
gather_result = asyncio.gather(
twitter.get_medias(tweet_ids, audio_only),
instagram.get_medias(instagram_ids, audio_only),
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
yt_dlp_wrapper.get_medias(media_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
gather_results = await asyncio.gather(
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media),
yt_dlp_wrapper.get_medias(media_urls, preferred_video_codec, preferred_extension, force, audio_only, timeout_for_media),
return_exceptions=True
)
await gather_result
await self.delete_message(bot_state_message)
gather_medias, gather_exceptions = flanautils.filter_exceptions(gather_result.result())
gather_medias, gather_exceptions = flanautils.filter_exceptions(gather_results)
await self._manage_exceptions(exceptions + gather_exceptions, message, print_traceback=True)
return medias | gather_medias
async def _send_media(self, media: Media, bot_state_message: Message, message: Message) -> Message | None:
return await self.send(media, message, reply_to=message.replied_message)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_force_scraping(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, force=True)
async def _on_force_scraping_audio(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, force=True, audio_only=True)
async def _on_no_delete_original(self, message: Message):
if not await self._scrape_and_send(message):
await self._on_recover_message(message)
async def _on_no_scraping(self, message: Message):
pass
async def _on_recover_message(self, message: Message):
pass
async def _on_scraping(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
async def _on_scraping(
self,
message: Message,
delete=True,
force: bool = None,
full: bool = None,
audio_only: bool = None,
scrape_replied=True,
) -> OrderedSet[Message]:
sended_media_messages = OrderedSet()
if not message.chat.config['auto_scraping'] and not self.is_bot_mentioned(message):
return sended_media_messages
if message.replied_message:
sended_media_messages += await self._scrape_and_send(message.replied_message, force, audio_only)
if force is None:
force = bool(
flanautils.cartesian_product_string_matching(
message.text,
constants.KEYWORDS['force'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
return await self._scrape_send_and_delete(message, force, audio_only, sended_media_messages)
if full is None:
full = bool(
self._parse_callbacks(
message.text,
[
RegisteredCallback(..., keywords=(('sin',), ('timeout', 'limite'))),
RegisteredCallback(..., keywords=multibot_constants.KEYWORDS['all'])
]
)
)
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, audio_only=True)
if audio_only is None:
audio_only = bool(
flanautils.cartesian_product_string_matching(
message.text,
multibot_constants.KEYWORDS['audio'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
keywords = self._get_keywords(delete, force, full, audio_only)
if scrape_replied and message.replied_message:
sended_media_messages += await self._scrape_and_send(
message.replied_message,
force,
full,
audio_only,
send_user_context=False
)
kwargs = {
'message': message,
'force': force,
'full': full,
'audio_only': audio_only,
'keywords': keywords,
'sended_media_messages': sended_media_messages
}
if delete:
sended_media_messages |= await self._scrape_send_and_delete(**kwargs)
else:
sended_media_messages |= await self._scrape_and_send(**kwargs)
if not sended_media_messages:
await self._on_recover_message(message)
return sended_media_messages
@reply
async def _on_song_info(self, message: Message):
@@ -247,7 +333,17 @@ class ScraperBot(MultiBot, ABC):
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
@return_if_first_empty(([], 0), exclude_self_types='ScraperBot', globals_=globals())
async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]:
async def send_medias(
self,
medias: OrderedSet[Media],
message: Message,
send_song_info=False,
send_user_context=True,
keywords: list[str] = None
) -> tuple[list[Message], int]:
if not keywords:
keywords = []
sended_media_messages = []
fails = 0
bot_state_message: Message | None = None
@@ -259,29 +355,22 @@ class ScraperBot(MultiBot, ABC):
if message.chat.is_group:
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message, reply_to=message.replied_message)
user_text = ' '.join(
[word for word in message.text.split()
if (
if (
send_user_context
and
(user_text := ' '.join(
[word for word in message.text.split()
if (
not any(await self._find_ids(word))
and
not flanautils.find_urls(word)
and
not flanautils.cartesian_product_string_matching(
word,
(
*multibot_constants.KEYWORDS['audio'],
*multibot_constants.KEYWORDS['delete'],
*constants.KEYWORDS['force'],
*multibot_constants.KEYWORDS['negate'],
*constants.KEYWORDS['scraping']
),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
not flanautils.cartesian_product_string_matching(word, keywords, multibot_constants.PARSER_MIN_SCORE_DEFAULT)
and
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
)]
)
if user_text:
)]
))
):
user_text_bot_message = await self.send(user_text, message, reply_to=message.replied_message)
for media in medias:
@@ -293,7 +382,7 @@ class ScraperBot(MultiBot, ABC):
message.song_infos.add(media.song_info)
message.save()
if bot_message := await self.send(media, message, reply_to=message.replied_message):
if bot_message := await self._send_media(media, bot_state_message, message):
sended_media_messages.append(bot_message)
if media.song_info and bot_message:
bot_message.song_infos.add(media.song_info)
@@ -318,10 +407,10 @@ class ScraperBot(MultiBot, ABC):
@return_if_first_empty(exclude_self_types='ScraperBot', globals_=globals())
async def send_song_info(self, song_info: Media, message: Message):
attributes = (
f'Título: {song_info.title}\n' if song_info.title else '',
f'Autor: {song_info.author}\n' if song_info.author else '',
f'Álbum: {song_info.album}\n' if song_info.album else '',
f'Previa:'
f'<b>Título:</b> {song_info.title}\n' if song_info.title else '',
f'<b>Autor:</b> {song_info.author}\n' if song_info.author else '',
f'<b>Álbum:</b> {song_info.album}\n' if song_info.album else '',
f'<b>Previa:</b>'
)
await self.send(''.join(attributes), message)
if song_info:

368
flanabot/bots/steam_bot.py Normal file
View File

@@ -0,0 +1,368 @@
__all__ = ['SteamBot']
import asyncio
import base64
import os
import random
import re
import urllib.parse
from abc import ABC
from collections import defaultdict
from collections.abc import Awaitable, Callable, Iterable
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
import aiohttp
import flanautils
import playwright.async_api
import plotly
from flanautils import Media, MediaType, Source
from multibot import LimitError, MultiBot, RegisteredCallback, constants as multibot_constants
from flanabot import constants
from flanabot.models import Message, SteamRegion
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- STEAM_BOT --------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class SteamBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_steam, keywords='steam', priority=3)
async def _add_app_price(
self,
app_prices: dict[str, dict[str, float]],
steam_region: SteamRegion,
app_ids: Iterable[str],
session: aiohttp.ClientSession
) -> None:
for ids_batch_batch in flanautils.chunks(
flanautils.chunks(list(app_ids), constants.STEAM_IDS_BATCH),
constants.STEAM_MAX_CONCURRENT_REQUESTS
):
gather_results = await asyncio.gather(
*(self._get_app_data(session, ids_batch, steam_region.code) for ids_batch in ids_batch_batch)
)
for gather_result in gather_results:
for app_id, app_data in gather_result.items():
if (
(data := app_data.get('data'))
and
(price := data.get('price_overview', {}).get('final')) is not None
):
app_prices[app_id][steam_region.code] = price / 100 / steam_region.eur_conversion_rate
@staticmethod
@asynccontextmanager
async def _create_browser_context(
browser: playwright.async_api.Browser
) -> AsyncIterator[playwright.async_api.BrowserContext]:
async with await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36',
screen={
'width': 1920,
'height': 1080
},
viewport={
'width': 1920,
'height': 945
},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
default_browser_type='chromium',
locale='es-ES'
) as context:
yield context
@staticmethod
async def _get_app_data(
session: aiohttp.ClientSession,
ids_batch: Iterable[str],
country_code: str
) -> dict[str, Any]:
async with session.get(
constants.STEAM_APP_ENDPOINT_TEMPLATE.format(ids=','.join(ids_batch), country_code=country_code)
) as response:
if response.status != 200:
raise LimitError('🚫 Steam ban: espera 5 minutos antes de consultar de nuevo.')
return await response.json()
async def _get_most_apps_ids(self, browser: playwright.async_api.Browser) -> set[str]:
app_ids = set()
re_pattern = fr'{urllib.parse.urlparse(constants.STEAM_MOST_URLS[0]).netloc}/\w+/(\d+)'
async with self._create_browser_context(browser) as context:
context.set_default_timeout(10000)
page = await context.new_page()
for url in constants.STEAM_MOST_URLS:
await page.goto(url)
try:
await page.wait_for_selector('tr td a[href]')
except playwright.async_api.TimeoutError:
raise LimitError('🚫 Steam ban: espera 5 minutos antes de consultar de nuevo.')
locator = page.locator('tr td a[href]')
for i in range(await locator.count()):
href = await locator.nth(i).get_attribute('href')
app_ids.add(re.search(re_pattern, href).group(1))
return app_ids
@staticmethod
async def _insert_conversion_rates(
session: aiohttp.ClientSession,
steam_regions: list[SteamRegion]
) -> list[SteamRegion]:
async with session.get(
constants.STEAM_EXCHANGERATE_API_ENDPOINT.format(api_key=os.environ['EXCHANGERATE_API_KEY'])
) as response:
exchange_data = await response.json()
for steam_region in steam_regions:
alpha_3_code = constants.STEAM_REGION_CODE_MAPPING[steam_region.code]
steam_region.eur_conversion_rate = exchange_data['conversion_rates'][alpha_3_code]
return steam_regions
async def _scrape_steam_data(
self,
update_state: Callable[[str], Awaitable[Message]],
update_steam_regions=False,
most_apps=True
) -> tuple[list[SteamRegion], set[str]]:
steam_regions = SteamRegion.find()
most_apps_ids = set()
if update_steam_regions or most_apps:
async with playwright.async_api.async_playwright() as playwright_:
async with await playwright_.chromium.launch() as browser:
if update_steam_regions:
await update_state('Actualizando las regiones de Steam...')
SteamRegion.delete_many_raw({})
steam_regions = await self._update_steam_regions(browser)
if most_apps:
bot_state_message = await update_state(
'Obteniendo los productos más vendidos y jugados de Steam...'
)
try:
most_apps_ids = await self._get_most_apps_ids(browser)
except LimitError:
await self.delete_message(bot_state_message)
raise
return steam_regions, most_apps_ids
else:
return steam_regions, most_apps_ids
async def _update_steam_regions(self, browser: playwright.async_api.Browser) -> list[SteamRegion]:
steam_regions = []
for app_id in constants.STEAM_APP_IDS_FOR_SCRAPE_COUNTRIES:
async with self._create_browser_context(browser) as context:
page = await context.new_page()
await page.goto(f'{constants.STEAM_DB_URL}/app/{app_id}/')
locator = page.locator("#prices td[class='price-line']")
for i in range(await locator.count()):
td = locator.nth(i)
src = (await td.locator('img').get_attribute('src'))
name = (await td.text_content()).strip()
flag_url = f'{constants.STEAM_DB_URL}{src}'
region_code = await td.get_attribute('data-cc')
steam_region = SteamRegion(region_code, name, flag_url)
steam_region.save()
steam_regions.append(steam_region)
await asyncio.sleep(2)
return steam_regions
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_steam(
self,
message: Message,
update_steam_regions: bool | None = None,
most_apps: bool | None = None,
last_apps: bool | None = None,
random_apps: bool | None = None
):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
if update_steam_regions is None:
update_steam_regions = bool(
self._parse_callbacks(
message.text,
[
RegisteredCallback(
...,
keywords=(multibot_constants.KEYWORDS['update'], constants.KEYWORDS['region'])
)
]
)
)
if most_apps is None:
most_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
('jugados', 'played', 'sellers', 'selling', 'vendidos'),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if last_apps is None:
last_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
multibot_constants.KEYWORDS['last'] + ('new', 'novedades', 'nuevos'),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if random_apps is None:
random_apps = bool(
flanautils.cartesian_product_string_matching(
message.text,
multibot_constants.KEYWORDS['random'],
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
)
if not any((most_apps, last_apps, random_apps)):
most_apps = True
last_apps = True
random_apps = True
update_state = self.create_message_updater(message, delete_user_message=True)
chart_title_parts = []
steam_regions, selected_app_ids = await self._scrape_steam_data(update_state, update_steam_regions, most_apps)
if most_apps:
chart_title_parts.append(f'los {len(selected_app_ids)} más vendidos/jugados')
async with aiohttp.ClientSession() as session:
if last_apps or random_apps:
await update_state('Obteniendo todos los productos de Steam...')
async with session.get(constants.STEAM_ALL_APPS_ENDPOINT) as response:
all_apps_data = await response.json()
app_ids = [
str(app_id) for app_data in all_apps_data['applist']['apps'] if (app_id := app_data['appid'])
]
if last_apps:
selected_app_ids.update(app_ids[-constants.STEAM_LAST_APPS:])
chart_title_parts.append(f'los {constants.STEAM_LAST_APPS} más nuevos')
if random_apps:
selected_app_ids.update(random.sample(app_ids, constants.STEAM_RANDOM_APPS))
chart_title_parts.append(f'{constants.STEAM_RANDOM_APPS} aleatorios')
steam_regions = await self._insert_conversion_rates(session, steam_regions)
bot_state_message = await update_state('Obteniendo los precios para todas las regiones...')
apps_prices = defaultdict(dict)
try:
await asyncio.gather(
*(
self._add_app_price(apps_prices, steam_region, selected_app_ids, session)
for steam_region in steam_regions
)
)
except LimitError:
await self.delete_message(bot_state_message)
raise
apps_prices = {k: v for k, v in apps_prices.items() if len(v) == len(steam_regions)}
total_prices = defaultdict(float)
for app_prices in apps_prices.values():
for region_code, price in app_prices.items():
total_prices[region_code] += price
for steam_region in steam_regions:
steam_region.mean_price = total_prices[steam_region.code] / len(apps_prices)
await update_state('Creando gráfico...')
steam_regions = sorted(steam_regions, key=lambda steam_region: steam_region.mean_price)
region_names = []
region_total_prices = []
bar_colors = []
bar_line_colors = []
images = []
for i, steam_region in enumerate(steam_regions):
region_names.append(steam_region.name)
region_total_prices.append(steam_region.mean_price)
bar_colors.append(steam_region.bar_color)
bar_line_colors.append(steam_region.bar_line_color)
async with session.get(steam_region.flag_url) as response:
images.append({
'source': f'data:image/svg+xml;base64,{base64.b64encode(await response.read()).decode()}',
'xref': 'x',
'yref': 'paper',
'x': i,
'y': 0.04,
'sizex': 0.425,
'sizey': 0.0225,
'xanchor': 'center',
'opacity': 1,
'layer': 'above'
})
figure = plotly.graph_objs.Figure(
[
plotly.graph_objs.Bar(
x=region_names,
y=region_total_prices,
marker={'color': bar_colors},
)
]
)
figure.update_layout(
width=1920,
height=945,
margin={'t': 20, 'r': 20, 'b': 20, 'l': 20},
title={
'text': f"Media de {len(apps_prices)} productos de Steam<br><sup>(solo los comparables entre {flanautils.join_last_separator(chart_title_parts, ', ', ' y ')})</sup>",
'xref': 'paper',
'yref': 'paper',
'x': 0.5,
'y': 0.93,
'font': {
'size': 35
}
},
images=images,
xaxis={
'tickfont': {
'size': 14
}
},
yaxis={
'ticksuffix': '',
'tickfont': {
'size': 18
}
}
)
bot_state_message = await update_state('Enviando...')
await self.send(Media(figure.to_image(), MediaType.IMAGE, 'png', Source.LOCAL), message)
await self.delete_message(bot_state_message)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #

View File

@@ -0,0 +1,195 @@
__all__ = ['UberEatsBot']
import asyncio
import datetime
import random
from abc import ABC
from collections import defaultdict
import flanautils
import playwright.async_api
from multibot import MultiBot, group
from flanabot import constants
from flanabot.models import Chat, Message
# ---------------------------------------------------------------------------------------------------- #
# --------------------------------------------- POLL_BOT --------------------------------------------- #
# ---------------------------------------------------------------------------------------------------- #
class UberEatsBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task_contexts: dict[int, dict] = defaultdict(lambda: defaultdict(lambda: None))
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_ubereats, keywords='ubereats', priority=2)
async def _cancel_scraping_task(self, chat: Chat):
if not (task := self._task_contexts[chat.id]['task']) or task.done():
return
await self._close_playwright(chat)
task.cancel()
del self._task_contexts[chat.id]
async def _close_playwright(self, chat: Chat):
if browser := self._task_contexts[chat.id]['browser']:
await browser.close()
if playwright_ := self._task_contexts[chat.id]['playwright']:
await playwright_.stop()
async def _scrape_codes(self, chat: Chat) -> list[str | None]:
async def get_code() -> str:
return await page.input_value("input[class='code toCopy']")
codes: list[str | None] = [None] * len(chat.ubereats['cookies'])
async with playwright.async_api.async_playwright() as playwright_:
self._task_contexts[chat.id]['playwright'] = playwright_
for i, cookies in enumerate(chat.ubereats['cookies']):
for _ in range(3):
try:
async with await playwright_.chromium.launch() as browser:
self._task_contexts[chat.id]['browser'] = browser
context: playwright.async_api.BrowserContext = await browser.new_context(
storage_state={'cookies': cookies},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36',
screen={
'width': 1920,
'height': 1080
},
viewport={
'width': 1280,
'height': 720
},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
default_browser_type='chromium',
locale='es-ES'
)
context.set_default_timeout(3000)
page = await context.new_page()
await page.goto('https://www.myunidays.com/ES/es-ES/partners/ubereats/access/online', timeout=30000)
if button := await page.query_selector("button[class='button highlight']"):
await button.click()
else:
await page.click("'Revelar código'")
for _ in range(5):
if len(context.pages) == 2:
break
await asyncio.sleep(0.5)
else:
continue
page = context.pages[1]
await page.wait_for_load_state()
code = await get_code()
if not (new_code_button := await page.query_selector("button[class='getNewCode button secondary']")):
new_code_button = page.locator("'Obtener nuevo código'")
if await new_code_button.is_enabled():
await new_code_button.click()
for _ in range(5):
if (new_code := await get_code()) != code:
code = new_code
break
await asyncio.sleep(0.5)
codes[i] = code
chat.ubereats['cookies'][i] = await context.cookies('https://www.myunidays.com')
except playwright.async_api.Error:
await asyncio.sleep(1)
else:
break
return codes
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_ready(self):
await super()._on_ready()
for chat in self.Chat.find({
'platform': self.platform.value,
'config.ubereats': True,
'ubereats.cookies': {"$exists": True, "$ne": []}
}):
chat = await self.get_chat(chat.id)
chat.pull_from_database(overwrite_fields=('_id', 'config', 'ubereats'))
if (
chat.ubereats['next_execution']
and
(delta_time := chat.ubereats['next_execution'] - datetime.datetime.now(datetime.timezone.utc)) > datetime.timedelta()
):
flanautils.do_later(delta_time, self.start_ubereats, chat)
else:
await self.start_ubereats(chat)
@group(False)
async def _on_ubereats(self, message: Message):
if not message.chat.ubereats['cookies']:
return
time = flanautils.text_to_time(message.text)
if not time:
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
await self.send_ubereats_code(message.chat, update_next_execution=False)
await self.delete_message(bot_state_message)
return
if time < datetime.timedelta(days=1, minutes=5):
await self.send('El mínimo es 1 día y 5 minutos.', message)
return
seconds = int(time.total_seconds())
message.chat.ubereats['seconds'] = seconds
message.save()
period = flanautils.TimeUnits(seconds=seconds)
await self.send(f'A partir de ahora te enviaré un código de UberEats cada <b>{period.to_words()}</b>.', message)
await self.start_ubereats(message.chat)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
async def send_ubereats_code(self, chat: Chat, update_next_execution=True):
chat.pull_from_database(overwrite_fields=('ubereats',))
codes = await self._scrape_codes(chat)
for i, code in enumerate(codes):
if code:
if code in chat.ubereats['last_codes']:
warning_text = '<i>Código ya enviado anteriormente:</i> '
else:
warning_text = ''
await self.send(f'{warning_text}<code>{code}</code>', chat, silent=True)
else:
try:
codes[i] = chat.ubereats['last_codes'][i]
except IndexError:
codes[i] = None
chat.ubereats['last_codes'] = codes
if update_next_execution:
chat.ubereats['next_execution'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=chat.ubereats['seconds'])
chat.save()
async def start_ubereats(self, chat: Chat, send_code_now=True):
await self._cancel_scraping_task(chat)
chat.config['ubereats'] = True
chat.save(pull_overwrite_fields=('ubereats',))
self._task_contexts[chat.id]['task'] = flanautils.do_every(chat.ubereats['seconds'], self.send_ubereats_code, chat, do_first_now=send_code_now)
async def stop_ubereats(self, chat: Chat):
await self._cancel_scraping_task(chat)
chat.config['ubereats'] = False
chat.save(pull_overwrite_fields=('ubereats',))

View File

@@ -16,9 +16,9 @@ from flanabot import constants
from flanabot.models import Action, BotAction, ButtonsGroup, Message, WeatherChart
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- WEATHER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
# -------------------------------------------- WEATHER_BOT -------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class WeatherBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
@@ -26,10 +26,10 @@ class WeatherBot(MultiBot, ABC):
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_weather, constants.KEYWORDS['weather'])
self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather']))
self.register(self._on_weather, keywords=constants.KEYWORDS['weather'])
self.register(self._on_weather, keywords=(multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather']))
self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER)
self.register_button(self._on_weather_button_press, key=ButtonsGroup.WEATHER)
# ---------------------------------------------- #
# HANDLERS #
@@ -40,7 +40,7 @@ class WeatherBot(MultiBot, ABC):
show_progress_state = False
elif message.chat.is_group and not self.is_bot_mentioned(message):
if message.chat.config['auto_weather_chart']:
if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}):
if BotAction.find_one({'platform': self.platform.value, 'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}):
return
show_progress_state = False
else:
@@ -54,12 +54,12 @@ class WeatherBot(MultiBot, ABC):
# noinspection PyTypeChecker
place_words = (
OrderedSet(original_text_words)
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
- flanautils.CommonWords.get()
OrderedSet(original_text_words)
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
- flanautils.CommonWords.get()
)
if not place_words:
if not message.is_inline:
@@ -161,7 +161,7 @@ class WeatherBot(MultiBot, ABC):
if bot_message and not self.is_bot_mentioned(message):
# noinspection PyTypeChecker
BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save()
BotAction(self.platform.value, Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save()
async def _on_weather_button_press(self, message: Message):
await self.accept_button_event(message)

View File

@@ -26,7 +26,7 @@ FONT_SIZE = 32 * SIZE_MULTIPLIER
TABLE_LINE_WIDTH = 4 * SIZE_MULTIPLIER
BLUE = (66 / 255, 135 / 255, 245 / 255)
BACKGROUND_COLOR = (54 / 255, 57 / 255, 63 / 255)
BACKGROUND_COLOR = (49 / 255, 51 / 255, 56 / 255)
GRAY = (200 / 255, 200 / 255, 200 / 255)
HIGHLIGHT_COLOR = (104 / 255, 107 / 255, 113 / 255)
RED = (255 / 255, 70 / 255, 70 / 255)

View File

@@ -6,20 +6,58 @@ from multibot import Platform
AUDIT_LOG_AGE = datetime.timedelta(hours=1)
AUDIT_LOG_LIMIT = 5
AUTO_WEATHER_EVERY = datetime.timedelta(hours=6)
BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CHECK_PUNISHMENTS_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CONNECT_4_AI_DELAY_SECONDS = 1
CONNECT_4_CENTER_COLUMN_POINTS = 2
CONNECT_4_N_COLUMNS = 7
CONNECT_4_N_ROWS = 6
FLANASERVER_BASE_URL = 'https://flanaserver.duckdns.org'
FLANASERVER_FILE_EXPIRATION_SECONDS = 3 * 24 * 60 * 60
FLOOD_2s_LIMIT = 2
FLOOD_7s_LIMIT = 4
HEAT_FIRST_LEVEL = -1
HEAT_PERIOD_SECONDS = datetime.timedelta(minutes=15).total_seconds()
HELP_MINUTES_LIMIT = 1
INSTAGRAM_BAN_SLEEP = datetime.timedelta(days=1)
INSULT_PROBABILITY = 0.00166666667
MAX_PLACE_QUERY_LENGTH = 50
PUNISHMENT_INCREMENT_EXPONENT = 6
PUNISHMENTS_RESET_TIME = datetime.timedelta(weeks=2)
RECOVERY_DELETED_MESSAGE_BEFORE = datetime.timedelta(hours=1)
SCRAPING_TIMEOUT_SECONDS = 20
SPAM_CHANNELS_LIMIT = 2
SPAM_DELETION_DELAY = datetime.timedelta(seconds=5)
SPAM_TIME_RANGE = datetime.timedelta(hours=1)
STEAM_ALL_APPS_ENDPOINT = 'https://api.steampowered.com/ISteamApps/GetAppList/v2'
STEAM_APP_ENDPOINT_TEMPLATE = 'https://store.steampowered.com/api/appdetails?appids={ids}&cc={country_code}&filters=price_overview'
STEAM_APP_IDS_FOR_SCRAPE_COUNTRIES = (400, 620, 730, 210970, 252490, 292030, 427520, 1712350)
STEAM_DB_URL = 'https://steamdb.info'
STEAM_EXCHANGERATE_API_ENDPOINT_TEMPLATE = 'https://v6.exchangerate-api.com/v6/{api_key}/latest/EUR'
STEAM_IDS_BATCH = 750
STEAM_LAST_APPS = 1500
STEAM_MAX_CONCURRENT_REQUESTS = 10
STEAM_MOST_URLS = (
'https://store.steampowered.com/charts/topselling/global',
'https://store.steampowered.com/charts/mostplayed'
)
STEAM_RANDOM_APPS = 1000
STEAM_REGION_CODE_MAPPING = {'eu': 'EUR', 'ru': 'RUB', 'pk': 'USD', 'ua': 'UAH', 'za': 'ZAR', 'vn': 'VND', 'tw': 'TWD',
'id': 'IDR', 'my': 'MYR', 'ar': 'USD', 'tr': 'USD', 'ph': 'PHP', 'in': 'INR', 'cn': 'CNY',
'br': 'BRL', 'sa': 'SAR', 'th': 'THB', 'pe': 'PEN', 'cl': 'CLP', 'kw': 'KWD', 'az': 'USD',
'kz': 'KZT', 'co': 'COP', 'mx': 'MXN', 'qa': 'QAR', 'sg': 'SGD', 'jp': 'JPY', 'uy': 'UYU',
'ae': 'AED', 'kr': 'KRW', 'hk': 'HKD', 'cr': 'CRC', 'nz': 'NZD', 'ca': 'CAD', 'au': 'AUD',
'il': 'ILS', 'us': 'USD', 'no': 'NOK', 'uk': 'GBP', 'pl': 'PLN', 'ch': 'CHF'}
YADIO_API_ENDPOINT = 'https://api.yadio.io/exrates/EUR'
BANNED_POLL_PHRASES = (
'Deja de dar por culo {presser_name} que no puedes votar aqui',
'No es pesao {presser_name}, que no tienes permitido votar aqui',
'Deja de pulsar botones que no puedes votar aqui {presser_name}',
'{presser_name} deja de intentar votar aqui que no puedes',
'Te han prohibido votar aquí {presser_name}.',
'No puedes votar aquí, {presser_name}.'
)
BYE_PHRASES = ('Adiós.', 'adio', 'adioh', 'adios', 'adió', 'adiós', 'agur', 'bye', 'byyeeee', 'chao', 'hasta la vista',
'hasta luego', 'hasta nunca', ' hasta pronto', 'hasta la próxima', 'nos vemos', 'taluego')
@@ -30,7 +68,7 @@ CHANGEABLE_ROLES = defaultdict(
Platform.DISCORD: defaultdict(
list,
{
360868977754505217: [881238165476741161, 991454395663401072, 1033098591725699222],
360868977754505217: [881238165476741161, 991454395663401072, 1033098591725699222, 1176639571677696173],
862823584670285835: [976660580939202610, 984269640752590868]
}
)
@@ -38,7 +76,6 @@ CHANGEABLE_ROLES = defaultdict(
)
DISCORD_HEAT_NAMES = [
'Canal Congelado',
'Canal Fresquito',
'Canal Templaillo',
'Canal Calentito',
@@ -81,29 +118,33 @@ INSULTS = ('._.', 'aha', 'Aléjate de mi.', 'Ante la duda mi dedo corazón te sa
KEYWORDS = {
'choose': ('choose', 'elige', 'escoge'),
'connect_4': (('conecta', 'connect', 'ralla', 'raya'), ('4', 'cuatro', 'four')),
'covid_chart': ('case', 'caso', 'contagiado', 'contagio', 'corona', 'coronavirus', 'covid', 'covid19', 'death',
'disease', 'enfermedad', 'enfermos', 'fallecido', 'incidencia', 'jacovid', 'mascarilla', 'muerte',
'muerto', 'pandemia', 'sick', 'virus'),
'currency_chart': ('argentina', 'bitcoin', 'cardano', 'cripto', 'crypto', 'criptodivisa', 'cryptodivisa',
'cryptomoneda', 'cryptocurrency', 'currency', 'dinero', 'divisa', 'ethereum', 'inversion',
'moneda', 'pasta'),
'dice': ('dado', 'dice'),
'eur': ('eur', 'euro', 'euros', ''),
'force': ('force', 'forzar', 'fuerza'),
'money': ('bitcoin', 'btc', 'cripto', 'criptomoneda', 'crypto', 'cryptocurrency', 'currency', 'currency', 'dinero',
'divisa', 'moneda', 'money', 'precio', 'price', 'satoshi'),
'multiple_answer': ('multi', 'multi-answer', 'multiple', 'multirespuesta'),
'poll': ('encuesta', 'quiz'),
'notify': ('alert', 'alertame', 'alertar', 'avisame', 'avisar', 'aviso', 'inform', 'informame', 'informar',
'notificacion', 'notificame', 'notificar', 'notification'),
'offer': ('oferta', 'offer', 'orden', 'order', 'post', 'publicacion'),
'poll': ('encuesta', 'quiz', 'votacion', 'votar', 'voting'),
'premium': ('%', 'premium', 'prima'),
'punish': ('acaba', 'aprende', 'ataca', 'atalo', 'azota', 'beating', 'boss', 'castiga', 'castigo', 'condena',
'controla', 'destroy', 'destroza', 'duro', 'ejecuta', 'enseña', 'escarmiento', 'execute', 'fuck',
'fusila', 'hell', 'humos', 'infierno', 'jefe', 'jode', 'learn', 'leccion', 'lesson', 'manda', 'paliza',
'purgatorio', 'purgatory', 'sancion', 'shoot', 'teach', 'whip'),
'random': ('aleatorio', 'azar', 'random'),
'scraping': ('api', 'aqui', 'busca', 'contenido', 'content', 'descarga', 'descargar', 'download', 'envia', 'habia',
'media', 'redes', 'scrap', 'scraping', 'search', 'send', 'social', 'sociales', 'tenia', 'video',
'videos'),
'region': ('countries', 'country', 'pais', 'paises', 'region', 'regiones', 'regions', 'zona', 'zonas', 'zone',
'zones'),
'scraping': ('busca', 'contenido', 'content', 'descarga', 'descargar', 'descargues', 'download', 'envia', 'scrap',
'scrapea', 'scrapees', 'scraping', 'search', 'send'),
'self': (('contigo', 'contra', 'ti'), ('mismo', 'ti')),
'song_info': ('aqui', 'cancion', 'data', 'datos', 'info', 'informacion', 'information', 'llama', 'media', 'name',
'nombre', 'sonaba', 'sonando', 'song', 'sono', 'sound', 'suena', 'title', 'titulo', 'video'),
'song_info': ('cancion', 'data', 'datos', 'info', 'informacion', 'information', 'sonaba', 'sonando', 'song', 'sono',
'sound', 'suena'),
'tunnel': ('canal', 'channel', 'tunel', 'tunnel'),
'unpunish': ('absolve', 'forgive', 'innocent', 'inocente', 'perdona', 'spare'),
'vote': ('votacion', 'votar', 'vote', 'voting', 'voto'),
'until': ('hasta', 'until'),
'usd': ('$', 'dolar', 'dolares', 'dollar', 'dollars', 'usd'),
'vote': ('vote', 'voto'),
'weather': ('atmosfera', 'atmosferico', 'calle', 'calor', 'caloret', 'clima', 'climatologia', 'cloud', 'cloudless',
'cloudy', 'cold', 'congelar', 'congelado', 'denbora', 'despejado', 'diluvio', 'frio', 'frost', 'hielo',
'humedad', 'llover', 'llueva', 'llueve', 'lluvia', 'nevada', 'nieva', 'nieve', 'nube', 'nubes',

View File

@@ -1,7 +1,9 @@
from flanabot.models.bot_action import *
from flanabot.models.chat import *
from flanabot.models.enums import *
from flanabot.models.heating_context import *
from flanabot.models.message import *
from flanabot.models.player import *
from flanabot.models.punishment import *
from flanabot.models.steam_region import *
from flanabot.models.weather_chart import *

View File

@@ -4,7 +4,7 @@ import datetime
from dataclasses import dataclass, field
from flanautils import DCMongoBase, FlanaBase
from multibot.models.user import User
from multibot import Platform, User
from flanabot.models.chat import Chat
from flanabot.models.enums import Action
@@ -17,6 +17,7 @@ class BotAction(DCMongoBase, FlanaBase):
unique_keys = 'message'
nullable_unique_keys = 'message'
platform: Platform = None
action: Action = None
message: Message = None
author: User = None

View File

@@ -7,17 +7,19 @@ from multibot import Chat as MultiBotChat
@dataclass(eq=False)
class Chat(MultiBotChat):
DEFAULT_CONFIG = {
config: dict = field(default_factory=lambda: {
'auto_insult': True,
'auto_scraping': True,
'auto_weather_chart': False,
'check_flood': False,
'punish': False,
'scraping_delete_original': True
}
config: dict[str, bool] = field(default_factory=dict)
def __post_init__(self):
super().__post_init__()
self.config = self.DEFAULT_CONFIG | self.config
'scraping_delete_original': True,
'ubereats': False
})
btc_offers_query: dict[str, float] = field(default_factory=lambda: {})
ubereats: dict = field(default_factory=lambda: {
'cookies': [],
'last_codes': [],
'seconds': 86700,
'next_execution': None
})

View File

@@ -0,0 +1,19 @@
__all__ = ['ChannelData', 'HeatingContext']
from dataclasses import dataclass, field
import discord
@dataclass
class ChannelData:
channel: discord.VoiceChannel
original_name: str
n_fires: int = 0
@dataclass
class HeatingContext:
channels_data: dict[str, ChannelData] = field(default_factory=dict)
is_active: bool = False
heat_level: float = -1

View File

@@ -1,5 +1,6 @@
__all__ = ['Punishment']
import datetime
from dataclasses import dataclass
from typing import Any
@@ -16,3 +17,11 @@ class Punishment(Penalty):
self_vars = super()._mongo_repr()
self_vars['level'] = self.level
return self_vars
def delete(self, cascade=False):
if self.level == 0:
super().delete(cascade)
elif self.is_active:
self.is_active = False
self.last_update = datetime.datetime.now(datetime.timezone.utc)
self.save()

View File

@@ -0,0 +1,40 @@
__all__ = ['SteamRegion']
from dataclasses import dataclass
from typing import Any
from flanautils import DCMongoBase, FlanaBase
@dataclass
class SteamRegion(DCMongoBase, FlanaBase):
collection_name = 'steam_region'
unique_keys = ('code',)
code: str
name: str
flag_url: str
eur_conversion_rate: float | None = None
mean_price: float = 0.0
def __post_init__(self):
super().__post_init__()
match self.code:
case 'eu':
self.bar_color = '#2b6ced'
self.bar_line_color = '#0055ff'
case 'in':
self.bar_color = '#ffc091'
self.bar_line_color = '#ff6600'
case 'ar':
self.bar_color = '#8adcff'
self.bar_line_color = '#00b3ff'
case 'tr':
self.bar_color = '#ff7878'
self.bar_line_color = '#ff0000'
case _:
self.bar_color = '#68a9f2'
self.bar_line_color = '#68a9f2'
def _mongo_repr(self) -> Any:
return {k: v for k, v in super()._mongo_repr().items() if k in ('code', 'name', 'flag_url')}

Binary file not shown.

View File

@@ -2,7 +2,6 @@
name = {project_name}
version = {project_version}
author = {author}
author_email = alberlc@outlook.com
description = {description}
long_description = file: README.rst
url = https://github.com/{author}/{project_name}