Skip to content

Commit 4a7a8d1

Browse files
committed
develop
1 parent 7f06a71 commit 4a7a8d1

6 files changed

+22
-53
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ services:
4545
- RDKAFKA_PORT=9092
4646
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
4747
- GCLOUD_PROJECT=mqdev
48-
- MONGO_CONNECTION_STRING=mongodb://127.0.0.1/
48+
- MONGO_DSN=mongodb://127.0.0.1/
4949

5050
rabbitmq:
5151
image: 'enqueue/rabbitmq:latest'

pkg/mongodb/MongodbConnectionFactory.php

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,23 @@
77

88
class MongodbConnectionFactory implements PsrConnectionFactory
99
{
10-
/**
11-
* @var string
12-
*/
13-
private $uri;
14-
1510
/**
1611
* @var array
1712
*/
1813
private $config;
1914

20-
/**
21-
* @var array
22-
*/
23-
private $uriOptions;
24-
25-
/**
26-
* @var array
27-
*/
28-
private $driverOptions;
29-
30-
public function __construct($uri = 'mongodb://127.0.0.1/', array $config = [], array $uriOptions = [], array $driverOptions = [])
15+
public function __construct(array $params)
3116
{
32-
$this->uri = $uri;
33-
$this->config = $config;
34-
$this->uriOptions = $uriOptions;
35-
$this->driverOptions = $driverOptions;
17+
$this->config = array_replace([
18+
'uri' => 'mongodb://127.0.0.1/',
19+
'uriOptions' => [],
20+
'driverOptions' => [],
21+
], $params);
3622
}
3723

3824
public function createContext()
3925
{
40-
$client = new Client($this->uri, $this->uriOptions, $this->driverOptions);
26+
$client = new Client($this->config['uri'], $this->config['uriOptions'], $this->config['driverOptions']);
4127

4228
return new MongodbContext($client, $this->config);
4329
}

pkg/mongodb/MongodbConsumer.php

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Enqueue\Mongodb;
44

5-
use Enqueue\Util\JSON;
65
use Interop\Queue\InvalidMessageException;
76
use Interop\Queue\PsrConsumer;
87
use Interop\Queue\PsrMessage;
@@ -134,24 +133,18 @@ public function reject(PsrMessage $message, $requeue = false)
134133
*/
135134
protected function receiveMessage()
136135
{
137-
try {
138-
$now = time();
139-
$collection = $this->context->getCollection();
140-
$message = $collection->findOne(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]], ['sort' => ['priority' => -1]]);
141-
if (!$message) {
142-
return null;
143-
}
144-
$mongodbMessage = $message->getArrayCopy();
145-
$convertedMessage = $this->convertMessage($mongodbMessage);
146-
$affected = $collection->deleteOne(['_id' => $mongodbMessage['_id']]);
147-
if (1 !== $affected->getDeletedCount()) {
148-
throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $mongodbMessage['_id']->__toString()));
149-
}
136+
$now = time();
137+
$collection = $this->context->getCollection();
138+
$message = $collection->findOneAndDelete(['$or' => [['delayed_until' => ['$exists' => false]], ['delayed_until' => ['$lte' => $now]]]],
139+
['sort' => ['priority' => -1], 'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
150140

151-
return $convertedMessage;
152-
} catch (\Exception $e) {
153-
throw $e;
141+
if (!$message) {
142+
return null;
154143
}
144+
145+
$convertedMessage = $this->convertMessage($message);
146+
147+
return $convertedMessage;
155148
}
156149

157150
/**
@@ -161,21 +154,12 @@ protected function receiveMessage()
161154
*/
162155
protected function convertMessage(array $mongodbMessage)
163156
{
164-
$message = $this->context->createMessage();
157+
$message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']);
165158
$message->setId($mongodbMessage['_id']->__toString());
166-
$message->setBody($mongodbMessage['body']);
167159
$message->setPriority((int) $mongodbMessage['priority']);
168160
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
169161
$message->setPublishedAt((int) $mongodbMessage['published_at']);
170162

171-
if ($mongodbMessage['headers']) {
172-
$message->setHeaders(JSON::decode($mongodbMessage['headers']));
173-
}
174-
175-
if ($mongodbMessage['properties']) {
176-
$message->setProperties(JSON::decode($mongodbMessage['properties']));
177-
}
178-
179163
return $message;
180164
}
181165
}

pkg/mongodb/MongodbProducer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
123123
$collection = $this->context->getCollection();
124124
$collection->insertOne($mongoMessage);
125125
} catch (\Exception $e) {
126-
throw new Exception('The transport fails to send the message due to some internal error.', null, $e);
126+
throw new Exception('The transport has failed to send the message due to some internal error.', null, $e);
127127
}
128128
}
129129

pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ trait CreateMongodbContextTrait
88
{
99
protected function createMongodbContext()
1010
{
11-
if (false == $env = getenv('MONGO_CONNECTION_STRING')) {
12-
$this->markTestSkipped('The MONGO_CONNECTION_STRING env is not available. Skip tests');
11+
if (false == $env = getenv('MONGO_DSN')) {
12+
$this->markTestSkipped('The MONGO_DSN env is not available. Skip tests');
1313
}
1414

1515
$factory = new MongodbConnectionFactory($env);

pkg/mongodb/composer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
"require": {
1313
"php": ">=5.6",
1414
"queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1",
15-
"php-http/client-common": "^1.7@dev",
1615
"mongodb/mongodb": "^1.3"
1716
},
1817
"require-dev": {

0 commit comments

Comments
 (0)