Skip to content

Future is potentially leaked in basic_ack, basic_nack, basic_reject, basic_publish #176

Description

@axnsan12

In Channel methods that use the write_queue with the drain_future pattern, the drain_future will be leaked if self.write_queue.put throws an exception (for example, asyncio.CancelledError when shutting down the loop).

ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=Exception() created at C:\Python\Python310\lib\asyncio\base_events.py:424>
source_traceback: Object created at (most recent call last):
  File "main.py", line 277, in <module>
    asyncio.run(main())
  File "C:\Python\Python310\lib\asyncio\runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "C:\Python\Python310\lib\asyncio\runners.py", line 63, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 628, in run_until_complete
    self.run_forever()
  File "C:\Python\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 595, in run_forever
    self._run_once()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 1873, in _run_once
    handle._run()
  File "C:\Python\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  [...]
  File "C:\Python\Python310\lib\site-packages\aio_pika\message.py", line 604, in __aexit__
    await self.message.reject(requeue=self.requeue)
  File "C:\Python\Python310\lib\site-packages\aio_pika\message.py", line 501, in reject
    await self.__channel.basic_reject(
  File "C:\Python\Python310\lib\site-packages\aiormq\channel.py", line 590, in basic_reject
    drain_future = self.create_future()
  File "C:\Python\Python310\lib\site-packages\aiormq\base.py", line 115, in create_future
    return self.__future_store.create_future()
  File "C:\Python\Python310\lib\site-packages\aiormq\base.py", line 72, in create_future
    future = self.loop.create_future()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 424, in create_future
    return futures.Future(loop=self)

Example offending code snippet:

aiormq/aiormq/channel.py

Lines 540 to 556 in 5b9c88d

drain_future = self.create_future() if wait else None
await self.write_queue.put(
ChannelFrame.marshall(
frames=[
spec.Basic.Ack(
delivery_tag=delivery_tag,
multiple=multiple,
),
],
channel_number=self.number,
drain_future=drain_future,
),
)
if drain_future is not None:
await drain_future

Not sure what a robust solution would be here, maybe try-except the queue op and drain_future.cancel() on execption.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions