Skip to content

[BC break][amqp] Introduce connection config. Make it same across all transports. #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 175 additions & 74 deletions docs/bundle/config_reference.md

Large diffs are not rendered by default.

175 changes: 52 additions & 123 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Enqueue\AmqpBunny;

use Bunny\Client;
use Enqueue\AmqpTools\ConnectionConfig;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
Expand All @@ -12,75 +12,37 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
use DelayStrategyAwareTrait;

/**
* @var array
* @var ConnectionConfig
*/
private $config;

/**
* @var Client
* @var BunnyClient
*/
private $client;

/**
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
* @see \Enqueue\AmqpTools\ConnectionConfig for possible config formats and values
*
* [
* 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.',
* 'port' => 'amqp.port Port on the host.',
* 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.',
* 'user' => 'amqp.user The user name to use. Note: Max 128 characters.',
* 'pass' => 'amqp.password Password. Note: Max 128 characters.',
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* 'receive_method' => 'Could be either basic_get or basic_consume',
* 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"',
* 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.',
* 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.',
* ]
* In addition this factory accepts next options:
* receive_method - Could be either basic_get or basic_consume
*
* or
*
* amqp://user:pass@host:10000/vhost?lazy=true&socket=true
*
* @param array|string $config
* @param array|string|null $config
*/
public function __construct($config = 'amqp:')
{
if (is_string($config) && 0 === strpos($config, 'amqp+bunny:')) {
$config = str_replace('amqp+bunny:', 'amqp:', $config);
}

if (empty($config) || 'amqp:' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

$config = array_replace($this->defaultConfig(), $config);

$config = array_replace($this->defaultConfig(), $config);
if (array_key_exists('qos_global', $config)) {
$config['qos_global'] = (bool) $config['qos_global'];
}
if (array_key_exists('qos_prefetch_count', $config)) {
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
}
if (array_key_exists('qos_prefetch_size', $config)) {
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
}
if (array_key_exists('lazy', $config)) {
$config['lazy'] = (bool) $config['lazy'];
}

$this->config = $config;
$this->config = (new ConnectionConfig($config))
->addSupportedScheme('amqp+bunny')
->addDefaultOption('receive_method', 'basic_get')
->addDefaultOption('tcp_nodelay', null)
->parse()
;

$supportedMethods = ['basic_get', 'basic_consume'];
if (false == in_array($this->config['receive_method'], $supportedMethods, true)) {
if (false == in_array($this->config->getOption('receive_method'), $supportedMethods, true)) {
throw new \LogicException(sprintf(
'Invalid "receive_method" option value "%s". It could be only "%s"',
$this->config['receive_method'],
$this->config->getOption('receive_method'),
implode('", "', $supportedMethods)
));
}
Expand All @@ -91,99 +53,66 @@ public function __construct($config = 'amqp:')
*/
public function createContext()
{
if ($this->config['lazy']) {
if ($this->config->isLazy()) {
$context = new AmqpContext(function () {
$channel = $this->establishConnection()->channel();
$channel->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
$channel->qos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());

return $channel;
}, $this->config);
}, $this->config->getConfig());
$context->setDelayStrategy($this->delayStrategy);

return $context;
}

$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
$context = new AmqpContext($this->establishConnection()->channel(), $this->config->getConfig());
$context->setDelayStrategy($this->delayStrategy);
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
$context->setQos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());

return $context;
}

/**
* @return Client
* @return ConnectionConfig
*/
private function establishConnection()
public function getConfig()
{
if (false == $this->client) {
$this->client = new Client($this->config);
$this->client->connect();
}

return $this->client;
return $this->config;
}

/**
* @param string $dsn
*
* @return array
* @return BunnyClient
*/
private function parseDsn($dsn)
private function establishConnection()
{
$dsnConfig = parse_url($dsn);
if (false === $dsnConfig) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

$dsnConfig = array_replace([
'scheme' => null,
'host' => null,
'port' => null,
'user' => null,
'pass' => null,
'path' => null,
'query' => null,
], $dsnConfig);

if ('amqp' !== $dsnConfig['scheme']) {
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
}

if ($dsnConfig['query']) {
$query = [];
parse_str($dsnConfig['query'], $query);

$dsnConfig = array_replace($query, $dsnConfig);
if (false == $this->client) {
$bunnyConfig = [];
$bunnyConfig['host'] = $this->config->getHost();
$bunnyConfig['port'] = $this->config->getPort();
$bunnyConfig['vhost'] = $this->config->getVHost();
$bunnyConfig['user'] = $this->config->getUser();
$bunnyConfig['password'] = $this->config->getPass();
$bunnyConfig['read_write_timeout'] = min($this->config->getReadTimeout(), $this->config->getWriteTimeout());
$bunnyConfig['timeout'] = $this->config->getConnectionTimeout();

// @see https://github.com/php-enqueue/enqueue-dev/issues/229
// $bunnyConfig['persistent'] = $this->config->isPersisted();
// if ($this->config->isPersisted()) {
// $bunnyConfig['path'] = 'enqueue';//$this->config->getOption('path', $this->config->getOption('vhost'));
// }

if ($this->config->getHeartbeat()) {
$bunnyConfig['heartbeat'] = $this->config->getHeartbeat();
}

if (null !== $this->config->getOption('tcp_nodelay')) {
$bunnyConfig['tcp_nodelay'] = $this->config->getOption('tcp_nodelay');
}

$this->client = new BunnyClient($bunnyConfig);
$this->client->connect();
}

$dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/');

unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']);

$dsnConfig = array_map(function ($value) {
return urldecode($value);
}, $dsnConfig);

return $dsnConfig;
}

/**
* @return array
*/
private function defaultConfig()
{
return [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
'lazy' => true,
'vhost' => '/',
'heartbeat' => 0,
'receive_method' => 'basic_get',
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
];
return $this->client;
}
}
20 changes: 20 additions & 0 deletions pkg/amqp-bunny/BunnyClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\AmqpBunny;

use Bunny\Client;
use Bunny\Exception\ClientException;

class BunnyClient extends Client
{
public function __destruct()
{
try {
parent::__destruct();
} catch (ClientException $e) {
if ('Broken pipe or closed connection.' !== $e->getMessage()) {
throw $e;
}
}
}
}
140 changes: 0 additions & 140 deletions pkg/amqp-bunny/Symfony/AmqpBunnyTransportFactory.php

This file was deleted.

Loading