Skip to content

Commit 92d9698

Browse files
committed
wamp
1 parent 7830b80 commit 92d9698

13 files changed

+304
-27
lines changed

pkg/test/WampExtension.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Enqueue\Test;
4+
5+
use Enqueue\Wamp\WampConnectionFactory;
6+
use Enqueue\Wamp\WampContext;
7+
8+
trait WampExtension
9+
{
10+
private function buildWampContext(): WampContext
11+
{
12+
if (false == $dsn = getenv('WAMP_DSN')) {
13+
throw new \PHPUnit_Framework_SkippedTestError('Functional tests are not allowed in this environment');
14+
}
15+
16+
return (new WampConnectionFactory($dsn))->createContext();
17+
}
18+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Functional;
4+
5+
use Enqueue\Test\WampExtension;
6+
use Enqueue\Wamp\WampMessage;
7+
use PHPUnit\Framework\TestCase;
8+
use Psr\Log\NullLogger;
9+
use Thruway\Logging\Logger;
10+
11+
/**
12+
* @group functional
13+
* @group Wamp
14+
*/
15+
class WampConsumerTest extends TestCase
16+
{
17+
use WampExtension;
18+
19+
public static function setUpBeforeClass()
20+
{
21+
Logger::set(new NullLogger());
22+
}
23+
24+
public function testShouldSendAndReceiveMessage()
25+
{
26+
$context = $this->buildWampContext();
27+
$topic = $context->createTopic('topic');
28+
$consumer = $context->createConsumer($topic);
29+
$producer = $context->createProducer();
30+
$message = $context->createMessage('the body');
31+
32+
// init client
33+
$consumer->receive(1);
34+
35+
$consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) {
36+
$producer->send($topic, $message);
37+
});
38+
39+
$receivedMessage = $consumer->receive(100);
40+
41+
$this->assertInstanceOf(WampMessage::class, $receivedMessage);
42+
$this->assertSame('the body', $receivedMessage->getBody());
43+
}
44+
45+
public function testShouldSendAndReceiveNoWaitMessage()
46+
{
47+
$context = $this->buildWampContext();
48+
$topic = $context->createTopic('topic');
49+
$consumer = $context->createConsumer($topic);
50+
$producer = $context->createProducer();
51+
$message = $context->createMessage('the body');
52+
53+
// init client
54+
$consumer->receive(1);
55+
56+
$consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) {
57+
$producer->send($topic, $message);
58+
});
59+
60+
$receivedMessage = $consumer->receiveNoWait();
61+
62+
$this->assertInstanceOf(WampMessage::class, $receivedMessage);
63+
$this->assertSame('the body', $receivedMessage->getBody());
64+
}
65+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Functional;
4+
5+
use Enqueue\Test\WampExtension;
6+
use Enqueue\Wamp\WampMessage;
7+
use PHPUnit\Framework\TestCase;
8+
use Psr\Log\NullLogger;
9+
use Thruway\Logging\Logger;
10+
11+
/**
12+
* @group functional
13+
* @group Wamp
14+
*/
15+
class WampSubscriptionConsumerTest extends TestCase
16+
{
17+
use WampExtension;
18+
19+
public static function setUpBeforeClass()
20+
{
21+
Logger::set(new NullLogger());
22+
}
23+
24+
public function testShouldSendAndReceiveMessage()
25+
{
26+
$context = $this->buildWampContext();
27+
$topic = $context->createTopic('topic');
28+
$consumer = $context->createSubscriptionConsumer();
29+
$producer = $context->createProducer();
30+
$message = $context->createMessage('the body');
31+
32+
$receivedMessage = null;
33+
$consumer->subscribe($context->createConsumer($topic), function ($message) use (&$receivedMessage) {
34+
$receivedMessage = $message;
35+
36+
return false;
37+
});
38+
39+
// init client
40+
$consumer->consume(1);
41+
42+
$consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) {
43+
$producer->send($topic, $message);
44+
});
45+
46+
$consumer->consume(100);
47+
48+
$this->assertInstanceOf(WampMessage::class, $receivedMessage);
49+
$this->assertSame('the body', $receivedMessage->getBody());
50+
}
51+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Wamp\WampConnectionFactory;
6+
use Interop\Queue\Spec\ConnectionFactorySpec;
7+
8+
/**
9+
* @group Wamp
10+
*/
11+
class WampConnectionFactoryTest extends ConnectionFactorySpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createConnectionFactory()
17+
{
18+
return new WampConnectionFactory();
19+
}
20+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Test\WampExtension;
6+
use Interop\Queue\Spec\ContextSpec;
7+
8+
/**
9+
* @group functional
10+
* @group Wamp
11+
*/
12+
class WampContextTest extends ContextSpec
13+
{
14+
use WampExtension;
15+
16+
/**
17+
* {@inheritdoc}
18+
*/
19+
protected function createContext()
20+
{
21+
return $this->buildWampContext();
22+
}
23+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Wamp\WampMessage;
6+
use Interop\Queue\Spec\MessageSpec;
7+
8+
/**
9+
* @group Wamp
10+
*/
11+
class WampMessageTest extends MessageSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createMessage()
17+
{
18+
return new WampMessage();
19+
}
20+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Test\WampExtension;
6+
use Interop\Queue\Spec\ProducerSpec;
7+
8+
/**
9+
* @group functional
10+
* @group Wamp
11+
*/
12+
class WampProducerTest extends ProducerSpec
13+
{
14+
use WampExtension;
15+
16+
/**
17+
* {@inheritdoc}
18+
*/
19+
protected function createProducer()
20+
{
21+
return $this->buildWampContext()->createProducer();
22+
}
23+
}

pkg/wamp/Tests/Spec/WampQueueTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Wamp\WampDestination;
6+
use Interop\Queue\Spec\QueueSpec;
7+
8+
/**
9+
* @group Wamp
10+
*/
11+
class WampQueueTest extends QueueSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createQueue()
17+
{
18+
return new WampDestination(self::EXPECTED_QUEUE_NAME);
19+
}
20+
}

