From 04d4530efd66f0f25da9b7cc6647dbd437c15e15 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 5 Jan 2018 00:43:16 +0200 Subject: [PATCH 1/3] [kafka] add ability to set offset. --- pkg/rdkafka/RdKafkaConsumer.php | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index e99e0cf3b..9d30be2a7 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -6,6 +6,7 @@ use Interop\Queue\PsrConsumer; use Interop\Queue\PsrMessage; use RdKafka\KafkaConsumer; +use RdKafka\TopicPartition; class RdKafkaConsumer implements PsrConsumer { @@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer */ private $commitAsync; + /** + * @var int|null + */ + private $offset; + /** * @param KafkaConsumer $consumer * @param RdKafkaContext $context @@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; + $this->offset = null; $this->setSerializer($serializer); } @@ -69,6 +76,15 @@ public function setCommitAsync($async) $this->commitAsync = (bool) $async; } + public function setOffset($offset) + { + if ($this->subscribed) { + throw new \LogicException('The consumer has already subscribed.'); + } + + $this->offset = $offset; + } + /** * {@inheritdoc} */ @@ -83,7 +99,11 @@ public function getQueue() public function receive($timeout = 0) { if (false == $this->subscribed) { - $this->consumer->subscribe([$this->topic->getTopicName()]); + $this->consumer->assign([new TopicPartition( + $this->getQueue()->getQueueName(), + $this->getQueue()->getPartition(), + $this->offset + )]); $this->subscribed = true; } From eded88e0504d1045b7513e4c1164be26e9bbef34 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 5 Jan 2018 01:02:51 +0200 Subject: [PATCH 2/3] [kafka] change offset. --- pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 71 +++++++++++++++++++++-- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 5acd7f4dc..77a949c15 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('subscribe') - ->with(['dest']) + ->method('assign') ; $kafkaConsumer ->expects($this->once()) @@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() $this->assertNull($consumer->receive(1000)); } + public function testShouldPassProperlyConfiguredTopicPartitionOnAssign() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('assign') + ; + $kafkaConsumer + ->expects($this->any()) + ->method('consume') + ->willReturn($kafkaMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); + + $consumer->receive(1000); + $consumer->receive(1000); + $consumer->receive(1000); + } + public function testShouldSubscribeOnFirstReceiveOnly() { $destination = new RdKafkaTopic('dest'); @@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('subscribe') - ->with(['dest']) + ->method('assign') ; $kafkaConsumer ->expects($this->any()) @@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly() $consumer->receive(1000); } + public function testThrowOnOffsetChangeAfterSubscribing() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('assign') + ; + $kafkaConsumer + ->expects($this->any()) + ->method('consume') + ->willReturn($kafkaMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); + + $consumer->receive(1000); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The consumer has already subscribed.'); + $consumer->setOffset(123); + } + public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() { $destination = new RdKafkaTopic('dest'); @@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('subscribe') - ->with(['dest']) + ->method('assign') ; $kafkaConsumer ->expects($this->once()) From 3c1fd491a90a5100df32877a371a9ce1a1a17a96 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 5 Jan 2018 01:07:01 +0200 Subject: [PATCH 3/3] [kafka][doc][skip ci] Add docs on how to change offset. --- docs/transport/kafka.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md index efc564fa2..cb36842a5 100644 --- a/docs/transport/kafka.md +++ b/docs/transport/kafka.md @@ -8,6 +8,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) * [Serialize message](#serialize-message) +* [Chnage offset](#change-offset) ## Installation @@ -84,6 +85,9 @@ $fooQueue = $psrContext->createQueue('foo'); $consumer = $psrContext->createConsumer($fooQueue); +// Enable async commit to gain better performance. +//$consumer->setCommitAsync(true); + $message = $consumer->receive(); // process a message @@ -115,4 +119,21 @@ class FooSerializer implements Serializer $psrContext->setSerializer(new FooSerializer()); ``` +## Change offset + +By default consumers starts from the beginning of the topic and updates the offset while you are processing messages. +There is an ability to change the current offset. + +```php +createQueue('foo'); + +$consumer = $psrContext->createConsumer($fooQueue); +$consumer->setOffset(123); + +$message = $consumer->receive(2000); +``` + [back to index](index.md) \ No newline at end of file