Update BtcOffersBot (manage websocket reconnections)

This commit is contained in:
AlberLC
2025-04-17 16:08:26 +02:00
parent 39aea44803
commit 5ec3c1e81b

View File

@@ -100,6 +100,7 @@ class BtcOffersBot(MultiBot, ABC):
self._websocket: websockets.ClientConnection | None = None self._websocket: websockets.ClientConnection | None = None
self._notification_task: asyncio.Task[None] | None = None self._notification_task: asyncio.Task[None] | None = None
self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers" self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers"
self._websocket_lock = asyncio.Lock()
# -------------------------------------------------------- # # -------------------------------------------------------- #
# ------------------- PROTECTED METHODS ------------------ # # ------------------- PROTECTED METHODS ------------------ #
@@ -238,7 +239,7 @@ class BtcOffersBot(MultiBot, ABC):
async def _on_ready(self): async def _on_ready(self):
await super()._on_ready() await super()._on_ready()
await self.start_saved_btc_offers_notification() asyncio.create_task(self.start_saved_btc_offers_notifications())
async def _on_stop_btc_offers_notification(self, message: Message): async def _on_stop_btc_offers_notification(self, message: Message):
previous_btc_offers_query = message.chat.btc_offers_query previous_btc_offers_query = message.chat.btc_offers_query
@@ -254,16 +255,17 @@ class BtcOffersBot(MultiBot, ABC):
# -------------------- PUBLIC METHODS -------------------- # # -------------------- PUBLIC METHODS -------------------- #
# -------------------------------------------------------- # # -------------------------------------------------------- #
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]): async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}: async with self._websocket_lock:
while True: if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}:
try: while True:
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}') try:
except ConnectionRefusedError: self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS) except ConnectionRefusedError:
else: await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS)
break else:
break
if not self._notification_task: if not self._notification_task or self._notification_task.done():
self._notification_task = asyncio.create_task(self._wait_btc_offers_notification()) self._notification_task = asyncio.create_task(self._wait_btc_offers_notification())
chat.btc_offers_query = query chat.btc_offers_query = query
@@ -272,12 +274,17 @@ class BtcOffersBot(MultiBot, ABC):
async def start_saved_btc_offers_notification(self): async def start_saved_btc_offers_notification(self):
for chat in self.Chat.find({ for chat in self.Chat.find({
if chats := self.Chat.find({
'platform': self.platform.value, 'platform': self.platform.value,
'btc_offers_query': {'$exists': True, '$ne': {}} 'btc_offers_query': {'$exists': True, '$ne': {}}
}): }):
chat = await self.get_chat(chat.id) for chat in chats:
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query')) chat = await self.get_chat(chat.id)
await self.start_btc_offers_notification(chat, chat.btc_offers_query) chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
elif self._notification_task and not self._notification_task.done():
self._notification_task.cancel()
await asyncio.sleep(0)
async def stop_btc_offers_notification(self, chat: Chat): async def stop_btc_offers_notification(self, chat: Chat):
if not self._websocket: if not self._websocket: