Skip to content

Commit 193a288

Browse files
[HttpClient] Add support for amphp/http-client v5
1 parent 8703b18 commit 193a288

File tree

10 files changed

+148
-210
lines changed

10 files changed

+148
-210
lines changed

composer.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@
121121
"symfony/yaml": "self.version"
122122
},
123123
"require-dev": {
124-
"amphp/amp": "^2.5",
125-
"amphp/http-client": "^4.2.1",
126-
"amphp/http-tunnel": "^1.0",
124+
"amphp/http-client": "^4.2.1|^5",
127125
"async-aws/ses": "^1.0",
128126
"async-aws/sqs": "^1.0",
129127
"async-aws/sns": "^1.0",
@@ -161,6 +159,7 @@
161159
},
162160
"conflict": {
163161
"ext-psr": "<1.1|>=2",
162+
"amphp/amp": "<2.5",
164163
"async-aws/core": "<1.5",
165164
"doctrine/annotations": "<1.13.1",
166165
"doctrine/dbal": "<2.13.1",

src/Symfony/Component/HttpClient/AmpHttpClient.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@
3333
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client:^4.2.1".');
3434
}
3535

36-
if (!interface_exists(Promise::class)) {
37-
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the installed "amphp/http-client" is not compatible with this version of "symfony/http-client". Try downgrading "amphp/http-client" to "^4.2.1".');
38-
}
39-
4036
/**
4137
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client.
4238
*
@@ -158,8 +154,8 @@ public function reset(): void
158154
$this->multi->dnsCache = [];
159155

160156
foreach ($this->multi->pushedResponses as $authority => $pushedResponses) {
161-
foreach ($pushedResponses as [$pushedUrl, $pushDeferred]) {
162-
$pushDeferred->fail(new CancelledException());
157+
foreach ($pushedResponses as [$pushedUrl, $pushedResponse]) {
158+
$pushedResponse->error(new CancelledException());
163159

164160
$this->logger?->debug(sprintf('Unused pushed response: "%s"', $pushedUrl));
165161
}

src/Symfony/Component/HttpClient/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
6.4
5+
---
6+
7+
* Add support for amphp/http-client v5
8+
49
6.3
510
---
611

src/Symfony/Component/HttpClient/HttpClient.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
namespace Symfony\Component\HttpClient;
1313

1414
use Amp\Http\Client\Connection\ConnectionLimitingPool;
15-
use Amp\Promise;
1615
use Symfony\Contracts\HttpClient\HttpClientInterface;
1716

1817
/**
@@ -31,7 +30,7 @@ final class HttpClient
3130
*/
3231
public static function create(array $defaultOptions = [], int $maxHostConnections = 6, int $maxPendingPushes = 50): HttpClientInterface
3332
{
34-
if ($amp = class_exists(ConnectionLimitingPool::class) && interface_exists(Promise::class)) {
33+
if ($amp = class_exists(ConnectionLimitingPool::class)) {
3534
if (!\extension_loaded('curl')) {
3635
return new AmpHttpClient($defaultOptions, null, $maxHostConnections, $maxPendingPushes);
3736
}

src/Symfony/Component/HttpClient/Internal/AmpBody.php

Lines changed: 56 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,22 @@
1111

1212
namespace Symfony\Component\HttpClient\Internal;
1313

14-
use Amp\ByteStream\InputStream;
15-
use Amp\ByteStream\ResourceInputStream;
16-
use Amp\Http\Client\RequestBody;
17-
use Amp\Promise;
18-
use Amp\Success;
14+
use Amp\ByteStream\ReadableBuffer;
15+
use Amp\ByteStream\ReadableIterableStream;
16+
use Amp\ByteStream\ReadableResourceStream;
17+
use Amp\ByteStream\ReadableStream;
18+
use Amp\Cancellation;
19+
use Amp\Http\Client\HttpContent;
1920
use Symfony\Component\HttpClient\Exception\TransportException;
2021

2122
/**
2223
* @author Nicolas Grekas <p@tchwork.com>
2324
*
2425
* @internal
2526
*/
26-
class AmpBody implements RequestBody, InputStream
27+
class AmpBody implements HttpContent, ReadableStream
2728
{
28-
private ResourceInputStream|\Closure|string $body;
29+
private ReadableStream $body;
2930
private array $info;
3031
private \Closure $onProgress;
3132
private ?int $offset = 0;
@@ -43,105 +44,101 @@ public function __construct($body, &$info, \Closure $onProgress)
4344
if (\is_resource($body)) {
4445
$this->offset = ftell($body);
4546
$this->length = fstat($body)['size'];
46-
$this->body = new ResourceInputStream($body);
47+
$this->body = new ReadableResourceStream($body);
4748
} elseif (\is_string($body)) {
4849
$this->length = \strlen($body);
49-
$this->body = $body;
50+
$this->body = new ReadableBuffer($body);
5051
} else {
51-
$this->body = $body;
52+
$this->body = new ReadableIterableStream((static function () use ($body) {
53+
while ('' !== $data = ($body)(16372)) {
54+
if (!\is_string($data)) {
55+
throw new TransportException(sprintf('Return value of the "body" option callback must be string, "%s" returned.', get_debug_type($data)));
56+
}
57+
58+
yield $data;
59+
}
60+
})());
5261
}
5362
}
5463

55-
public function createBodyStream(): InputStream
64+
public function getContent(): ReadableStream
5665
{
5766
if (null !== $this->uploaded) {
5867
$this->uploaded = null;
5968

6069
if (\is_string($this->body)) {
6170
$this->offset = 0;
62-
} elseif ($this->body instanceof ResourceInputStream) {
71+
} elseif ($this->body instanceof ReadableResourceStream) {
6372
fseek($this->body->getResource(), $this->offset);
6473
}
6574
}
6675

6776
return $this;
6877
}
6978

70-
public function getHeaders(): Promise
79+
public function getContentType(): ?string
7180
{
72-
return new Success([]);
81+
return null;
7382
}
7483

75-
public function getBodyLength(): Promise
84+
public function getContentLength(): ?int
7685
{
77-
return new Success($this->length - $this->offset);
86+
return 0 <= $this->length ? $this->length - $this->offset : null;
7887
}
7988

80-
public function read(): Promise
89+
public function read(Cancellation $cancellation = null): ?string
8190
{
8291
$this->info['size_upload'] += $this->uploaded;
8392
$this->uploaded = 0;
8493
($this->onProgress)();
8594

86-
$chunk = $this->doRead();
87-
$chunk->onResolve(function ($e, $data) {
88-
if (null !== $data) {
89-
$this->uploaded = \strlen($data);
90-
} else {
91-
$this->info['upload_content_length'] = $this->info['size_upload'];
92-
}
93-
});
95+
if (null !== $data = $this->body->read($cancellation)) {
96+
$this->uploaded = \strlen($data);
97+
} else {
98+
$this->info['upload_content_length'] = $this->info['size_upload'];
99+
}
94100

95-
return $chunk;
101+
return $data;
96102
}
97103

98-
public static function rewind(RequestBody $body): RequestBody
104+
public function isReadable(): bool
99105
{
100-
if (!$body instanceof self) {
101-
return $body;
102-
}
103-
104-
$body->uploaded = null;
105-
106-
if ($body->body instanceof ResourceInputStream) {
107-
fseek($body->body->getResource(), $body->offset);
108-
109-
return new $body($body->body, $body->info, $body->onProgress);
110-
}
106+
return $this->body->isReadable();
107+
}
111108

112-
if (\is_string($body->body)) {
113-
$body->offset = 0;
114-
}
109+
public function close(): void
110+
{
111+
$this->body->close();
112+
}
115113

116-
return $body;
114+
public function isClosed(): bool
115+
{
116+
return $this->body->isClosed();
117117
}
118118

119-
private function doRead(): Promise
119+
public function onClose(\Closure $onClose): void
120120
{
121-
if ($this->body instanceof ResourceInputStream) {
122-
return $this->body->read();
123-
}
121+
$this->body->onClose($onClose);
122+
}
124123

125-
if (null === $this->offset || !$this->length) {
126-
return new Success();
124+
public static function rewind(HttpContent $body): HttpContent
125+
{
126+
if (!$body instanceof self) {
127+
return $body;
127128
}
128129

129-
if (\is_string($this->body)) {
130-
$this->offset = null;
131-
132-
return new Success($this->body);
133-
}
130+
$body->uploaded = null;
134131

135-
if ('' === $data = ($this->body)(16372)) {
136-
$this->offset = null;
132+
if ($body->body instanceof ReadableResourceStream) {
133+
fseek($body->body->getResource(), $body->offset);
137134

138-
return new Success();
135+
return new $body($body->body->getResource(), $body->info, $body->onProgress);
139136
}
140137

141-
if (!\is_string($data)) {
142-
throw new TransportException(sprintf('Return value of the "body" option callback must be string, "%s" returned.', get_debug_type($data)));
138+
if ($body->body instanceof ReadableBuffer) {
139+
return new $body($body->body->read(), $body->info, $body->onProgress);
143140
}
144141

145-
return new Success($data);
142+
return $body;
146143
}
147144
}

src/Symfony/Component/HttpClient/Internal/AmpClientState.php

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
namespace Symfony\Component\HttpClient\Internal;
1313

14-
use Amp\CancellationToken;
15-
use Amp\Deferred;
14+
use Amp\Cancellation;
15+
use Amp\Future;
1616
use Amp\Http\Client\Connection\ConnectionLimitingPool;
1717
use Amp\Http\Client\Connection\DefaultConnectionFactory;
1818
use Amp\Http\Client\InterceptedHttpClient;
@@ -22,14 +22,13 @@
2222
use Amp\Http\Client\Response;
2323
use Amp\Http\Tunnel\Http1TunnelConnector;
2424
use Amp\Http\Tunnel\Https1TunnelConnector;
25-
use Amp\Promise;
2625
use Amp\Socket\Certificate;
2726
use Amp\Socket\ClientTlsContext;
2827
use Amp\Socket\ConnectContext;
29-
use Amp\Socket\Connector;
30-
use Amp\Socket\DnsConnector;
28+
use Amp\Socket\DnsSocketConnector;
29+
use Amp\Socket\Socket;
3130
use Amp\Socket\SocketAddress;
32-
use Amp\Success;
31+
use Amp\Socket\SocketConnector;
3332
use Psr\Log\LoggerInterface;
3433

3534
/**
@@ -61,10 +60,7 @@ public function __construct(?callable $clientConfigurator, int $maxHostConnectio
6160
$this->logger = &$logger;
6261
}
6362

64-
/**
65-
* @return Promise<Response>
66-
*/
67-
public function request(array $options, Request $request, CancellationToken $cancellation, array &$info, \Closure $onProgress, &$handle): Promise
63+
public function request(array $options, Request $request, Cancellation $cancellation, array &$info, \Closure $onProgress, &$handle): Response
6864
{
6965
if ($options['proxy']) {
7066
if ($request->hasHeader('proxy-authorization')) {
@@ -94,22 +90,15 @@ public function request(array $options, Request $request, CancellationToken $can
9490
}
9591

9692
$request->addEventListener(new AmpListener($info, $options['peer_fingerprint']['pin-sha256'] ?? [], $onProgress, $handle));
97-
$request->setPushHandler(fn ($request, $response): Promise => $this->handlePush($request, $response, $options));
93+
$request->setPushHandler(fn ($request, $response) => $this->handlePush($request, $response, $options));
9894

99-
($request->hasHeader('content-length') ? new Success((int) $request->getHeader('content-length')) : $request->getBody()->getBodyLength())
100-
->onResolve(static function ($e, $bodySize) use (&$info) {
101-
if (null !== $bodySize && 0 <= $bodySize) {
102-
$info['upload_content_length'] = ((1 + $info['upload_content_length']) ?? 1) - 1 + $bodySize;
103-
}
104-
});
95+
if (0 <= $bodySize = $request->hasHeader('content-length') ? (int) $request->getHeader('content-length') : $request->getBody()->getContentLength() ?? -1) {
96+
$info['upload_content_length'] = ((1 + $info['upload_content_length']) ?? 1) - 1 + $bodySize;
97+
}
10598

10699
[$client, $connector] = $this->getClient($options);
107100
$response = $client->request($request, $cancellation);
108-
$response->onResolve(static function ($e) use ($connector, &$handle) {
109-
if (null === $e) {
110-
$handle = $connector->handle;
111-
}
112-
});
101+
$handle = $connector->handle;
113102

114103
return $response;
115104
}
@@ -142,22 +131,20 @@ private function getClient(array $options): array
142131
$options['ciphers'] && $context = $context->withCiphers($options['ciphers']);
143132
$options['capture_peer_cert_chain'] && $context = $context->withPeerCapturing();
144133

145-
$connector = $handleConnector = new class() implements Connector {
134+
$connector = $handleConnector = new class() implements SocketConnector {
146135
public $connector;
147136
public $uri;
148137
public $handle;
149138

150-
public function connect(string $uri, ConnectContext $context = null, CancellationToken $token = null): Promise
139+
public function connect(SocketAddress|string $uri, ConnectContext $context = null, Cancellation $cancellation = null): Socket
151140
{
152-
$result = $this->connector->connect($this->uri ?? $uri, $context, $token);
153-
$result->onResolve(function ($e, $socket) {
154-
$this->handle = null !== $socket ? $socket->getResource() : false;
155-
});
141+
$socket = $this->connector->connect($this->uri ?? $uri, $context, $cancellation);
142+
$this->handle = null !== $socket ? $socket->getResource() : false;
156143

157-
return $result;
144+
return $socket;
158145
}
159146
};
160-
$connector->connector = new DnsConnector(new AmpResolver($this->dnsCache));
147+
$connector->connector = new DnsSocketConnector(new AmpResolver($this->dnsCache));
161148

162149
$context = (new ConnectContext())
163150
->withTcpNoDelay()
@@ -190,9 +177,8 @@ public function connect(string $uri, ConnectContext $context = null, Cancellatio
190177
return $this->clients[$key] = [($this->clientConfigurator)(new PooledHttpClient($pool)), $handleConnector];
191178
}
192179

193-
private function handlePush(Request $request, Promise $response, array $options): Promise
180+
private function handlePush(Request $request, Future $response, array $options): void
194181
{
195-
$deferred = new Deferred();
196182
$authority = $request->getUri()->getAuthority();
197183

198184
if ($this->maxPendingPushes <= \count($this->pushedResponses[$authority] ?? [])) {
@@ -203,13 +189,11 @@ private function handlePush(Request $request, Promise $response, array $options)
203189

204190
$url = (string) $request->getUri();
205191
$this->logger?->debug(sprintf('Queueing pushed response: "%s"', $url));
206-
$this->pushedResponses[$authority][] = [$url, $deferred, $request, $response, [
192+
$this->pushedResponses[$authority][] = [$url, $response, $request, [
207193
'proxy' => $options['proxy'],
208194
'bindto' => $options['bindto'],
209195
'local_cert' => $options['local_cert'],
210196
'local_pk' => $options['local_pk'],
211197
]];
212-
213-
return $deferred->promise();
214198
}
215199
}

0 commit comments

Comments
 (0)