diff --git a/recorder/danmaku/douyin/dylr.py b/recorder/danmaku/douyin/dylr.py index 93ac07e..b8d222b 100644 --- a/recorder/danmaku/douyin/dylr.py +++ b/recorder/danmaku/douyin/dylr.py @@ -36,7 +36,13 @@ def get_danmu_ws_url(id_str): async def consumer_handler(websocket, room_id): room_id = str(room_id) - async for ws_msg in websocket: + while True: + try: + ws_msg = await asyncio.wait_for(websocket.recv(), timeout=60) + except asyncio.TimeoutError: + logging.warning(f'No message received in 60 seconds, reconnecting: {room_id}') + break + wss_package = PushFrame() wss_package.ParseFromString(ws_msg) decompressed = gzip.decompress(wss_package.payload) @@ -78,7 +84,7 @@ async def subscribe(room_id): assert 'id_str' in room_data, f'Failed to get id_str, room_data: {room_data}' ws_url = get_danmu_ws_url(room_data['id_str']) - logging.info(f'ws_url: {ws_url}') + logging.debug(f'ws_url: {ws_url}') # `ping_timeout=None` for not sending ping package and don't wait for pong response async for websocket in websockets.connect( @@ -91,6 +97,8 @@ async def subscribe(room_id): '%7C1680522049%7C280d802d6d478e3e78d0c807f7c487e7ffec0ae4e5fdd6a0fe74c3c6af149511', } ): + logging.info(f'Connected to websocket: {room_id}') + try: await consumer_handler(websocket, room_id) except websockets.WebSocketException as e: @@ -108,13 +116,7 @@ async def _main(room_id, interval): continue logging.info(f'Live started: {room_id}') - task = asyncio.create_task(subscribe(room_id)) - while True: - await asyncio.sleep(3600) - if not get_stream(room_id): - # not live anymore - task.cancel() - break + await subscribe(room_id) async def main():