229 Commits

Author SHA1 Message Date
AlberLC
cba5a5a289 Update requirements.txt 2023-01-11 00:47:05 +01:00
AlberLC
db5d4459d7 Update README.rst 2023-01-11 00:35:43 +01:00
AlberLC
697e6e7fdf Update requirements.txt 2023-01-10 23:39:48 +01:00
AlberLC
fccb675c3d Improve legibility 2023-01-03 04:03:05 +01:00
AlberLC
4795c0c286 Improve Connect4Bot ai 2023-01-03 03:56:06 +01:00
AlberLC
409b3b4864 Refactor Message.data system 2023-01-01 02:35:11 +01:00
AlberLC
94d29dafd2 Add minor changes 2022-12-22 09:36:20 +01:00
AlberLC
bdd5bd6059 Refactor Message.data system 2022-12-22 09:29:25 +01:00
AlberLC
e2298bdb41 Fix _on_delete in private is_admin 2022-12-22 09:28:29 +01:00
AlberLC
cf6844e7a1 Refactor ScraperBot.send_medias 2022-12-21 05:25:38 +01:00
AlberLC
9344aa2d16 Update force parameter 2022-12-21 05:23:09 +01:00
AlberLC
293a5e46f1 Dont call self._manage_exceptions 2022-12-20 04:47:26 +01:00
AlberLC
1bc1e1ecde Add force_gif_download 2022-12-19 07:30:26 +01:00
AlberLC
a99a185830 Add force_gif_download 2022-12-19 06:29:11 +01:00
AlberLC
bf68225d61 Fix ScraperBot.send_medias message 2022-12-19 02:12:16 +01:00
AlberLC
fb98afed79 Fix ScraperBot.send_medias message 2022-12-18 04:31:52 +01:00
AlberLC
ca9546c567 Improve legibility 2022-12-18 04:31:22 +01:00
AlberLC
9bb1e480f7 Improve ScraperBot.send_medias message 2022-12-17 10:16:02 +01:00
AlberLC
770e599eca Update tiktok scraping 2022-12-17 06:45:47 +01:00
AlberLC
64f9ebee45 Update tiktok scraping 2022-12-17 06:26:35 +01:00
AlberLC
333f494a20 Update _on_scraping bot_mentioned 2022-12-17 04:39:34 +01:00
AlberLC
ef5acc3f26 Update _on_scraping bot_mentioned 2022-12-16 11:04:24 +01:00
AlberLC
93cf015260 Update requirements.txt 2022-12-16 10:41:18 +01:00
AlberLC
2c3fbab7a8 Update requirements.txt 2022-12-16 10:33:13 +01:00
AlberLC
070d3bf572 Improve ScraperBot.send_medias message 2022-12-16 10:10:53 +01:00
AlberLC
58cdee8ccc Improve ScraperBot.send_medias message 2022-12-16 10:10:20 +01:00
AlberLC
1cb92bf3da Update requirements.txt 2022-12-16 09:50:16 +01:00
AlberLC
d42a20ef14 improve platforms support for ScraperBot 2022-12-16 09:50:09 +01:00
AlberLC
b1f3eb9ebe Update constants.KEYWORDS 2022-12-16 06:44:34 +01:00
AlberLC
3bc40c0b64 Add new keywords to no timeout_for_media 2022-12-13 22:39:37 +01:00
AlberLC
3b6d62b9e1 Improve legibility 2022-12-12 06:31:55 +01:00
AlberLC
77a44a0f9c Move flana_disct_bot constants 2022-12-11 22:20:27 +01:00
AlberLC
572800571d Fix handlers in Telegram 2022-12-11 22:08:08 +01:00
AlberLC
16b38ae257 Update requirements.txt 2022-12-08 01:29:36 +01:00
AlberLC
98f5376670 Update setup.cfg 2022-12-07 23:16:06 +01:00
AlberLC
bd38c094a4 Update requirements.txt 2022-12-07 23:16:00 +01:00
AlberLC
5e502a7c15 Update requirements.txt 2022-12-07 23:09:00 +01:00
AlberLC
203c198a84 Update FlanaBot._on_delete 2022-12-06 03:25:13 +01:00
AlberLC
e2427fa3c8 Improve legibility 2022-12-04 09:49:06 +01:00
AlberLC
73bedd9b52 Update FlanaDiscBot.is_punished 2022-12-03 06:54:52 +01:00
AlberLC
597bab672e Update FlanaDiscBot.is_punished 2022-12-03 06:51:09 +01:00
AlberLC
9c45647793 Update multibot_constants 2022-12-03 06:44:58 +01:00
AlberLC
8c65182c39 Update flanautils function names 2022-12-03 06:44:22 +01:00
AlberLC
0275cb3f84 Update flanautils function names 2022-12-03 06:42:45 +01:00
AlberLC
430a141dd7 Update PollBot delete_message order 2022-12-03 06:26:26 +01:00
AlberLC
96d117a9fe Fix PollBot._on_poll_button_press 2022-12-03 06:25:31 +01:00
AlberLC
9925084ef4 Update PollBot._get_poll_message 2022-12-03 06:25:07 +01:00
AlberLC
4ae42f386d Update PollBot._get_poll_message 2022-12-03 06:24:27 +01:00
AlberLC
b8483b4d54 Update PollBot._get_options 2022-12-03 06:23:29 +01:00
AlberLC
1751bf37e1 Update FlanaBot.check_old_database_actions 2022-12-03 06:22:50 +01:00
AlberLC
db8e020977 Update flanautils function names 2022-12-03 06:21:39 +01:00
AlberLC
90c15950fb Update _on_recover_message registration 2022-12-03 06:20:31 +01:00
AlberLC
90c2e16d5d Update buttons data system
Move data persistence from database to primary memory.
2022-12-03 06:19:30 +01:00
AlberLC
d370909e68 Update _on_users 2022-11-30 01:48:08 +01:00
AlberLC
113dee6bd8 Use delete_many_raw 2022-11-30 01:11:03 +01:00
AlberLC
83bb9b1168 Improve legibility 2022-11-30 00:55:58 +01:00
AlberLC
5fb995201a Add loser to connect 4 2022-11-29 06:31:57 +01:00
AlberLC
7e127f55a7 Add _on_database_messages_simple 2022-11-29 05:30:27 +01:00
AlberLC
09b2c7db7e Add _on_database_messages_simple 2022-11-29 05:24:34 +01:00
AlberLC
300132fb84 Add _on_database_messages_simple 2022-11-29 05:22:32 +01:00
AlberLC
032fcf7801 Fix use BaseException 2022-11-28 00:23:51 +01:00
AlberLC
9916f2b2c9 Delete Connect4Bot user messages and buttons when finished 2022-11-26 22:07:09 +01:00
AlberLC
7f4ca8c7a5 Update config names 2022-11-26 21:29:18 +01:00
AlberLC
7f5ba7cd09 Update config names 2022-11-26 21:26:36 +01:00
AlberLC
025460828b Update config names 2022-11-26 21:23:58 +01:00
AlberLC
32b582c5d9 Update constants.KEYWORDS 2022-11-26 21:23:39 +01:00
AlberLC
008de3b43e Update constants.KEYWORDS 2022-11-26 21:22:57 +01:00
AlberLC
dc29922e7c Improve legibility 2022-11-26 20:21:56 +01:00
AlberLC
79e93c01fd Add reply to send_medias 2022-11-26 04:25:01 +01:00
AlberLC
bf3bf23285 Rename whitelisted decorator 2022-11-26 02:17:23 +01:00
AlberLC
8f395d3406 Fix connect 4 edit if not found 2022-11-26 02:03:50 +01:00
AlberLC
fce458d5ed Improve legibility 2022-11-26 01:48:21 +01:00
AlberLC
60351ab574 Improve legibility 2022-11-26 01:46:12 +01:00
AlberLC
e883909769 Move config logic to FlanaBot 2022-11-26 01:39:37 +01:00
AlberLC
04eb2ff491 Fix _on_users 2022-11-25 07:09:19 +01:00
AlberLC
7a04a699d7 Fix ScraperBot.send_song_info 2022-11-25 04:39:37 +01:00
AlberLC
3f8b58437a Rename Message.contents to data 2022-11-25 03:45:48 +01:00
AlberLC
9e94d8c770 Add Connect4Bot vs itself 2022-11-23 05:28:35 +01:00
AlberLC
f0babe7c85 Improve Connect4Bot ai 2022-11-22 00:41:08 +01:00
AlberLC
c3ab1be08e Fix Connect4Bot ai patterns 2022-11-21 22:16:31 +01:00
AlberLC
28637ad684 Update constants 2022-11-18 19:54:11 +01:00
AlberLC
9e8ec86e96 Fix connect 4 ai 2022-11-17 05:10:52 +01:00
AlberLC
9ca7ea035a Improve connect 4 ai 2022-11-17 02:26:37 +01:00
AlberLC
3bf4f16aa4 Improve connect 4 ai 2022-11-17 01:49:40 +01:00
AlberLC
86d15d6b9a Improve connect 4 frontend 2022-11-16 06:12:04 +01:00
AlberLC
95d9272e39 Fix Connect4Bot last decision 2022-11-16 05:54:41 +01:00
AlberLC
47874735ec Improve connect 4 frontend 2022-11-16 02:59:03 +01:00
AlberLC
781cb8cefa Improve connect 4 frontend 2022-11-16 02:58:39 +01:00
AlberLC
6f09869a92 Make filter_mention_ids and distribute_buttons public 2022-11-14 04:44:03 +01:00
AlberLC
9b1e9f6f2f Fix Connect4Bot ai 2022-11-13 01:36:59 +01:00
AlberLC
9d21855fa1 Update Connect4Bot type hints 2022-11-13 01:36:32 +01:00
AlberLC
86d84e8fe4 Improve legibility 2022-11-12 05:10:19 +01:00
AlberLC
77e21bb068 Improve legibility 2022-11-12 05:01:53 +01:00
AlberLC
19202391ac Add PollBot._on_delete_all_votes 2022-11-10 22:02:07 +01:00
AlberLC
1358018cb0 Add PollBot._on_delete_all_votes 2022-11-10 20:00:46 +01:00
AlberLC
97efbec6a4 Add PollBot._on_delete_all_votes 2022-11-10 19:55:20 +01:00
AlberLC
af6cab6166 Add Connect4Bot 2022-11-09 04:54:43 +01:00
AlberLC
1900ba3167 Add Connect4Bot 2022-11-09 04:29:15 +01:00
AlberLC
1adfd8febf Move _distribute_buttons to MultiBot 2022-11-09 00:40:13 +01:00
AlberLC
5507236a73 Split FlanaBot into multiple bots 2022-11-08 20:33:01 +01:00
AlberLC
569bed91ad Split FlanaBot into multiple bots 2022-11-08 04:17:27 +01:00
AlberLC
7f1e596243 Fix typing 2022-11-08 03:50:09 +01:00
AlberLC
af9d284fa1 Split FlanaBot into multiple bots 2022-11-08 03:20:31 +01:00
AlberLC
36a3592a84 Add minor changes 2022-11-07 18:41:41 +01:00
AlberLC
910074d3aa Rename RatioMatch to ScoreMatch 2022-11-07 18:41:20 +01:00
AlberLC
bb30c10867 Fix calculate time of mention id 2022-11-06 05:00:05 +01:00
AlberLC
f62d224e27 Fix calculate time of mention id 2022-11-05 08:48:48 +01:00
AlberLC
4cc610d579 Update flood checking 2022-11-03 21:09:49 +01:00
AlberLC
a967f7d594 Move logic to FlanaBot 2022-11-02 03:45:23 +01:00
AlberLC
6e3f014e24 Fix CHANGEABLE_ROLES defaultdict(list, ... 2022-10-31 19:18:08 +01:00
AlberLC
a88ca55b04 Update Chat.DEFAULT_CONFIG 2022-10-29 21:13:05 +02:00
AlberLC
496b4e0898 Refactor new models import 2022-10-29 06:15:32 +02:00
AlberLC
16a009fe9a Add bots.__init__ 2022-10-29 04:29:52 +02:00
AlberLC
113c37ee2c Refactor new models import 2022-10-29 04:23:31 +02:00
AlberLC
bdbc4d5e0d Add delete votes and voting ban 2022-10-29 03:16:54 +02:00
AlberLC
967439846f Refactor new models import 2022-10-29 03:15:43 +02:00
AlberLC
24ec450de6 Refactor new models import 2022-10-28 21:58:22 +02:00
AlberLC
d8374b7d73 Move _on_config to MultiBot 2022-10-28 21:58:03 +02:00
AlberLC
ef5b156873 Refactor penalty system 2022-10-28 08:10:39 +02:00
AlberLC
7f1f275f08 Update _on_roles 2022-10-25 09:46:04 +02:00
AlberLC
da41602456 Update config system 2022-10-25 09:44:22 +02:00
AlberLC
ca6373d490 Update database management 2022-10-25 09:42:57 +02:00
AlberLC
082ae6733b Delete useless code 2022-10-25 09:42:03 +02:00
AlberLC
18ef15d031 Update todo annotations 2022-10-25 06:42:08 +02:00
AlberLC
81c9868460 Fix _on_choose 2022-10-22 21:09:07 +02:00
AlberLC
81c8c5b6d9 Update changeable roles 2022-10-21 21:54:29 +02:00
AlberLC
7006041076 Fix multiple answer polls 2022-10-20 20:06:52 +02:00
AlberLC
3347b52bab Fix super call to _on_inline_query_raw 2022-10-17 20:20:20 +02:00
AlberLC
e291711ed4 Update config message 2022-10-13 22:34:37 +02:00
AlberLC
0819b4f183 Update config message 2022-10-13 22:32:43 +02:00
AlberLC
efaf90e217 Add multiple answer polls 2022-10-13 22:15:49 +02:00
AlberLC
b4d7541851 Fix discord limit button text 2022-10-10 02:39:06 +02:00
AlberLC
c05e52eda2 Add priority to choose 2022-10-05 20:08:00 +02:00
AlberLC
b426286a23 Add priority to polls 2022-10-05 20:02:57 +02:00
AlberLC
45be09354c Update BYE_PHRASES 2022-10-05 07:43:51 +02:00
AlberLC
1623aac3c8 Minor change 2022-09-29 19:35:16 +02:00
AlberLC
bb9cf53673 Update timeout_for_media 2022-09-22 22:22:24 +02:00
AlberLC
ff887b2cd7 Update timeout_for_media 2022-09-21 19:03:01 +02:00
AlberLC
68c0558f65 Add timeout_for_media 2022-09-21 05:10:44 +02:00
AlberLC
25a1a1b601 Fix typo 2022-09-08 20:29:18 +02:00
AlberLC
d9f4a39c98 Update punishments system 2022-09-08 19:55:47 +02:00
AlberLC
30d9393de4 Update Update fire algorithm 2022-09-08 19:55:22 +02:00
AlberLC
1a410144f4 Improve _on_config_button_press button states 2022-08-26 23:34:46 +02:00
AlberLC
4a70a77e94 Improve _on_scraping_audio keywords 2022-08-26 19:46:51 +02:00
AlberLC
3ab840dd2f Use pytz timezones in _on_audit_log 2022-08-26 05:31:13 +02:00
AlberLC
a5996fa2ca Fix _create_user_from_discord_user async 2022-08-25 21:11:11 +02:00
AlberLC
50feac35ce Update _on_roles 2022-08-25 20:20:16 +02:00
AlberLC
40ddd71bee Update _on_roles 2022-08-25 06:58:05 +02:00
AlberLC
8c49b24024 Update _on_roles 2022-08-25 06:56:15 +02:00
AlberLC
499c3162ae Update _on_roles 2022-08-25 06:30:25 +02:00
AlberLC
a86251e902 Update _on_audit_log error message 2022-08-25 06:08:09 +02:00
AlberLC
cf0837536d Add audit log 2022-08-25 05:15:33 +02:00
AlberLC
6dc8da51f5 Fix fire algorithm 2022-08-19 13:45:26 +02:00
AlberLC
44e751870a Fix fire algorithm 2022-08-17 13:46:30 +02:00
AlberLC
70b5103a41 Update fire algorithm 2022-08-16 21:04:53 +02:00
AlberLC
66d555e20f Update fire algorithm 2022-08-16 21:02:11 +02:00
AlberLC
57f94aaa4e Fix scraping with reply 2022-08-16 20:55:58 +02:00
AlberLC
613a16327c Fix channel heating 2022-08-14 22:50:57 +02:00
AlberLC
65970b78e5 Fix channel heating 2022-08-14 22:31:34 +02:00
AlberLC
700123a962 Fix channel heating 2022-08-14 20:22:44 +02:00
AlberLC
f9216e1746 Update medias with extension argument 2022-08-12 17:26:38 +02:00
AlberLC
19d22c6d74 Update requirements.txt 2022-08-12 17:08:16 +02:00
AlberLC
f0e0606479 Add support for scraping audio_only 2022-08-12 16:47:44 +02:00
AlberLC
1fbe76b733 Update scraping structure 2022-08-12 16:47:17 +02:00
AlberLC
a68b779e1a Update fire algorithm 2022-08-12 16:46:25 +02:00
AlberLC
ebdfe14667 Update requirements.txt 2022-08-10 18:50:12 +02:00
AlberLC
8f7aa3698c Fix channel fire 2022-08-10 04:19:54 +02:00
AlberLC
237a7e93b6 Fix channel fire 2022-08-09 18:49:26 +02:00
AlberLC
a38932fe81 Add channel fire 2022-08-09 17:41:33 +02:00
AlberLC
eee74684dc Improve legibility 2022-08-08 10:29:49 +02:00
AlberLC
33764e9642 Add timeout_for_media to _search_medias 2022-08-07 22:29:12 +02:00
AlberLC
9fba7fd13c Update spaces between text and emojis 2022-08-07 22:14:51 +02:00
AlberLC
1c1709bea3 Fix asyncio.gather return_exceptions 2022-08-07 20:02:44 +02:00
AlberLC
b64e279eaf Fix tiktok.find_tiktok_ids async asynchronous call 2022-08-06 04:09:45 +02:00
AlberLC
f1c0e1179e Upgrade scrapers structures 2022-08-06 03:57:21 +02:00
AlberLC
1eb019d5ff Upgrade bot_state_message 2022-08-05 16:08:37 +02:00
AlberLC
7d5bff9b89 Update README.rst 2022-08-05 06:09:05 +02:00
AlberLC
5db2cc0ae2 Update README.rst 2022-08-05 05:53:00 +02:00
AlberLC
8a69110fee Update README.rst 2022-08-05 05:51:29 +02:00
AlberLC
3be66803f7 Update Dockerfile 2022-08-04 23:04:09 +02:00
AlberLC
b816b2f26d Add YouTube scraper 2022-08-04 11:21:07 +02:00
AlberLC
fabd8ba375 Fix _on_roles bot-mentioned and in groups 2022-07-29 20:02:33 +02:00
AlberLC
9fe9fe5757 Add changeable roles 2022-07-29 07:15:58 +02:00
AlberLC
b58d7c2e56 Fix parse_callback test 2022-07-29 07:15:16 +02:00
AlberLC
a2db01fd46 Fix polls capitalize 2022-07-19 23:44:02 +02:00
AlberLC
e005bb3670 Add delete message at config show 2022-07-18 21:55:59 +02:00
AlberLC
e13796273d Improve a minor legibility error 2022-07-17 09:55:11 +02:00
AlberLC
866599261f Update HEAT_NAMES 2022-07-17 09:29:50 +02:00
AlberLC
5892cc1579 Fix get_group_roles call 2022-07-17 09:26:56 +02:00
AlberLC
5a923b01c7 Add remembered user roles at rejoin 2022-07-16 06:29:27 +02:00
AlberLC
4ecf847686 Fix bot used to delete some messages 2022-07-14 20:08:50 +02:00
AlberLC
db9ccbf81a Use str.capitalize 2022-07-07 07:39:42 +02:00
AlberLC
b26724b0a5 Fix regex pattern warning 2022-07-07 07:16:36 +02:00
AlberLC
bb93ffbf0d Add _get_options 2022-07-07 07:13:45 +02:00
AlberLC
1cc7464059 Update polls 2022-07-07 03:59:29 +02:00
AlberLC
86fcb0e0b6 Fix typo 2022-07-06 05:48:42 +02:00
AlberLC
ebeecbff5d Fix _check_message_flood
Check messages in its chat
2022-06-29 22:17:58 +02:00
AlberLC
d137f220cc Fix message find by date 2022-06-29 06:48:06 +02:00
AlberLC
00323f99fb Update HEAT_NAMES 2022-06-27 05:08:08 +02:00
AlberLC
6237ccf8b6 Fix _on_choose 2022-06-27 04:59:14 +02:00
AlberLC
f402293c99 Upgrade _on_choose 2022-06-27 04:22:47 +02:00
AlberLC
99effad710 Fix constants accents 2022-06-23 04:51:49 +02:00
AlberLC
54cada0649 Fix _on_choose 2022-06-23 04:31:45 +02:00
AlberLC
0f5d3c333a Fix commands at private 2022-06-23 04:13:36 +02:00
AlberLC
5fb55404cf Fix commands at private 2022-06-23 03:10:49 +02:00
AlberLC
2768d8e949 Fix commands at private 2022-06-23 02:17:59 +02:00
AlberLC
b0ca5a2ded Add ports to db local access 2022-06-23 02:17:33 +02:00
AlberLC
697ba9e89e Add polls 2022-06-22 09:18:42 +02:00
AlberLC
09df75ea0b Update buttons logic (add ButtonsInfo) and add choose and dice functionalities 2022-06-19 13:34:12 +02:00
AlberLC
c2e7e68619 Fix _check_message_flood 2022-06-15 00:46:31 +02:00
AlberLC
f12d0c18c1 Fix insults 2022-06-14 04:41:26 +02:00
AlberLC
b55f933a32 Add new insults 2022-06-14 04:01:16 +02:00
AlberLC
5fa102b157 Fix _on_scraping with replied message 2022-06-12 00:13:33 +02:00
AlberLC
1e8a33c0c4 Improve button registration 2022-06-08 22:05:02 +02:00
AlberLC
daef039ce5 Change _manage_exceptions context arg 2022-06-06 04:22:26 +02:00
AlberLC
7ad4c4052e Remove twitch bot at main 2022-06-06 02:31:16 +02:00
AlberLC
641dc72738 Update requirements.txt 2022-06-04 03:33:59 +02:00
AlberLC
cafa7ce1c5 Fix recover original message 2022-06-04 02:58:32 +02:00
AlberLC
c9a46b2b07 Fix roles mentions in weather 2022-06-01 03:16:45 +02:00
AlberLC
92ee0e405d Fix env variable names 2022-05-31 23:08:36 +02:00
AlberLC
dc6bf7accb Refactor punishments and optimize chat objects 2022-05-31 22:11:16 +02:00
AlberLC
9e5f7a81ff Add user who pressed the button 2022-05-26 05:45:16 +02:00
AlberLC
28ff804d5a Improve discord user name printing 2022-05-25 05:30:09 +02:00
AlberLC
d47fdfb57e Fix bot insults ratio 2022-05-25 04:00:47 +02:00
AlberLC
20fb6e3223 Fix bot insults ratio 2022-05-25 02:55:28 +02:00
AlberLC
952672946b Add Discord bot 2022-05-25 00:46:32 +02:00
AlberLC
7c43db0867 Fix warning at implicit long weather request 2022-03-31 20:06:06 +02:00
AlberLC
f6093ec01e Fix resource file resolution 2022-03-17 03:50:33 +01:00
AlberLC
99d84a5d35 Fix query place with coordinates 2022-02-21 00:58:56 +01:00
29 changed files with 2518 additions and 1196 deletions

View File

@@ -7,4 +7,4 @@ COPY venv/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
ENV PYTHONPATH=/application
CMD python3.10 flanabot/main.py
CMD python3.10 -u flanabot/main.py

View File

@@ -30,7 +30,7 @@ Features
- Mute users temporarily or forever.
- Ban users temporarily or forever.
- Configurable default behavior for each chat, just talk to him to configure it.
- Get media from twitter, instagram and tiktok and send it to the chat. From tiktok also obtains data about the song that is playing in the video.
- Get media from Instagram, Reddit, TikTok, Twitter, YouTube and others and send it to the chat. From TikTok also obtains data about the song that is playing in the video.
.. |license| image:: https://img.shields.io/github/license/AlberLC/flanabot?style=flat

View File

@@ -7,6 +7,8 @@ services:
mongodb:
image: mongo
ports:
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=${MONGO_USER}
- MONGO_INITDB_ROOT_PASSWORD=${MONGO_PASSWORD}

View File

@@ -0,0 +1,7 @@
from flanabot.bots.flana_bot import *
from flanabot.bots.flana_disc_bot import *
from flanabot.bots.flana_tele_bot import *
from flanabot.bots.penalty_bot import *
from flanabot.bots.poll_bot import *
from flanabot.bots.scraper_bot import *
from flanabot.bots.weather_bot import *

View File

@@ -0,0 +1,587 @@
__all__ = ['Connect4Bot']
import asyncio
import copy
import random
from abc import ABC
from collections import defaultdict
from typing import Iterable
from flanautils import Media, MediaType, Source
from multibot import MultiBot
import connect_4_frontend
from flanabot import constants
from flanabot.models import ButtonsGroup, Message, Player
# ----------------------------------------------------------------------------------------------------- #
# --------------------------------------------- CONNECT_4_BOT --------------------------------------------- #
# ----------------------------------------------------------------------------------------------------- #
class Connect4Bot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_connect_4, constants.KEYWORDS['connect_4'])
self.register(self._on_connect_4_vs_itself, (*constants.KEYWORDS['connect_4'], *constants.KEYWORDS['self']))
self.register_button(self._on_connect_4_button_press, ButtonsGroup.CONNECT_4)
def _ai_insert(
self,
current_player_num: int,
next_player_num: int,
board: list[list[int | None]]
) -> tuple[int, int]:
available_positions_ = self._available_positions(board)
# check if current player can win
for i, j in available_positions_:
if current_player_num in self._check_winners(i, j, board):
return self.insert_piece(j, current_player_num, board)
# check if next player can win
for i, j in available_positions_:
if next_player_num in self._check_winners(i, j, board):
return self.insert_piece(j, current_player_num, board)
# future possibility (above the move)
next_player_winning_positions_above = []
current_player_winning_positions_above = []
for i, j in available_positions_:
if i < 1:
continue
board_copy = copy.deepcopy(board)
board_copy[i][j] = current_player_num
winners = self._check_winners(i - 1, j, board_copy)
if next_player_num in winners:
next_player_winning_positions_above.append((i, j))
elif current_player_num in winners:
current_player_winning_positions_above.append((i, j))
# check if after the current player moves, it will have 2 positions to win
for i, j in available_positions_:
if (i, j) in next_player_winning_positions_above:
continue
board_copy = copy.deepcopy(board)
board_copy[i][j] = current_player_num
if len(self._winning_positions(board_copy)[current_player_num]) >= 2:
return self.insert_piece(j, current_player_num, board)
# check if after the next player moves, he will have 2 positions to win
for i, j in available_positions_:
if (i, j) in current_player_winning_positions_above:
continue
board_copy = copy.deepcopy(board)
board_copy[i][j] = next_player_num
future_winning_positions = self._winning_positions(board_copy)[next_player_num]
if len(future_winning_positions) < 2:
continue
if (i, j) not in next_player_winning_positions_above:
return self.insert_piece(j, current_player_num, board)
for i_2, j_2 in future_winning_positions:
if (i_2, j_2) in available_positions_ and (i_2, j_2) not in next_player_winning_positions_above:
return self.insert_piece(j_2, current_player_num, board)
good_positions = [pos for pos in available_positions_ if pos not in next_player_winning_positions_above and pos not in current_player_winning_positions_above]
if good_positions:
j = random.choice(self._best_moves(good_positions, current_player_num, board))[1]
elif current_player_winning_positions_above:
j = random.choice(self._best_moves(current_player_winning_positions_above, current_player_num, board))[1]
else:
j = random.choice(self._best_moves(next_player_winning_positions_above, current_player_num, board))[1]
return self.insert_piece(j, current_player_num, board)
async def _ai_turn(
self,
player_1: Player,
player_2: Player,
current_player: Player,
next_player: Player,
next_turn: int,
delay: float,
board: list[list[int | None]],
message: Message
) -> bool:
await asyncio.sleep(delay)
i, j = self._ai_insert(current_player.number, next_player.number, board)
if await self._check_game_finished(i, j, player_1, player_2, next_turn, board, message):
return True
return not await self.edit(
Media(
connect_4_frontend.make_image(board, next_player, highlight=(i, j)),
MediaType.IMAGE,
'png',
Source.LOCAL
),
message
)
@staticmethod
def _available_positions(board: list[list[int | None]]) -> list[tuple[int, int]]:
available_positions = []
for j in range(constants.CONNECT_4_N_COLUMNS):
for i in range(constants.CONNECT_4_N_ROWS - 1, -1, -1):
if board[i][j] is None:
available_positions.append((i, j))
break
return available_positions
# noinspection DuplicatedCode
@staticmethod
def _best_moves(
possible_positions: Iterable[tuple[int, int]],
player_num: int,
board: list[list[int | None]]
) -> list[tuple[int, int]]:
best_moves = []
max_points = float('-inf')
for i, j in possible_positions:
if 3 <= j <= constants.CONNECT_4_N_COLUMNS - 4:
points = constants.CONNECT_4_CENTER_COLUMN_POINTS
else:
points = 0
# left
for j_left in range(j - 1, j - 4, -1):
if j_left < 0:
points -= 1
break
if board[i][j_left] is not None:
if board[i][j_left] == player_num:
points += 1
else:
points -= 1
break
# right
for j_right in range(j + 1, j + 4):
if j_right >= constants.CONNECT_4_N_COLUMNS:
points -= 1
break
if board[i][j_right] is not None:
if board[i][j_right] == player_num:
points += 1
else:
points -= 1
break
# up
for i_up in range(i - 1, i - 4, -1):
if i_up < 0:
points -= 1
break
if board[i_up][j] is not None:
if board[i_up][j] == player_num:
points += 1
else:
points -= 1
break
# down
for i_down in range(i + 1, i + 4):
if i_down >= constants.CONNECT_4_N_ROWS:
points -= 1
break
if board[i_down][j] is not None:
if board[i_down][j] == player_num:
points += 1
else:
points -= 1
break
# up left
for n in range(1, 4):
i_up = i - n
j_left = j - n
if i_up < 0 or j_left < 0:
points -= 1
break
if board[i_up][j_left] is not None:
if board[i_up][j_left] == player_num:
points += 1
else:
points -= 1
break
# up right
for n in range(1, 4):
i_up = i - n
j_right = j + n
if i_up < 0 or j_right >= constants.CONNECT_4_N_COLUMNS:
points -= 1
break
if board[i_up][j_right] is not None:
if board[i_up][j_right] == player_num:
points += 1
else:
points -= 1
break
# down left
for n in range(1, 4):
i_down = i + n
j_left = j - n
if i_down >= constants.CONNECT_4_N_ROWS or j_left < 0:
points -= 1
break
if board[i_down][j_left] is not None:
if board[i_down][j_left] == player_num:
points += 1
else:
points -= 1
break
# down right
for n in range(1, 4):
i_down = i + n
j_right = j + n
if i_down >= constants.CONNECT_4_N_ROWS or j_right >= constants.CONNECT_4_N_COLUMNS:
points -= 1
break
if board[i_down][j_right] is not None:
if board[i_down][j_right] == player_num:
points += 1
else:
points -= 1
break
if points > max_points:
best_moves = [(i, j)]
max_points = points
elif points == max_points:
best_moves.append((i, j))
return best_moves
async def _check_game_finished(
self,
i: int,
j: int,
player_1: Player,
player_2: Player,
turn: int,
board: list[list[int | None]],
message: Message
) -> bool:
if board[i][j] in self._check_winners(i, j, board):
winner, loser = (player_1, player_2) if board[i][j] == player_1.number else (player_2, player_1)
edit_kwargs = {'winner': winner, 'loser': loser, 'win_position': (i, j)}
elif turn >= constants.CONNECT_4_N_ROWS * constants.CONNECT_4_N_COLUMNS:
edit_kwargs = {'tie': True}
else:
return False
try:
message.data['is_active'] = False
except AttributeError:
pass
await self.edit(
Media(
connect_4_frontend.make_image(board, highlight=(i, j), **edit_kwargs),
MediaType.IMAGE,
'png',
Source.LOCAL
),
message,
buttons=[]
)
return True
@staticmethod
def _check_winner_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < j and board[i][j - 3] == board[i][j - 2] == board[i][j - 1]
or
1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i][j - 2] == board[i][j - 1] == board[i][j + 1]
)
and
board[i][j - 1] is not None
):
return board[i][j - 1]
@staticmethod
def _check_winner_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
j < constants.CONNECT_4_N_COLUMNS - 3 and board[i][j + 1] == board[i][j + 2] == board[i][j + 3]
or
0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i][j - 1] == board[i][j + 1] == board[i][j + 2]
)
and
board[i][j + 1] is not None
):
return board[i][j + 1]
@staticmethod
def _check_winner_up(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and board[i - 3][j] == board[i - 2][j] == board[i - 1][j]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and board[i - 2][j] == board[i - 1][j] == board[i + 1][j]
)
and
board[i - 1][j] is not None
):
return board[i - 1][j]
@staticmethod
def _check_winner_down(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and board[i + 1][j] == board[i + 2][j] == board[i + 3][j]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and board[i - 1][j] == board[i + 1][j] == board[i + 2][j]
)
and
board[i + 1][j] is not None
):
return board[i + 1][j]
@staticmethod
def _check_winner_up_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and 2 < j and board[i - 3][j - 3] == board[i - 2][j - 2] == board[i - 1][j - 1]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i - 2][j - 2] == board[i - 1][j - 1] == board[i + 1][j + 1]
)
and
board[i - 1][j - 1] is not None
):
return board[i - 1][j - 1]
@staticmethod
def _check_winner_up_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
2 < i and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i - 1][j + 1] == board[i - 2][j + 2] == board[i - 3][j + 3]
or
1 < i < constants.CONNECT_4_N_ROWS - 1 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i + 1][j - 1] == board[i - 1][j + 1] == board[i - 2][j + 2]
)
and
board[i - 1][j + 1] is not None
):
return board[i - 1][j + 1]
@staticmethod
def _check_winner_down_left(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and 2 < j and board[i + 3][j - 3] == board[i + 2][j - 2] == board[i + 1][j - 1]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 1 < j < constants.CONNECT_4_N_COLUMNS - 1 and board[i + 2][j - 2] == board[i + 1][j - 1] == board[i - 1][j + 1]
)
and
board[i + 1][j - 1] is not None
):
return board[i + 1][j - 1]
@staticmethod
def _check_winner_down_right(i: int, j: int, board: list[list[int | None]]) -> int | None:
if (
(
i < constants.CONNECT_4_N_ROWS - 3 and j < constants.CONNECT_4_N_COLUMNS - 3 and board[i + 1][j + 1] == board[i + 2][j + 2] == board[i + 3][j + 3]
or
0 < i < constants.CONNECT_4_N_ROWS - 2 and 0 < j < constants.CONNECT_4_N_COLUMNS - 2 and board[i - 1][j - 1] == board[i + 1][j + 1] == board[i + 2][j + 2]
)
and
board[i + 1][j + 1] is not None
):
return board[i + 1][j + 1]
def _check_winners(self, i: int, j: int, board: list[list[int | None]]) -> set[int]:
winners = set()
if winner := self._check_winner_left(i, j, board):
winners.add(winner)
if winner := self._check_winner_up(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_right(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_down(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_up_left(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_up_right(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_down_right(i, j, board):
winners.add(winner)
if len(winners) == 2:
return winners
if winner := self._check_winner_down_left(i, j, board):
winners.add(winner)
return winners
def _winning_positions(self, board: list[list[int | None]]) -> defaultdict[int, list[tuple[int, int]]]:
winning_positions = defaultdict(list)
for next_i, next_j in self._available_positions(board):
for player_number in self._check_winners(next_i, next_j, board):
winning_positions[player_number].append((next_i, next_j))
return winning_positions
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_connect_4(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
board = [[None for _ in range(constants.CONNECT_4_N_COLUMNS)] for _ in range(constants.CONNECT_4_N_ROWS)]
player_1 = Player(message.author.id, message.author.name.split('#')[0], 1)
try:
user_2 = next(user for user in message.mentions if user.id != self.id)
except StopIteration:
player_2 = Player(self.id, self.name.split('#')[0], 2)
else:
player_2 = Player(user_2.id, user_2.name.split('#')[0], 2)
await self.send(
media=Media(connect_4_frontend.make_image(board, player_1), MediaType.IMAGE, 'png', Source.LOCAL),
message=message,
buttons=self.distribute_buttons([str(n) for n in range(1, constants.CONNECT_4_N_COLUMNS + 1)]),
buttons_key=ButtonsGroup.CONNECT_4,
data={
'connect_4': {
'is_active': True,
'board': board,
'player_1': player_1.to_dict(),
'player_2': player_2.to_dict(),
'turn': 0
}
}
)
await self.delete_message(message)
async def _on_connect_4_button_press(self, message: Message):
await self.accept_button_event(message)
connect_4_data = message.data['connect_4']
is_active = connect_4_data['is_active']
board = connect_4_data['board']
player_1 = Player.from_dict(connect_4_data['player_1'])
player_2 = Player.from_dict(connect_4_data['player_2'])
if connect_4_data['turn'] % 2 == 0:
current_player = player_1
next_player = player_2
else:
current_player = player_2
next_player = player_1
presser_id = message.buttons_info.presser_user.id
move_column = int(message.buttons_info.pressed_text) - 1
if not is_active or current_player.id != presser_id or board[0][move_column] is not None:
return
connect_4_data['is_active'] = False
i, j = self.insert_piece(move_column, current_player.number, board)
connect_4_data['turn'] += 1
if await self._check_game_finished(i, j, player_1, player_2, connect_4_data['turn'], board, message):
return
await self.edit(
Media(
connect_4_frontend.make_image(board, next_player, highlight=(i, j)),
MediaType.IMAGE,
'png',
Source.LOCAL
),
message
)
if player_2.id == self.id:
connect_4_data['turn'] += 1
if await self._ai_turn(
player_1,
player_2,
next_player,
current_player,
connect_4_data['turn'],
constants.CONNECT_4_AI_DELAY_SECONDS,
board,
message
):
return
connect_4_data['is_active'] = True
async def _on_connect_4_vs_itself(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
board = [[None for _ in range(constants.CONNECT_4_N_COLUMNS)] for _ in range(constants.CONNECT_4_N_ROWS)]
player_1 = Player(self.id, self.name.split('#')[0], 1)
player_2 = Player(self.id, self.name.split('#')[0], 2)
current_player = player_1
next_player = player_2
turn = 0
bot_message = await self.send(
media=Media(connect_4_frontend.make_image(board, current_player), MediaType.IMAGE, 'png', Source.LOCAL),
message=message
)
await self.delete_message(message)
while True:
turn += 1
if await self._ai_turn(
player_1,
player_2,
current_player,
next_player,
turn,
constants.CONNECT_4_AI_DELAY_SECONDS / 2,
board,
bot_message
):
break
current_player, next_player = next_player, current_player
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
@staticmethod
def insert_piece(j: int, player_number: int, board: list[list[int | None]]) -> tuple[int, int] | None:
for i in range(constants.CONNECT_4_N_ROWS - 1, -1, -1):
if board[i][j] is None:
board[i][j] = player_number
return i, j

File diff suppressed because it is too large Load Diff

View File

@@ -1,37 +1,20 @@
__all__ = ['FlanaDiscBot']
import asyncio
import datetime
import math
import os
import random
import discord
from multibot import DiscordBot
import flanautils
import pytz
from flanautils import Media, NotFoundError, OrderedSet
from multibot import BadRoleError, DiscordBot, Role, User, bot_mentioned, constants as multibot_constants, group
from flanabot import constants
from flanabot.bots.flana_bot import FlanaBot
from flanabot.exceptions import BadRoleError, UserDisconnectedError
from flanabot.models import User
HEAT_NAMES = [
'Canal Congelado',
'Canal Fresquito',
'Canal Templaillo',
'Canal Calentito',
'Canal Caloret',
'Canal Caliente',
'Canal Olor a Vasco',
'Verano Cordobés al Sol',
'Canal Ardiendo',
'abrid las putas ventanas y traed el extintor',
'Canal INFIERNO',
'La Palma 🌋'
]
HOT_CHANNEL_ID = 493530483045564417
ROLES = {
'Administrador': 387344390030360587,
'Carroñero': 493523298429435905,
'al lol': 881238165476741161,
'Persona': 866046517998387220,
'Castigado': 877662459568209921,
'Bot': 493784221085597706
}
from flanabot.models import Chat, Message, Punishment
# ---------------------------------------------------------------------------------------------------- #
@@ -41,69 +24,151 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
def __init__(self):
super().__init__(os.environ['DISCORD_BOT_TOKEN'])
self.heating = False
self.heat_level = 0
self.heat_level = 0.0
# ----------------------------------------------------------- #
# -------------------- PROTECTED METHODS -------------------- #
# ----------------------------------------------------------- #
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.bot_client.add_listener(self._on_voice_state_update, 'on_voice_state_update')
self.client.add_listener(self._on_member_join, 'on_member_join')
self.client.add_listener(self._on_member_remove, 'on_member_remove')
self.client.add_listener(self._on_voice_state_update, 'on_voice_state_update')
self.register(self._on_audit_log, multibot_constants.KEYWORDS['audit'])
async def _changeable_roles(self, group_: int | str | Chat | Message) -> list[Role]:
group_id = self.get_group_id(group_)
return [role for role in await self.get_group_roles(group_) if role.id in constants.CHANGEABLE_ROLES[group_id]]
async def _heat_channel(self, channel: discord.VoiceChannel):
async def set_fire_to(channel_key: str, depends_on: str, firewall=0):
fire_score = random.randint(0, channels[depends_on]['n_fires'] - channels[channel_key]['n_fires']) - firewall // 2
if fire_score < 1:
if not channels[channel_key]['n_fires']:
return
channels[channel_key]['n_fires'] -= 1
elif fire_score == 1:
return
else:
channels[channel_key]['n_fires'] += 1
if channels[channel_key]['n_fires']:
new_name_ = '🔥' * channels[channel_key]['n_fires']
else:
new_name_ = channels[channel_key]['original_name']
await channels[channel_key]['object'].edit(name=new_name_)
channels = {}
for key in constants.DISCORD_HOT_CHANNEL_IDS:
channel_ = flanautils.find(channel.guild.voice_channels, condition=lambda c: c.id == constants.DISCORD_HOT_CHANNEL_IDS[key])
channels[key] = {
'object': channel_,
'original_name': channel_.name,
'n_fires': 0
}
while True:
await asyncio.sleep(constants.HEAT_PERIOD_SECONDS)
if channel.members:
if self.heat_level == len(HEAT_NAMES) - 1:
return
self.heat_level += 0.5
elif not channel.members:
else:
if not self.heat_level:
return
self.heat_level -= 0.5
else:
if self.heat_level > len(constants.DISCORD_HEAT_NAMES) - 1:
self.heat_level = float(int(self.heat_level))
if not self.heat_level.is_integer():
continue
await channel.edit(name=HEAT_NAMES[int(self.heat_level)])
i = int(self.heat_level)
if i < len(constants.DISCORD_HEAT_NAMES):
n_fires = 0
new_name = constants.DISCORD_HEAT_NAMES[i]
else:
n_fires = i - len(constants.DISCORD_HEAT_NAMES) + 1
n_fires = round(math.log(n_fires + 4, 1.2) - 8)
new_name = '🔥' * n_fires
channels['C']['n_fires'] = n_fires
if channel.name != new_name:
await channel.edit(name=new_name)
async def _mute(self, user_id: int, group_id: int):
user = await self.get_user(user_id, group_id)
try:
await user.original_object.edit(mute=True)
except discord.errors.HTTPException:
raise UserDisconnectedError
await set_fire_to('B', depends_on='C', firewall=len(channels['B']['object'].members))
await set_fire_to('A', depends_on='B', firewall=len(channels['A']['object'].members))
await set_fire_to('D', depends_on='C', firewall=len(channels['C']['object'].members))
await set_fire_to('E', depends_on='D', firewall=len(channels['D']['object'].members))
async def _punish(self, user_id: int, group_id: int):
user = await self.get_user(user_id, group_id)
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
try:
await user.original_object.remove_roles(self._find_role_by_id(ROLES['Persona'], user.original_object.guild.roles))
await user.original_object.add_roles(self._find_role_by_id(ROLES['Castigado'], user.original_object.guild.roles))
await self.add_role(user_id, group_, 'Castigado')
await self.remove_role(user_id, group_, 'Persona')
except AttributeError:
raise BadRoleError(str(self._punish))
async def _unmute(self, user_id: int, group_id: int):
user = await self.get_user(user_id, group_id)
try:
await user.original_object.edit(mute=False)
except discord.errors.HTTPException:
raise UserDisconnectedError
async def _search_medias(
self,
message: Message,
force=False,
audio_only=False,
timeout_for_media: int | float = 15
) -> OrderedSet[Media]:
return await super()._search_medias(message, force, audio_only, timeout_for_media)
async def _unpunish(self, user_id: int, group_id: int):
user = await self.get_user(user_id, group_id)
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
user_id = self.get_user_id(user)
try:
await user.original_object.remove_roles(self._find_role_by_id(ROLES['Castigado'], user.original_object.guild.roles))
await user.original_object.add_roles(self._find_role_by_id(ROLES['Persona'], user.original_object.guild.roles))
await self.add_role(user_id, group_, 'Persona')
await self.remove_role(user_id, group_, 'Castigado')
except AttributeError:
raise BadRoleError(str(self._unpunish))
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@group
@bot_mentioned
async def _on_audit_log(self, message: Message):
audit_entries = await self.find_audit_entries(
message,
limit=constants.AUDIT_LOG_LIMIT,
actions=(discord.AuditLogAction.member_disconnect, discord.AuditLogAction.member_move),
after=datetime.datetime.now(datetime.timezone.utc) - constants.AUDIT_LOG_AGE
)
await self.delete_message(message)
if not audit_entries:
await self.send_error(f'No hay entradas en el registro de auditoría <i>(desconectar y mover)</i> en la última hora.', message)
return
message_parts = ['<b>Registro de auditoría (solo desconectar y mover):</b>', '']
for entry in audit_entries:
author = await self._create_user_from_discord_user(entry.user)
date_string = entry.created_at.astimezone(pytz.timezone('Europe/Madrid')).strftime('%d/%m/%Y %H:%M:%S')
if entry.action is discord.AuditLogAction.member_disconnect:
message_parts.append(f"<b>{author.name}</b> ha <b>desconectado</b> {entry.extra.count} {'usuario' if entry.extra.count == 1 else 'usuarios'} <i>({date_string})</i>")
elif entry.action is discord.AuditLogAction.member_move:
message_parts.append(f"<b>{author.name}</b> ha <b>movido</b> {entry.extra.count} {'usuario' if entry.extra.count == 1 else 'usuarios'} a {entry.extra.channel.name} <i>({date_string})</i>")
await self.send('\n'.join(message_parts), message)
async def _on_member_join(self, member: discord.Member):
user = await self._create_user_from_discord_user(member)
user.pull_from_database(overwrite_fields=('roles',))
for role in user.roles:
try:
await self.add_role(user, member.guild.id, role.id)
except NotFoundError:
pass
async def _on_member_remove(self, member: discord.Member):
(await self._create_user_from_discord_user(member)).save()
async def _on_voice_state_update(self, _: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
if getattr(before.channel, 'id', None) == HOT_CHANNEL_ID:
if getattr(before.channel, 'id', None) == constants.DISCORD_HOT_CHANNEL_IDS['C']:
channel = before.channel
elif getattr(after.channel, 'id', None) == HOT_CHANNEL_ID:
elif getattr(after.channel, 'id', None) == constants.DISCORD_HOT_CHANNEL_IDS['C']:
channel = after.channel
else:
return
@@ -116,18 +181,13 @@ class FlanaDiscBot(DiscordBot, FlanaBot):
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
async def is_deaf(self, user_id: int, group_id: int) -> bool:
user = await self.get_user(user_id, group_id)
return user.original_object.voice.deaf
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
user = await self.get_user(user, group_)
group_id = self.get_group_id(group_)
async def is_muted(self, user_id: int, group_id: int) -> bool:
user = await self.get_user(user_id, group_id)
return user.original_object.voice.mute
@staticmethod
def is_self_deaf(user: User) -> bool:
return user.original_object.voice.self_deaf
@staticmethod
def is_self_muted(user: User) -> bool:
return user.original_object.voice.self_mute
return bool(Punishment.find({
'platform': self.platform.value,
'user_id': user.id,
'group_id': group_id,
'is_active': True
}))

View File

@@ -1,26 +1,23 @@
from __future__ import annotations # todo0 remove in 3.11
from __future__ import annotations # todo0 remove when it's by default
__all__ = ['whitelisted', 'FlanaTeleBot']
import functools
import os
from typing import Callable
import telethon.tl.functions
from flanaapis.weather.constants import WeatherEmoji
from flanautils import Media, MediaType, return_if_first_empty
from multibot import TelegramBot, constants as multibot_constants, find_message, user_client
from flanautils import Media, OrderedSet
from multibot import TelegramBot, find_message, user_client
from flanabot.bots.flana_bot import FlanaBot
from flanabot.models.chat import Chat
from flanabot.models.message import Message
from flanabot.models.user import User
from flanabot.models import Message
# ---------------------------------------------------------- #
# ----------------------- DECORATORS ----------------------- #
# ---------------------------------------------------------- #
def whitelisted_event(func: Callable) -> Callable:
# ---------------------------------------------------- #
# -------------------- DECORATORS -------------------- #
# ---------------------------------------------------- #
def whitelisted(func: Callable) -> Callable:
@functools.wraps(func)
@find_message
async def wrapper(self: FlanaTeleBot, message: Message):
@@ -45,23 +42,9 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
)
self.whitelist_ids = []
# ----------------------------------------------------------- #
# -------------------- PROTECTED METHODS -------------------- #
# ----------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register_button(self._on_button_press)
@return_if_first_empty(exclude_self_types='FlanaTeleBot', globals_=globals())
async def _create_chat_from_telegram_chat(self, telegram_chat: multibot_constants.TELEGRAM_CHAT) -> Chat | None:
chat = await super()._create_chat_from_telegram_chat(telegram_chat)
chat.config = Chat.DEFAULT_CONFIG
return Chat.from_dict(chat.to_dict())
@return_if_first_empty(exclude_self_types='FlanaTeleBot', globals_=globals())
async def _create_user_from_telegram_user(self, original_user: multibot_constants.TELEGRAM_USER, group_id: int = None) -> User | None:
return User.from_dict((await super()._create_user_from_telegram_user(original_user, group_id)).to_dict())
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
@user_client
async def _get_contacts_ids(self) -> list[int]:
async with self.user_client:
@@ -69,50 +52,27 @@ class FlanaTeleBot(TelegramBot, FlanaBot):
return [contact.user_id for contact in contacts_data.contacts]
async def _search_medias(
self,
message: Message,
force=False,
audio_only=False,
timeout_for_media: int | float = 15
) -> OrderedSet[Media]:
return await super()._search_medias(message, force, audio_only, timeout_for_media)
@user_client
async def _update_whitelist(self):
self.whitelist_ids = [self.owner_id, self.bot_id] + await self._get_contacts_ids()
self.whitelist_ids = [self.owner_id, self.id] + await self._get_contacts_ids()
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@whitelisted_event
async def _on_button_press(self, message: Message):
await message.original_event.answer()
match message.button_text:
case WeatherEmoji.ZOOM_IN.value:
message.weather_chart.zoom_in()
case WeatherEmoji.ZOOM_OUT.value:
message.weather_chart.zoom_out()
case WeatherEmoji.LEFT.value:
message.weather_chart.move_left()
case WeatherEmoji.RIGHT.value:
message.weather_chart.move_right()
case WeatherEmoji.PRECIPITATION_VOLUME.value:
message.weather_chart.trace_metadatas['rain_volume'].show = not message.weather_chart.trace_metadatas['rain_volume'].show
message.weather_chart.trace_metadatas['snow_volume'].show = not message.weather_chart.trace_metadatas['snow_volume'].show
case emoji if emoji in WeatherEmoji.values:
trace_metadata_name = WeatherEmoji(emoji).name.lower()
message.weather_chart.trace_metadatas[trace_metadata_name].show = not message.weather_chart.trace_metadatas[trace_metadata_name].show
message.weather_chart.apply_zoom()
message.weather_chart.draw()
message.save()
image_bytes = message.weather_chart.to_image()
file = await self._prepare_media_to_send(Media(image_bytes, MediaType.IMAGE))
try:
await message.original_object.edit(file=file)
except telethon.errors.rpcerrorlist.MessageNotModifiedError:
pass
@whitelisted_event
@whitelisted
async def _on_inline_query_raw(self, message: Message):
await super()._on_new_message_raw(message)
await super()._on_inline_query_raw(message)
@whitelisted_event
@whitelisted
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)

View File

@@ -0,0 +1,194 @@
__all__ = ['PenaltyBot']
import asyncio
import datetime
from abc import ABC
import flanautils
from flanautils import TimeUnits
from multibot import MultiBot, User, admin, bot_mentioned, constants as multibot_constants, group, ignore_self_message
from flanabot import constants
from flanabot.models import Chat, Message, Punishment
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- PENALTY_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
class PenaltyBot(MultiBot, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = asyncio.Lock()
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_ban, multibot_constants.KEYWORDS['ban'])
self.register(self._on_mute, multibot_constants.KEYWORDS['mute'])
self.register(self._on_mute, (('haz', 'se'), multibot_constants.KEYWORDS['mute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['unmute']))
self.register(self._on_mute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_punish, constants.KEYWORDS['punish'])
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['unpunish']))
self.register(self._on_punish, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission']))
self.register(self._on_unban, multibot_constants.KEYWORDS['unban'])
self.register(self._on_unmute, multibot_constants.KEYWORDS['unmute'])
self.register(self._on_unmute, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['mute']))
self.register(self._on_unmute, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['sound']))
self.register(self._on_unpunish, constants.KEYWORDS['unpunish'])
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['punish']))
self.register(self._on_unpunish, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission']))
@admin(False)
@group
async def _check_message_flood(self, message: Message):
if await self.is_punished(message.author, message.chat):
return
last_2s_messages = self.Message.find({
'platform': self.platform.value,
'author': message.author.object_id,
'chat': message.chat.object_id,
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=2)}
})
last_7s_messages = self.Message.find({
'platform': self.platform.value,
'author': message.author.object_id,
'chat': message.chat.object_id,
'date': {'$gte': datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=7)}
})
if len(last_2s_messages) > constants.FLOOD_2s_LIMIT or len(last_7s_messages) > constants.FLOOD_7s_LIMIT:
punishment = Punishment.find_one({
'platform': self.platform.value,
'user_id': message.author.id,
'group_id': message.chat.group_id
})
punishment_seconds = (getattr(punishment, 'level', 0) + 2) ** constants.PUNISHMENT_INCREMENT_EXPONENT
await self.punish(message.author.id, message.chat.group_id, punishment_seconds, message)
await self.send(f'Castigado durante {TimeUnits(seconds=punishment_seconds).to_words()}.', message)
async def _punish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
pass
async def _unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
pass
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
@bot_mentioned
@group
@admin(send_negative=True)
async def _on_ban(self, message: Message):
for user in await self._find_users_to_punish(message):
await self.ban(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
@group
@bot_mentioned
@admin(send_negative=True)
async def _on_mute(self, message: Message):
for user in await self._find_users_to_punish(message):
await self.mute(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
@ignore_self_message
async def _on_new_message_raw(self, message: Message):
await super()._on_new_message_raw(message)
if message.chat.config['check_flood'] and message.chat.config['punish'] and not message.is_inline:
async with self.lock:
await self._check_message_flood(message)
@bot_mentioned
@group
@admin(send_negative=True)
async def _on_punish(self, message: Message):
if not message.chat.config['punish']:
return
for user in await self._find_users_to_punish(message):
await self.punish(user, message, flanautils.text_to_time(await self.filter_mention_ids(message.text, message)), message)
async def _on_ready(self):
await super()._on_ready()
await flanautils.do_every(constants.CHECK_PUNISHMENTS_EVERY_SECONDS, self.check_old_punishments)
@bot_mentioned
@group
@admin(send_negative=True)
async def _on_unban(self, message: Message):
for user in await self._find_users_to_punish(message):
await self.unban(user, message, message)
@group
@bot_mentioned
@admin(send_negative=True)
async def _on_unmute(self, message: Message):
for user in await self._find_users_to_punish(message):
await self.unmute(user, message, message)
@group
@bot_mentioned
@admin(send_negative=True)
async def _on_unpunish(self, message: Message):
if not message.chat.config['punish']:
return
for user in await self._find_users_to_punish(message):
await self.unpunish(user, message, message)
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
async def check_old_punishments(self):
punishments = Punishment.find({'platform': self.platform.value}, lazy=True)
for punishment in punishments:
now = datetime.datetime.now(datetime.timezone.utc)
if not punishment.until or now < punishment.until:
continue
await self._remove_penalty(punishment, self._unpunish, delete=False)
if punishment.is_active:
punishment.is_active = False
punishment.last_update = now
punishment.save()
if punishment.last_update + constants.PUNISHMENTS_RESET_TIME <= now:
if punishment.level == 1:
punishment.delete()
else:
punishment.level -= 1
punishment.last_update = now
punishment.save()
async def is_punished(self, user: int | str | User, group_: int | str | Chat | Message) -> bool:
pass
async def punish(
self,
user: int | str | User,
group_: int | str | Chat | Message,
time: int | datetime.timedelta = None,
message: Message = None
):
# noinspection PyTypeChecker
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_), time)
punishment.pull_from_database(overwrite_fields=('level',), exclude_fields=('until',))
punishment.level += 1
await self._punish(punishment.user_id, punishment.group_id)
punishment.save(pull_exclude_fields=('until',))
await self._unpenalize_later(punishment, self._unpunish, message)
async def unpunish(self, user: int | str | User, group_: int | str | Chat | Message, message: Message = None):
# noinspection PyTypeChecker
punishment = Punishment(self.platform, self.get_user_id(user), self.get_group_id(group_))
await self._remove_penalty(punishment, self._unpunish, message)

279
flanabot/bots/poll_bot.py Normal file
View File

@@ -0,0 +1,279 @@
__all__ = ['PollBot']
import math
import random
import re
from abc import ABC
from typing import Iterable
import flanautils
from flanautils import OrderedSet
from multibot import MultiBot, admin, constants as multibot_constants
from flanabot import constants
from flanabot.models import ButtonsGroup, Message
# ---------------------------------------------------------------------------------------------------- #
# --------------------------------------------- POLL_BOT --------------------------------------------- #
# ---------------------------------------------------------------------------------------------------- #
class PollBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_choose, constants.KEYWORDS['choose'], priority=2)
self.register(self._on_choose, constants.KEYWORDS['random'], priority=2)
self.register(self._on_choose, (constants.KEYWORDS['choose'], constants.KEYWORDS['random']), priority=2)
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_all_votes, (multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['all'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['vote']))
self.register(self._on_delete_votes, (multibot_constants.KEYWORDS['delete'], constants.KEYWORDS['vote']))
self.register(self._on_dice, constants.KEYWORDS['dice'])
self.register(self._on_poll, constants.KEYWORDS['poll'], priority=2)
self.register(self._on_poll_multi, (constants.KEYWORDS['poll'], constants.KEYWORDS['multiple_answer']), priority=2)
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['deactivate'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['deactivate'], constants.KEYWORDS['poll']))
self.register(self._on_stop_poll, multibot_constants.KEYWORDS['stop'])
self.register(self._on_stop_poll, (multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['poll']))
self.register(self._on_voting_ban, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register(self._on_voting_unban, (multibot_constants.KEYWORDS['activate'], multibot_constants.KEYWORDS['permission'], constants.KEYWORDS['vote']))
self.register_button(self._on_poll_button_press, ButtonsGroup.POLL)
@staticmethod
def _get_options(text: str, discarded_words: Iterable = ()) -> list[str]:
options = (option for option in text.split() if not flanautils.cartesian_product_string_matching(option.lower(), discarded_words, multibot_constants.PARSER_MIN_SCORE_DEFAULT))
text = ' '.join(options)
conjunctions = [f' {conjunction} ' for conjunction in flanautils.CommonWords.get('conjunctions')]
if any(char in text for char in (',', ';', *conjunctions)):
conjunction_parts = [f'(?:[,;]*{conjunction}[,;]*)+' for conjunction in conjunctions]
options = re.split(f"{'|'.join(conjunction_parts)}|[,;]+", text)
return list(OrderedSet(stripped_option for option in options if (stripped_option := option.strip())))
else:
return list(OrderedSet(text.split()))
@staticmethod
def _get_poll_message(message: Message) -> Message | None:
if (poll_message := message.replied_message) and poll_message.buttons_info and poll_message.buttons_info.key == ButtonsGroup.POLL:
return poll_message
async def _update_poll_buttons(self, message: Message):
poll_data = message.data['poll']
if poll_data['is_multiple_answer']:
total_votes = len({option_vote[0] for option_votes in poll_data['votes'].values() if option_votes for option_vote in option_votes})
else:
total_votes = sum(len(option_votes) for option_votes in poll_data['votes'].values())
if total_votes:
buttons = []
for option, option_votes in poll_data['votes'].items():
ratio = f'{len(option_votes)}/{total_votes}'
names = f"({', '.join(option_vote[1] for option_vote in option_votes)})" if option_votes else ''
buttons.append(f'{option}{ratio} {names}')
else:
buttons = list(poll_data['votes'].keys())
await self.edit(self.distribute_buttons(buttons, vertically=True), message)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_choose(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
discarded_words = {
*constants.KEYWORDS['choose'],
*constants.KEYWORDS['random'],
self.name.lower(), f'<@{self.id}>',
'entre', 'between'
}
if options := self._get_options(message.text, discarded_words):
for i in range(1, len(options) - 1):
try:
n1 = flanautils.cast_number(options[i - 1])
except ValueError:
try:
n1 = flanautils.text_to_number(options[i - 1], ignore_no_numbers=False)
except KeyError:
continue
try:
n2 = flanautils.cast_number(options[i + 1])
except ValueError:
try:
n2 = flanautils.text_to_number(options[i + 1], ignore_no_numbers=False)
except KeyError:
continue
if options[i] in ('al', 'to'):
await self.send(random.randint(math.ceil(n1), math.floor(n2)), message)
return
await self.send(random.choice(options), message)
else:
await self.send(random.choice(('¿Que elija el qué?', '¿Y las opciones?', '?', '🤔')), message)
async def _on_delete_all_votes(self, message: Message):
await self._on_delete_votes(message, all_=True)
@admin(send_negative=True)
async def _on_delete_votes(self, message: Message, all_=False):
if not (poll_message := self._get_poll_message(message)):
return
poll_data = poll_message.data['poll']
if all_:
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name].clear()
else:
for user in await self._find_users_to_punish(message):
for option_name, option_votes in poll_data['votes'].items():
poll_data['votes'][option_name] = [option_vote for option_vote in option_votes if option_vote[0] != user.id]
await self.delete_message(message)
await self._update_poll_buttons(poll_message)
async def _on_dice(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
if top_number := flanautils.text_to_number(message.text):
await self.send(random.randint(1, math.floor(top_number)), message)
else:
await self.send(random.choice(('¿De cuántas caras?', '¿Y el número?', '?', '🤔')), message)
async def _on_poll(self, message: Message, is_multiple_answer=False):
if message.chat.is_group and not self.is_bot_mentioned(message):
return
discarded_words = {*constants.KEYWORDS['poll'], *constants.KEYWORDS['multiple_answer'], self.name.lower(), f'<@{self.id}>'}
if final_options := [f'{option[0].upper()}{option[1:]}' for option in self._get_options(message.text, discarded_words)]:
await self.send(
f"Encuesta {'multirespuesta ' if is_multiple_answer else ''}en curso...",
self.distribute_buttons(final_options, vertically=True),
message,
buttons_key=ButtonsGroup.POLL,
data={
'poll': {
'is_active': True,
'is_multiple_answer': is_multiple_answer,
'votes': {option: [] for option in final_options},
'banned_users_tries': {}
}
}
)
else:
await self.send(random.choice(('¿Y las opciones?', '?', '🤔')), message)
await self.delete_message(message)
async def _on_poll_button_press(self, message: Message):
await self.accept_button_event(message)
poll_data = message.data['poll']
if not poll_data['is_active']:
return
presser_id = message.buttons_info.presser_user.id
presser_name = message.buttons_info.presser_user.name.split('#')[0]
if (presser_id_str := str(presser_id)) in poll_data['banned_users_tries']:
poll_data['banned_users_tries'][presser_id_str] += 1
if poll_data['banned_users_tries'][presser_id_str] == 3:
await self.send(random.choice((
f'Deja de dar por culo {presser_name} que no puedes votar aqui',
f'No es pesao {presser_name}, que no tienes permitido votar aqui',
f'Deja de pulsar botones que no puedes votar aqui {presser_name}',
f'{presser_name} deja de intentar votar aqui que no puedes',
f'Te han prohibido votar aquí {presser_name}.',
f'No puedes votar aquí, {presser_name}.'
)), reply_to=message)
return
option_name = results[0] if (results := re.findall('(.*?) ➜.+', message.buttons_info.pressed_text)) else message.buttons_info.pressed_text
selected_option_votes = poll_data['votes'][option_name]
if [presser_id, presser_name] in selected_option_votes:
selected_option_votes.remove([presser_id, presser_name])
else:
if not poll_data['is_multiple_answer']:
for option_votes in poll_data['votes'].values():
try:
option_votes.remove([presser_id, presser_name])
except ValueError:
pass
else:
break
selected_option_votes.append([presser_id, presser_name])
await self._update_poll_buttons(message)
async def _on_poll_multi(self, message: Message):
await self._on_poll(message, is_multiple_answer=True)
async def _on_stop_poll(self, message: Message):
if not (poll_message := self._get_poll_message(message)):
return
winners = []
max_votes = 1
for option, votes in poll_message.data['poll']['votes'].items():
if len(votes) > max_votes:
winners = [option]
max_votes = len(votes)
elif len(votes) == max_votes:
winners.append(option)
match winners:
case [_, _, *_]:
winners = [f'<b>{winner}</b>' for winner in winners]
text = f"Encuesta finalizada. Los ganadores son: {flanautils.join_last_separator(winners, ', ', ' y ')}."
case [winner]:
text = f'Encuesta finalizada. Ganador: <b>{winner}</b>.'
case _:
text = 'Encuesta finalizada.'
poll_message.data['poll']['is_active'] = False
await self.edit(text, poll_message)
@admin(send_negative=True)
async def _on_voting_ban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
return
await self.delete_message(message)
for user in await self._find_users_to_punish(message):
if str(user.id) not in poll_message.data['poll']['banned_users_tries']:
poll_message.data['poll']['banned_users_tries'][str(user.id)] = 0
@admin(send_negative=True)
async def _on_voting_unban(self, message: Message):
if message.chat.is_group and not self.is_bot_mentioned(message) or not (poll_message := self._get_poll_message(message)):
return
await self.delete_message(message)
for user in await self._find_users_to_punish(message):
try:
del poll_message.data['poll']['banned_users_tries'][str(user.id)]
except KeyError:
pass
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #

View File

@@ -0,0 +1,328 @@
__all__ = ['ScraperBot']
import asyncio
import random
from abc import ABC
from collections import defaultdict
from typing import Iterable
import flanautils
from flanaapis import RedditMediaNotFoundError, instagram, reddit, tiktok, twitter, yt_dlp_wrapper
from flanautils import Media, MediaType, OrderedSet, return_if_first_empty
from multibot import MultiBot, RegisteredCallback, SendError, constants as multibot_constants, reply
from flanabot import constants
from flanabot.models import Action, BotAction, Message
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- SCRAPER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
class ScraperBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_force_scraping, constants.KEYWORDS['force'])
self.register(self._on_force_scraping, (constants.KEYWORDS['force'], constants.KEYWORDS['scraping']))
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio']))
self.register(self._on_force_scraping_audio, (constants.KEYWORDS['force'], multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['message']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['negate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_no_delete_original, (multibot_constants.KEYWORDS['deactivate'], multibot_constants.KEYWORDS['delete'], multibot_constants.KEYWORDS['message']))
self.register(self._on_scraping, constants.KEYWORDS['scraping'])
self.register(self._on_scraping_audio, multibot_constants.KEYWORDS['audio'])
self.register(self._on_scraping_audio, (multibot_constants.KEYWORDS['audio'], constants.KEYWORDS['scraping']))
self.register(self._on_song_info, constants.KEYWORDS['song_info'])
@staticmethod
async def _find_ids(text: str) -> tuple[OrderedSet[str], ...]:
return (
twitter.find_ids(text),
instagram.find_ids(text),
reddit.find_ids(text),
await tiktok.find_users_and_ids(text),
tiktok.find_download_urls(text)
)
@staticmethod
def _medias_sended_info(medias: Iterable[Media]) -> str:
medias_count = defaultdict(lambda: defaultdict(int))
for media in medias:
if not media.source or isinstance(media.source, str):
medias_count[media.source][media.type_] += 1
else:
medias_count[media.source.name][media.type_] += 1
medias_sended_info = []
for source, media_type_count in medias_count.items():
source_medias_sended_info = []
for media_type, count in media_type_count.items():
if count:
if count == 1:
type_text = {MediaType.IMAGE: 'imagen',
MediaType.AUDIO: 'audio',
MediaType.GIF: 'gif',
MediaType.VIDEO: 'vídeo',
None: 'cosa que no sé que tipo de archivo es',
MediaType.ERROR: 'error'}[media_type]
else:
type_text = {MediaType.IMAGE: 'imágenes',
MediaType.AUDIO: 'audios',
MediaType.GIF: 'gifs',
MediaType.VIDEO: 'vídeos',
None: 'cosas que no sé que tipos de archivos son',
MediaType.ERROR: 'errores'}[media_type]
source_medias_sended_info.append(f'{count} {type_text}')
if source_medias_sended_info:
medias_sended_info.append(f"{flanautils.join_last_separator(source_medias_sended_info, ', ', ' y ')} de {source if source else 'algún sitio'}")
medias_sended_info_joined = flanautils.join_last_separator(medias_sended_info, ',\n', ' y\n')
new_line = ' ' if len(medias_sended_info) == 1 else '\n'
return f'{new_line}{medias_sended_info_joined}:'
async def _scrape_and_send(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
kwargs = {}
if self._parse_callbacks(
message.text,
[
RegisteredCallback(..., [['sin'], ['timeout', 'limite']]),
RegisteredCallback(..., 'completo entero full todo')
]
):
kwargs['timeout_for_media'] = None
if not (medias := await self._search_medias(message, force, audio_only, **kwargs)):
return OrderedSet()
sended_media_messages, _ = await self.send_medias(medias, message)
sended_media_messages = OrderedSet(sended_media_messages)
await self.send_inline_results(message)
return sended_media_messages
async def _scrape_send_and_delete(
self,
message: Message,
force=False,
audio_only=False,
sended_media_messages: OrderedSet[Media] = None
) -> OrderedSet[Media]:
if sended_media_messages is None:
sended_media_messages = OrderedSet()
sended_media_messages += await self._scrape_and_send(message, force, audio_only)
if (
sended_media_messages
and
message.chat.is_group
and
message.chat.config['scraping_delete_original']
):
# noinspection PyTypeChecker
BotAction(Action.MESSAGE_DELETED, message, affected_objects=[message, *sended_media_messages]).save()
await self.delete_message(message)
return sended_media_messages
async def _search_medias(
self,
message: Message,
force=False,
audio_only=False,
timeout_for_media: int | float = None
) -> OrderedSet[Media]:
medias = OrderedSet()
exceptions: list[Exception] = []
ids = []
media_urls = []
for text_part in message.text.split():
for i, platform_ids in enumerate(await self._find_ids(text_part)):
try:
ids[i] |= platform_ids
except IndexError:
ids.append(platform_ids)
if not any(ids) and flanautils.find_urls(text_part):
if force:
media_urls.append(text_part)
else:
if not any(domain.lower() in text_part for domain in multibot_constants.GIF_DOMAINS):
media_urls.append(text_part)
if not any(ids) and not media_urls:
return medias
bot_state_message = await self.send(random.choice(constants.SCRAPING_PHRASES), message)
tweet_ids, instagram_ids, reddit_ids, tiktok_users_and_ids, tiktok_download_urls = ids
try:
reddit_medias = await reddit.get_medias(reddit_ids, 'h264', 'mp4', force, audio_only, timeout_for_media)
except RedditMediaNotFoundError as e:
exceptions.append(e)
reddit_medias = ()
reddit_urls = []
for reddit_media in reddit_medias:
if reddit_media.source:
medias.add(reddit_media)
else:
reddit_urls.append(reddit_media.url)
if force:
media_urls.extend(reddit_urls)
else:
for reddit_url in reddit_urls:
for domain in multibot_constants.GIF_DOMAINS:
if domain.lower() in reddit_url:
medias.add(Media(reddit_url, MediaType.GIF, source=domain))
break
else:
media_urls.append(reddit_url)
gather_result = asyncio.gather(
twitter.get_medias(tweet_ids, audio_only),
instagram.get_medias(instagram_ids, audio_only),
tiktok.get_medias(tiktok_users_and_ids, tiktok_download_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
yt_dlp_wrapper.get_medias(media_urls, 'h264', 'mp4', force, audio_only, timeout_for_media),
return_exceptions=True
)
await gather_result
await self.delete_message(bot_state_message)
gather_medias, gather_exceptions = flanautils.filter_exceptions(gather_result.result())
await self._manage_exceptions(exceptions + gather_exceptions, message, print_traceback=True)
return medias | gather_medias
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_force_scraping(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, force=True)
async def _on_force_scraping_audio(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, force=True, audio_only=True)
async def _on_no_delete_original(self, message: Message):
if not await self._scrape_and_send(message):
await self._on_recover_message(message)
async def _on_recover_message(self, message: Message):
pass
async def _on_scraping(self, message: Message, force=False, audio_only=False) -> OrderedSet[Media]:
sended_media_messages = OrderedSet()
if not message.chat.config['auto_scraping'] and not self.is_bot_mentioned(message):
return sended_media_messages
if message.replied_message:
sended_media_messages += await self._scrape_and_send(message.replied_message, force, audio_only)
return await self._scrape_send_and_delete(message, force, audio_only, sended_media_messages)
async def _on_scraping_audio(self, message: Message) -> OrderedSet[Media]:
return await self._on_scraping(message, audio_only=True)
@reply
async def _on_song_info(self, message: Message):
song_infos = message.replied_message.song_infos if message.replied_message else []
if song_infos:
for song_info in song_infos:
await self.send_song_info(song_info, message)
elif message.chat.is_private or self.is_bot_mentioned(message):
raise SendError('No hay información musical en ese mensaje.')
# -------------------------------------------------------- #
# -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- #
@return_if_first_empty(([], 0), exclude_self_types='ScraperBot', globals_=globals())
async def send_medias(self, medias: OrderedSet[Media], message: Message, send_song_info=False) -> tuple[list[Message], int]:
sended_media_messages = []
fails = 0
bot_state_message: Message | None = None
sended_info_message: Message | None = None
user_text_bot_message: Message | None = None
if not message.is_inline:
bot_state_message: Message = await self.send('Enviando...', message)
if message.chat.is_group:
sended_info_message = await self.send(f"{message.author.name.split('#')[0]} compartió{self._medias_sended_info(medias)}", message, reply_to=message.replied_message)
user_text = ' '.join(
[word for word in message.text.split()
if (
not any(await self._find_ids(word))
and
not flanautils.find_urls(word)
and
not flanautils.cartesian_product_string_matching(
word,
(
*multibot_constants.KEYWORDS['audio'],
*multibot_constants.KEYWORDS['delete'],
*constants.KEYWORDS['force'],
*multibot_constants.KEYWORDS['negate'],
*constants.KEYWORDS['scraping']
),
multibot_constants.PARSER_MIN_SCORE_DEFAULT
)
and
flanautils.remove_symbols(word).lower() not in (str(self.id), self.name.lower())
)]
)
if user_text:
user_text_bot_message = await self.send(user_text, message, reply_to=message.replied_message)
for media in medias:
if not media.content:
fails += 1
continue
if media.song_info:
message.song_infos.add(media.song_info)
message.save()
if bot_message := await self.send(media, message, reply_to=message.replied_message):
sended_media_messages.append(bot_message)
if media.song_info and bot_message:
bot_message.song_infos.add(media.song_info)
bot_message.save()
else:
fails += 1
if send_song_info and media.song_info:
await self.send_song_info(media.song_info, message)
if fails == len(medias):
if sended_info_message:
await self.delete_message(sended_info_message)
if user_text_bot_message:
await self.delete_message(user_text_bot_message)
if bot_state_message:
await self.delete_message(bot_state_message)
return sended_media_messages, fails
@return_if_first_empty(exclude_self_types='ScraperBot', globals_=globals())
async def send_song_info(self, song_info: Media, message: Message):
attributes = (
f'Título: {song_info.title}\n' if song_info.title else '',
f'Autor: {song_info.author}\n' if song_info.author else '',
f'Álbum: {song_info.album}\n' if song_info.album else '',
f'Previa:'
)
await self.send(''.join(attributes), message)
if song_info:
await self.send(song_info, message)

View File

@@ -0,0 +1,193 @@
__all__ = ['WeatherBot']
import datetime
import random
from abc import ABC
import flanaapis.geolocation.functions
import flanaapis.weather.functions
import flanautils
import plotly.graph_objects
from flanaapis import Place, PlaceNotFoundError, WeatherEmoji
from flanautils import Media, MediaType, NotFoundError, OrderedSet, Source, TraceMetadata
from multibot import MultiBot, constants as multibot_constants
from flanabot import constants
from flanabot.models import Action, BotAction, ButtonsGroup, Message, WeatherChart
# ------------------------------------------------------------------------------------------------------- #
# --------------------------------------------- WEATHER_BOT --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------- #
class WeatherBot(MultiBot, ABC):
# -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ #
# -------------------------------------------------------- #
def _add_handlers(self):
super()._add_handlers()
self.register(self._on_weather, constants.KEYWORDS['weather'])
self.register(self._on_weather, (multibot_constants.KEYWORDS['show'], constants.KEYWORDS['weather']))
self.register_button(self._on_weather_button_press, ButtonsGroup.WEATHER)
# ---------------------------------------------- #
# HANDLERS #
# ---------------------------------------------- #
async def _on_weather(self, message: Message):
bot_state_message: Message | None = None
if message.is_inline:
show_progress_state = False
elif message.chat.is_group and not self.is_bot_mentioned(message):
if message.chat.config['auto_weather_chart']:
if BotAction.find_one({'action': Action.AUTO_WEATHER_CHART.value, 'chat': message.chat.object_id, 'date': {'$gt': datetime.datetime.now(datetime.timezone.utc) - constants.AUTO_WEATHER_EVERY}}):
return
show_progress_state = False
else:
return
else:
show_progress_state = True
original_text_words = flanautils.remove_accents(message.text.lower())
original_text_words = flanautils.remove_symbols(original_text_words, ignore=('-', '.'), replace_with=' ').split()
original_text_words = await self.filter_mention_ids(original_text_words, message, delete_names=True)
# noinspection PyTypeChecker
place_words = (
OrderedSet(original_text_words)
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['show'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, constants.KEYWORDS['weather'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['date'], min_score=0.85).keys()
- flanautils.cartesian_product_string_matching(original_text_words, multibot_constants.KEYWORDS['thanks'], min_score=0.85).keys()
- flanautils.CommonWords.get()
)
if not place_words:
if not message.is_inline:
await self.send_error(random.choice(('¿Tiempo dónde?', 'Indica el sitio.', 'Y el sitio?', 'y el sitio? me lo invento?')), message)
return
if 'calle' in original_text_words:
place_words.insert(0, 'calle')
place_query = ' '.join(place_words)
if len(place_query) >= constants.MAX_PLACE_QUERY_LENGTH:
if show_progress_state:
await self.send_error(Media(str(flanautils.resolve_path('resources/mucho_texto.png')), MediaType.IMAGE, 'jpg', Source.LOCAL), message, send_as_file=False)
return
if show_progress_state:
bot_state_message = await self.send(f'Buscando "{place_query}" en el mapa 🧐...', message)
result: str | Place | None = None
async for result in flanaapis.geolocation.functions.find_place_showing_progress(place_query):
if isinstance(result, str) and bot_state_message:
await self.edit(result, bot_state_message)
place: Place = result
if not place:
if bot_state_message:
await self.delete_message(bot_state_message)
raise PlaceNotFoundError(place_query)
if bot_state_message:
bot_state_message = await self.edit(f'Obteniendo datos del tiempo para "{place_query}"...', bot_state_message)
current_weather, day_weathers = await flanaapis.weather.functions.get_day_weathers_by_place(place)
if bot_state_message:
bot_state_message = await self.edit('Creando gráficas del tiempo...', bot_state_message)
weather_chart = WeatherChart(
_font={'size': 30},
_title={
'text': place.name[:40].strip(' ,-'),
'xref': 'paper',
'yref': 'paper',
'xanchor': 'left',
'yanchor': 'top',
'x': 0.025,
'y': 0.975,
'font': {
'size': 50,
'family': 'open sans'
}
},
_legend={'x': 0.99, 'y': 0.99, 'xanchor': 'right', 'yanchor': 'top', 'bgcolor': 'rgba(0,0,0,0)'},
_margin={'l': 20, 'r': 20, 't': 20, 'b': 20},
trace_metadatas={
'temperature': TraceMetadata(name='temperature', group='temperature', legend='Temperatura', show=False, color='#ff8400', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
'temperature_feel': TraceMetadata(name='temperature_feel', group='temperature', legend='Sensación de temperatura', show=True, color='red', default_min=0, default_max=40, y_tick_suffix=' ºC', y_axis_width=130),
'clouds': TraceMetadata(name='clouds', legend='Nubes', show=False, color='#86abe3', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
'visibility': TraceMetadata(name='visibility', legend='Visibilidad', show=False, color='#c99a34', default_min=0, default_max='{max_y_data} * 2', y_tick_suffix=' km', y_delta_tick=2, hide_y_ticks_if='{tick} > {max_y_data}'),
'uvi': TraceMetadata(name='uvi', legend='UVI', show=False, color='#ffd000', default_min=-12, default_max=12, hide_y_ticks_if='{tick} < 0', y_delta_tick=1, y_axis_width=75),
'humidity': TraceMetadata(name='humidity', legend='Humedad', show=False, color='#2baab5', default_min=0, default_max=100, y_tick_suffix=' %'),
'precipitation_probability': TraceMetadata(name='precipitation_probability', legend='Probabilidad de precipitaciones', show=True, color='#0033ff', default_min=-100, default_max=100, y_tick_suffix=' %', hide_y_ticks_if='{tick} < 0'),
'rain_volume': TraceMetadata(plotly.graph_objects.Histogram, name='rain_volume', group='precipitation', legend='Volumen de lluvia', show=True, color='#34a4eb', opacity=0.3, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
'snow_volume': TraceMetadata(plotly.graph_objects.Histogram, name='snow_volume', group='precipitation', legend='Volumen de nieve', show=True, color='#34a4eb', opacity=0.8, pattern={'shape': '.', 'fgcolor': '#ffffff', 'bgcolor': '#b0d6f3', 'solidity': 0.5, 'size': 14}, default_min=-10, default_max=10, y_tick_suffix=' mm', y_delta_tick=1, hide_y_ticks_if='{tick} < 0', y_axis_width=130),
'pressure': TraceMetadata(name='pressure', legend='Presión', show=False, color='#31a339', default_min=1013.25 - 90, default_max=1013.25 + 90, y_tick_suffix=' hPa', y_axis_width=225),
'wind_speed': TraceMetadata(name='wind_speed', legend='Velocidad del viento', show=False, color='#d8abff', default_min=-120, default_max=120, y_tick_suffix=' km/h', hide_y_ticks_if='{tick} < 0', y_axis_width=165)
},
x_data=[instant_weather.date_time for day_weather in day_weathers for instant_weather in day_weather.instant_weathers],
all_y_data=[],
current_weather=current_weather,
day_weathers=day_weathers,
timezone=(timezone := day_weathers[0].timezone),
place=place,
view_position=datetime.datetime.now(timezone)
)
weather_chart.apply_zoom()
weather_chart.draw()
if not (image_bytes := weather_chart.to_image()):
if bot_state_message:
await self.delete_message(bot_state_message)
raise NotFoundError('No hay suficientes datos del tiempo.')
if bot_state_message:
bot_state_message = await self.edit('Enviando...', bot_state_message)
bot_message: Message = await self.send(
Media(image_bytes, MediaType.IMAGE, 'jpg'),
[
[WeatherEmoji.ZOOM_IN.value, WeatherEmoji.ZOOM_OUT.value, WeatherEmoji.LEFT.value, WeatherEmoji.RIGHT.value],
[WeatherEmoji.TEMPERATURE.value, WeatherEmoji.TEMPERATURE_FEEL.value, WeatherEmoji.CLOUDS.value, WeatherEmoji.VISIBILITY.value, WeatherEmoji.UVI.value],
[WeatherEmoji.HUMIDITY.value, WeatherEmoji.PRECIPITATION_PROBABILITY.value, WeatherEmoji.PRECIPITATION_VOLUME.value, WeatherEmoji.PRESSURE.value, WeatherEmoji.WIND_SPEED.value]
],
message,
buttons_key=ButtonsGroup.WEATHER,
data={'weather_chart': weather_chart},
send_as_file=False
)
await self.send_inline_results(message)
if bot_state_message:
await self.delete_message(bot_state_message)
if bot_message and not self.is_bot_mentioned(message):
# noinspection PyTypeChecker
BotAction(Action.AUTO_WEATHER_CHART, message, affected_objects=[bot_message]).save()
async def _on_weather_button_press(self, message: Message):
await self.accept_button_event(message)
weather_chart = message.data['weather_chart']
match message.buttons_info.pressed_text:
case WeatherEmoji.ZOOM_IN.value:
weather_chart.zoom_in()
case WeatherEmoji.ZOOM_OUT.value:
weather_chart.zoom_out()
case WeatherEmoji.LEFT.value:
weather_chart.move_left()
case WeatherEmoji.RIGHT.value:
weather_chart.move_right()
case WeatherEmoji.PRECIPITATION_VOLUME.value:
weather_chart.trace_metadatas['rain_volume'].show = not weather_chart.trace_metadatas['rain_volume'].show
weather_chart.trace_metadatas['snow_volume'].show = not weather_chart.trace_metadatas['snow_volume'].show
case emoji if emoji in WeatherEmoji.values:
trace_metadata_name = WeatherEmoji(emoji).name.lower()
weather_chart.trace_metadatas[trace_metadata_name].show = not weather_chart.trace_metadatas[trace_metadata_name].show
case _:
return
weather_chart.apply_zoom()
weather_chart.draw()
image_bytes = weather_chart.to_image()
await self.edit(Media(image_bytes, MediaType.IMAGE, 'jpg'), message)

View File

@@ -0,0 +1,261 @@
import io
import math
import random
from typing import Sequence
import cairo
from flanabot import constants
from flanabot.models.player import Player
SIZE_MULTIPLIER = 1
LEFT_MARGIN = 20
TOP_MARGIN = 20
CELL_LENGTH = 100 * SIZE_MULTIPLIER
NUMBERS_HEIGHT = 30 * SIZE_MULTIPLIER
TEXT_HEIGHT = 70 * SIZE_MULTIPLIER
NUMBERS_X_INITIAL_POSITION = LEFT_MARGIN + CELL_LENGTH / 2 - 8 * SIZE_MULTIPLIER
NUMBERS_Y_POSITION = TOP_MARGIN + CELL_LENGTH * constants.CONNECT_4_N_ROWS + 40 * SIZE_MULTIPLIER
TEXT_POSITION = (LEFT_MARGIN, TOP_MARGIN + CELL_LENGTH * constants.CONNECT_4_N_ROWS + NUMBERS_HEIGHT + 70 * SIZE_MULTIPLIER)
SURFACE_WIDTH = LEFT_MARGIN * 2 + CELL_LENGTH * constants.CONNECT_4_N_COLUMNS
SURFACE_HEIGHT = TOP_MARGIN * 2 + CELL_LENGTH * constants.CONNECT_4_N_ROWS + NUMBERS_HEIGHT + TEXT_HEIGHT
CIRCLE_RADIUS = 36 * SIZE_MULTIPLIER
CROSS_LINE_WIDTH = 24 * SIZE_MULTIPLIER
FONT_SIZE = 32 * SIZE_MULTIPLIER
TABLE_LINE_WIDTH = 4 * SIZE_MULTIPLIER
BLUE = (66 / 255, 135 / 255, 245 / 255)
BACKGROUND_COLOR = (54 / 255, 57 / 255, 63 / 255)
GRAY = (200 / 255, 200 / 255, 200 / 255)
HIGHLIGHT_COLOR = (104 / 255, 107 / 255, 113 / 255)
RED = (255 / 255, 70 / 255, 70 / 255)
PLAYER_1_COLOR = BLUE
PLAYER_2_COLOR = RED
def center_point(board_position: Sequence[int]) -> tuple[float, float]:
return LEFT_MARGIN + (board_position[1] + 0.5) * CELL_LENGTH, TOP_MARGIN + (board_position[0] + 0.5) * CELL_LENGTH
def draw_circle(board_position: Sequence[int], radius: float, color: tuple[float, float, float], context: cairo.Context):
context.set_source_rgba(*color)
context.arc(*center_point(board_position), radius, 0, 2 * math.pi)
context.fill()
def draw_line(
board_position_start: Sequence[int],
board_position_end: Sequence[int],
line_width: float,
color: tuple[float, float, float],
context: cairo.Context
):
context.set_line_width(line_width)
context.set_line_cap(cairo.LINE_CAP_ROUND)
context.set_source_rgba(*color)
context.move_to(*center_point(board_position_start))
context.line_to(*center_point(board_position_end))
context.stroke()
def draw_table(line_width: float, color: tuple[float, float, float], context: cairo.Context):
context.set_line_width(line_width)
context.set_line_cap(cairo.LINE_CAP_ROUND)
context.set_source_rgba(*color)
x = LEFT_MARGIN
y = TOP_MARGIN
for _ in range(constants.CONNECT_4_N_ROWS + 1):
context.move_to(x, y)
context.line_to(x + CELL_LENGTH * constants.CONNECT_4_N_COLUMNS, y)
context.stroke()
y += CELL_LENGTH
x = LEFT_MARGIN
y = TOP_MARGIN
for _ in range(constants.CONNECT_4_N_COLUMNS + 1):
context.move_to(x, y)
context.line_to(x, y + CELL_LENGTH * constants.CONNECT_4_N_ROWS)
context.stroke()
x += CELL_LENGTH
def draw_text(
text: str,
point: Sequence[float],
color: tuple[float, float, float],
font_size: float,
italic: bool,
context: cairo.Context
):
context.move_to(*point)
context.set_source_rgba(*color)
context.select_font_face("Sans", cairo.FONT_SLANT_ITALIC if italic else cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD)
context.set_font_size(font_size)
context.show_text(text)
def draw_winner_lines(
win_position: Sequence[int],
board: list[list[int | None]],
color: tuple[float, float, float],
context: cairo.Context
):
i, j = win_position
player_number = board[i][j]
# horizontal
j_a = j - 1
while j_a >= 0 and board[i][j_a] == player_number:
j_a -= 1
j_b = j + 1
while j_b < constants.CONNECT_4_N_COLUMNS and board[i][j_b] == player_number:
j_b += 1
if abs(j_a - j) + abs(j_b - j) - 1 >= 4:
draw_line((i, j_a + 1), (i, j_b - 1), CROSS_LINE_WIDTH, color, context)
# vertical
i_a = i - 1
while i_a >= 0 and board[i_a][j] == player_number:
i_a -= 1
i_b = i + 1
while i_b < constants.CONNECT_4_N_ROWS and board[i_b][j] == player_number:
i_b += 1
if abs(i_a - i) + abs(i_b - i) - 1 >= 4:
draw_line((i_a + 1, j), (i_b - 1, j), CROSS_LINE_WIDTH, color, context)
# diagonal 1
i_a = i - 1
j_a = j - 1
while i_a >= 0 and j_a >= 0 and board[i_a][j_a] == player_number:
i_a -= 1
j_a -= 1
i_b = i + 1
j_b = j + 1
while i_b < constants.CONNECT_4_N_ROWS and j_b < constants.CONNECT_4_N_COLUMNS and board[i_b][j_b] == player_number:
i_b += 1
j_b += 1
if abs(i_a - i) + abs(i_b - i) - 1 >= 4:
draw_line((i_a + 1, j_a + 1), (i_b - 1, j_b - 1), CROSS_LINE_WIDTH, color, context)
# diagonal 2
i_a = i - 1
j_a = j + 1
while i_a >= 0 and j_a < constants.CONNECT_4_N_COLUMNS and board[i_a][j_a] == player_number:
i_a -= 1
j_a += 1
i_b = i + 1
j_b = j - 1
while i_b < constants.CONNECT_4_N_ROWS and j_b >= 0 and board[i_b][j_b] == player_number:
i_b += 1
j_b -= 1
if abs(i_a - i) + abs(i_b - i) - 1 >= 4:
draw_line((i_a + 1, j_a - 1), (i_b - 1, j_b + 1), CROSS_LINE_WIDTH, color, context)
def highlight_cell(
board_position: Sequence[int],
color: tuple[float, float, float],
context: cairo.Context
):
x, y = top_left_point(board_position)
context.move_to(x, y)
x += CELL_LENGTH
context.line_to(x, y)
y += CELL_LENGTH
context.line_to(x, y)
x -= CELL_LENGTH
context.line_to(x, y)
context.close_path()
context.set_source_rgba(*color)
context.fill()
def make_image(
board: list[list[int | None]],
next_turn_player: Player = None,
winner: Player = None,
loser: Player = None,
highlight=None,
win_position: Sequence[int] = None,
tie=False
) -> bytes:
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, SURFACE_WIDTH, SURFACE_HEIGHT)
context = cairo.Context(surface)
paint_background(BACKGROUND_COLOR, context)
if highlight:
highlight_cell(highlight, HIGHLIGHT_COLOR, context)
draw_table(TABLE_LINE_WIDTH, GRAY, context)
write_numbers(GRAY, context)
for i in range(constants.CONNECT_4_N_ROWS):
for j in range(constants.CONNECT_4_N_COLUMNS):
match board[i][j]:
case 1:
draw_circle((i, j), CIRCLE_RADIUS, PLAYER_1_COLOR, context)
case 2:
draw_circle((i, j), CIRCLE_RADIUS, PLAYER_2_COLOR, context)
if tie:
write_tie(context)
elif winner:
player_color = PLAYER_1_COLOR if winner.number == 1 else PLAYER_2_COLOR
draw_winner_lines(win_position, board, player_color, context)
write_winner(winner, loser, context)
else:
write_player_turn(next_turn_player.name, PLAYER_1_COLOR if next_turn_player.number == 1 else PLAYER_2_COLOR, context)
buffer = io.BytesIO()
surface.write_to_png(buffer)
return buffer.getvalue()
def paint_background(color: tuple[float, float, float], context: cairo.Context):
context.set_source_rgba(*color)
context.paint()
def top_left_point(board_position: Sequence[int]) -> tuple[float, float]:
return LEFT_MARGIN + board_position[1] * CELL_LENGTH, TOP_MARGIN + board_position[0] * CELL_LENGTH
def write_numbers(color: tuple[float, float, float], context: cairo.Context):
x = NUMBERS_X_INITIAL_POSITION
for j in range(constants.CONNECT_4_N_COLUMNS):
draw_text(str(j + 1), (x, NUMBERS_Y_POSITION), color, FONT_SIZE, italic=False, context=context)
x += CELL_LENGTH
def write_player_turn(name: str, color: tuple[float, float, float], context: cairo.Context):
text = 'Turno de '
point = TEXT_POSITION
draw_text(text, point, GRAY, FONT_SIZE, True, context)
point = (point[0] + context.text_extents(text).width + 9 * SIZE_MULTIPLIER, point[1])
draw_text(name, point, color, FONT_SIZE, True, context)
point = (point[0] + context.text_extents(name).width, point[1])
draw_text('.', point, GRAY, FONT_SIZE, True, context)
def write_tie(context: cairo.Context):
draw_text(f"Empate{random.choice(('.', ' :c', ' :/', ' :s'))}", TEXT_POSITION, GRAY, FONT_SIZE, True, context)
def write_winner(winner: Player, loser: Player, context: cairo.Context):
winner_color, loser_color = (PLAYER_1_COLOR, PLAYER_2_COLOR) if winner.number == 1 else (PLAYER_2_COLOR, PLAYER_1_COLOR)
point = TEXT_POSITION
draw_text(winner.name, point, winner_color, FONT_SIZE, True, context)
text = ' le ha ganado a '
point = (point[0] + context.text_extents(winner.name).width + 3 * SIZE_MULTIPLIER, point[1])
draw_text(text, point, GRAY, FONT_SIZE, True, context)
point = (point[0] + context.text_extents(text).width + 10 * SIZE_MULTIPLIER, point[1])
draw_text(loser.name, point, loser_color, FONT_SIZE, True, context)
point = (point[0] + context.text_extents(loser.name).width + 3 * SIZE_MULTIPLIER, point[1])
draw_text('!!!', point, GRAY, FONT_SIZE, True, context)

View File

@@ -1,113 +1,115 @@
import datetime
from collections import defaultdict
import flanautils
from multibot import Platform
AUDIT_LOG_AGE = datetime.timedelta(hours=1)
AUDIT_LOG_LIMIT = 5
AUTO_WEATHER_EVERY = datetime.timedelta(hours=6)
CHECK_MESSAGE_EVERY_SECONDS = datetime.timedelta(days=1).total_seconds()
CHECK_MUTES_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CHECK_PUNISHMENTS_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
CONNECT_4_AI_DELAY_SECONDS = 1
CONNECT_4_CENTER_COLUMN_POINTS = 2
CONNECT_4_N_COLUMNS = 7
CONNECT_4_N_ROWS = 6
FLOOD_2s_LIMIT = 2
FLOOD_7s_LIMIT = 4
HEAT_PERIOD_SECONDS = datetime.timedelta(minutes=15).total_seconds()
INSULT_PROBABILITY = 0.00166666667
MAX_PLACE_QUERY_LENGTH = 50
PUNISHMENT_INCREMENT_EXPONENT = 6
PUNISHMENTS_RESET = datetime.timedelta(weeks=6 * flanautils.WEEKS_IN_A_MONTH)
PUNISHMENTS_RESET_TIME = datetime.timedelta(weeks=2)
RECOVERY_DELETED_MESSAGE_BEFORE = datetime.timedelta(hours=1)
TIME_THRESHOLD_TO_MANUAL_UNMUTE = datetime.timedelta(days=3)
TIME_THRESHOLD_TO_MANUAL_UNPUNISH = datetime.timedelta(days=3)
SCRAPING_MESSAGE_WAITING_TIME = 0.1
BAD_PHRASES = (
'Cállate ya anda.',
'¿Quién te ha preguntado?',
'¿Tú eres así o te dan apagones cerebrales?',
'Ante la duda mi dedo corazón te saluda.',
'Enjoy cancer brain.',
'Calla noob.',
'Hablas tanta mierda que tu culo tiene envidia de tu boca.',
'jAJjajAJjajAJjajAJajJAJajJA',
'enjoy xd',
'Reported.',
'Baneito pa ti en breve.',
'Despídete de tu cuenta.',
'Flanagan es más guapo que tú.',
'jajaj',
'xd',
'Hay un concurso de hostias y tienes todas las papeletas.',
'¿Por qué no te callas?',
'Das penilla.',
'Deberían hacerte la táctica del C4.',
'Te voy romper las pelotas.',
'Más tonto y no naces.',
'Eres más tonto que peinar bombillas.',
'Eres más tonto que pellizcar cristales.',
'Eres más malo que pegarle a un padre.'
BYE_PHRASES = ('Adiós.', 'adio', 'adioh', 'adios', 'adió', 'adiós', 'agur', 'bye', 'byyeeee', 'chao', 'hasta la vista',
'hasta luego', 'hasta nunca', ' hasta pronto', 'hasta la próxima', 'nos vemos', 'taluego')
CHANGEABLE_ROLES = defaultdict(
lambda: defaultdict(list),
{
Platform.DISCORD: defaultdict(
list,
{
360868977754505217: [881238165476741161, 991454395663401072, 1033098591725699222],
862823584670285835: [976660580939202610, 984269640752590868]
}
)
}
)
BYE_PHRASES = ('Adiós.', 'adieu', 'adio', 'adioh', 'adios', 'adió', 'adiós', 'agur', 'bye', 'byyeeee', 'chao',
'hasta la vista', 'hasta luego', 'hasta nunca', ' hasta pronto', 'hasta la próxima',
'nos vemos', 'taluego')
DISCORD_HEAT_NAMES = [
'Canal Congelado',
'Canal Fresquito',
'Canal Templaillo',
'Canal Calentito',
'Canal Caloret',
'Canal Caliente',
'Canal Olor a Vasco',
'Canal Verano Cordobés al Sol',
'Canal Al rojo vivo',
'Canal Ardiendo',
'Canal INFIERNO'
]
DISCORD_HOT_CHANNEL_IDS = {
'A': 493529846027386900,
'B': 493529881125060618,
'C': 493530483045564417,
'D': 829032476949217302,
'E': 829032505645596742
}
HELLO_PHRASES = ('alo', 'aloh', 'buenas', 'Hola.', 'hello', 'hey', 'hi', 'hola', 'holaaaa', 'holaaaaaaa', 'ola',
'ola k ase', 'pa ti mi cola', 'saludos')
INSULTS = ('._.', 'aha', 'Aléjate de mi.', 'Ante la duda mi dedo corazón te saluda.', 'Baneito pa ti en breve.',
'Calla noob.', 'Cansino.', 'Cuéntame menos.', 'Cuéntame más.', 'Cállate ya anda.', 'Cállate.',
'Das penilla.', 'De verdad. Estás para encerrarte.', 'Deberían hacerte la táctica del C4.',
'Despídete de tu cuenta.', 'Déjame tranquilo.', 'Enjoy cancer brain.', 'Eres cortito, ¿eh?',
'Eres más malo que pegarle a un padre.', 'Eres más tonto que peinar bombillas.',
'Eres más tonto que pellizcar cristales.', 'Estás mal de la azotea.', 'Estás mal de la cabeza.',
'Flanagan es más guapo que tú.', 'Hablas tanta mierda que tu culo tiene envidia de tu boca.',
'Hay un concurso de hostias y tienes todas las papeletas.', 'Loco.', 'Más tonto y no naces.',
'No eres muy avispado tú...', 'Pesado.', 'Qué bien, ¿eh?', 'Que me dejes en paz.', 'Qué pesado.',
'Quita bicho.', 'Reportaito mi arma.', 'Reported.', 'Retard.', 'Te voy romper las pelotas.',
'Tú... no estás muy bien, ¿no?', 'Ya estamos otra vez...', 'Ya estamos...', 'enjoy xd',
'jAJjajAJjajAJjajAJajJAJajJA', 'jajaj', 'o_O', 'xd', '¿Otra vez tú?', '¿Pero cuándo te vas a callar?',
'¿Por qué no te callas?', '¿Quién te ha preguntado?', '¿Qué quieres?', '¿Te callas o te callo?',
'¿Te imaginas que me interesa?', '¿Te quieres callar?', '¿Todo bien?',
'¿Tú eres así o te dan apagones cerebrales?', '🖕', '😑', '🙄', '🤔', '🤨')
KEYWORDS = {
'activate': ('activa', 'activar', 'activate', 'deja', 'dejale', 'devuelve', 'devuelvele', 'enable', 'encender',
'enciende', 'habilita', 'habilitar'),
'bye': ('adieu', 'adio', 'adiooooo', 'adios', 'agur', 'buenas', 'bye', 'cama', 'chao', 'farewell', 'goodbye',
'hasta', 'luego', 'noches', 'pronto', 'taluego', 'taluegorl', 'tenga', 'vemos', 'vista', 'voy'),
'change': ('alter', 'alternar', 'alternate', 'cambiar', 'change', 'default', 'defecto', 'edit', 'editar',
'exchange', 'modificar', 'modify', 'permutar', 'predeterminado', 'shift', 'swap', 'switch', 'turn',
'vary'),
'config': ('ajustar', 'ajuste', 'ajustes', 'automatico', 'automatic', 'config', 'configs', 'configuracion',
'configuration', 'default', 'defecto', 'setting', 'settings'),
'choose': ('choose', 'elige', 'escoge'),
'connect_4': (('conecta', 'connect', 'ralla', 'raya'), ('4', 'cuatro', 'four')),
'covid_chart': ('case', 'caso', 'contagiado', 'contagio', 'corona', 'coronavirus', 'covid', 'covid19', 'death',
'disease', 'enfermedad', 'enfermos', 'fallecido', 'incidencia', 'jacovid', 'mascarilla', 'muerte',
'muerto', 'pandemia', 'sick', 'virus'),
'currency_chart': ('argentina', 'bitcoin', 'cardano', 'cripto', 'crypto', 'criptodivisa', 'cryptodivisa',
'cryptomoneda', 'cryptocurrency', 'currency', 'dinero', 'divisa', 'ethereum', 'inversion',
'moneda', 'pasta'),
'date': ('ayer', 'de', 'domingo', 'fin', 'finde', 'friday', 'hoy', 'jueves', 'lunes', 'martes', 'mañana',
'miercoles', 'monday', 'pasado', 'sabado', 'saturday', 'semana', 'sunday', 'thursday', 'today', 'tomorrow',
'tuesday', 'viernes', 'wednesday', 'week', 'weekend', 'yesterday'),
'deactivate': ('apaga', 'apagar', 'deactivate', 'deactivate', 'desactivar', 'deshabilita', 'deshabilitar',
'disable', 'forbids', 'prohibe', 'quita', 'remove', 'return'),
'hello': ('alo', 'aloh', 'buenas', 'dias', 'hello', 'hey', 'hi', 'hola', 'holaaaaaa', 'ola', 'saludos', 'tardes'),
'help': ('ayuda', 'help'),
'mute': ('calla', 'calle', 'cierra', 'close', 'mute', 'mutea', 'mutealo', 'noise', 'ruido', 'shut', 'silence',
'silencia'),
'negate': ('no', 'ocurra', 'ocurre'),
'permission': ('permiso', 'permission'),
'punish': ('acaba', 'aprende', 'ataca', 'atalo', 'azota', 'boss', 'castiga', 'castigo', 'condena', 'controla',
'destroy', 'destroza', 'duro', 'ejecuta', 'enseña', 'escarmiento', 'execute', 'finish', 'fuck', 'fusila',
'hell', 'humos', 'infierno', 'jefe', 'jode', 'learn', 'leccion', 'lesson', 'manda', 'purgatorio',
'sancion', 'shoot', 'teach', 'termina', 'whip'),
'reset': ('recover', 'recovery', 'recupera', 'reinicia', 'reset', 'resetea', 'restart'),
'dice': ('dado', 'dice'),
'force': ('force', 'forzar', 'fuerza'),
'multiple_answer': ('multi', 'multi-answer', 'multiple', 'multirespuesta'),
'poll': ('encuesta', 'quiz'),
'punish': ('acaba', 'aprende', 'ataca', 'atalo', 'azota', 'beating', 'boss', 'castiga', 'castigo', 'condena',
'controla', 'destroy', 'destroza', 'duro', 'ejecuta', 'enseña', 'escarmiento', 'execute', 'fuck',
'fusila', 'hell', 'humos', 'infierno', 'jefe', 'jode', 'learn', 'leccion', 'lesson', 'manda', 'paliza',
'purgatorio', 'purgatory', 'sancion', 'shoot', 'teach', 'whip'),
'random': ('aleatorio', 'azar', 'random'),
'scraping': ('api', 'aqui', 'busca', 'contenido', 'content', 'descarga', 'descargar', 'download', 'envia', 'habia',
'media', 'redes', 'scrap', 'scraping', 'search', 'send', 'social', 'sociales', 'tenia', 'video',
'videos'),
'show': ('actual', 'enseña', 'estado', 'how', 'muestra', 'show', 'como'),
'self': (('contigo', 'contra', 'ti'), ('mismo', 'ti')),
'song_info': ('aqui', 'cancion', 'data', 'datos', 'info', 'informacion', 'information', 'llama', 'media', 'name',
'nombre', 'sonaba', 'sonando', 'song', 'sono', 'sound', 'suena', 'title', 'titulo',
'video'),
'sound': ('hablar', 'hable', 'micro', 'microfono', 'microphone', 'sonido', 'sound', 'talk', 'volumen'),
'thanks': ('gracia', 'gracias', 'grasia', 'grasias', 'grax', 'thank', 'thanks', 'ty'),
'unmute': ('desilencia', 'desmutea', 'desmutealo', 'unmute'),
'nombre', 'sonaba', 'sonando', 'song', 'sono', 'sound', 'suena', 'title', 'titulo', 'video'),
'unpunish': ('absolve', 'forgive', 'innocent', 'inocente', 'perdona', 'spare'),
'weather_chart': ('atmosfera', 'atmosferico', 'calle', 'calor', 'caloret', 'clima', 'climatologia', 'cloud',
'cloudless', 'cloudy', 'cold', 'congelar', 'congelado', 'denbora', 'despejado', 'diluvio', 'frio',
'frost', 'hielo', 'humedad', 'llover', 'llueva', 'llueve', 'lluvia', 'nevada', 'nieva', 'nieve',
'nube', 'nubes', 'nublado', 'meteorologia', 'rain', 'snow', 'snowfall', 'snowstorm', 'sol',
'solano', 'storm', 'sun', 'temperatura', 'tempo', 'tiempo', 'tormenta', 'ventisca', 'weather',
'wetter')
'vote': ('votacion', 'votar', 'vote', 'voting', 'voto'),
'weather': ('atmosfera', 'atmosferico', 'calle', 'calor', 'caloret', 'clima', 'climatologia', 'cloud', 'cloudless',
'cloudy', 'cold', 'congelar', 'congelado', 'denbora', 'despejado', 'diluvio', 'frio', 'frost', 'hielo',
'humedad', 'llover', 'llueva', 'llueve', 'lluvia', 'nevada', 'nieva', 'nieve', 'nube', 'nubes',
'nublado', 'meteorologia', 'rain', 'snow', 'snowfall', 'snowstorm', 'sol', 'solano', 'storm', 'sun',
'temperatura', 'tempo', 'tiempo', 'tormenta', 'ventisca', 'weather', 'wetter')
}
RECOVER_PHRASES = (
'No hay nada que recuperar.',
'Ya lo he recuperado y enviado, así que callate ya.',
'Ya lo he recuperado y enviado, así que mejor estás antento antes de dar por culo.',
'Ya lo he recuperado y enviado, no lo voy a hacer dos veces.',
'Ya lo he recuperado y enviado. A ver si leemos más y jodemos menos.',
'Ya lo he reenviado.'
)
SCRAPING_PHRASES = ('Analizando...', 'Buscando...', 'Hackeando internet... 👀', 'Rebuscando en la web...',
'Robando cosas...', 'Scrapeando...', 'Scraping...')

View File

@@ -1,6 +0,0 @@
class BadRoleError(Exception):
pass
class UserDisconnectedError(Exception):
pass

View File

@@ -1,18 +1,19 @@
import asyncio
import os
import flanautils
os.environ |= flanautils.find_environment_variables('../.env')
import asyncio
from flanabot.bots.flana_disc_bot import FlanaDiscBot
from flanabot.bots.flana_tele_bot import FlanaTeleBot
async def main():
os.environ |= flanautils.find_environment_variables('../.env')
flana_disc_bot = FlanaDiscBot()
flana_tele_bot = FlanaTeleBot()
await asyncio.gather(
flana_disc_bot.start(),
flana_tele_bot.start()
)

View File

@@ -1,5 +1,7 @@
from flanabot.models.bot_action import *
from flanabot.models.chat import *
from flanabot.models.enums import *
from flanabot.models.message import *
from flanabot.models.punishments import *
from flanabot.models.user import *
from flanabot.models.player import *
from flanabot.models.punishment import *
from flanabot.models.weather_chart import *

View File

@@ -0,0 +1,30 @@
__all__ = ['BotAction']
import datetime
from dataclasses import dataclass, field
from flanautils import DCMongoBase, FlanaBase
from multibot.models.user import User
from flanabot.models.chat import Chat
from flanabot.models.enums import Action
from flanabot.models.message import Message
@dataclass(eq=False)
class BotAction(DCMongoBase, FlanaBase):
collection_name = 'bot_action'
unique_keys = 'message'
nullable_unique_keys = 'message'
action: Action = None
message: Message = None
author: User = None
chat: Chat = None
affected_objects: list = field(default_factory=list)
date: datetime.datetime = field(default_factory=datetime.datetime.now)
def __post_init__(self):
super().__post_init__()
self.author = self.author or getattr(self.message, 'author', None)
self.chat = self.chat or getattr(self.message, 'chat', None)

View File

@@ -1,21 +1,23 @@
__all__ = ['Chat']
from dataclasses import dataclass, field
from multibot import Chat as MultiBotChat
from flanabot.models.user import User
@dataclass(eq=False)
class Chat(MultiBotChat):
DEFAULT_CONFIG = {'auto_clear': False,
'auto_covid_chart': True,
'auto_currency_chart': True,
'auto_delete_original': True,
DEFAULT_CONFIG = {
'auto_insult': True,
'auto_scraping': True,
'auto_weather_chart': True}
users: list[User] = field(default_factory=list)
'auto_weather_chart': False,
'check_flood': False,
'punish': False,
'scraping_delete_original': True
}
config: dict[str, bool] = field(default_factory=dict)
def __post_init__(self):
super().__post_init__()
if not self.config:
self.config = self.DEFAULT_CONFIG
self.config = self.DEFAULT_CONFIG | self.config

19
flanabot/models/enums.py Normal file
View File

@@ -0,0 +1,19 @@
__all__ = ['Action', 'ButtonsGroup']
from enum import auto
from flanautils import FlanaEnum
class Action(FlanaEnum):
AUTO_WEATHER_CHART = auto()
MESSAGE_DELETED = auto()
class ButtonsGroup(FlanaEnum):
CONFIG = auto()
CONNECT_4 = auto()
POLL = auto()
ROLES = auto()
USERS = auto()
WEATHER = auto()

View File

@@ -1,21 +1,19 @@
from __future__ import annotations # todo0 remove in 3.11
from __future__ import annotations # todo0 remove when it's by default
__all__ = ['Message']
from dataclasses import dataclass, field
from typing import Iterable
from flanautils import Media, OrderedSet
from multibot import Message as MultiBotMessage
from multibot import Message as MultiBotMessage, User
from flanabot.models.chat import Chat
from flanabot.models.user import User
from flanabot.models.weather_chart import WeatherChart
@dataclass(eq=False)
class Message(MultiBotMessage):
author: User = None
mentions: Iterable[User] = field(default_factory=list)
mentions: list[User] = field(default_factory=list)
chat: Chat = None
replied_message: Message = None
weather_chart: WeatherChart = None
song_infos: OrderedSet[Media] = field(default_factory=OrderedSet)

12
flanabot/models/player.py Normal file
View File

@@ -0,0 +1,12 @@
__all__ = ['Player']
from dataclasses import dataclass
from flanautils import FlanaBase
@dataclass
class Player(FlanaBase):
id: int
name: str
number: int

View File

@@ -0,0 +1,18 @@
__all__ = ['Punishment']
from dataclasses import dataclass
from typing import Any
from multibot import Penalty
@dataclass(eq=False)
class Punishment(Penalty):
collection_name = 'punishment'
level: int = 0
def _mongo_repr(self) -> Any:
self_vars = super()._mongo_repr()
self_vars['level'] = self.level
return self_vars

View File

@@ -1,23 +0,0 @@
import datetime
from dataclasses import dataclass
from flanautils import DCMongoBase, FlanaBase
from multibot.models.database import db
@dataclass(eq=False)
class PunishmentBase(DCMongoBase, FlanaBase):
user_id: int = None
group_id: int = None
until: datetime.datetime = None
is_active: bool = True
@dataclass(eq=False)
class Punishment(PunishmentBase):
collection = db.punishment
@dataclass(eq=False)
class Mute(PunishmentBase):
collection = db.mute

View File

@@ -1,22 +0,0 @@
from dataclasses import dataclass
from multibot import User as MultiBotUser
from flanabot.models.punishments import Mute, Punishment
@dataclass(eq=False)
class User(MultiBotUser):
def is_muted_on(self, group_id: int):
return group_id in self.muted_on
def is_punished_on(self, group_id: int):
return group_id in self.punished_on
@property
def muted_on(self):
return {mute.group_id for mute in Mute.find({'user_id': self.id, 'is_active': True})}
@property
def punished_on(self):
return {punishment for punishment in Punishment.find({'user_id': self.id, 'is_active': True})}

View File

@@ -1,3 +1,5 @@
__all__ = ['Direction', 'WeatherChart']
import datetime
from dataclasses import dataclass, field
@@ -44,7 +46,7 @@ class WeatherChart(DateChart):
if self.show_now_vertical_line and first_dt <= now <= last_dt:
self.figure.add_vline(x=now, line_width=1, line_dash='dot')
self.figure.add_annotation(text="Ahora", yref="paper", x=now, y=0.01, showarrow=False)
self.figure.add_annotation(text='Ahora', yref='paper', x=now, y=0.01, showarrow=False)
for day_weather in self.day_weathers:
date_time = datetime.datetime(year=day_weather.date.year, month=day_weather.date.month, day=day_weather.date.day, tzinfo=self.timezone)

View File

@@ -1,65 +1,57 @@
aiohttp==3.7.4.post0
aiosignal==1.2.0
anyio==3.5.0
asgiref==3.4.1
async-generator==1.10
async-timeout==3.0.1
attrs==21.4.0
beautifulsoup4==4.10.0
certifi==2021.10.8
cffi==1.15.0
chardet==4.0.0
charset-normalizer==2.0.10
click==8.0.3
colorama==0.4.4
cryptg==0.2.post4
cryptography==36.0.1
discord.py==1.7.3
fastapi==0.71.0
aiohttp==3.8.3
aiosignal==1.3.1
anyio==3.6.2
async-timeout==4.0.2
attrs==22.2.0
beautifulsoup4==4.11.1
Brotli==1.0.9
certifi==2022.12.7
charset-normalizer==2.1.1
click==8.1.3
colorama==0.4.6
cryptg==0.4.0
discord.py==2.1.0
dnspython==2.2.1
fastapi==0.89.1
flanaapis
flanautils
frozenlist==1.2.0
greenlet==1.1.2
h11==0.12.0
hachoir==3.1.2
idna==3.3
iso8601==1.0.2
frozenlist==1.3.3
greenlet==2.0.1
h11==0.14.0
hachoir==3.2.0
idna==3.4
iso8601==1.1.0
jellyfish==0.9.0
kaleido==0.2.1
mpmath==1.2.1
multibot
multidict==5.2.0
outcome==1.1.0
Pillow==9.0.0
playwright==1.17.2
plotly==5.5.0
multidict==6.0.4
mutagen==1.46.0
Pillow==9.4.0
playwright==1.29.1
plotly==5.11.0
pyaes==1.6.1
pyasn1==0.4.8
pycparser==2.21
pydantic==1.9.0
pyee==8.2.2
pymongo==4.0.1
pyOpenSSL==21.0.0
requests==2.27.1
rsa==4.8
selenium==4.1.0
selenium-stealth==1.0.6
six==1.16.0
sniffio==1.2.0
sortedcontainers==2.4.0
starlette==0.17.1
sympy==1.9
Telethon==1.24.0
tenacity==8.0.1
TikTokApi==4.1.0
trio==0.19.0
trio-websocket==0.9.2
twitchio==2.1.4
typing_extensions==4.0.1
ujson==5.1.0
urllib3==1.26.8
uvicorn==0.17.0
websockets==10.1
ws4py==0.5.1
wsproto==1.0.0
yarl==1.7.2
pycairo==1.23.0
pycryptodomex==3.16.0
pydantic==1.10.4
pyee==9.0.4
pymongo==4.3.3
pytube==12.1.2
pytz==2022.7
requests==2.28.1
rsa==4.9
sniffio==1.3.0
soupsieve==2.3.2.post1
starlette==0.22.0
sympy==1.11.1
Telethon==1.26.1
tenacity==8.1.0
twitchio==2.5.0
typing_extensions==4.4.0
ujson==5.7.0
urllib3==1.26.13
uvicorn==0.20.0
websockets==10.4
yarl==1.8.2
yt-dlp==2023.1.6

View File

@@ -22,6 +22,8 @@ install_requires =
flanautils
multibot
plotly
pycairo
pytz
[options.packages.find]
include = {project_name}*

View File

@@ -7,7 +7,6 @@ os.environ |= flanautils.find_environment_variables('../.env')
import unittest
from typing import Iterable
from multibot import constants as multibot_constants
from flanabot.bots.flana_tele_bot import FlanaTeleBot
@@ -15,7 +14,7 @@ class TestParseCallbacks(unittest.TestCase):
def _test_no_always_callbacks(self, phrases: Iterable[str], callback: callable):
for i, phrase in enumerate(phrases):
with self.subTest(phrase):
callbacks = [registered_callback.callback for registered_callback in self.flana_tele_bot._parse_callbacks(phrase, multibot_constants.RATIO_REWARD_EXPONENT, multibot_constants.KEYWORDS_LENGHT_PENALTY, multibot_constants.MINIMUM_RATIO_TO_MATCH)
callbacks = [registered_callback.callback for registered_callback in self.flana_tele_bot._parse_callbacks(phrase, self.flana_tele_bot._registered_callbacks)
if not registered_callback.always]
self.assertEqual(1, len(callbacks))
self.assertEqual(callback, callbacks[0], f'\n\nExpected: {callback.__name__}\nActual: {callbacks[0].__name__}')
@@ -27,98 +26,31 @@ class TestParseCallbacks(unittest.TestCase):
phrases = ['adios', 'taluego', 'adiooo', 'hasta la proxima', 'nos vemos', 'hasta la vista', 'hasta pronto']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_bye)
def test_on_config_list_show(self):
def test_on_config(self):
phrases = [
'flanabot ajustes',
'Flanabot ajustes',
'Flanabot qué puedo ajustar?',
'flanabot ayuda'
'config',
'configuracion',
'configuración',
'configuration'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_config_list_show)
def test_on_covid_chart(self):
phrases = [
'cuantos contagios',
'casos',
'enfermos',
'muerte',
'pandemia',
'enfermedad',
'fallecidos',
'mascarillas',
'virus',
'covid-19',
'como va el covid',
'lo peta el corona'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_covid_chart)
def test_on_currency_chart(self):
phrases = [
'como van esos dineros',
'critodivisa',
'esas cryptos',
'inversion',
'moneda',
'mas caro en argentina?',
'el puto bitcoin',
'divisa'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_currency_chart)
def test_on_currency_chart_config_activate(self):
phrases = ['activa el bitcoin automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_currency_chart_config_activate)
def test_on_currency_chart_config_change(self):
phrases = ['cambia la config del bitcoin automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_currency_chart_config_change)
def test_on_currency_chart_config_deactivate(self):
phrases = ['desactiva el bitcoin automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_currency_chart_config_deactivate)
def test_on_currency_chart_config_show(self):
phrases = ['enseña el bitcoin automatico', 'como esta el bitcoin automatico', 'flanabot ajustes bitcoin']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_currency_chart_config_show)
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_config)
def test_on_delete(self):
phrases = ['borra ese mensaje', 'borra ese mensaje puto', 'borra', 'borra el mensaje', 'borra eso', 'borres']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_delete)
def test_on_delete_original_config_activate(self):
phrases = [
'activa el borrado automatico',
'flanabot pon el auto delete activado',
'flanabot activa el autodelete'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_delete_original_config_activate)
def test_on_delete_original_config_change(self):
phrases = ['cambia la config del borrado automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_delete_original_config_change)
def test_on_delete_original_config_deactivate(self):
phrases = [
'desactiva el borrado automatico',
'flanabot pon el auto delete desactivado',
'flanabot desactiva el autodelete'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_delete_original_config_deactivate)
def test_on_delete_original_config_show(self):
phrases = ['enseña el borrado automatico', 'como esta el borrado automatico', 'flanabot ajustes delete']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_delete_original_config_show)
def test_on_hello(self):
phrases = ['hola', 'hello', 'buenos dias', 'holaaaaaa', 'hi', 'holaaaaa', 'saludos', 'ola k ase']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_hello)
def test_on_mute(self):
phrases = [
# 'silencia',
# 'silencia al pavo ese',
# 'calla a ese pesao',
'silencia',
'silencia al pavo ese',
'calla a ese pesao',
'haz que se calle',
'quitale el microfono a ese',
'quitale el micro',
@@ -160,7 +92,6 @@ class TestParseCallbacks(unittest.TestCase):
'ataca',
'acaba',
'acaba con',
'termina con el',
'acabaq con su sufri,iento',
'acaba con ese apvo',
'castigalo',
@@ -187,22 +118,6 @@ class TestParseCallbacks(unittest.TestCase):
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_scraping)
def test_on_scraping_config_activate(self):
phrases = ['activa el scraping automatico', 'activa el scraping']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_scraping_config_activate)
def test_on_scraping_config_change(self):
phrases = ['cambia la config del scraping']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_scraping_config_change)
def test_on_scraping_config_deactivate(self):
phrases = ['desactiva el scraping automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_scraping_config_deactivate)
def test_on_scraping_config_show(self):
phrases = ['enseña el scraping automatico', 'como esta el scraping automatico', 'flanabot ajustes scraping']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_scraping_config_show)
def test_on_song_info(self):
phrases = [
'que sonaba ahi',
@@ -267,20 +182,4 @@ class TestParseCallbacks(unittest.TestCase):
'hara mucho calor en egipto este fin de semana?',
'pfff no ve que frio ahi en oviedo este finde'
]
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather_chart)
def test_on_weather_chart_config_activate(self):
phrases = ['activa el tiempo automatico', 'activa el tiempo']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather_chart_config_activate)
def test_on_weather_chart_config_change(self):
phrases = ['cambia la config del tiempo']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather_chart_config_change)
def test_on_weather_chart_config_deactivate(self):
phrases = ['desactiva el tiempo automatico']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather_chart_config_deactivate)
def test_on_weather_chart_config_show(self):
phrases = ['enseña el tiempo automatico', 'como esta el tiempo automatico', 'flanabot ajustes tiempo']
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather_chart_config_show)
self._test_no_always_callbacks(phrases, self.flana_tele_bot._on_weather)