Skip to content

ChannelInvalidStateError with manual message ack #222

Description

@davgia

Hi, I have the following python class that wraps rabbitmq interactions:

class Consumer():

    def __init__(self):
        self.rabbitmq_url = "[REDACTED]"
        self.queue_name = "[REDACTED]"

        self.connection = None
        self.channel = None
        self.queue = None

        self.thread = None
        self.loop = None
        self.executor = None

    def _start_loop(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        self.executor = ThreadPoolExecutor()

        try:
            self.loop.run_until_complete(self._consume_messages())
        except Exception:
           print("Fatal error occurred in event loop")
        finally:
            if not self.loop.is_closed():
                self.loop.close()

    async def _consume_messages(self):

        try:
            self.connection = await aio_pika.connect_robust(self.rabbitmq_url, loop=self.loop)

            self.channel = await self.connection.channel()

            await self.channel.set_qos(prefetch_count=1)

            self.queue = await self.channel.declare_queue(self.queue_name, durable=True)

            async with self.queue.iterator() as queue_iter:
                async for message in queue_iter:
                    await self._accept_message(message)
        except asyncio.CancelledError:
            print("Consumer task cancelled")
        except Exception:
            print("Fatal error occurred in the consumer")

    async def _accept_message(self, message: aio_pika.abc.AbstractIncomingMessage):
        async with message.process(ignore_processed=True):
            try:
                body = message.body.decode("utf-8")

                await self._process_message(body)

                await message.ack()
            except Exception:
                await message.reject()

What happens is the following error:

Traceback (most recent call last):
  File \"/app/modules/rabbitmq/consumer.py\", line 201, in _accept_message
    await message.ack()
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 453, in ack
    await self.channel.basic_ack(
          ^^^^^^^^^^^^
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 391, in channel
    raise ChannelInvalidStateError
aiormq.exceptions.ChannelInvalidStateError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File \"/app/modules/rabbitmq/consumer.py\", line 207, in _accept_message
    await message.reject()
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 480, in reject
    await self.channel.basic_reject(
          ^^^^^^^^^^^^
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 391, in channel
    raise ChannelInvalidStateError
aiormq.exceptions.ChannelInvalidStateError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File \"/app/modules/rabbitmq/consumer.py\", line 165, in _consume_messages
    await self._accept_message(message)
  File \"/app/modules/rabbitmq/consumer.py\", line 181, in _accept_message
    async with message.process(ignore_processed=True):
               ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 582, in __aexit__
    if not self.message.channel.is_closed:
           ^^^^^^^^^^^^^^^^^^^^
  File \"/usr/local/lib/python3.13/site-packages/aio_pika/message.py\", line 391, in channel
    raise ChannelInvalidStateError
aiormq.exceptions.ChannelInvalidStateError

But I don't understand why... Basically I am sure that self._process_message returns without throwing error but when I try to ack the message after its call the library throws an error saying that the channel is in invalid state (closed??). With ignore_processed=True I should be able to manually ack the messages. Is there anything I can do to debug it further or it is a known error and my code is broken?

My last guess is that there is a timeout in rabbitmq somewhere that automatically closes the channel. I know that the self._process_message is very slow, like 1h. Is my guess correct?

Thanks in advance,
Davide

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions