From 7e7e1cd5a7bf595ec97c9d3b6691f6eb09ce7e1c Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Thu, 26 Sep 2019 13:48:55 +0200 Subject: [PATCH 1/3] Pass whole message for deserialization --- pkg/rdkafka/JsonSerializer.php | 6 ++++-- pkg/rdkafka/RdKafkaConsumer.php | 2 +- pkg/rdkafka/Serializer.php | 4 +++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php index ae161ca08..8ebfe897e 100644 --- a/pkg/rdkafka/JsonSerializer.php +++ b/pkg/rdkafka/JsonSerializer.php @@ -4,6 +4,8 @@ namespace Enqueue\RdKafka; +use RdKafka\Message as VendorMessage; + class JsonSerializer implements Serializer { public function toString(RdKafkaMessage $message): string @@ -25,9 +27,9 @@ public function toString(RdKafkaMessage $message): string return $json; } - public function toMessage(string $string): RdKafkaMessage + public function toMessage(VendorMessage $message): RdKafkaMessage { - $data = json_decode($string, true); + $data = json_decode($message->payload, true); if (JSON_ERROR_NONE !== json_last_error()) { throw new \InvalidArgumentException(sprintf( 'The malformed json given. Error %s and message %s', diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 241ee3841..c1a399742 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -164,7 +164,7 @@ private function doReceive(int $timeout): ?RdKafkaMessage case RD_KAFKA_RESP_ERR__TIMED_OUT: break; case RD_KAFKA_RESP_ERR_NO_ERROR: - $message = $this->serializer->toMessage($kafkaMessage->payload); + $message = $this->serializer->toMessage($kafkaMessage); $message->setKey($kafkaMessage->key); $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php index 7e2a116ed..8d1691561 100644 --- a/pkg/rdkafka/Serializer.php +++ b/pkg/rdkafka/Serializer.php @@ -4,9 +4,11 @@ namespace Enqueue\RdKafka; +use RdKafka\Message as VendorMessage; + interface Serializer { public function toString(RdKafkaMessage $message): string; - public function toMessage(string $string): RdKafkaMessage; + public function toMessage(VendorMessage $string): RdKafkaMessage; } From a6c1990cfada9f9bc6385683369dcaf018b66ea0 Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Thu, 26 Sep 2019 14:52:40 +0200 Subject: [PATCH 2/3] New message transformation concept --- pkg/rdkafka/ConsumeMessageTransformer.php | 10 ++++++++++ pkg/rdkafka/JsonSerializer.php | 7 +++++-- pkg/rdkafka/NullMessageTransformer.php | 19 +++++++++++++++++++ pkg/rdkafka/ProduceMessageTransformer.php | 10 ++++++++++ pkg/rdkafka/RdKafkaConsumer.php | 19 ++++++++++++++++++- pkg/rdkafka/RdKafkaContext.php | 23 ++++++++++++++++++++++- pkg/rdkafka/RdKafkaProducer.php | 20 +++++++++++++++++++- pkg/rdkafka/Serializer.php | 7 ++++--- pkg/rdkafka/SerializerAwareTrait.php | 8 ++++---- 9 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 pkg/rdkafka/ConsumeMessageTransformer.php create mode 100644 pkg/rdkafka/NullMessageTransformer.php create mode 100644 pkg/rdkafka/ProduceMessageTransformer.php diff --git a/pkg/rdkafka/ConsumeMessageTransformer.php b/pkg/rdkafka/ConsumeMessageTransformer.php new file mode 100644 index 000000000..1cc2e878e --- /dev/null +++ b/pkg/rdkafka/ConsumeMessageTransformer.php @@ -0,0 +1,10 @@ +payload, true); + $data = json_decode($string, true); if (JSON_ERROR_NONE !== json_last_error()) { throw new \InvalidArgumentException(sprintf( 'The malformed json given. Error %s and message %s', diff --git a/pkg/rdkafka/NullMessageTransformer.php b/pkg/rdkafka/NullMessageTransformer.php new file mode 100644 index 000000000..146ef4f8c --- /dev/null +++ b/pkg/rdkafka/NullMessageTransformer.php @@ -0,0 +1,19 @@ +consumer = $consumer; @@ -164,7 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage case RD_KAFKA_RESP_ERR__TIMED_OUT: break; case RD_KAFKA_RESP_ERR_NO_ERROR: - $message = $this->serializer->toMessage($kafkaMessage); + if (null !== $this->serializer) { + $message = $this->serializer->toMessage($kafkaMessage->payload); + } else { + $message = new RdKafkaMessage($kafkaMessage->payload); + } + $message->setKey($kafkaMessage->key); $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); @@ -175,6 +185,8 @@ private function doReceive(int $timeout): ?RdKafkaMessage $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); } + $this->consumeMessageTransformer->transformConsumeMessage($message); + return $message; default: throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err); @@ -183,4 +195,9 @@ private function doReceive(int $timeout): ?RdKafkaMessage return null; } + + public function setMessageTransformer(ConsumeMessageTransformer $messageTransformer): void + { + $this->consumeMessageTransformer = $messageTransformer; + } } diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 6270b2230..c9eaa8d52 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -50,6 +50,16 @@ class RdKafkaContext implements Context */ private $rdKafkaConsumers; + /** + * @var ConsumeMessageTransformer + */ + private $consumeMessageTransformer; + + /** + * @var ProduceMessageTransformer + */ + private $produceMessageTransformer; + /** * @param array $config */ @@ -60,6 +70,11 @@ public function __construct(array $config) $this->rdKafkaConsumers = []; $this->setSerializer(new JsonSerializer()); + + $messageTransformer = new NullMessageTransformer(); + + $this->consumeMessageTransformer = $messageTransformer; + $this->produceMessageTransformer = $messageTransformer; } /** @@ -96,7 +111,11 @@ public function createTemporaryQueue(): Queue */ public function createProducer(): Producer { - return new RdKafkaProducer($this->getProducer(), $this->getSerializer()); + $producer = new RdKafkaProducer($this->getProducer(), $this->getSerializer()); + + $producer->setMessageTransformer($this->produceMessageTransformer); + + return $producer; } /** @@ -120,6 +139,8 @@ public function createConsumer(Destination $destination): Consumer $this->getSerializer() ); + $consumer->setMessageTransformer($this->consumeMessageTransformer); + if (isset($this->config['commit_async'])) { $consumer->setCommitAsync($this->config['commit_async']); } diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 77ec9115b..64ae29139 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -21,6 +21,11 @@ class RdKafkaProducer implements Producer */ private $producer; + /** + * @var ProduceMessageTransformer + */ + private $produceMessageTransformer; + public function __construct(VendorProducer $producer, Serializer $serializer) { $this->producer = $producer; @@ -38,7 +43,15 @@ public function send(Destination $destination, Message $message): void InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; - $payload = $this->serializer->toString($message); + + if (null !== $this->serializer) { + $payload = $this->serializer->toString($message); + } else { + $payload = $message->getBody(); + } + + $this->produceMessageTransformer->transformProduceMessage($message); + $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); @@ -111,4 +124,9 @@ public function getTimeToLive(): ?int { return null; } + + public function setMessageTransformer(ProduceMessageTransformer $messageTransformer): void + { + $this->produceMessageTransformer = $messageTransformer; + } } diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php index 8d1691561..918d4b5e8 100644 --- a/pkg/rdkafka/Serializer.php +++ b/pkg/rdkafka/Serializer.php @@ -4,11 +4,12 @@ namespace Enqueue\RdKafka; -use RdKafka\Message as VendorMessage; - +/** + * @deprecated Use ProduceMessageTransformer and/or ConsumeMessageTransformer instead + */ interface Serializer { public function toString(RdKafkaMessage $message): string; - public function toMessage(VendorMessage $string): RdKafkaMessage; + public function toMessage(string $string): RdKafkaMessage; } diff --git a/pkg/rdkafka/SerializerAwareTrait.php b/pkg/rdkafka/SerializerAwareTrait.php index d640f93cb..738e7c9bc 100644 --- a/pkg/rdkafka/SerializerAwareTrait.php +++ b/pkg/rdkafka/SerializerAwareTrait.php @@ -7,20 +7,20 @@ trait SerializerAwareTrait { /** - * @var Serializer + * @var Serializer|null */ private $serializer; /** - * @param Serializer $serializer + * @param Serializer|null $serializer */ - public function setSerializer(Serializer $serializer) + public function setSerializer(?Serializer $serializer) { $this->serializer = $serializer; } /** - * @return Serializer + * @return Serializer|null */ public function getSerializer() { From 3a1b758cfefbc2bc706739b22f20faf3d6df71bc Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Thu, 26 Sep 2019 14:55:46 +0200 Subject: [PATCH 3/3] Remove unused use --- pkg/rdkafka/JsonSerializer.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php index 0fae23143..a407dc7f0 100644 --- a/pkg/rdkafka/JsonSerializer.php +++ b/pkg/rdkafka/JsonSerializer.php @@ -4,8 +4,6 @@ namespace Enqueue\RdKafka; -use RdKafka\Message as VendorMessage; - /** * @deprecated Use ProduceMessageTransformer and/or ConsumeMessageTransformer instead */