Open
Description
Description
A SystemError
exception is raised when calling poll
or close
on a consumer if the on_assign
callback raises an Exception.
def log(self, level, msg, *args, **kwargs):
"""
Log 'msg % args' with the integer severity 'level'.
To pass exception information, use the keyword argument exc_info with
a true value, e.g.
logger.log(level, "We have a %s", "mysterious problem", exc_info=1)
"""
> if not isinstance(level, int):
E SystemError: <built-in function isinstance> returned a result with an error set
I think this is due to the cpython code not clearing the python exception during callbacks and subsequent callbacks executing python code.
Reading the docs, I get the impression that the librdkafka callbacks which execute python code should clear exceptions PyErr_Clear
and restore them when python code is calling cpython functions (e.g. check and set the error when consumer.poll
is called). If this is right, we could probably replace the crashed
counter with the exception info.
How to reproduce
import logging
logging.basicConfig(level=logging.DEBUG)
from confluent_kafka import Producer
from confluent_kafka import Consumer
prod = Producer(
{
'bootstrap.servers': servers,
'debug': 'all'
},
logger=logging.getLogger('')
)
prod.produce('test', b'data')
assert prod.flush(10) == 0
cons = Consumer(
{
'bootstrap.servers': servers,
'group.id': 'test',
'auto.offset.reset': 'smallest',
'debug': 'all'
},
logger=logging.getLogger('')
)
def _on_assign(self, *args, **kwargs):
raise Exception("Oops")
cons.subscribe(['test'], on_assign=_on_assign, on_revoke=_on_assign)
msg = cons.poll(10)
assert msg is not None
cons.close()
I can provide the debug logs if necessary.
Checklist
Please provide the following information:
-
confluent_kafka.version(), confluent_kafka.libversion() == (('1.2.2', 16908800), ('1.2.2', 16909055))
- 0.10.2.1
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue