11
11
12
12
namespace Symfony \Component \HttpClient ;
13
13
14
+ use GuzzleHttp \Promise \Promise as GuzzlePromise ;
14
15
use Http \Client \Exception \NetworkException ;
15
16
use Http \Client \Exception \RequestException ;
16
- use Http \Client \HttpClient ;
17
+ use Http \Client \HttpAsyncClient ;
18
+ use Http \Client \HttpClient as HttplugInterface ;
17
19
use Http \Message \RequestFactory ;
18
20
use Http \Message \StreamFactory ;
19
21
use Http \Message \UriFactory ;
20
- use Psr \Http \Client \ClientInterface ;
21
- use Psr \Http \Client \NetworkExceptionInterface ;
22
- use Psr \Http \Client \RequestExceptionInterface ;
22
+ use Http \Promise \Promise ;
23
+ use Http \Promise \RejectedPromise ;
24
+ use Nyholm \Psr7 \Factory \Psr17Factory ;
25
+ use Nyholm \Psr7 \Request ;
26
+ use Nyholm \Psr7 \Uri ;
27
+ use Psr \Http \Message \RequestFactoryInterface ;
23
28
use Psr \Http \Message \RequestInterface ;
24
29
use Psr \Http \Message \ResponseFactoryInterface ;
25
- use Psr \Http \Message \ResponseInterface ;
30
+ use Psr \Http \Message \ResponseInterface as Psr7ResponseInterface ;
26
31
use Psr \Http \Message \StreamFactoryInterface ;
27
32
use Psr \Http \Message \StreamInterface ;
33
+ use Psr \Http \Message \UriFactoryInterface ;
28
34
use Psr \Http \Message \UriInterface ;
35
+ use Symfony \Component \HttpClient \Response \HttplugPromise ;
36
+ use Symfony \Component \HttpClient \Response \ResponseTrait ;
37
+ use Symfony \Component \HttpClient \Response \StreamWrapper ;
38
+ use Symfony \Contracts \HttpClient \Exception \TransportExceptionInterface ;
29
39
use Symfony \Contracts \HttpClient \HttpClientInterface ;
40
+ use Symfony \Contracts \HttpClient \ResponseInterface ;
30
41
31
- if (!interface_exists (HttpClient ::class)) {
42
+ if (!interface_exists (HttplugInterface ::class)) {
32
43
throw new \LogicException ('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug". ' );
33
44
}
34
45
35
- if (!interface_exists (ClientInterface::class)) {
36
- throw new \LogicException ('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client". ' );
37
- }
38
-
39
46
if (!interface_exists (RequestFactory::class)) {
40
47
throw new \LogicException ('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7". ' );
41
48
}
42
49
43
50
/**
44
51
* An adapter to turn a Symfony HttpClientInterface into an Httplug client.
45
52
*
46
- * Run "composer require psr/http-client" to install the base ClientInterface. Run
47
- * "composer require nyholm/psr7" to install an efficient implementation of response
53
+ * Run "composer require nyholm/psr7" to install an efficient implementation of response
48
54
* and stream factories with flex-provided autowiring aliases.
49
55
*
50
56
* @author Nicolas Grekas <p@tchwork.com>
51
57
*/
52
- final class HttplugClient implements HttpClient , RequestFactory, StreamFactory, UriFactory
58
+ final class HttplugClient implements HttplugInterface, HttpAsyncClient , RequestFactory, StreamFactory, UriFactory
53
59
{
54
60
private $ client ;
61
+ private $ responseFactory ;
62
+ private $ streamFactory ;
63
+ private $ promisePool = [];
64
+ private $ pendingResponse ;
55
65
56
66
public function __construct (HttpClientInterface $ client = null , ResponseFactoryInterface $ responseFactory = null , StreamFactoryInterface $ streamFactory = null )
57
67
{
58
- $ this ->client = new Psr18Client ($ client , $ responseFactory , $ streamFactory );
68
+ $ this ->client = $ client ?? HttpClient::create ();
69
+ $ this ->responseFactory = $ responseFactory ;
70
+ $ this ->streamFactory = $ streamFactory ?? ($ responseFactory instanceof StreamFactoryInterface ? $ responseFactory : null );
71
+ $ this ->promisePool = new \SplObjectStorage ();
72
+
73
+ if (null !== $ this ->responseFactory && null !== $ this ->streamFactory ) {
74
+ return ;
75
+ }
76
+
77
+ if (!class_exists (Psr17Factory::class)) {
78
+ throw new \LogicException ('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7". ' );
79
+ }
80
+
81
+ $ psr17Factory = new Psr17Factory ();
82
+ $ this ->responseFactory = $ this ->responseFactory ?? $ psr17Factory ;
83
+ $ this ->streamFactory = $ this ->streamFactory ?? $ psr17Factory ;
59
84
}
60
85
61
86
/**
62
87
* {@inheritdoc}
63
88
*/
64
- public function sendRequest (RequestInterface $ request ): ResponseInterface
89
+ public function sendRequest (RequestInterface $ request ): Psr7ResponseInterface
65
90
{
66
91
try {
67
- return $ this ->client ->sendRequest ($ request );
68
- } catch (RequestExceptionInterface $ e ) {
69
- throw new RequestException ($ e ->getMessage (), $ request , $ e );
70
- } catch (NetworkExceptionInterface $ e ) {
92
+ return $ this ->createPsr7Response ($ this ->sendPsr7Request ($ request ));
93
+ } catch (TransportExceptionInterface $ e ) {
71
94
throw new NetworkException ($ e ->getMessage (), $ request , $ e );
72
95
}
73
96
}
74
97
98
+ /**
99
+ * {@inheritdoc}
100
+ *
101
+ * @return HttplugPromise
102
+ */
103
+ public function sendAsyncRequest (RequestInterface $ request ): Promise
104
+ {
105
+ if (!class_exists (GuzzlePromise::class)) {
106
+ throw new \LogicException (sprintf ('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises". ' , __METHOD__ ));
107
+ }
108
+
109
+ try {
110
+ $ response = $ this ->sendPsr7Request ($ request , true );
111
+ } catch (NetworkException $ e ) {
112
+ return new RejectedPromise ($ e );
113
+ }
114
+
115
+ $ cancel = function () use ($ response ) {
116
+ $ response ->cancel ();
117
+ unset($ this ->promisePool [$ response ]);
118
+ };
119
+
120
+ $ promise = new GuzzlePromise (function () use ($ response ) {
121
+ $ this ->pendingResponse = $ response ;
122
+ $ this ->wait ();
123
+ }, $ cancel );
124
+
125
+ $ this ->promisePool [$ response ] = [$ request , $ promise ];
126
+
127
+ return new HttplugPromise ($ promise , $ cancel );
128
+ }
129
+
130
+ /**
131
+ * Resolve pending promises that complete before the timeouts are reached.
132
+ *
133
+ * When $maxDuration is null and $idleTimeout is reached, promises are rejected.
134
+ *
135
+ * @return int The number of remaining pending promises
136
+ */
137
+ public function wait (float $ maxDuration = null , float $ idleTimeout = null ): int
138
+ {
139
+ $ pendingResponse = $ this ->pendingResponse ;
140
+ $ this ->pendingResponse = null ;
141
+
142
+ if (null !== $ maxDuration ) {
143
+ $ startTime = microtime (true );
144
+ $ idleTimeout = max (0.0 , min ($ maxDuration / 5 , $ idleTimeout ?? $ maxDuration ));
145
+ $ remainingDuration = $ maxDuration ;
146
+ }
147
+
148
+ do {
149
+ foreach ($ this ->client ->stream ($ this ->promisePool , $ idleTimeout ) as $ response => $ chunk ) {
150
+ try {
151
+ if (null !== $ maxDuration && $ chunk ->isTimeout ()) {
152
+ goto check_duration;
153
+ }
154
+
155
+ if ($ chunk ->isFirst ()) {
156
+ // Deactivate throwing on 3/4/5xx
157
+ $ response ->getStatusCode ();
158
+ }
159
+
160
+ if (!$ chunk ->isLast ()) {
161
+ goto check_duration;
162
+ }
163
+
164
+ if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
165
+ unset($ this ->promisePool [$ response ]);
166
+ $ promise ->resolve ($ this ->createPsr7Response ($ response , true ));
167
+ }
168
+ } catch (\Exception $ e ) {
169
+ if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
170
+ unset($ this ->promisePool [$ response ]);
171
+
172
+ if ($ e instanceof TransportExceptionInterface) {
173
+ $ e = new NetworkException ($ e ->getMessage (), $ request , $ e );
174
+ }
175
+
176
+ $ promise ->reject ($ e );
177
+ }
178
+ }
179
+
180
+ if ($ pendingResponse === $ response ) {
181
+ return \count ($ this ->promisePool );
182
+ }
183
+
184
+ check_duration:
185
+ if (null !== $ maxDuration && $ idleTimeout && $ idleTimeout > $ remainingDuration = max (0.0 , $ maxDuration - microtime (true ) + $ startTime )) {
186
+ $ idleTimeout = $ remainingDuration / 5 ;
187
+ break ;
188
+ }
189
+ }
190
+
191
+ if (!$ count = \count ($ this ->promisePool )) {
192
+ return 0 ;
193
+ }
194
+ } while (null !== $ maxDuration && 0 < $ remainingDuration );
195
+
196
+ return $ count ;
197
+ }
198
+
75
199
/**
76
200
* {@inheritdoc}
77
201
*/
78
202
public function createRequest ($ method , $ uri , array $ headers = [], $ body = null , $ protocolVersion = '1.1 ' ): RequestInterface
79
203
{
80
- $ request = $ this ->client
81
- ->createRequest ($ method , $ uri )
204
+ if ($ this ->responseFactory instanceof RequestFactoryInterface) {
205
+ $ request = $ this ->responseFactory ->createRequest ($ method , $ uri );
206
+ } elseif (!class_exists (Request::class)) {
207
+ throw new \LogicException (sprintf ('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7". ' , __METHOD__ ));
208
+ } else {
209
+ $ request = new Request ($ method , $ uri );
210
+ }
211
+
212
+ $ request = $ request
82
213
->withProtocolVersion ($ protocolVersion )
83
214
->withBody ($ this ->createStream ($ body ))
84
215
;
@@ -100,27 +231,84 @@ public function createStream($body = null): StreamInterface
100
231
}
101
232
102
233
if (\is_string ($ body ?? '' )) {
103
- $ body = $ this ->client ->createStream ($ body ?? '' );
104
-
105
- if ($ body ->isSeekable ()) {
106
- $ body ->seek (0 );
107
- }
108
-
109
- return $ body ;
234
+ $ stream = $ this ->streamFactory ->createStream ($ body ?? '' );
235
+ } elseif (\is_resource ($ body )) {
236
+ $ stream = $ this ->streamFactory ->createStreamFromResource ($ body );
237
+ } else {
238
+ throw new \InvalidArgumentException (sprintf ('%s() expects string, resource or StreamInterface, %s given. ' , __METHOD__ , \gettype ($ body )));
110
239
}
111
240
112
- if (\is_resource ( $ body )) {
113
- return $ this -> client -> createStreamFromResource ( $ body );
241
+ if ($ stream -> isSeekable ( )) {
242
+ $ stream -> seek ( 0 );
114
243
}
115
244
116
- throw new \ InvalidArgumentException ( sprintf ( ' %s() expects string, resource or StreamInterface, %s given. ' , __METHOD__ , \gettype ( $ body ))) ;
245
+ return $ stream ;
117
246
}
118
247
119
248
/**
120
249
* {@inheritdoc}
121
250
*/
122
- public function createUri ($ uri = '' ): UriInterface
251
+ public function createUri ($ uri ): UriInterface
123
252
{
124
- return $ uri instanceof UriInterface ? $ uri : $ this ->client ->createUri ($ uri );
253
+ if ($ uri instanceof UriInterface) {
254
+ return $ uri ;
255
+ }
256
+
257
+ if ($ this ->responseFactory instanceof UriFactoryInterface) {
258
+ return $ this ->responseFactory ->createUri ($ uri );
259
+ }
260
+
261
+ if (!class_exists (Uri::class)) {
262
+ throw new \LogicException (sprintf ('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7". ' , __METHOD__ ));
263
+ }
264
+
265
+ return new Uri ($ uri );
266
+ }
267
+
268
+ private function sendPsr7Request (RequestInterface $ request , bool $ buffer = null ): ResponseInterface
269
+ {
270
+ try {
271
+ $ body = $ request ->getBody ();
272
+
273
+ if ($ body ->isSeekable ()) {
274
+ $ body ->seek (0 );
275
+ }
276
+
277
+ return $ this ->client ->request ($ request ->getMethod (), (string ) $ request ->getUri (), [
278
+ 'headers ' => $ request ->getHeaders (),
279
+ 'body ' => $ body ->getContents (),
280
+ 'http_version ' => '1.0 ' === $ request ->getProtocolVersion () ? '1.0 ' : null ,
281
+ 'buffer ' => $ buffer ,
282
+ ]);
283
+ } catch (\InvalidArgumentException $ e ) {
284
+ throw new RequestException ($ e ->getMessage (), $ request , $ e );
285
+ } catch (TransportExceptionInterface $ e ) {
286
+ throw new NetworkException ($ e ->getMessage (), $ request , $ e );
287
+ }
288
+ }
289
+
290
+ private function createPsr7Response (ResponseInterface $ response , bool $ buffer = false ): Psr7ResponseInterface
291
+ {
292
+ $ psrResponse = $ this ->responseFactory ->createResponse ($ response ->getStatusCode ());
293
+
294
+ foreach ($ response ->getHeaders (false ) as $ name => $ values ) {
295
+ foreach ($ values as $ value ) {
296
+ $ psrResponse = $ psrResponse ->withAddedHeader ($ name , $ value );
297
+ }
298
+ }
299
+
300
+ if (isset (class_uses ($ response )[ResponseTrait::class])) {
301
+ $ body = $ this ->streamFactory ->createStreamFromResource ($ response ->toStream (false ));
302
+ } elseif (!$ buffer ) {
303
+ $ body = $ this ->streamFactory ->createStreamFromResource (StreamWrapper::createResource ($ response , $ this ->client ));
304
+ } else {
305
+ $ body = $ this ->streamFactory ->createStream ($ response ->getContent (false ));
306
+ }
307
+
308
+ if ($ body ->isSeekable ()) {
309
+ $ body ->seek (0 );
310
+ }
311
+
312
+ return $ psrResponse ->withBody ($ body );
125
313
}
126
314
}
0 commit comments