|
|
@ -35,7 +35,7 @@ class A2SProtocol(asyncio.DatagramProtocol): |
|
|
|
reassembled = b"".join( |
|
|
|
fragment.payload for fragment in self.fragment_buf) |
|
|
|
logger.debug("Received %s part packet with content: %r", |
|
|
|
len(fragments), reassembled) |
|
|
|
len(self.fragment_buf), reassembled) |
|
|
|
self.recv_queue.put_nowait(reassembled) |
|
|
|
self.fragment_buf = [] |
|
|
|
else: |
|
|
@ -64,7 +64,8 @@ class A2SStreamAsync: |
|
|
|
|
|
|
|
@classmethod |
|
|
|
async def create(cls, address, timeout): |
|
|
|
transport, protocol = await asyncio.create_datagram_endpoint( |
|
|
|
loop = asyncio.get_running_loop() |
|
|
|
transport, protocol = await loop.create_datagram_endpoint( |
|
|
|
lambda: A2SProtocol(), remote_addr=address) |
|
|
|
return cls(transport, protocol, timeout) |
|
|
|
|
|
|
@ -76,7 +77,7 @@ class A2SStreamAsync: |
|
|
|
queue_task = asyncio.create_task(self.protocol.recv_queue.get()) |
|
|
|
error_task = asyncio.create_task(self.protocol.error_event.wait()) |
|
|
|
done, pending = await asyncio.wait({queue_task, error_task}, |
|
|
|
timeout=self.timeout, return_when=FIRST_COMPLETED) |
|
|
|
timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED) |
|
|
|
|
|
|
|
for task in pending: task.cancel() |
|
|
|
if error_task in done: |
|
|
@ -86,7 +87,7 @@ class A2SStreamAsync: |
|
|
|
|
|
|
|
return queue_task.result() |
|
|
|
|
|
|
|
async def request(payload): |
|
|
|
async def request(self, payload): |
|
|
|
self.send(payload) |
|
|
|
return await self.recv() |
|
|
|
|
|
|
|