Skip to content

Commit b229c25

Browse files
authored
Merge pull request #228 from php-enqueue/amqp-connection-config
[BC break][amqp] Introduce connection config. Make it same across all transports.
2 parents f363374 + 620031c commit b229c25

32 files changed

+1216
-2583
lines changed

docs/bundle/config_reference.md

Lines changed: 175 additions & 74 deletions
Large diffs are not rendered by default.

pkg/amqp-bunny/AmqpConnectionFactory.php

Lines changed: 52 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\AmqpBunny;
44

5-
use Bunny\Client;
5+
use Enqueue\AmqpTools\ConnectionConfig;
66
use Enqueue\AmqpTools\DelayStrategyAware;
77
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
88
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
@@ -12,75 +12,37 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
1212
use DelayStrategyAwareTrait;
1313

1414
/**
15-
* @var array
15+
* @var ConnectionConfig
1616
*/
1717
private $config;
1818

1919
/**
20-
* @var Client
20+
* @var BunnyClient
2121
*/
2222
private $client;
2323

2424
/**
25-
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
25+
* @see \Enqueue\AmqpTools\ConnectionConfig for possible config formats and values
2626
*
27-
* [
28-
* 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.',
29-
* 'port' => 'amqp.port Port on the host.',
30-
* 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.',
31-
* 'user' => 'amqp.user The user name to use. Note: Max 128 characters.',
32-
* 'pass' => 'amqp.password Password. Note: Max 128 characters.',
33-
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
34-
* 'receive_method' => 'Could be either basic_get or basic_consume',
35-
* 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"',
36-
* 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.',
37-
* 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.',
38-
* ]
27+
* In addition this factory accepts next options:
28+
* receive_method - Could be either basic_get or basic_consume
3929
*
40-
* or
41-
*
42-
* amqp://user:pass@host:10000/vhost?lazy=true&socket=true
43-
*
44-
* @param array|string $config
30+
* @param array|string|null $config
4531
*/
4632
public function __construct($config = 'amqp:')
4733
{
48-
if (is_string($config) && 0 === strpos($config, 'amqp+bunny:')) {
49-
$config = str_replace('amqp+bunny:', 'amqp:', $config);
50-
}
51-
52-
if (empty($config) || 'amqp:' === $config) {
53-
$config = [];
54-
} elseif (is_string($config)) {
55-
$config = $this->parseDsn($config);
56-
} elseif (is_array($config)) {
57-
} else {
58-
throw new \LogicException('The config must be either an array of options, a DSN string or null');
59-
}
60-
61-
$config = array_replace($this->defaultConfig(), $config);
62-
63-
$config = array_replace($this->defaultConfig(), $config);
64-
if (array_key_exists('qos_global', $config)) {
65-
$config['qos_global'] = (bool) $config['qos_global'];
66-
}
67-
if (array_key_exists('qos_prefetch_count', $config)) {
68-
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
69-
}
70-
if (array_key_exists('qos_prefetch_size', $config)) {
71-
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
72-
}
73-
if (array_key_exists('lazy', $config)) {
74-
$config['lazy'] = (bool) $config['lazy'];
75-
}
76-
77-
$this->config = $config;
34+
$this->config = (new ConnectionConfig($config))
35+
->addSupportedScheme('amqp+bunny')
36+
->addDefaultOption('receive_method', 'basic_get')
37+
->addDefaultOption('tcp_nodelay', null)
38+
->parse()
39+
;
7840

7941
$supportedMethods = ['basic_get', 'basic_consume'];
80-
if (false == in_array($this->config['receive_method'], $supportedMethods, true)) {
42+
if (false == in_array($this->config->getOption('receive_method'), $supportedMethods, true)) {
8143
throw new \LogicException(sprintf(
8244
'Invalid "receive_method" option value "%s". It could be only "%s"',
83-
$this->config['receive_method'],
45+
$this->config->getOption('receive_method'),
8446
implode('", "', $supportedMethods)
8547
));
8648
}
@@ -91,99 +53,66 @@ public function __construct($config = 'amqp:')
9153
*/
9254
public function createContext()
9355
{
94-
if ($this->config['lazy']) {
56+
if ($this->config->isLazy()) {
9557
$context = new AmqpContext(function () {
9658
$channel = $this->establishConnection()->channel();
97-
$channel->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
59+
$channel->qos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());
9860

9961
return $channel;
100-
}, $this->config);
62+
}, $this->config->getConfig());
10163
$context->setDelayStrategy($this->delayStrategy);
10264

10365
return $context;
10466
}
10567

106-
$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
68+
$context = new AmqpContext($this->establishConnection()->channel(), $this->config->getConfig());
10769
$context->setDelayStrategy($this->delayStrategy);
108-
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);
70+
$context->setQos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());
10971

