diff --git a/pkg/rdkafka/ConsumeMessageTransformer.php b/pkg/rdkafka/ConsumeMessageTransformer.php new file mode 100644 index 000000000..1cc2e878e --- /dev/null +++ b/pkg/rdkafka/ConsumeMessageTransformer.php @@ -0,0 +1,10 @@ +consumer = $consumer; @@ -164,7 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage case RD_KAFKA_RESP_ERR__TIMED_OUT: break; case RD_KAFKA_RESP_ERR_NO_ERROR: - $message = $this->serializer->toMessage($kafkaMessage->payload); + if (null !== $this->serializer) { + $message = $this->serializer->toMessage($kafkaMessage->payload); + } else { + $message = new RdKafkaMessage($kafkaMessage->payload); + } + $message->setKey($kafkaMessage->key); $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); @@ -175,6 +185,8 @@ private function doReceive(int $timeout): ?RdKafkaMessage $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); } + $this->consumeMessageTransformer->transformConsumeMessage($message); + return $message; default: throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err); @@ -183,4 +195,9 @@ private function doReceive(int $timeout): ?RdKafkaMessage return null; } + + public function setMessageTransformer(ConsumeMessageTransformer $messageTransformer): void + { + $this->consumeMessageTransformer = $messageTransformer; + } } diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 6270b2230..c9eaa8d52 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -50,6 +50,16 @@ class RdKafkaContext implements Context */ private $rdKafkaConsumers; + /** + * @var ConsumeMessageTransformer + */ + private $consumeMessageTransformer; + + /** + * @var ProduceMessageTransformer + */ + private $produceMessageTransformer; + /** * @param array $config */ @@ -60,6 +70,11 @@ public function __construct(array $config) $this->rdKafkaConsumers = []; $this->setSerializer(new JsonSerializer()); + + $messageTransformer = new NullMessageTransformer(); + + $this->consumeMessageTransformer = $messageTransformer; + $this->produceMessageTransformer = $messageTransformer; } /** @@ -96,7 +111,11 @@ public function createTemporaryQueue(): Queue */ public function createProducer(): Producer { - return new RdKafkaProducer($this->getProducer(), $this->getSerializer()); + $producer = new RdKafkaProducer($this->getProducer(), $this->getSerializer()); + + $producer->setMessageTransformer($this->produceMessageTransformer); + + return $producer; } /** @@ -120,6 +139,8 @@ public function createConsumer(Destination $destination): Consumer $this->getSerializer() ); + $consumer->setMessageTransformer($this->consumeMessageTransformer); + if (isset($this->config['commit_async'])) { $consumer->setCommitAsync($this->config['commit_async']); } diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 77ec9115b..64ae29139 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -21,6 +21,11 @@ class RdKafkaProducer implements Producer */ private $producer; + /** + * @var ProduceMessageTransformer + */ + private $produceMessageTransformer; + public function __construct(VendorProducer $producer, Serializer $serializer) { $this->producer = $producer; @@ -38,7 +43,15 @@ public function send(Destination $destination, Message $message): void InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; - $payload = $this->serializer->toString($message); + + if (null !== $this->serializer) { + $payload = $this->serializer->toString($message); + } else { + $payload = $message->getBody(); + } + + $this->produceMessageTransformer->transformProduceMessage($message); + $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); @@ -111,4 +124,9 @@ public function getTimeToLive(): ?int { return null; } + + public function setMessageTransformer(ProduceMessageTransformer $messageTransformer): void + { + $this->produceMessageTransformer = $messageTransformer; + } } diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php index 7e2a116ed..918d4b5e8 100644 --- a/pkg/rdkafka/Serializer.php +++ b/pkg/rdkafka/Serializer.php @@ -4,6 +4,9 @@ namespace Enqueue\RdKafka; +/** + * @deprecated Use ProduceMessageTransformer and/or ConsumeMessageTransformer instead + */ interface Serializer { public function toString(RdKafkaMessage $message): string; diff --git a/pkg/rdkafka/SerializerAwareTrait.php b/pkg/rdkafka/SerializerAwareTrait.php index d640f93cb..738e7c9bc 100644 --- a/pkg/rdkafka/SerializerAwareTrait.php +++ b/pkg/rdkafka/SerializerAwareTrait.php @@ -7,20 +7,20 @@ trait SerializerAwareTrait { /** - * @var Serializer + * @var Serializer|null */ private $serializer; /** - * @param Serializer $serializer + * @param Serializer|null $serializer */ - public function setSerializer(Serializer $serializer) + public function setSerializer(?Serializer $serializer) { $this->serializer = $serializer; } /** - * @return Serializer + * @return Serializer|null */ public function getSerializer() {