2
2
3
3
namespace Enqueue \Client \Driver ;
4
4
5
- use Enqueue \Client \Config ;
6
- use Enqueue \Client \DriverInterface ;
7
- use Enqueue \Client \Message ;
8
- use Enqueue \Client \Meta \QueueMetaRegistry ;
9
5
use Enqueue \RdKafka \RdKafkaContext ;
10
- use Enqueue \RdKafka \RdKafkaMessage ;
11
6
use Enqueue \RdKafka \RdKafkaTopic ;
12
- use Interop \Queue \PsrMessage ;
13
- use Interop \Queue \PsrQueue ;
14
7
use Psr \Log \LoggerInterface ;
15
8
use Psr \Log \NullLogger ;
16
9
17
- class RdKafkaDriver implements DriverInterface
10
+ /**
11
+ * @method RdKafkaContext getContext()
12
+ */
13
+ class RdKafkaDriver extends GenericDriver
18
14
{
19
- /**
20
- * @var RdKafkaContext
21
- */
22
- private $ context ;
23
-
24
- /**
25
- * @var Config
26
- */
27
- private $ config ;
28
-
29
- /**
30
- * @var QueueMetaRegistry
31
- */
32
- private $ queueMetaRegistry ;
33
-
34
- public function __construct (RdKafkaContext $ context , Config $ config , QueueMetaRegistry $ queueMetaRegistry )
35
- {
36
- $ this ->context = $ context ;
37
- $ this ->config = $ config ;
38
- $ this ->queueMetaRegistry = $ queueMetaRegistry ;
39
- }
40
-
41
- /**
42
- * @return RdKafkaMessage
43
- */
44
- public function createTransportMessage (Message $ message ): PsrMessage
45
- {
46
- $ headers = $ message ->getHeaders ();
47
- $ headers ['content_type ' ] = $ message ->getContentType ();
48
-
49
- $ transportMessage = $ this ->context ->createMessage ();
50
- $ transportMessage ->setBody ($ message ->getBody ());
51
- $ transportMessage ->setHeaders ($ headers );
52
- $ transportMessage ->setProperties ($ message ->getProperties ());
53
- $ transportMessage ->setMessageId ($ message ->getMessageId ());
54
- $ transportMessage ->setTimestamp ($ message ->getTimestamp ());
55
- $ transportMessage ->setReplyTo ($ message ->getReplyTo ());
56
- $ transportMessage ->setCorrelationId ($ message ->getCorrelationId ());
57
-
58
- return $ transportMessage ;
59
- }
60
-
61
- public function createClientMessage (PsrMessage $ message ): Message
62
- {
63
- $ clientMessage = new Message ();
64
- $ clientMessage ->setBody ($ message ->getBody ());
65
- $ clientMessage ->setHeaders ($ message ->getHeaders ());
66
- $ clientMessage ->setProperties ($ message ->getProperties ());
67
-
68
- $ clientMessage ->setContentType ($ message ->getHeader ('content_type ' ));
69
-
70
- $ clientMessage ->setTimestamp ($ message ->getTimestamp ());
71
- $ clientMessage ->setMessageId ($ message ->getMessageId ());
72
- $ clientMessage ->setReplyTo ($ message ->getReplyTo ());
73
- $ clientMessage ->setCorrelationId ($ message ->getCorrelationId ());
74
-
75
- return $ clientMessage ;
76
- }
77
-
78
- public function sendToRouter (Message $ message ): void
79
- {
80
- if (false == $ message ->getProperty (Config::PARAMETER_TOPIC_NAME )) {
81
- throw new \LogicException ('Topic name parameter is required but is not set ' );
82
- }
83
-
84
- $ topic = $ this ->createRouterTopic ();
85
- $ transportMessage = $ this ->createTransportMessage ($ message );
86
-
87
- $ this ->context ->createProducer ()->send ($ topic , $ transportMessage );
88
- }
89
-
90
- public function sendToProcessor (Message $ message ): void
91
- {
92
- if (false == $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
93
- throw new \LogicException ('Processor name parameter is required but is not set ' );
94
- }
95
-
96
- if (false == $ queueName = $ message ->getProperty (Config::PARAMETER_PROCESSOR_QUEUE_NAME )) {
97
- throw new \LogicException ('Queue name parameter is required but is not set ' );
98
- }
99
-
100
- $ transportMessage = $ this ->createTransportMessage ($ message );
101
- $ destination = $ this ->createQueue ($ queueName );
102
-
103
- $ this ->context ->createProducer ()->send ($ destination , $ transportMessage );
104
- }
105
-
106
- /**
107
- * @return RdKafkaTopic
108
- */
109
- public function createQueue (string $ queueName ): PsrQueue
15
+ public function __construct (RdKafkaContext $ context , ...$ args )
110
16
{
111
- $ transportName = $ this ->queueMetaRegistry ->getQueueMeta ($ queueName )->getTransportName ();
112
-
113
- return $ this ->context ->createQueue ($ transportName );
17
+ parent ::__construct ($ context , ...$ args );
114
18
}
115
19
116
20
public function setupBroker (LoggerInterface $ logger = null ): void
@@ -122,29 +26,21 @@ public function setupBroker(LoggerInterface $logger = null): void
122
26
};
123
27
124
28
// setup router
125
- $ routerQueue = $ this ->createQueue ($ this ->config ->getRouterQueueName ());
29
+ $ routerQueue = $ this ->createQueue ($ this ->getConfig () ->getRouterQueueName ());
126
30
$ log ('Create router queue: %s ' , $ routerQueue ->getQueueName ());
127
- $ this ->context ->createConsumer ($ routerQueue );
31
+ $ this ->getContext () ->createConsumer ($ routerQueue );
128
32
129
33
// setup queues
130
- foreach ($ this ->queueMetaRegistry ->getQueuesMeta () as $ meta ) {
131
- $ queue = $ this ->createQueue ($ meta ->getClientName ());
34
+ $ declaredQueues = [];
35
+ foreach ($ this ->getRouteCollection ()->all () as $ route ) {
36
+ /** @var RdKafkaTopic $queue */
37
+ $ queue = $ this ->createRouteQueue ($ route );
38
+ if (array_key_exists ($ queue ->getQueueName (), $ declaredQueues )) {
39
+ continue ;
40
+ }
41
+
132
42
$ log ('Create processor queue: %s ' , $ queue ->getQueueName ());
133
- $ this ->context ->createConsumer ($ queue );
43
+ $ this ->getContext () ->createConsumer ($ queue );
134
44
}
135
45
}
136
-
137
- public function getConfig (): Config
138
- {
139
- return $ this ->config ;
140
- }
141
-
142
- private function createRouterTopic (): RdKafkaTopic
143
- {
144
- $ topic = $ this ->context ->createTopic (
145
- $ this ->config ->createTransportRouterTopicName ($ this ->config ->getRouterTopicName ())
146
- );
147
-
148
- return $ topic ;
149
- }
150
46
}
0 commit comments