Skip to content

SystemError if assign_cb raises Exception #729

Open
@coldeasy

Description

@coldeasy

Description

A SystemError exception is raised when calling poll or close on a consumer if the on_assign callback raises an Exception.

    def log(self, level, msg, *args, **kwargs):
        """
        Log 'msg % args' with the integer severity 'level'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        logger.log(level, "We have a %s", "mysterious problem", exc_info=1)
        """
>       if not isinstance(level, int):
E       SystemError: <built-in function isinstance> returned a result with an error set

I think this is due to the cpython code not clearing the python exception during callbacks and subsequent callbacks executing python code.
Reading the docs, I get the impression that the librdkafka callbacks which execute python code should clear exceptions PyErr_Clear and restore them when python code is calling cpython functions (e.g. check and set the error when consumer.poll is called). If this is right, we could probably replace the crashed counter with the exception info.

How to reproduce

    import logging
    logging.basicConfig(level=logging.DEBUG)
    from confluent_kafka import Producer
    from confluent_kafka import Consumer
    prod = Producer(
        {
            'bootstrap.servers': servers,
            'debug': 'all'
        },
        logger=logging.getLogger('')
    )
    prod.produce('test', b'data')
    assert prod.flush(10) == 0
    cons = Consumer(
        {
            'bootstrap.servers': servers,
            'group.id': 'test',
            'auto.offset.reset': 'smallest',
            'debug': 'all'
        },
        logger=logging.getLogger('')
    )

    def _on_assign(self, *args, **kwargs):
        raise Exception("Oops")

    cons.subscribe(['test'], on_assign=_on_assign, on_revoke=_on_assign)
    msg = cons.poll(10)
    assert msg is not None
    cons.close()

I can provide the debug logs if necessary.

Checklist

Please provide the following information:

  • confluent_kafka.version(), confluent_kafka.libversion() == (('1.2.2', 16908800), ('1.2.2', 16909055))
  • 0.10.2.1
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions