Skip to content

Commit 3676e5c

Browse files
committed
[rdkafka] Do not use deprecated classes.
1 parent a927539 commit 3676e5c

10 files changed

+67
-67
lines changed

pkg/rdkafka/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![Total Downloads](https://poser.pugx.org/enqueue/rdkafka/d/total.png)](https://packagist.org/packages/enqueue/rdkafka)
66
[![Latest Stable Version](https://poser.pugx.org/enqueue/rdkafka/version.png)](https://packagist.org/packages/enqueue/rdkafka)
77

8-
This is an implementation of PSR specification. It allows you to send and consume message via Kafka protocol.
8+
This is an implementation of Queue Interop specification. It allows you to send and consume message via Kafka protocol.
99

1010
## Resources
1111

pkg/rdkafka/RdKafkaConnectionFactory.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\PsrConnectionFactory;
8-
use Interop\Queue\PsrContext;
7+
use Interop\Queue\ConnectionFactory;
8+
use Interop\Queue\Context;
99

10-
class RdKafkaConnectionFactory implements PsrConnectionFactory
10+
class RdKafkaConnectionFactory implements ConnectionFactory
1111
{
1212
/**
1313
* @var array
@@ -53,7 +53,7 @@ public function __construct($config = 'kafka:')
5353
/**
5454
* @return RdKafkaContext
5555
*/
56-
public function createContext(): PsrContext
56+
public function createContext(): Context
5757
{
5858
return new RdKafkaContext($this->config);
5959
}

pkg/rdkafka/RdKafkaConsumer.php

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\InvalidMessageException;
8-
use Interop\Queue\PsrConsumer;
9-
use Interop\Queue\PsrMessage;
10-
use Interop\Queue\PsrQueue;
7+
use Interop\Queue\Consumer;
8+
use Interop\Queue\Exception\InvalidMessageException;
9+
use Interop\Queue\Message;
10+
use Interop\Queue\Queue;
1111
use RdKafka\KafkaConsumer;
1212
use RdKafka\TopicPartition;
1313

14-
class RdKafkaConsumer implements PsrConsumer
14+
class RdKafkaConsumer implements Consumer
1515
{
1616
use SerializerAwareTrait;
1717

@@ -75,15 +75,15 @@ public function setOffset(int $offset = null): void
7575
$this->offset = $offset;
7676
}
7777

78-
public function getQueue(): PsrQueue
78+
public function getQueue(): Queue
7979
{
8080
return $this->topic;
8181
}
8282

8383
/**
8484
* @return RdKafkaMessage
8585
*/
86-
public function receive(int $timeout = 0): ?PsrMessage
86+
public function receive(int $timeout = 0): ?Message
8787
{
8888
if (false === $this->subscribed) {
8989
if (null === $this->offset) {
@@ -116,15 +116,15 @@ public function receive(int $timeout = 0): ?PsrMessage
116116
/**
117117
* @return RdKafkaMessage
118118
*/
119-
public function receiveNoWait(): ?PsrMessage
119+
public function receiveNoWait(): ?Message
120120
{
121121
throw new \LogicException('Not implemented');
122122
}
123123

124124
/**
125125
* @param RdKafkaMessage $message
126126
*/
127-
public function acknowledge(PsrMessage $message): void
127+
public function acknowledge(Message $message): void
128128
{
129129
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
130130

@@ -142,7 +142,7 @@ public function acknowledge(PsrMessage $message): void
142142
/**
143143
* @param RdKafkaMessage $message
144144
*/
145-
public function reject(PsrMessage $message, bool $requeue = false): void
145+
public function reject(Message $message, bool $requeue = false): void
146146
{
147147
$this->acknowledge($message);
148148

pkg/rdkafka/RdKafkaContext.php

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,24 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\InvalidDestinationException;
8-
use Interop\Queue\PsrConsumer;
9-
use Interop\Queue\PsrContext;
10-
use Interop\Queue\PsrDestination;
11-
use Interop\Queue\PsrMessage;
12-
use Interop\Queue\PsrProducer;
13-
use Interop\Queue\PsrQueue;
14-
use Interop\Queue\PsrSubscriptionConsumer;
15-
use Interop\Queue\PsrTopic;
16-
use Interop\Queue\PurgeQueueNotSupportedException;
17-
use Interop\Queue\SubscriptionConsumerNotSupportedException;
18-
use Interop\Queue\TemporaryQueueNotSupportedException;
7+
use Interop\Queue\Consumer;
8+
use Interop\Queue\Context;
9+
use Interop\Queue\Destination;
10+
use Interop\Queue\Exception\InvalidDestinationException;
11+
use Interop\Queue\Exception\PurgeQueueNotSupportedException;
12+
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
13+
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
14+
use Interop\Queue\Message;
15+
use Interop\Queue\Producer;
16+
use Interop\Queue\Queue;
17+
use Interop\Queue\SubscriptionConsumer;
18+
use Interop\Queue\Topic;
1919
use RdKafka\Conf;
2020
use RdKafka\KafkaConsumer;
21-
use RdKafka\Producer;
21+
use RdKafka\Producer as VendorProducer;
2222
use RdKafka\TopicConf;
2323

24-
class RdKafkaContext implements PsrContext
24+
class RdKafkaContext implements Context
2525
{
2626
use SerializerAwareTrait;
2727

@@ -59,36 +59,36 @@ public function __construct(array $config)
5959
/**
6060
* @return RdKafkaMessage
6161
*/
62-
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
62+
public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
6363
{
6464
return new RdKafkaMessage($body, $properties, $headers);
6565
}
6666

6767
/**
6868
* @return RdKafkaTopic
6969
*/
70-
public function createTopic(string $topicName): PsrTopic
70+
public function createTopic(string $topicName): Topic
7171
{
7272
return new RdKafkaTopic($topicName);
7373
}
7474

7575
/**
7676
* @return RdKafkaTopic
7777
*/
78-
public function createQueue(string $queueName): PsrQueue
78+
public function createQueue(string $queueName): Queue
7979
{
8080
return new RdKafkaTopic($queueName);
8181
}
8282

83-
public function createTemporaryQueue(): PsrQueue
83+
public function createTemporaryQueue(): Queue
8484
{
8585
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
8686
}
8787

8888
/**
8989
* @return RdKafkaProducer
9090
*/
91-
public function createProducer(): PsrProducer
91+
public function createProducer(): Producer
9292
{
9393
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
9494
}
@@ -98,7 +98,7 @@ public function createProducer(): PsrProducer
9898
*
9999
* @return RdKafkaConsumer
100100
*/
101-
public function createConsumer(PsrDestination $destination): PsrConsumer
101+
public function createConsumer(Destination $destination): Consumer
102102
{
103103
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
104104

@@ -128,20 +128,20 @@ public function close(): void
128128
}
129129
}
130130

131-
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
131+
public function createSubscriptionConsumer(): SubscriptionConsumer
132132
{
133133
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
134134
}
135135

136-
public function purgeQueue(PsrQueue $queue): void
136+
public function purgeQueue(Queue $queue): void
137137
{
138138
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
139139
}
140140

141-
private function getProducer(): Producer
141+
private function getProducer(): VendorProducer
142142
{
143143
if (null === $this->producer) {
144-
$this->producer = new Producer($this->getConf());
144+
$this->producer = new VendorProducer($this->getConf());
145145

146146
if (isset($this->config['log_level'])) {
147147
$this->producer->setLogLevel($this->config['log_level']);

pkg/rdkafka/RdKafkaMessage.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\PsrMessage;
8-
use RdKafka\Message;
7+
use Interop\Queue\Message;
8+
use RdKafka\Message as VendorMessage;
99

10-
class RdKafkaMessage implements PsrMessage
10+
class RdKafkaMessage implements Message
1111
{
1212
/**
1313
* @var string
@@ -174,12 +174,12 @@ public function setKey(string $key = null): void
174174
$this->key = $key;
175175
}
176176

177-
public function getKafkaMessage(): ?Message
177+
public function getKafkaMessage(): ?VendorMessage
178178
{
179179
return $this->kafkaMessage;
180180
}
181181

182-
public function setKafkaMessage(Message $message = null): void
182+
public function setKafkaMessage(VendorMessage $message = null): void
183183
{
184184
$this->kafkaMessage = $message;
185185
}

pkg/rdkafka/RdKafkaProducer.php

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,23 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\InvalidDestinationException;
8-
use Interop\Queue\InvalidMessageException;
9-
use Interop\Queue\PsrDestination;
10-
use Interop\Queue\PsrMessage;
11-
use Interop\Queue\PsrProducer;
12-
use RdKafka\Producer;
13-
14-
class RdKafkaProducer implements PsrProducer
7+
use Interop\Queue\Destination;
8+
use Interop\Queue\Exception\InvalidDestinationException;
9+
use Interop\Queue\Exception\InvalidMessageException;
10+
use Interop\Queue\Message;
11+
use Interop\Queue\Producer;
12+
use RdKafka\Producer as VendorProducer;
13+
14+
class RdKafkaProducer implements Producer
1515
{
1616
use SerializerAwareTrait;
1717

1818
/**
19-
* @var Producer
19+
* @var VendorProducer
2020
*/
2121
private $producer;
2222

23-
public function __construct(Producer $producer, Serializer $serializer)
23+
public function __construct(VendorProducer $producer, Serializer $serializer)
2424
{
2525
$this->producer = $producer;
2626

@@ -31,7 +31,7 @@ public function __construct(Producer $producer, Serializer $serializer)
3131
* @param RdKafkaTopic $destination
3232
* @param RdKafkaMessage $message
3333
*/
34-
public function send(PsrDestination $destination, PsrMessage $message): void
34+
public function send(Destination $destination, Message $message): void
3535
{
3636
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
3737
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
@@ -47,7 +47,7 @@ public function send(PsrDestination $destination, PsrMessage $message): void
4747
/**
4848
* @return RdKafkaProducer
4949
*/
50-
public function setDeliveryDelay(int $deliveryDelay = null): PsrProducer
50+
public function setDeliveryDelay(int $deliveryDelay = null): Producer
5151
{
5252
if (null === $deliveryDelay) {
5353
return $this;
@@ -64,7 +64,7 @@ public function getDeliveryDelay(): ?int
6464
/**
6565
* @return RdKafkaProducer
6666
*/
67-
public function setPriority(int $priority = null): PsrProducer
67+
public function setPriority(int $priority = null): Producer
6868
{
6969
if (null === $priority) {
7070
return $this;
@@ -78,7 +78,7 @@ public function getPriority(): ?int
7878
return null;
7979
}
8080

81-
public function setTimeToLive(int $timeToLive = null): PsrProducer
81+
public function setTimeToLive(int $timeToLive = null): Producer
8282
{
8383
if (null === $timeToLive) {
8484
return $this;

pkg/rdkafka/RdKafkaTopic.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
namespace Enqueue\RdKafka;
66

7-
use Interop\Queue\PsrQueue;
8-
use Interop\Queue\PsrTopic;
7+
use Interop\Queue\Queue;
8+
use Interop\Queue\Topic;
99
use RdKafka\TopicConf;
1010

11-
class RdKafkaTopic implements PsrTopic, PsrQueue
11+
class RdKafkaTopic implements Topic, Queue
1212
{
1313
/**
1414
* @var string

pkg/rdkafka/Tests/RdKafkaContextTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
use Enqueue\RdKafka\JsonSerializer;
77
use Enqueue\RdKafka\RdKafkaContext;
88
use Enqueue\RdKafka\Serializer;
9-
use Interop\Queue\InvalidDestinationException;
10-
use Interop\Queue\TemporaryQueueNotSupportedException;
9+
use Interop\Queue\Exception\InvalidDestinationException;
10+
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
1111
use PHPUnit\Framework\TestCase;
1212

1313
/**

pkg/rdkafka/Tests/RdKafkaProducerTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
use Enqueue\RdKafka\RdKafkaProducer;
99
use Enqueue\RdKafka\RdKafkaTopic;
1010
use Enqueue\RdKafka\Serializer;
11-
use Interop\Queue\InvalidDestinationException;
12-
use Interop\Queue\InvalidMessageException;
11+
use Interop\Queue\Exception\InvalidDestinationException;
12+
use Interop\Queue\Exception\InvalidMessageException;
1313
use PHPUnit\Framework\TestCase;
1414
use RdKafka\Producer;
1515
use RdKafka\ProducerTopic;

pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Enqueue\RdKafka\Tests\Spec;
44

55
use Enqueue\RdKafka\RdKafkaConnectionFactory;
6-
use Interop\Queue\PsrMessage;
6+
use Interop\Queue\Message;
77
use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec;
88

99
/**
@@ -27,7 +27,7 @@ public function test()
2727

2828
$message = $consumer->receive(10000); // 10 sec
2929

30-
$this->assertInstanceOf(PsrMessage::class, $message);
30+
$this->assertInstanceOf(Message::class, $message);
3131
$consumer->acknowledge($message);
3232

3333
$this->assertSame($expectedBody, $message->getBody());

0 commit comments

Comments
 (0)