11072
return $context;
11173
}
11274

11375
/**
114-
* @return Client
76+
* @return ConnectionConfig
11577
*/
116-
private function establishConnection()
78+
public function getConfig()
11779
{
118-
if (false == $this->client) {
119-
$this->client = new Client($this->config);
120-
$this->client->connect();
121-
}
122-
123-
return $this->client;
80+
return $this->config;
12481
}
12582

12683
/**
127-
* @param string $dsn
128-
*
129-
* @return array
84+
* @return BunnyClient
13085
*/
131-
private function parseDsn($dsn)
86+
private function establishConnection()
13287
{
133-
$dsnConfig = parse_url($dsn);
134-
if (false === $dsnConfig) {
135-
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
136-
}
137-
138-
$dsnConfig = array_replace([
139-
'scheme' => null,
140-
'host' => null,
141-
'port' => null,
142-
'user' => null,
143-
'pass' => null,
144-
'path' => null,
145-
'query' => null,
146-
], $dsnConfig);
147-
148-
if ('amqp' !== $dsnConfig['scheme']) {
149-
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
150-
}
151-
152-
if ($dsnConfig['query']) {
153-
$query = [];
154-
parse_str($dsnConfig['query'], $query);
155-
156-
$dsnConfig = array_replace($query, $dsnConfig);
88+
if (false == $this->client) {
89+
$bunnyConfig = [];
90+
$bunnyConfig['host'] = $this->config->getHost();
91+
$bunnyConfig['port'] = $this->config->getPort();
92+
$bunnyConfig['vhost'] = $this->config->getVHost();
93+
$bunnyConfig['user'] = $this->config->getUser();
94+
$bunnyConfig['password'] = $this->config->getPass();
95+
$bunnyConfig['read_write_timeout'] = min($this->config->getReadTimeout(), $this->config->getWriteTimeout());
96+
$bunnyConfig['timeout'] = $this->config->getConnectionTimeout();
97+
98+
// @see https://github.com/php-enqueue/enqueue-dev/issues/229
99+
// $bunnyConfig['persistent'] = $this->config->isPersisted();
100+
// if ($this->config->isPersisted()) {
101+
// $bunnyConfig['path'] = 'enqueue';//$this->config->getOption('path', $this->config->getOption('vhost'));
102+
// }
103+
104+
if ($this->config->getHeartbeat()) {
105+
$bunnyConfig['heartbeat'] = $this->config->getHeartbeat();
106+
}
107+
108+
if (null !== $this->config->getOption('tcp_nodelay')) {
109+
$bunnyConfig['tcp_nodelay'] = $this->config->getOption('tcp_nodelay');
110+
}
111+
112+
$this->client = new BunnyClient($bunnyConfig);
113+
$this->client->connect();
157114
}
158115

159-
$dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/');
160-
161-
unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']);
162-
163-
$dsnConfig = array_map(function ($value) {
164-
return urldecode($value);
165-
}, $dsnConfig);
166-
167-
return $dsnConfig;
168-
}
169-
170-
/**
171-
* @return array
172-
*/
173-
private function defaultConfig()
174-
{
175-
return [
176-
'host' => 'localhost',
177-
'port' => 5672,
178-
'user' => 'guest',
179-
'pass' => 'guest',
180-
'lazy' => true,
181-
'vhost' => '/',
182-
'heartbeat' => 0,
183-
'receive_method' => 'basic_get',
184-
'qos_prefetch_size' => 0,
185-
'qos_prefetch_count' => 1,
186-
'qos_global' => false,
187-
];
116+
return $this->client;
188117
}
189118
}

pkg/amqp-bunny/BunnyClient.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny;
4+
5+
use Bunny\Client;
6+
use Bunny\Exception\ClientException;
7+
8+
class BunnyClient extends Client
9+
{
10+
public function __destruct()
11+
{
12+
try {
13+
parent::__destruct();
14+
} catch (ClientException $e) {
15+
if ('Broken pipe or closed connection.' !== $e->getMessage()) {
16+
throw $e;
17+
}
18+
}
19+
}
20+
}

pkg/amqp-bunny/Symfony/AmqpBunnyTransportFactory.php

Lines changed: 0 additions & 140 deletions
This file was deleted.

0 commit comments

Comments
 (0)