Add BtcOffersBot.stop_all_btc_offers_notification
This commit is contained in:
@@ -125,6 +125,12 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['money']))
|
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['money']))
|
||||||
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['notify']))
|
self.register(self._on_stop_btc_offers_notification, keywords=(multibot_constants.KEYWORDS['stop'], constants.KEYWORDS['notify']))
|
||||||
|
|
||||||
|
def _find_chats_to_notify(self) -> list[Chat]:
|
||||||
|
return self.Chat.find({'platform': self.platform.value, 'btc_offers_query': {'$exists': True, '$ne': {}}})
|
||||||
|
|
||||||
|
def _is_websocket_connected(self) -> bool:
|
||||||
|
return self._websocket and self._websocket.state in {websockets.State.CONNECTING, websockets.State.OPEN}
|
||||||
|
|
||||||
async def _send_offers(self, offers: list[dict], chat: Chat, notifications_disabled=False):
|
async def _send_offers(self, offers: list[dict], chat: Chat, notifications_disabled=False):
|
||||||
offers_parts = []
|
offers_parts = []
|
||||||
for i, offer in enumerate(offers, start=1):
|
for i, offer in enumerate(offers, start=1):
|
||||||
@@ -195,7 +201,7 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
try:
|
try:
|
||||||
data = json.loads(await self._websocket.recv())
|
data = json.loads(await self._websocket.recv())
|
||||||
except websockets.ConnectionClosed:
|
except websockets.ConnectionClosed:
|
||||||
await self.start_saved_btc_offers_notifications()
|
await self.start_all_btc_offers_notifications()
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -245,7 +251,7 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
|
|
||||||
async def _on_ready(self):
|
async def _on_ready(self):
|
||||||
await super()._on_ready()
|
await super()._on_ready()
|
||||||
asyncio.create_task(self.start_saved_btc_offers_notifications())
|
asyncio.create_task(self.start_all_btc_offers_notifications())
|
||||||
|
|
||||||
async def _on_stop_btc_offers_notification(self, message: Message):
|
async def _on_stop_btc_offers_notification(self, message: Message):
|
||||||
previous_btc_offers_query = message.chat.btc_offers_query
|
previous_btc_offers_query = message.chat.btc_offers_query
|
||||||
@@ -260,9 +266,19 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
# -------------------- PUBLIC METHODS -------------------- #
|
# -------------------- PUBLIC METHODS -------------------- #
|
||||||
# -------------------------------------------------------- #
|
# -------------------------------------------------------- #
|
||||||
|
async def start_all_btc_offers_notifications(self):
|
||||||
|
if chats := self._find_chats_to_notify():
|
||||||
|
for chat in chats:
|
||||||
|
chat = await self.get_chat(chat.id)
|
||||||
|
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
|
||||||
|
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
|
||||||
|
elif self._notification_task and not self._notification_task.done():
|
||||||
|
self._notification_task.cancel()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
|
async def start_btc_offers_notification(self, chat: Chat, query: dict[str, float]):
|
||||||
async with self._websocket_lock:
|
async with self._websocket_lock:
|
||||||
if not self._websocket or self._websocket.state in {websockets.State.CLOSING, websockets.State.CLOSED}:
|
if not self._is_websocket_connected():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
|
self._websocket = await websockets.connect(f'ws://{self._api_endpoint}')
|
||||||
@@ -278,23 +294,13 @@ class BtcOffersBot(MultiBot, ABC):
|
|||||||
chat.save()
|
chat.save()
|
||||||
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'query': query}))
|
await self._websocket.send(json.dumps({'action': 'start', 'chat_id': chat.id, 'query': query}))
|
||||||
|
|
||||||
async def start_saved_btc_offers_notifications(self):
|
async def stop_all_btc_offers_notification(self):
|
||||||
if chats := self.Chat.find({
|
for chat in self._find_chats_to_notify():
|
||||||
'platform': self.platform.value,
|
await self.stop_btc_offers_notification(chat)
|
||||||
'btc_offers_query': {'$exists': True, '$ne': {}}
|
|
||||||
}):
|
|
||||||
for chat in chats:
|
|
||||||
chat = await self.get_chat(chat.id)
|
|
||||||
chat.pull_from_database(overwrite_fields=('_id', 'btc_offers_query'))
|
|
||||||
await self.start_btc_offers_notification(chat, chat.btc_offers_query)
|
|
||||||
elif self._notification_task and not self._notification_task.done():
|
|
||||||
self._notification_task.cancel()
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
|
|
||||||
async def stop_btc_offers_notification(self, chat: Chat):
|
async def stop_btc_offers_notification(self, chat: Chat):
|
||||||
if not self._websocket:
|
if self._is_websocket_connected():
|
||||||
return
|
await self._websocket.send(json.dumps({'action': 'stop', 'chat_id': chat.id}))
|
||||||
|
|
||||||
await self._websocket.send(json.dumps({'action': 'stop', 'chat_id': chat.id}))
|
|
||||||
chat.btc_offers_query = {}
|
chat.btc_offers_query = {}
|
||||||
chat.save(pull_exclude_fields=('btc_offers_query',))
|
chat.save(pull_exclude_fields=('btc_offers_query',))
|
||||||
|
|||||||
Reference in New Issue
Block a user