Update BtcOffersBot (manage websocket reconnections)
This commit is contained in:
@@ -11,8 +11,8 @@ from collections.abc import Callable
|
|||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import flanautils
|
import flanautils
|
||||||
|
import websockets
|
||||||
from multibot import MultiBot, constants as multibot_constants
|
from multibot import MultiBot, constants as multibot_constants
|
||||||
from websockets.asyncio import client
|
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
from flanabot.models import Chat, Message
|
from flanabot.models import Chat, Message
|
||||||
@@ -97,7 +97,7 @@ def preprocess_btc_offers(func: Callable) -> Callable:
|
|||||||
class BtcOffersBot(MultiBot, ABC):
|
class BtcOffersBot(MultiBot, ABC):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self._websocket: client.ClientConnection | None = None
|
self._websocket: websockets.ClientConnection | None = None
|
||||||
self._notification_task: asyncio.Task[None] | None = None
|
self._notification_task: asyncio.Task[None] | None = None
|
||||||
self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers"
|
self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers"
|
||||||
|
|
||||||
@@ -135,7 +135,9 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
if offer['author']:
|
if offer['author']:
|
||||||
offer_parts.append(f"<b>Autor:</b> <code>{offer['author']}</code>")
|
offer_parts.append(f"<b>Autor:</b> <code>{offer['author']}</code>")
|
||||||
|
|
||||||
payment_methods_text = ''.join(f'\n <code>{payment_method}</code>' for payment_method in offer['payment_methods'])
|
payment_methods_text = ''.join(
|
||||||
|
f'\n <code>{payment_method}</code>' for payment_method in offer['payment_methods']
|
||||||
|
)
|
||||||
|
|
||||||
offer_parts.extend(
|
offer_parts.extend(
|
||||||
(
|
(
|
||||||
@@ -148,7 +150,9 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if offer['description']:
|
if offer['description']:
|
||||||
offer_parts.append(f"<b>Descripción:</b>\n<code><code><code>{offer['description']}</code></code></code>")
|
offer_parts.append(
|
||||||
|
f"<b>Descripción:</b>\n<code><code><code>{offer['description']}</code></code></code>"
|
||||||
|
)
|
||||||
|
|
||||||
offers_parts.append('\n'.join(offer_parts))
|
offers_parts.append('\n'.join(offer_parts))
|
||||||
|
|
||||||
@@ -175,7 +179,7 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
(
|
(
|
||||||
'',
|
'',
|
||||||
'-' * 70,
|
'-' * 70,
|
||||||
'<b>ℹ️ Los avisos de ofertas de BTC se han desactivado. Si quieres volver a recibirlos, no dudes en pedírmelo.</b>'
|
'<b>ℹ️ Los avisos de ofertas de BTC se han eliminado. Si quieres volver a recibirlos, no dudes en pedírmelo.</b>'
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -184,7 +188,14 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
|
|
||||||
async def _wait_btc_offers_notification(self):
|
async def _wait_btc_offers_notification(self):
|
||||||
while True:
|
while True:
|
||||||
data = json.loads(await self._websocket.recv())
|
while True:
|
||||||
|
try:
|
||||||
|
data = json.loads(await self._websocket.recv())
|
||||||
|
except websockets.ConnectionClosed:
|
||||||
|
await self.start_saved_btc_offers_notification()
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
chat = await self.get_chat(data['chat_id'])
|
chat = await self.get_chat(data['chat_id'])
|
||||||
chat.btc_offers_query = {}
|
chat.btc_offers_query = {}
|
||||||
chat.save(pull_exclude_fields=('btc_offers_query',))
|
chat.save(pull_exclude_fields=('btc_offers_query',))
|
||||||
@@ -227,14 +238,7 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
|
|
||||||
async def _on_ready(self):
|
async def _on_ready(self):
|
||||||
await super()._on_ready()
|
await super()._on_ready()
|
||||||
|
await self.start_saved_btc_offers_notification()
|
||||||
for chat in self.Chat.find({
|
|
||||||
'platform': self.platform.value,
|
|
||||||
'btc_offers_max_eur': {'$exists': True, '$ne': None}
|
|
||||||
}):
|
|
||||||
chat = await self.get_chat(chat.id)
|
|
||||||
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_max_eur'))
|
|
||||||
await self.start_btc_offers_notification(chat, chat.btc_offers_max_eur)
|
|
||||||
|
|
||||||
async def _on_stop_btc_offers_notification(self, message: Message):
|
async def _on_stop_btc_offers_notification(self, message: Message):
|
||||||
previous_btc_offers_query = message.chat.btc_offers_query
|
previous_btc_offers_query = message.chat.btc_offers_query
|
||||||
@@ -249,14 +253,31 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
# -------------------- PUBLIC METHODS -------------------- #
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
async def start_btc_offers_notification(self, chat: Chat, max_price_eur: float):
|
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
|
||||||
if not self._websocket:
|
if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}:
|
||||||
self._websocket = await client.connect(f'ws://{self._api_endpoint}')
|
while True:
|
||||||
|
try:
|
||||||
|
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not self._notification_task:
|
||||||
self._notification_task = asyncio.create_task(self._wait_btc_offers_notification())
|
self._notification_task = asyncio.create_task(self._wait_btc_offers_notification())
|
||||||
|
|
||||||
chat.btc_offers_query = query
|
chat.btc_offers_query = query
|
||||||
chat.save()
|
chat.save()
|
||||||
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'max_price_eur': max_price_eur}))
|
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'query': query}))
|
||||||
|
|
||||||
|
async def start_saved_btc_offers_notification(self):
|
||||||
|
for chat in self.Chat.find({
|
||||||
|
'platform': self.platform.value,
|
||||||
|
'btc_offers_query': {'$exists': True, '$ne': {}}
|
||||||
|
}):
|
||||||
|
chat = await self.get_chat(chat.id)
|
||||||
|
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
|
||||||
|
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
|
||||||
|
|
||||||
async def stop_btc_offers_notification(self, chat: Chat):
|
async def stop_btc_offers_notification(self, chat: Chat):
|
||||||
if not self._websocket:
|
if not self._websocket:
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from multibot import Platform
|
|||||||
AUDIT_LOG_AGE = datetime.timedelta(hours=1)
|
AUDIT_LOG_AGE = datetime.timedelta(hours=1)
|
||||||
AUDIT_LOG_LIMIT = 5
|
AUDIT_LOG_LIMIT = 5
|
||||||
AUTO_WEATHER_EVERY = datetime.timedelta(hours=6)
|
AUTO_WEATHER_EVERY = datetime.timedelta(hours=6)
|
||||||
|
BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS = datetime.timedelta(hours=1).total_seconds()
|
||||||
CHECK_PUNISHMENTS_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
|
CHECK_PUNISHMENTS_EVERY_SECONDS = datetime.timedelta(hours=1).total_seconds()
|
||||||
CONNECT_4_AI_DELAY_SECONDS = 1
|
CONNECT_4_AI_DELAY_SECONDS = 1
|
||||||
CONNECT_4_CENTER_COLUMN_POINTS = 2
|
CONNECT_4_CENTER_COLUMN_POINTS = 2
|
||||||
|
|||||||
Reference in New Issue
Block a user