Skip to content

Commit 42ef727

Browse files
author
Ryan P
authored
Update built-in partitioner options to include murmur2 (#396)
1 parent f0921e3 commit 42ef727

File tree

4 files changed

+30
-172
lines changed

4 files changed

+30
-172
lines changed

confluent_kafka/src/Producer.c

Lines changed: 7 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
struct Producer_msgstate {
4949
Handle *self;
5050
PyObject *dr_cb;
51-
PyObject *partitioner_cb;
5251
};
5352

5453

@@ -58,32 +57,23 @@ struct Producer_msgstate {
5857
*/
5958
static __inline struct Producer_msgstate *
6059
Producer_msgstate_new (Handle *self,
61-
PyObject *dr_cb, PyObject *partitioner_cb) {
60+
PyObject *dr_cb) {
6261
struct Producer_msgstate *msgstate;
6362

64-
if (!dr_cb && !partitioner_cb)
65-
return NULL;
66-
6763
msgstate = calloc(1, sizeof(*msgstate));
6864
msgstate->self = self;
6965

7066
if (dr_cb) {
7167
msgstate->dr_cb = dr_cb;
7268
Py_INCREF(dr_cb);
7369
}
74-
if (partitioner_cb) {
75-
msgstate->partitioner_cb = partitioner_cb;
76-
Py_INCREF(partitioner_cb);
77-
}
7870
return msgstate;
7971
}
8072

8173
static __inline void
8274
Producer_msgstate_destroy (struct Producer_msgstate *msgstate) {
8375
if (msgstate->dr_cb)
8476
Py_DECREF(msgstate->dr_cb);
85-
if (msgstate->partitioner_cb)
86-
Py_DECREF(msgstate->partitioner_cb);
8777
free(msgstate);
8878
}
8979

@@ -93,10 +83,6 @@ static void Producer_clear0 (Handle *self) {
9383
Py_DECREF(self->u.Producer.default_dr_cb);
9484
self->u.Producer.default_dr_cb = NULL;
9585
}
96-
if (self->u.Producer.partitioner_cb) {
97-
Py_DECREF(self->u.Producer.partitioner_cb);
98-
self->u.Producer.partitioner_cb = NULL;
99-
}
10086
}
10187

10288
static int Producer_clear (Handle *self) {
@@ -128,8 +114,6 @@ static int Producer_traverse (Handle *self,
128114
visitproc visit, void *arg) {
129115
if (self->u.Producer.default_dr_cb)
130116
Py_VISIT(self->u.Producer.default_dr_cb);
131-
if (self->u.Producer.partitioner_cb)
132-
Py_VISIT(self->u.Producer.partitioner_cb);
133117

134118
Handle_traverse(self, visit, arg);
135119

@@ -191,71 +175,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
191175
}
192176

193177

194-
/**
195-
* FIXME: The partitioner is currently broken due to threading/GIL issues.
196-
*/
197-
int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
198-
const void *keydata,
199-
size_t keylen,
200-
int32_t partition_cnt,
201-
void *rkt_opaque, void *msg_opaque) {
202-
Handle *self = rkt_opaque;
203-
struct Producer_msgstate *msgstate = msg_opaque;
204-
PyGILState_STATE gstate;
205-
PyObject *result;
206-
PyObject *args;
207-
int32_t r = RD_KAFKA_PARTITION_UA;
208-
209-
if (!msgstate) {
210-
/* Fall back on default C partitioner if neither a per-msg
211-
* partitioner nor a default Python partitioner is available */
212-
return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen,
213-
partition_cnt,
214-
rkt_opaque, msg_opaque);
215-
}
216-
217-
gstate = PyGILState_Ensure();
218-
219-
if (!msgstate->partitioner_cb) {
220-
/* Fall back on default C partitioner if neither a per-msg
221-
* partitioner nor a default Python partitioner is available */
222-
r = msgstate->self->u.Producer.c_partitioner_cb(rkt,
223-
keydata, keylen,
224-
partition_cnt,
225-
rkt_opaque,
226-
msg_opaque);
227-
goto done;
228-
}
229-
230-
args = Py_BuildValue("(s#l)",
231-
(const char *)keydata, (int)keylen,
232-
(long)partition_cnt);
233-
if (!args) {
234-
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
235-
"Unable to build callback args");
236-
goto done;
237-
}
238-
239-
240-
result = PyObject_CallObject(msgstate->partitioner_cb, args);
241-
Py_DECREF(args);
242-
243-
if (result) {
244-
r = (int32_t)cfl_PyInt_AsInt(result);
245-
if (PyErr_Occurred())
246-
printf("FIXME: partition_cb returned wrong type "
247-
"(expected long), how to propagate?\n");
248-
Py_DECREF(result);
249-
} else {
250-
printf("FIXME: partitioner_cb crashed, how to propagate?\n");
251-
}
252-
253-
done:
254-
PyGILState_Release(gstate);
255-
return r;
256-
}
257-
258-
259178
#if HAVE_PRODUCEV
260179
static rd_kafka_resp_err_t
261180
Producer_producev (Handle *self,
@@ -313,7 +232,7 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
313232
const char *topic, *value = NULL, *key = NULL;
314233
int value_len = 0, key_len = 0;
315234
int partition = RD_KAFKA_PARTITION_UA;
316-
PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL;
235+
PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL;
317236
long long timestamp = 0;
318237
rd_kafka_resp_err_t err;
319238
struct Producer_msgstate *msgstate;
@@ -327,17 +246,16 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
327246
"partition",
328247
"callback",
329248
"on_delivery", /* Alias */
330-
"partitioner",
331249
"timestamp",
332250
"headers",
333251
NULL };
334252