pkg/wamp/Tests/Spec/WampTopicTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\Wamp\Tests\Spec;
4+
5+
use Enqueue\Wamp\WampDestination;
6+
use Interop\Queue\Spec\TopicSpec;
7+
8+
/**
9+
* @group Wamp
10+
*/
11+
class WampTopicTest extends TopicSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createTopic()
17+
{
18+
return new WampDestination(self::EXPECTED_TOPIC_NAME);
19+
}
20+
}

pkg/wamp/WampConsumer.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public function getQueue(): Queue
5050
return $this->queue;
5151
}
5252

53+
public function getClient(): ?Client
54+
{
55+
return $this->client;
56+
}
57+
5358
public function receive(int $timeout = 0): ?Message
5459
{
5560
$init = false;
@@ -59,7 +64,7 @@ public function receive(int $timeout = 0): ?Message
5964
if (null === $this->client) {
6065
$init = true;
6166

62-
$this->client = $this->context->getClient();
67+
$this->client = $this->context->getNewClient();
6368
$this->client->setAttemptRetry(true);
6469
$this->client->on('open', function (ClientSession $session) {
6570

@@ -80,7 +85,10 @@ public function receive(int $timeout = 0): ?Message
8085
}
8186

8287
if ($timeout > 0) {
83-
$this->timer = $this->client->getLoop()->addTimer($timeout / 1000, function () {
88+
$timeout = $timeout / 1000;
89+
$timeout = $timeout >= 0.1 ? $timeout : 0.1;
90+
91+
$this->timer = $this->client->getLoop()->addTimer($timeout, function () {
8492
$this->client->emit('do-stop');
8593
});
8694
}

pkg/wamp/WampContext.php

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
class WampContext implements Context
2020
{
2121
/**
22-
* @var Client
22+
* @var Client[]
2323
*/
24-
private $client;
24+
private $clients;
2525

2626
/**
2727
* @var callable
@@ -86,33 +86,34 @@ public function purgeQueue(Queue $queue): void
8686

8787
public function close(): void
8888
{
89-
if (null === $this->client) {
90-
return;
91-
}
89+
foreach ($this->clients as $client) {
90+
if (null === $client) {
91+
return;
92+
}
9293

93-
if (null === $this->client->getSession()) {
94-
return;
95-
}
94+
if (null === $client->getSession()) {
95+
return;
96+
}
9697

97-
$this->client->setAttemptRetry(false);
98-
$this->client->getSession()->close();
98+
$client->setAttemptRetry(false);
99+
$client->getSession()->close();
100+
}
99101
}
100102

101-
public function getClient(): Client
103+
public function getNewClient(): Client
102104
{
103-
if (false == $this->client) {
104-
$client = call_user_func($this->clientFactory);
105-
if (false == $client instanceof Client) {
106-
throw new \LogicException(sprintf(
107-
'The factory must return instance of "%s". But it returns %s',
108-
Client::class,
109-
is_object($client) ? get_class($client) : gettype($client)
110-
));
111-
}
105+
$client = call_user_func($this->clientFactory);
112106

113-
$this->client = $client;
107+
if (false == $client instanceof Client) {
108+
throw new \LogicException(sprintf(
109+
'The factory must return instance of "%s". But it returns %s',
110+
Client::class,
111+
is_object($client) ? get_class($client) : gettype($client)
112+
));
114113
}
115114

116-
return $this->client;
115+
$this->clients[] = $client;
116+
117+
return $client;
117118
}
118119
}

pkg/wamp/WampProducer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function send(Destination $destination, Message $message): void
6565
if (null === $this->client) {
6666
$init = true;
6767

68-
$this->client = $this->context->getClient();
68+
$this->client = $this->context->getNewClient();
6969
$this->client->setAttemptRetry(true);
7070
$this->client->on('open', function (ClientSession $session) {
7171
$this->session = $session;

0 commit comments

Comments
 (0)