From 5ec3c1e81b20da1a1bdd6ffe8c500bda60c938b1 Mon Sep 17 00:00:00 2001 From: AlberLC Date: Thu, 17 Apr 2025 16:08:26 +0200 Subject: [PATCH] Update BtcOffersBot (manage websocket reconnections) --- flanabot/bots/btc_offers_bot.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/flanabot/bots/btc_offers_bot.py b/flanabot/bots/btc_offers_bot.py index 0fe5458..9d14b84 100644 --- a/flanabot/bots/btc_offers_bot.py +++ b/flanabot/bots/btc_offers_bot.py @@ -100,6 +100,7 @@ class BtcOffersBot(MultiBot, ABC): self._websocket: websockets.ClientConnection | None = None self._notification_task: asyncio.Task[None] | None = None self._api_endpoint = f"{os.environ['BTC_OFFERS_API_HOST']}:{os.environ['BTC_OFFERS_API_PORT']}/offers" + self._websocket_lock = asyncio.Lock() # -------------------------------------------------------- # # ------------------- PROTECTED METHODS ------------------ # @@ -238,7 +239,7 @@ class BtcOffersBot(MultiBot, ABC): async def _on_ready(self): await super()._on_ready() - await self.start_saved_btc_offers_notification() + asyncio.create_task(self.start_saved_btc_offers_notifications()) async def _on_stop_btc_offers_notification(self, message: Message): previous_btc_offers_query = message.chat.btc_offers_query @@ -254,16 +255,17 @@ class BtcOffersBot(MultiBot, ABC): # -------------------- PUBLIC METHODS -------------------- # # -------------------------------------------------------- # async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]): - if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}: - while True: - try: - self._websocket = await websockets.connect(f'ws://{self._api_endpoint}') - except ConnectionRefusedError: - await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS) - else: - break + async with self._websocket_lock: + if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}: + while True: + try: + self._websocket = await websockets.connect(f'ws://{self._api_endpoint}') + except ConnectionRefusedError: + await asyncio.sleep(constants.BTC_OFFERS_WEBSOCKET_RETRY_DELAY_SECONDS) + else: + break - if not self._notification_task: + if not self._notification_task or self._notification_task.done(): self._notification_task = asyncio.create_task(self._wait_btc_offers_notification()) chat.btc_offers_query = query @@ -272,12 +274,17 @@ class BtcOffersBot(MultiBot, ABC): async def start_saved_btc_offers_notification(self): for chat in self.Chat.find({ + if chats := self.Chat.find({ 'platform': self.platform.value, 'btc_offers_query': {'$exists': True, '$ne': {}} }): - chat = await self.get_chat(chat.id) - chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query')) - await self.start_btc_offers_notification(chat, chat.btc_offers_query) + for chat in chats: + chat = await self.get_chat(chat.id) + chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query')) + await self.start_btc_offers_notification(chat, chat.btc_offers_query) + elif self._notification_task and not self._notification_task.done(): + self._notification_task.cancel() + await asyncio.sleep(0) async def stop_btc_offers_notification(self, chat: Chat): if not self._websocket: