Skip to content

Commit 7f06a71

Browse files
committed
initial commit
1 parent 9adbf01 commit 7f06a71

32 files changed

+1980
-2
lines changed

composer.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"enqueue/fs": "*@dev",
1717
"enqueue/null": "*@dev",
1818
"enqueue/dbal": "*@dev",
19+
"enqueue/mongodb": "*@dev",
1920
"enqueue/sqs": "*@dev",
2021
"enqueue/pheanstalk": "*@dev",
2122
"enqueue/gearman": "*@dev",
@@ -41,7 +42,8 @@
4142
"symfony/console": "^2.8|^3|^4",
4243
"friendsofphp/php-cs-fixer": "^2",
4344
"empi89/php-amqp-stubs": "*@dev",
44-
"php-http/client-common": "^1.7@dev"
45+
"php-http/client-common": "^1.7@dev",
46+
"mongodb/mongodb": "^1.3"
4547
},
4648
"autoload": {
4749
"files": [
@@ -144,5 +146,9 @@
144146
"type": "path",
145147
"url": "pkg/async-event-dispatcher"
146148
}
149+
{
150+
"type": "path",
151+
"url": "pkg/mongodb"
152+
}
147153
]
148154
}

docker-compose.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ services:
1313
- zookeeper
1414
- google-pubsub
1515
- rabbitmqssl
16+
- mongo
1617
volumes:
1718
- './:/mqdev'
1819
environment:
@@ -24,7 +25,7 @@ services:
2425
- RABBITMQ_PASSWORD=guest
2526
- RABBITMQ_VHOST=mqdev
2627
- RABBITMQ_AMQP__PORT=5672
27-
- RABBITMQ_STOMP_PORT=61613
28+
- RABBITMQ_STOMP_PORT=61613
2829
- DOCTRINE_DRIVER=pdo_mysql
2930
- DOCTRINE_HOST=mysql
3031
- DOCTRINE_PORT=3306
@@ -44,6 +45,7 @@ services:
4445
- RDKAFKA_PORT=9092
4546
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
4647
- GCLOUD_PROJECT=mqdev
48+
- MONGO_CONNECTION_STRING=mongodb://127.0.0.1/
4749

4850
rabbitmq:
4951
image: 'enqueue/rabbitmq:latest'
@@ -102,6 +104,10 @@ services:
102104
image: 'google/cloud-sdk:latest'
103105
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'
104106

107+
mongo:
108+
image: mongo
109+
ports:
110+
- "27017:27017"
105111
volumes:
106112
mysql-data:
107113
driver: local

pkg/mongodb/.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Interop\Queue\PsrConnectionFactory;
6+
use MongoDB\Client;
7+
8+
class MongodbConnectionFactory implements PsrConnectionFactory
9+
{
10+
/**
11+
* @var string
12+
*/
13+
private $uri;
14+
15+
/**
16+
* @var array
17+
*/
18+
private $config;
19+
20+
/**
21+
* @var array
22+
*/
23+
private $uriOptions;
24+
25+
/**
26+
* @var array
27+
*/
28+
private $driverOptions;
29+
30+
public function __construct($uri = 'mongodb://127.0.0.1/', array $config = [], array $uriOptions = [], array $driverOptions = [])
31+
{
32+
$this->uri = $uri;
33+
$this->config = $config;
34+
$this->uriOptions = $uriOptions;
35+
$this->driverOptions = $driverOptions;
36+
}
37+
38+
public function createContext()
39+
{
40+
$client = new Client($this->uri, $this->uriOptions, $this->driverOptions);
41+
42+
return new MongodbContext($client, $this->config);
43+
}
44+
}

pkg/mongodb/MongodbConsumer.php

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Enqueue\Util\JSON;
6+
use Interop\Queue\InvalidMessageException;
7+
use Interop\Queue\PsrConsumer;
8+
use Interop\Queue\PsrMessage;
9+
10+
class MongodbConsumer implements PsrConsumer
11+
{
12+
/**
13+
* @var MongodbContext
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var MongodbDestination
19+
*/
20+
private $queue;
21+
22+
/**
23+
* @var int microseconds
24+
*/
25+
private $pollingInterval = 1000000;
26+
27+
/**
28+
* @param MongodbContext $context
29+
* @param MongodbDestination $queue
30+
*/
31+
public function __construct(MongodbContext $context, MongodbDestination $queue)
32+
{
33+
$this->context = $context;
34+
$this->queue = $queue;
35+
}
36+
37+
/**
38+
* Set polling interval in milliseconds.
39+
*
40+
* @param int $msec
41+
*/
42+
public function setPollingInterval($msec)
43+
{
44+
$this->pollingInterval = $msec * 1000;
45+
}
46+
47+
/**
48+
* Get polling interval in milliseconds.
49+
*
50+
* @return int
51+
*/
52+
public function getPollingInterval()
53+
{
54+
return (int) $this->pollingInterval / 1000;
55+
}
56+
57+
/**
58+
* {@inheritdoc}
59+
*
60+
* @return MongodbDestination
61+
*/
62+
public function getQueue()
63+
{
64+
return $this->queue;
65+
}
66+
67+
/**
68+
* {@inheritdoc}
69+
*
70+
* @return MongodbMessage|null
71+
*/
72+
public function receive($timeout = 0)
73+
{
74+
$timeout /= 1000;
75+
$startAt = microtime(true);
76+
77+
while (true) {
78+
$message = $this->receiveMessage();
79+
80+
if ($message) {
81+
return $message;
82+
}
83+
84+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
85+
return;
86+
}
87+
88+
usleep($this->pollingInterval);
89+
90+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
91+
return;
92+
}
93+
}
94+
}
95+
96+
/**
97+
* {@inheritdoc}
98+
*
99+
* @return MongodbMessage|null
100+
*/
101+
public function receiveNoWait()
102+
{
103+
return $this->receiveMessage();
104+
}
105+
106+
/**
107+
* {@inheritdoc}
108+
*
109+
* @param MongodbMessage $message
110+
*/
111+
public function acknowledge(PsrMessage $message)
112+
{
113+
// does nothing
114+
}
115+
116+
/**
117+
* {@inheritdoc}
118+
*
119+
* @param MongodbMessage $message
120+
*/
121+
public function reject(PsrMessage $message, $requeue = false)
122+
{
123+
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
124+
125+
if ($requeue) {
126+
$this->context->createProducer()->send($this->queue, $message);
127+
128+
return;
129+
}
130+
}
131+
132+
/**
133+
* @return MongodbMessage|null
134+
*/
135+
protected function receiveMessage()
136+
{
137+
try {
138+
$now = time();
139+
$collection = $this->context->getCollection();
140+
$message = $collection->findOne(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]], ['sort' => ['priority' => -1]]);
141+
if (!$message) {
142+
return null;
143+
}
144+
$mongodbMessage = $message->getArrayCopy();
145+
$convertedMessage = $this->convertMessage($mongodbMessage);
146+
$affected = $collection->deleteOne(['_id' => $mongodbMessage['_id']]);
147+
if (1 !== $affected->getDeletedCount()) {
148+
throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $mongodbMessage['_id']->__toString()));
149+
}
150+
151+
return $convertedMessage;
152+
} catch (\Exception $e) {
153+
throw $e;
154+
}
155+
}
156+
157+
/**
158+
* @param array $dbalMessage
159+
*
160+
* @return MongodbMessage
161+
*/
162+
protected function convertMessage(array $mongodbMessage)
163+
{
164+
$message = $this->context->createMessage();
165+
$message->setId($mongodbMessage['_id']->__toString());
166+
$message->setBody($mongodbMessage['body']);
167+
$message->setPriority((int) $mongodbMessage['priority']);
168+
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
169+
$message->setPublishedAt((int) $mongodbMessage['published_at']);
170+
171+
if ($mongodbMessage['headers']) {
172+
$message->setHeaders(JSON::decode($mongodbMessage['headers']));
173+
}
174+
175+
if ($mongodbMessage['properties']) {
176+
$message->setProperties(JSON::decode($mongodbMessage['properties']));
177+
}
178+
179+
return $message;
180+
}
181+
}

pkg/mongodb/MongodbContext.php

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb;
4+
5+
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrDestination;
8+
use MongoDB\Client;
9+
10+
class MongodbContext implements PsrContext
11+
{
12+
/**
13+
* @var array
14+
*/
15+
private $config;
16+
/**
17+
* @var Client
18+
*/
19+
private $client;
20+
21+
public function __construct($client, array $config = [])
22+
{
23+
$this->config = array_replace([
24+
'dbname' => 'enqueue',
25+
'collection_name' => 'enqueue',
26+
'polling_interval' => null,
27+
], $config);
28+
$this->client = $client;
29+
}
30+
31+
public function createMessage($body = '', array $properties = [], array $headers = [])
32+
{
33+
$message = new MongodbMessage();
34+
$message->setBody($body);
35+
$message->setProperties($properties);
36+
$message->setHeaders($headers);
37+
38+
return $message;
39+
}
40+
41+
public function createTopic($name)
42+
{
43+
return new MongodbDestination($name);
44+
}
45+
46+
public function createQueue($queueName)
47+
{
48+
return new MongodbDestination($queueName);
49+
}
50+
51+
public function createTemporaryQueue()
52+
{
53+
throw new \BadMethodCallException('Mongodb transport does not support temporary queues');
54+
}
55+
56+
public function createProducer()
57+
{
58+
return new MongodbProducer($this);
59+
}
60+
61+
public function createConsumer(PsrDestination $destination)
62+
{
63+
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
64+
65+
$consumer = new MongodbConsumer($this, $destination);
66+
67+
if (isset($this->config['polling_interval'])) {
68+
$consumer->setPollingInterval($this->config['polling_interval']);
69+
}
70+
71+
return $consumer;
72+
}
73+
74+
public function close()
75+
{
76+
// TODO: Implement close() method.
77+
}
78+
79+
public function getCollection()
80+
{
81+
return $this->client->selectDatabase($this->config['dbname'])->selectCollection($this->config['collection_name']);
82+
}
83+
84+
/**
85+
* @return Client
86+
*/
87+
public function getClient()
88+
{
89+
return $this->client;
90+
}
91+
92+
/**
93+
* @return array
94+
*/
95+
public function getConfig()
96+
{
97+
return $this->config;
98+
}
99+
}

0 commit comments

Comments
 (0)