Skip to content

Commit 3cf596f

Browse files
authored
Merge pull request #512 from php-enqueue/migrate-to-queue-interop-07
PHP 7.1+. Queue Interop typed interfaces.
2 parents 3c65a54 + c63fb80 commit 3cf596f

File tree

182 files changed

+2013
-4825
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

182 files changed

+2013
-4825
lines changed

bin/pre-commit

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,18 @@ function runPhpLint()
7575
{
7676
global $phpBin, $projectRootDir;
7777

78-
$filesWithErrors = array();
78+
$output = [];
7979
foreach (getFilesToFix() as $file) {
80-
$output = '';
80+
$commandOutput = null;
8181
$returnCode = null;
82-
exec(sprintf('%s -l %s 2>/dev/null', $phpBin, $projectRootDir.'/'.$file), $output, $returnCode);
82+
exec(sprintf('%s -l %s', $phpBin, $projectRootDir.'/'.$file), $commandOutput, $returnCode);
8383

8484
if ($returnCode) {
85-
$filesWithErrors[] = $file;
85+
$output[] = $commandOutput;
8686
}
8787
}
8888

89-
return $filesWithErrors;
89+
return $output;
9090
}
9191

9292
function runPhpCsFixer()
@@ -145,9 +145,10 @@ echo sprintf('Found %s staged files', count(getFilesToFix())).PHP_EOL;
145145
$phpSyntaxErrors = runPhpLint();
146146
if ($phpSyntaxErrors) {
147147
echo "Php syntax errors were found in next files:" . PHP_EOL;
148-
149-
foreach ($phpSyntaxErrors as $error) {
150-
echo $error . PHP_EOL;
148+
foreach ($phpSyntaxErrors as $phpSyntaxErrors) {
149+
echo array_walk_recursive($phpSyntaxErrors, function($item) {
150+
echo $item.PHP_EOL;
151+
}) . PHP_EOL;
151152
}
152153

153154
exit(1);

composer.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
"ext-mongodb": "^1.3",
1212
"ext-rdkafka": "^3.0.3",
1313

14-
"queue-interop/amqp-interop": "^0.7.4",
15-
"queue-interop/queue-interop": "^0.6.2",
14+
"queue-interop/amqp-interop": "0.8.x-dev",
15+
"queue-interop/queue-interop": "0.7.x-dev",
1616
"bunny/bunny": "^0.2.4|^0.3|^0.4",
1717
"php-amqplib/php-amqplib": "^2.7",
1818
"doctrine/dbal": "~2.5",
@@ -33,7 +33,7 @@
3333
"require-dev": {
3434
"phpunit/phpunit": "^5.5",
3535
"phpstan/phpstan": "^0.10",
36-
"queue-interop/queue-spec": "^0.5.9@dev",
36+
"queue-interop/queue-spec": "0.6.x-dev",
3737
"symfony/browser-kit": "4.0.*",
3838
"symfony/config": "4.0.*",
3939
"symfony/process": "4.0.*",

pkg/amqp-bunny/AmqpConnectionFactory.php

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\AmqpTools\DelayStrategyAware;
77
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
88
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
9+
use Interop\Queue\PsrContext;
910

1011
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
1112
{
@@ -24,9 +25,6 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
2425
/**
2526
* @see \Enqueue\AmqpTools\ConnectionConfig for possible config formats and values
2627
*
27-
* In addition this factory accepts next options:
28-
* receive_method - Could be either basic_get or basic_consume
29-
*
3028
* @param array|string|null $config
3129
*/
3230
public function __construct($config = 'amqp:')
@@ -37,21 +35,12 @@ public function __construct($config = 'amqp:')
3735
->addDefaultOption('tcp_nodelay', null)
3836
->parse()
3937
;
40-
41-
$supportedMethods = ['basic_get', 'basic_consume'];
42-
if (false == in_array($this->config->getOption('receive_method'), $supportedMethods, true)) {
43-
throw new \LogicException(sprintf(
44-
'Invalid "receive_method" option value "%s". It could be only "%s"',
45-
$this->config->getOption('receive_method'),
46-
implode('", "', $supportedMethods)
47-
));
48-
}
4938
}
5039

5140
/**
5241
* @return AmqpContext
5342
*/
54-
public function createContext()
43+
public function createContext(): PsrContext
5544
{
5645
if ($this->config->isLazy()) {
5746
$context = new AmqpContext(function () {
@@ -72,18 +61,12 @@ public function createContext()
7261
return $context;
7362
}
7463

75-
/**
76-
* @return ConnectionConfig
77-
*/
78-
public function getConfig()
64+
public function getConfig(): ConnectionConfig
7965
{
8066
return $this->config;
8167
}
8268

83-
/**
84-
* @return BunnyClient
85-
*/
86-
private function establishConnection()
69+
private function establishConnection(): BunnyClient
8770
{
8871
if ($this->config->isSslOn()) {
8972
throw new \LogicException('The bunny library does not support SSL connections');

pkg/amqp-bunny/AmqpConsumer.php

Lines changed: 26 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
1010
use Interop\Queue\InvalidMessageException;
1111
use Interop\Queue\PsrMessage;
12+
use Interop\Queue\PsrQueue;
1213

1314
class AmqpConsumer implements InteropAmqpConsumer
1415
{
@@ -27,16 +28,6 @@ class AmqpConsumer implements InteropAmqpConsumer
2728
*/
2829
private $queue;
2930

30-
/**
31-
* @var Buffer
32-
*/
33-
private $buffer;
34-
35-
/**
36-
* @var string
37-
*/
38-
private $receiveMethod;
39-
4031
/**
4132
* @var int
4233
*/
@@ -47,108 +38,86 @@ class AmqpConsumer implements InteropAmqpConsumer
4738
*/
4839
private $consumerTag;
4940

50-
/**
51-
* @param AmqpContext $context
52-
* @param InteropAmqpQueue $queue
53-
* @param Buffer $buffer
54-
* @param string $receiveMethod
55-
*/
56-
public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
41+
public function __construct(AmqpContext $context, InteropAmqpQueue $queue)
5742
{
5843
$this->context = $context;
5944
$this->channel = $context->getBunnyChannel();
6045
$this->queue = $queue;
61-
$this->buffer = $buffer;
62-
$this->receiveMethod = $receiveMethod;
6346
$this->flags = self::FLAG_NOPARAM;
6447
}
6548

66-
/**
67-
* {@inheritdoc}
68-
*/
69-
public function setConsumerTag($consumerTag)
49+
public function setConsumerTag(string $consumerTag = null): void
7050
{
7151
$this->consumerTag = $consumerTag;
7252
}
7353

74-
/**
75-
* {@inheritdoc}
76-
*/
77-
public function getConsumerTag()
54+
public function getConsumerTag(): ?string
7855
{
7956
return $this->consumerTag;
8057
}
8158

82-
/**
83-
* {@inheritdoc}
84-
*/
85-
public function clearFlags()
59+
public function clearFlags(): void
8660
{
8761
$this->flags = self::FLAG_NOPARAM;
8862
}
8963

90-
/**
91-
* {@inheritdoc}
92-
*/
93-
public function addFlag($flag)
64+
public function addFlag(int $flag): void
9465
{
9566
$this->flags |= $flag;
9667
}
9768

98-
/**
99-
* {@inheritdoc}
100-
*/
101-
public function getFlags()
69+
public function getFlags(): int
10270
{
10371
return $this->flags;
10472
}
10573

106-
/**
107-
* {@inheritdoc}
108-
*/
109-
public function setFlags($flags)
74+
public function setFlags(int $flags): void
11075
{
11176
$this->flags = $flags;
11277
}
11378

11479
/**
115-
* {@inheritdoc}
80+
* @return InteropAmqpQueue
11681
*/
117-
public function getQueue()
82+
public function getQueue(): PsrQueue
11883
{
11984
return $this->queue;
12085
}
12186

12287
/**
123-
* {@inheritdoc}
88+
* @return InteropAmqpMessage
12489
*/
125-
public function receive($timeout = 0)
90+
public function receive(int $timeout = 0): ?PsrMessage
12691
{
127-
if ('basic_get' == $this->receiveMethod) {
128-
return $this->receiveBasicGet($timeout);
129-
}
92+
$end = microtime(true) + ($timeout / 1000);
93+
94+
while (0 === $timeout || microtime(true) < $end) {
95+
if ($message = $this->receiveNoWait()) {
96+
return $message;
97+
}
13098

131-
if ('basic_consume' == $this->receiveMethod) {
132-
return $this->receiveBasicConsume($timeout);
99+
usleep(100000); //100ms
133100
}
134101

135-
throw new \LogicException('The "receiveMethod" is not supported');
102+
return null;
136103
}
137104

138105
/**
139-
* {@inheritdoc}
106+
* @return InteropAmqpMessage
140107
*/
141-
public function receiveNoWait()
108+
public function receiveNoWait(): ?PsrMessage
142109
{
143110
if ($message = $this->channel->get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) {
144111
return $this->context->convertMessage($message);
145112
}
113+
114+
return null;
146115
}
147116

148117
/**
149118
* @param InteropAmqpMessage $message
150119
*/
151-
public function acknowledge(PsrMessage $message)
120+
public function acknowledge(PsrMessage $message): void
152121
{
153122
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
154123

@@ -158,76 +127,12 @@ public function acknowledge(PsrMessage $message)
158127

159128
/**
160129
* @param InteropAmqpMessage $message
161-
* @param bool $requeue
162130
*/
163-
public function reject(PsrMessage $message, $requeue = false)
131+
public function reject(PsrMessage $message, bool $requeue = false): void
164132
{
165133
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
166134

167135
$bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], '');
168136
$this->channel->reject($bunnyMessage, $requeue);
169137
}
170-
171-
/**
172-
* @param int $timeout
173-
*
174-
* @return InteropAmqpMessage|null
175-
*/
176-
private function receiveBasicGet($timeout)
177-
{
178-
$end = microtime(true) + ($timeout / 1000);
179-
180-
while (0 === $timeout || microtime(true) < $end) {
181-
if ($message = $this->receiveNoWait()) {
182-
return $message;
183-
}
184-
185-
usleep(100000); //100ms
186-
}
187-
}
188-
189-
/**
190-
* @param int $timeout
191-
*
192-
* @return InteropAmqpMessage|null
193-
*/
194-
private function receiveBasicConsume($timeout)
195-
{
196-
if (false == $this->consumerTag) {
197-
$this->context->subscribe($this, function (InteropAmqpMessage $message) {
198-
$this->buffer->push($message->getConsumerTag(), $message);
199-
200-
return false;
201-
});
202-
}
203-
204-
if ($message = $this->buffer->pop($this->consumerTag)) {
205-
return $message;
206-
}
207-
208-
while (true) {
209-
$start = microtime(true);
210-
211-
$this->context->consume($timeout);
212-
213-
if ($message = $this->buffer->pop($this->consumerTag)) {
214-
return $message;
215-
}
216-
217-
// is here when consumed message is not for this consumer
218-
219-
// as timeout is infinite have to continue consumption, but it can overflow message buffer
220-
if ($timeout <= 0) {
221-
continue;
222-
}
223-
224-
// compute remaining timeout and continue until time is up
225-
$stop = microtime(true);
226-
$timeout -= ($stop - $start) * 1000;
227-
228-
if ($timeout <= 0) {
229-
break;
230-
}
231-
}
232-
}
233138
}

0 commit comments

Comments
 (0)