2
2
3
3
namespace Enqueue \Client \Driver ;
4
4
5
- use Enqueue \Client \Config ;
6
- use Enqueue \Client \DriverInterface ;
7
- use Enqueue \Client \Message ;
8
- use Enqueue \Client \MessagePriority ;
9
- use Enqueue \Client \Meta \QueueMetaRegistry ;
10
5
use Enqueue \Mongodb \MongodbContext ;
11
- use Enqueue \Mongodb \MongodbDestination ;
12
- use Enqueue \Mongodb \MongodbMessage ;
13
- use Interop \Queue \PsrMessage ;
14
- use Interop \Queue \PsrQueue ;
15
6
use Psr \Log \LoggerInterface ;
16
7
use Psr \Log \NullLogger ;
17
8
18
- class MongodbDriver implements DriverInterface
9
+ /**
10
+ * @method MongodbContext getContext
11
+ */
12
+ class MongodbDriver extends GenericDriver
19
13
{
20
- /**
21
- * @var MongodbContext
22
- */
23
- private $ context ;
24
-
25
- /**
26
- * @var Config
27
- */
28
- private $ config ;
29
-
30
- /**
31
- * @var QueueMetaRegistry
32
- */
33
- private $ queueMetaRegistry ;
34
-
35
- /**
36
- * @var array
37
- */
38
- private static $ priorityMap = [
39
- MessagePriority::VERY_LOW => 0 ,
40
- MessagePriority::LOW => 1 ,
41
- MessagePriority::NORMAL => 2 ,
42
- MessagePriority::HIGH => 3 ,
43
- MessagePriority::VERY_HIGH => 4 ,
44
- ];
45
-
46
- public function __construct (MongodbContext $ context , Config $ config , QueueMetaRegistry $ queueMetaRegistry )
47
- {
48
- $ this ->context = $ context ;
49
- $ this ->config = $ config ;
50
- $ this ->queueMetaRegistry = $ queueMetaRegistry ;
51
- }
52
-
53
- /**
54
- * @return MongodbMessage
55
- */
56
- public function createTransportMessage (Message $ message ): PsrMessage
57
- {
58
- $ properties = $ message ->getProperties ();
59
-
60
- $ headers = $ message ->getHeaders ();
61
- $ headers ['content_type ' ] = $ message ->getContentType ();
62
-
63
- $ transportMessage = $ this ->context ->createMessage ();
64
- $ transportMessage ->setBody ($ message ->getBody ());
65
- $ transportMessage ->setHeaders ($ headers );
66
- $ transportMessage ->setProperties ($ properties );
67
- $ transportMessage ->setMessageId ($ message ->getMessageId ());
68
- $ transportMessage ->setTimestamp ($ message ->getTimestamp ());
69
- $ transportMessage ->setDeliveryDelay ($ message ->getDelay ());
70
- $ transportMessage ->setReplyTo ($ message ->getReplyTo ());
71
- $ transportMessage ->setCorrelationId ($ message ->getCorrelationId ());
72
- if (array_key_exists ($ message ->getPriority (), self ::$ priorityMap )) {
73
- $ transportMessage ->setPriority (self ::$ priorityMap [$ message ->getPriority ()]);
74
- }
75
-
76
- return $ transportMessage ;
77
- }
78
-
79
- /**
80
- * @param MongodbMessage $message
81
- */
82
- public function createClientMessage (PsrMessage $ message ): Message
14
+ public function __construct (MongodbContext $ context , ...$ args )
83
15
{
84
- $ clientMessage = new Message ();
85
-
86
- $ clientMessage ->setBody ($ message ->getBody ());
87
- $ clientMessage ->setHeaders ($ message ->getHeaders ());
88
- $ clientMessage ->setProperties ($ message ->getProperties ());
89
-
90
- $ clientMessage ->setContentType ($ message ->getHeader ('content_type ' ));
91
- $ clientMessage ->setMessageId ($ message ->getMessageId ());
92
- $ clientMessage ->setTimestamp ($ message ->getTimestamp ());
93
- $ clientMessage ->setDelay ($ message ->getDeliveryDelay ());
94
- $ clientMessage ->setReplyTo ($ message ->getReplyTo ());
95
- $ clientMessage ->setCorrelationId ($ message ->getCorrelationId ());
96
-
97
- $ priorityMap = array_flip (self ::$ priorityMap );
98
- $ priority = array_key_exists ($ message ->getPriority (), $ priorityMap ) ?
99
- $ priorityMap [$ message ->getPriority ()] :
100
- MessagePriority::NORMAL ;
101
- $ clientMessage ->setPriority ($ priority );
102
-
103
- return $ clientMessage ;
104
- }
105
-
106
- public function sendToRouter (Message $ message ): void
107
- {
108
- if (false == $ message ->getProperty (Config::PARAMETER_TOPIC_NAME )) {
109
- throw new \LogicException ('Topic name parameter is required but is not set ' );
110
- }
111
-
112
- $ queue = $ this ->createQueue ($ this ->config ->getRouterQueueName ());
113
- $ transportMessage = $ this ->createTransportMessage ($ message );
114
-
115
- $ this ->context ->createProducer ()->send ($ queue , $ transportMessage );
116
- }
117
-
118
- public function sendToProcessor (Message $ message ): void
119
- {
120
- if (false == $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
121
- throw new \LogicException ('Processor name parameter is required but is not set ' );
122
- }
123
-
124
- if (false == $ queueName = $ message ->getProperty (Config::PARAMETER_PROCESSOR_QUEUE_NAME )) {
125
- throw new \LogicException ('Queue name parameter is required but is not set ' );
126
- }
127
-
128
- $ transportMessage = $ this ->createTransportMessage ($ message );
129
- $ destination = $ this ->createQueue ($ queueName );
130
-
131
- $ this ->context ->createProducer ()->send ($ destination , $ transportMessage );
132
- }
133
-
134
- /**
135
- * @return MongodbDestination
136
- */
137
- public function createQueue (string $ queueName ): PsrQueue
138
- {
139
- $ transportName = $ this ->queueMetaRegistry ->getQueueMeta ($ queueName )->getTransportName ();
140
-
141
- return $ this ->context ->createQueue ($ transportName );
16
+ parent ::__construct ($ context , ...$ args );
142
17
}
143
18
144
19
public function setupBroker (LoggerInterface $ logger = null ): void
@@ -147,18 +22,9 @@ public function setupBroker(LoggerInterface $logger = null): void
147
22
$ log = function ($ text , ...$ args ) use ($ logger ) {
148
23
$ logger ->debug (sprintf ('[MongodbDriver] ' .$ text , ...$ args ));
149
24
};
150
- $ contextConfig = $ this ->context ->getConfig ();
151
- $ log ('Creating database and collection: "%s" "%s" ' , $ contextConfig ['dbname ' ], $ contextConfig ['collection_name ' ]);
152
- $ this ->context ->createCollection ();
153
- }
154
25
155
- public function getConfig (): Config
156
- {
157
- return $ this ->config ;
158
- }
159
-
160
- public static function getPriorityMap (): array
161
- {
162
- return self ::$ priorityMap ;
26
+ $ contextConfig = $ this ->getContext ()->getConfig ();
27
+ $ log ('Creating database and collection: "%s" "%s" ' , $ contextConfig ['dbname ' ], $ contextConfig ['collection_name ' ]);
28
+ $ this ->getContext ()->createCollection ();
163
29
}
164
30
}
0 commit comments