2
2
3
3
namespace Enqueue \Client \Driver ;
4
4
5
- use Enqueue \Client \Config ;
6
- use Enqueue \Client \DriverInterface ;
7
5
use Enqueue \Client \Message ;
8
- use Enqueue \Client \Meta \QueueMetaRegistry ;
9
6
use Enqueue \Stomp \StompContext ;
10
7
use Enqueue \Stomp \StompDestination ;
11
8
use Enqueue \Stomp \StompMessage ;
12
9
use Interop \Queue \PsrMessage ;
13
10
use Interop \Queue \PsrQueue ;
11
+ use Interop \Queue \PsrTopic ;
14
12
use Psr \Log \LoggerInterface ;
15
13
use Psr \Log \NullLogger ;
16
14
17
- class StompDriver implements DriverInterface
15
+ /**
16
+ * @method StompContext getContext
17
+ */
18
+ class StompDriver extends GenericDriver
18
19
{
19
- /**
20
- * @var StompContext
21
- */
22
- private $ context ;
23
-
24
- /**
25
- * @var Config
26
- */
27
- private $ config ;
28
-
29
- /**
30
- * @var QueueMetaRegistry
31
- */
32
- private $ queueMetaRegistry ;
33
-
34
- public function __construct (StompContext $ context , Config $ config , QueueMetaRegistry $ queueMetaRegistry )
20
+ public function __construct (StompContext $ context , ...$ args )
35
21
{
36
- $ this ->context = $ context ;
37
- $ this ->config = $ config ;
38
- $ this ->queueMetaRegistry = $ queueMetaRegistry ;
39
- }
40
-
41
- public function sendToRouter (Message $ message ): void
42
- {
43
- if (false == $ message ->getProperty (Config::PARAMETER_TOPIC_NAME )) {
44
- throw new \LogicException ('Topic name parameter is required but is not set ' );
45
- }
46
-
47
- $ topic = $ this ->createRouterTopic ();
48
- $ transportMessage = $ this ->createTransportMessage ($ message );
49
-
50
- $ this ->context ->createProducer ()->send ($ topic , $ transportMessage );
51
- }
52
-
53
- public function sendToProcessor (Message $ message ): void
54
- {
55
- if (false == $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
56
- throw new \LogicException ('Processor name parameter is required but is not set ' );
57
- }
58
-
59
- if (false == $ queueName = $ message ->getProperty (Config::PARAMETER_PROCESSOR_QUEUE_NAME )) {
60
- throw new \LogicException ('Queue name parameter is required but is not set ' );
61
- }
62
-
63
- $ transportMessage = $ this ->createTransportMessage ($ message );
64
- $ destination = $ this ->createQueue ($ queueName );
65
-
66
- $ this ->context ->createProducer ()->send ($ destination , $ transportMessage );
22
+ parent ::__construct ($ context , ...$ args );
67
23
}
68
24
69
25
public function setupBroker (LoggerInterface $ logger = null ): void
@@ -77,89 +33,34 @@ public function setupBroker(LoggerInterface $logger = null): void
77
33
*/
78
34
public function createTransportMessage (Message $ message ): PsrMessage
79
35
{
80
- $ headers = $ message ->getHeaders ();
81
- $ headers ['content-type ' ] = $ message ->getContentType ();
82
-
83
- $ transportMessage = $ this ->context ->createMessage ();
84
- $ transportMessage ->setHeaders ($ headers );
36
+ /** @var StompMessage $transportMessage */
37
+ $ transportMessage = parent ::createTransportMessage ($ message );
85
38
$ transportMessage ->setPersistent (true );
86
- $ transportMessage ->setBody ($ message ->getBody ());
87
- $ transportMessage ->setProperties ($ message ->getProperties ());
88
-
89
- if ($ message ->getMessageId ()) {
90
- $ transportMessage ->setMessageId ($ message ->getMessageId ());
91
- }
92
-
93
- if ($ message ->getTimestamp ()) {
94
- $ transportMessage ->setTimestamp ($ message ->getTimestamp ());
95
- }
96
-
97
- if ($ message ->getReplyTo ()) {
98
- $ transportMessage ->setReplyTo ($ message ->getReplyTo ());
99
- }
100
-
101
- if ($ message ->getCorrelationId ()) {
102
- $ transportMessage ->setCorrelationId ($ message ->getCorrelationId ());
103
- }
104
39
105
40
return $ transportMessage ;
106
41
}
107
42
108
- /**
109
- * @param StompMessage $message
110
- */
111
- public function createClientMessage (PsrMessage $ message ): Message
112
- {
113
- $ clientMessage = new Message ();
114
-
115
- $ headers = $ message ->getHeaders ();
116
- unset(
117
- $ headers ['content-type ' ],
118
- $ headers ['message_id ' ],
119
- $ headers ['timestamp ' ],
120
- $ headers ['reply-to ' ],
121
- $ headers ['correlation_id ' ]
122
- );
123
-
124
- $ clientMessage ->setHeaders ($ headers );
125
- $ clientMessage ->setBody ($ message ->getBody ());
126
- $ clientMessage ->setProperties ($ message ->getProperties ());
127
-
128
- $ clientMessage ->setContentType ($ message ->getHeader ('content-type ' ));
129
-
130
- $ clientMessage ->setMessageId ($ message ->getMessageId ());
131
- $ clientMessage ->setTimestamp ($ message ->getTimestamp ());
132
- $ clientMessage ->setReplyTo ($ message ->getReplyTo ());
133
- $ clientMessage ->setCorrelationId ($ message ->getCorrelationId ());
134
-
135
- return $ clientMessage ;
136
- }
137
-
138
43
/**
139
44
* @return StompDestination
140
45
*/
141
46
public function createQueue (string $ queueName ): PsrQueue
142
47
{
143
- $ transportName = $ this ->queueMetaRegistry ->getQueueMeta ($ queueName )->getTransportName ();
144
-
145
- $ queue = $ this ->context ->createQueue ($ transportName );
48
+ /** @var StompDestination $queue */
49
+ $ queue = parent ::createQueue ($ queueName );
146
50
$ queue ->setDurable (true );
147
51
$ queue ->setAutoDelete (false );
148
52
$ queue ->setExclusive (false );
149
53
150
54
return $ queue ;
151
55
}
152
56
153
- public function getConfig (): Config
154
- {
155
- return $ this ->config ;
156
- }
157
-
158
- private function createRouterTopic (): StompDestination
57
+ /**
58
+ * @return StompDestination
59
+ */
60
+ protected function createRouterTopic (): PsrTopic
159
61
{
160
- $ topic = $ this ->context ->createTopic (
161
- $ this ->config ->createTransportRouterTopicName ($ this ->config ->getRouterTopicName ())
162
- );
62
+ /** @var StompDestination $topic */
63
+ $ topic = parent ::createRouterTopic ();
163
64
$ topic ->setDurable (true );
164
65
$ topic ->setAutoDelete (false );
165
66
0 commit comments