diff --git a/bin/subtree-split b/bin/subtree-split index 54af4ad41..222574f13 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -65,6 +65,7 @@ remote test git@github.com:php-enqueue/test.git remote async-event-dispatcher git@github.com:php-enqueue/async-event-dispatcher.git remote async-command git@github.com:php-enqueue/async-command.git remote mongodb git@github.com:php-enqueue/mongodb.git +remote dsn git@github.com:php-enqueue/dsn.git split 'pkg/enqueue' enqueue split 'pkg/simple-client' simple-client @@ -88,3 +89,4 @@ split 'pkg/test' test split 'pkg/async-event-dispatcher' async-event-dispatcher split 'pkg/async-command' async-command split 'pkg/mongodb' mongodb +split 'pkg/dsn' dsn diff --git a/pkg/amqp-bunny/AmqpConnectionFactory.php b/pkg/amqp-bunny/AmqpConnectionFactory.php index a50c4224a..14942c860 100644 --- a/pkg/amqp-bunny/AmqpConnectionFactory.php +++ b/pkg/amqp-bunny/AmqpConnectionFactory.php @@ -1,5 +1,7 @@ content, $properties, $headers); - $message->setDeliveryTag($bunnyMessage->deliveryTag); + $message->setDeliveryTag((int) $bunnyMessage->deliveryTag); $message->setRedelivered($bunnyMessage->redelivered); $message->setRoutingKey($bunnyMessage->routingKey); diff --git a/pkg/amqp-bunny/AmqpProducer.php b/pkg/amqp-bunny/AmqpProducer.php index 86facecc6..e98b2839d 100644 --- a/pkg/amqp-bunny/AmqpProducer.php +++ b/pkg/amqp-bunny/AmqpProducer.php @@ -1,5 +1,7 @@ getHeaders(); if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) { - $amqpProperties['timestamp'] = \DateTime::createFromFormat('U', $amqpProperties['timestamp']); + $amqpProperties['timestamp'] = \DateTime::createFromFormat('U', (string) $amqpProperties['timestamp']); } if ($appProperties = $message->getProperties()) { diff --git a/pkg/amqp-bunny/AmqpSubscriptionConsumer.php b/pkg/amqp-bunny/AmqpSubscriptionConsumer.php index 4da820e6f..6aea17c42 100644 --- a/pkg/amqp-bunny/AmqpSubscriptionConsumer.php +++ b/pkg/amqp-bunny/AmqpSubscriptionConsumer.php @@ -1,5 +1,7 @@ method('ack') ->with($this->isInstanceOf(Message::class)) ->willReturnCallback(function (Message $message) { - $this->assertSame('theDeliveryTag', $message->deliveryTag); + $this->assertSame(145, $message->deliveryTag); }); $context = $this->createContextMock(); @@ -81,7 +81,7 @@ public function testOnAcknowledgeShouldAcknowledgeMessage() $consumer = new AmqpConsumer($context, new AmqpQueue('aName')); $message = new AmqpMessage(); - $message->setDeliveryTag('theDeliveryTag'); + $message->setDeliveryTag(145); $consumer->acknowledge($message); } @@ -94,7 +94,7 @@ public function testOnRejectShouldRejectMessage() ->method('reject') ->with($this->isInstanceOf(Message::class), false) ->willReturnCallback(function (Message $message) { - $this->assertSame('theDeliveryTag', $message->deliveryTag); + $this->assertSame(167, $message->deliveryTag); }); $context = $this->createContextMock(); @@ -107,7 +107,7 @@ public function testOnRejectShouldRejectMessage() $consumer = new AmqpConsumer($context, new AmqpQueue('aName')); $message = new AmqpMessage(); - $message->setDeliveryTag('theDeliveryTag'); + $message->setDeliveryTag(167); $consumer->reject($message, false); } @@ -120,7 +120,7 @@ public function testOnRejectShouldRequeueMessage() ->method('reject') ->with($this->isInstanceOf(Message::class), true) ->willReturnCallback(function (Message $message) { - $this->assertSame('theDeliveryTag', $message->deliveryTag); + $this->assertSame(178, $message->deliveryTag); }); $context = $this->createContextMock(); @@ -133,7 +133,7 @@ public function testOnRejectShouldRequeueMessage() $consumer = new AmqpConsumer($context, new AmqpQueue('aName')); $message = new AmqpMessage(); - $message->setDeliveryTag('theDeliveryTag'); + $message->setDeliveryTag(178); $consumer->reject($message, true); } diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index 4a780c156..505f753d5 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -1,5 +1,7 @@ getBody(), $properties, $headers); - $message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']); + $message->setDeliveryTag((int) $amqpMessage->delivery_info['delivery_tag']); $message->setRedelivered($amqpMessage->delivery_info['redelivered']); $message->setRoutingKey($amqpMessage->delivery_info['routing_key']); diff --git a/pkg/amqp-lib/AmqpProducer.php b/pkg/amqp-lib/AmqpProducer.php index 346c07af2..6d7f2ded0 100644 --- a/pkg/amqp-lib/AmqpProducer.php +++ b/pkg/amqp-lib/AmqpProducer.php @@ -1,5 +1,7 @@ [AmqpMessage, AmqpMessage ...]] - */ - private $messages; - - public function __construct() - { - $this->messages = []; - } - - /** - * @param string $consumerTag - * @param AmqpMessage $message - */ - public function push($consumerTag, AmqpMessage $message) - { - if (false == array_key_exists($consumerTag, $this->messages)) { - $this->messages[$consumerTag] = []; - } - - $this->messages[$consumerTag][] = $message; - } - - /** - * @param string $consumerTag - * - * @return AmqpMessage|null - */ - public function pop($consumerTag) - { - if (false == empty($this->messages[$consumerTag])) { - return array_shift($this->messages[$consumerTag]); - } - } -} diff --git a/pkg/amqp-lib/StopBasicConsumptionException.php b/pkg/amqp-lib/StopBasicConsumptionException.php index 14d6848e0..99c9ea162 100644 --- a/pkg/amqp-lib/StopBasicConsumptionException.php +++ b/pkg/amqp-lib/StopBasicConsumptionException.php @@ -1,5 +1,7 @@ expects($this->once()) ->method('basic_ack') - ->with('delivery-tag') + ->with(167) ; $context = $this->createContextMock(); @@ -80,7 +80,7 @@ public function testOnAcknowledgeShouldAcknowledgeMessage() $consumer = new AmqpConsumer($context, new AmqpQueue('aName')); $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); + $message->setDeliveryTag(167); $consumer->acknowledge($message); } @@ -91,7 +91,7 @@ public function testOnRejectShouldRejectMessage() $channel ->expects($this->once()) ->method('basic_reject') - ->with('delivery-tag', $this->isTrue()) + ->with(125, $this->isTrue()) ; $context = $this->createContextMock(); @@ -104,7 +104,7 @@ public function testOnRejectShouldRejectMessage() $consumer = new AmqpConsumer($context, new AmqpQueue('aName')); $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); + $message->setDeliveryTag(125); $consumer->reject($message, true); } diff --git a/pkg/amqp-lib/Tests/BufferTest.php b/pkg/amqp-lib/Tests/BufferTest.php deleted file mode 100644 index bebe435cb..000000000 --- a/pkg/amqp-lib/Tests/BufferTest.php +++ /dev/null @@ -1,64 +0,0 @@ -assertAttributeSame([], 'messages', $buffer); - } - - public function testShouldReturnNullIfNoMessagesInBuffer() - { - $buffer = new Buffer(); - - $this->assertNull($buffer->pop('aConsumerTag')); - $this->assertNull($buffer->pop('anotherConsumerTag')); - } - - public function testShouldPushMessageToBuffer() - { - $fooMessage = new AmqpMessage(); - $barMessage = new AmqpMessage(); - $bazMessage = new AmqpMessage(); - - $buffer = new Buffer(); - - $buffer->push('aConsumerTag', $fooMessage); - $buffer->push('aConsumerTag', $barMessage); - - $buffer->push('anotherConsumerTag', $bazMessage); - - $this->assertAttributeSame([ - 'aConsumerTag' => [$fooMessage, $barMessage], - 'anotherConsumerTag' => [$bazMessage], - ], 'messages', $buffer); - } - - public function testShouldPopMessageFromBuffer() - { - $fooMessage = new AmqpMessage(); - $barMessage = new AmqpMessage(); - - $buffer = new Buffer(); - - $buffer->push('aConsumerTag', $fooMessage); - $buffer->push('aConsumerTag', $barMessage); - - $this->assertSame($fooMessage, $buffer->pop('aConsumerTag')); - $this->assertSame($barMessage, $buffer->pop('aConsumerTag')); - $this->assertNull($buffer->pop('aConsumerTag')); - } -} diff --git a/pkg/amqp-tools/ConnectionConfig.php b/pkg/amqp-tools/ConnectionConfig.php index d4deb1620..27582e051 100644 --- a/pkg/amqp-tools/ConnectionConfig.php +++ b/pkg/amqp-tools/ConnectionConfig.php @@ -1,5 +1,7 @@ setupBroker(); try { - if (method_exists($context, 'purgeQueue')) { - $queue = $this->getTestQueue(); - $context->purgeQueue($queue); - } + $context->purgeQueue($this->getTestQueue()); } catch (\Exception $e) { } } diff --git a/pkg/fs/CannotObtainLockException.php b/pkg/fs/CannotObtainLockException.php index 854d726f8..d1f842c60 100644 --- a/pkg/fs/CannotObtainLockException.php +++ b/pkg/fs/CannotObtainLockException.php @@ -1,5 +1,7 @@ getFileInfo())) { - touch($destination->getFileInfo()); - chmod($destination->getFileInfo(), $this->chmod); + if (false == file_exists((string) $destination->getFileInfo())) { + touch((string) $destination->getFileInfo()); + chmod((string) $destination->getFileInfo(), $this->chmod); } } finally { restore_error_handler(); @@ -106,7 +108,7 @@ public function workWithFile(FsDestination $destination, string $mode, callable }); try { - $file = fopen($destination->getFileInfo(), $mode); + $file = fopen((string) $destination->getFileInfo(), $mode); $this->lock->lock($destination); return call_user_func($callback, $destination, $file); diff --git a/pkg/fs/FsDestination.php b/pkg/fs/FsDestination.php index d3ca86035..065442543 100644 --- a/pkg/fs/FsDestination.php +++ b/pkg/fs/FsDestination.php @@ -1,5 +1,7 @@ context->workWithFile($destination, 'a+', function (FsDestination $destination, $file) use ($message) { $fileInfo = $destination->getFileInfo(); - if ($fileInfo instanceof TempFile && false == file_exists($fileInfo)) { + if ($fileInfo instanceof TempFile && false == file_exists((string) $fileInfo)) { return; } diff --git a/pkg/fs/Lock.php b/pkg/fs/Lock.php index 02c2fbb76..91125faa8 100644 --- a/pkg/fs/Lock.php +++ b/pkg/fs/Lock.php @@ -1,5 +1,7 @@ setPartition(1); $kafkaMessage = new Message(); $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; diff --git a/pkg/redis/PRedis.php b/pkg/redis/PRedis.php index a705fd559..799cf53f0 100644 --- a/pkg/redis/PRedis.php +++ b/pkg/redis/PRedis.php @@ -1,5 +1,7 @@ context->getRedis()->brpop($currentQueueNames, $timeout || 5000); + $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout ?: 5000); if ($result) { $message = RedisMessage::jsonUnserialize($result->getMessage()); list($consumer, $callback) = $this->subscribers[$result->getKey()]; diff --git a/pkg/redis/ServerException.php b/pkg/redis/ServerException.php index 98273adf5..efadf1d1a 100644 --- a/pkg/redis/ServerException.php +++ b/pkg/redis/ServerException.php @@ -1,5 +1,7 @@