335253
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
336-
"s|z#z#iOOOLO"
254+
"s|z#z#iOOLO"
337255
, kws,
338256
&topic, &value, &value_len,
339257
&key, &key_len, &partition,
340-
&dr_cb, &dr_cb2, &partitioner_cb,
258+
&dr_cb, &dr_cb2,
341259
&timestamp, &headers))
342260
return NULL;
343261

@@ -376,13 +294,10 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
376294

377295
if (!dr_cb || dr_cb == Py_None)
378296
dr_cb = self->u.Producer.default_dr_cb;
379-
if (!partitioner_cb || partitioner_cb == Py_None)
380-
partitioner_cb = self->u.Producer.partitioner_cb;
381-
382297

383298
/* Create msgstate if necessary, may return NULL if no callbacks
384299
* are wanted. */
385-
msgstate = Producer_msgstate_new(self, dr_cb, partitioner_cb);
300+
msgstate = Producer_msgstate_new(self, dr_cb);
386301

387302
/* Produce message */
388303
#if HAVE_PRODUCEV
@@ -503,8 +418,8 @@ static PyMethodDef Producer_methods[] = {
503418
" :param str topic: Topic to produce message to\n"
504419
" :param str|bytes value: Message payload\n"
505420
" :param str|bytes key: Message key\n"
506-
" :param int partition: Partition to produce to, elses uses the "
507-
"configured partitioner.\n"
421+
" :param int partition: Partition to produce to, else uses the "
422+
"configured built-in partitioner.\n"
508423
" :param func on_delivery(err,msg): Delivery report callback to call "
509424
"(from :py:func:`poll()` or :py:func:`flush()`) on successful or "
510425
"failed delivery\n"

confluent_kafka/src/confluent_kafka.c

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,70 +1451,6 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
14511451

14521452
return 1;
14531453

1454-
} else if (!strcasecmp(name, "partitioner") ||
1455-
!strcasecmp(name, "partitioner_callback")) {
1456-
1457-
if ((vs = cfl_PyObject_Unistr(valobj))) {
1458-
/* Use built-in C partitioners,
1459-
* based on their name. */
1460-
PyObject *vs8;
1461-
val = cfl_PyUnistr_AsUTF8(vs, &vs8);
1462-
1463-
if (!strcmp(val, "random"))
1464-
rd_kafka_topic_conf_set_partitioner_cb(
1465-
tconf, rd_kafka_msg_partitioner_random);
1466-
else if (!strcmp(val, "consistent"))
1467-
rd_kafka_topic_conf_set_partitioner_cb(
1468-
tconf, rd_kafka_msg_partitioner_consistent);
1469-
else if (!strcmp(val, "consistent_random"))
1470-
rd_kafka_topic_conf_set_partitioner_cb(
1471-
tconf, rd_kafka_msg_partitioner_consistent_random);
1472-
else {
1473-
cfl_PyErr_Format(
1474-
RD_KAFKA_RESP_ERR__INVALID_ARG,
1475-
"unknown builtin partitioner: %s "
1476-
"(available: random, consistent, consistent_random)",
1477-
val);
1478-
Py_XDECREF(vs8);
1479-
Py_DECREF(vs);
1480-
return -1;
1481-
}
1482-
1483-
Py_XDECREF(vs8);
1484-
Py_DECREF(vs);
1485-
1486-
} else {
1487-
/* Custom partitioner (Python callback) */
1488-
1489-
if (!PyCallable_Check(valobj)) {
1490-
cfl_PyErr_Format(
1491-
RD_KAFKA_RESP_ERR__INVALID_ARG,
1492-
"%s requires a callable "
1493-
"object", name);
1494-
return -1;
1495-
}
1496-
1497-
/* FIXME: Error out until GIL+rdkafka lock-ordering is fixed. */
1498-
if (1) {
1499-
cfl_PyErr_Format(
1500-
RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
1501-
"custom partitioner support not yet implemented");
1502-
return -1;
1503-
}
1504-
1505-
if (self->u.Producer.partitioner_cb)
1506-
Py_DECREF(self->u.Producer.partitioner_cb);
1507-
1508-
self->u.Producer.partitioner_cb = valobj;
1509-
Py_INCREF(self->u.Producer.partitioner_cb);
1510-
1511-
/* Use trampoline to call Python code. */
1512-
rd_kafka_topic_conf_set_partitioner_cb(tconf,
1513-
Producer_partitioner_cb);
1514-
}
1515-
1516-
return 1;
1517-
15181454
} else if (!strcmp(name, "delivery.report.only.error")) {
15191455
/* Since we allocate msgstate for each produced message
15201456
* with a callback we can't use delivery.report.only.error
@@ -1577,9 +1513,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15771513
Py_ssize_t pos = 0;
15781514
PyObject *ko, *vo;
15791515
PyObject *confdict = NULL;
1580-
int32_t (*partitioner_cb) (const rd_kafka_topic_t *,
1581-
const void *, size_t, int32_t,
1582-
void *, void *) = partitioner_cb;
15831516

15841517
if (rd_kafka_version() < MIN_RD_KAFKA_VERSION) {
15851518
PyErr_Format(PyExc_RuntimeError,

confluent_kafka/src/confluent_kafka.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,6 @@ typedef struct {
201201
*/
202202
struct {
203203
PyObject *default_dr_cb;
204-
PyObject *partitioner_cb; /**< Registered Python partitioner */
205-
int32_t (*c_partitioner_cb) (
206-
const rd_kafka_topic_t *,
207-
const void *, size_t, int32_t,
208-
void *, void *); /**< Fallback C partitioner*/
209-
210204
int dr_only_error; /**< delivery.report.only.error */
211205
} Producer;
212206

@@ -396,13 +390,6 @@ PyObject *Message_error (Message *self, PyObject *ignore);
396390
extern PyTypeObject ProducerType;
397391

398392

399-
int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
400-
const void *keydata,
401-
size_t keylen,
402-
int32_t partition_cnt,
403-
void *rkt_opaque, void *msg_opaque);
404-
405-
406393
/****************************************************************************
407394
*
408395
*

tests/test_Producer.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,26 @@ def handle_dr(err, msg):
168168
p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr)
169169

170170
p.flush()
171+
172+
173+
def test_set_partitioner_murmur2():
174+
"""
175+
Test ability to set built-in partitioner type murmur
176+
"""
177+
Producer({'partitioner': 'murmur2'})
178+
179+
180+
def test_set_partitioner_murmur2_random():
181+
"""
182+
Test ability to set built-in partitioner type murmur2_random
183+
"""
184+
Producer({'partitioner': 'murmur2_random'})
185+
186+
187+
def test_set_invalid_partitioner_murmur():
188+
"""
189+
Assert invalid partitioner raises KafkaException
190+
"""
191+
with pytest.raises(KafkaException) as e:
192+
Producer({'partitioner': 'murmur'})
193+
assert 'Invalid value for configuration property "partitioner": murmur' in str(e)

0 commit comments

Comments
 (0)