19
19
use Amp \Http \Client \Request ;
20
20
use Amp \Http \Client \Response ;
21
21
use Psr \Log \LoggerInterface ;
22
+ use Revolt \EventLoop ;
22
23
use Symfony \Component \HttpClient \Chunk \FirstChunk ;
23
24
use Symfony \Component \HttpClient \Chunk \InformationalChunk ;
24
25
use Symfony \Component \HttpClient \Exception \InvalidArgumentException ;
32
33
33
34
use function Amp \async ;
34
35
use function Amp \delay ;
36
+ use function Amp \Future \awaitFirst ;
35
37
36
38
/**
37
39
* @author Nicolas Grekas <p@tchwork.com>
@@ -49,8 +51,6 @@ final class AmpResponse implements ResponseInterface, StreamableInterface
49
51
private ?array $ options ;
50
52
private \Closure $ onProgress ;
51
53
52
- private static ?DeferredCancellation $ delay = null ;
53
-
54
54
/**
55
55
* @internal
56
56
*/
@@ -105,7 +105,7 @@ public function __construct(AmpClientState $multi, Request $request, array $opti
105
105
};
106
106
107
107
$ multi ->lastTimeout = null ;
108
- $ multi ->openHandles [$ id ] = $ id ;
108
+ $ multi ->openHandles [$ id ] = new DeferredFuture () ;
109
109
++$ multi ->responseCount ;
110
110
111
111
$ this ->canary = new Canary (static function () use ($ canceller , $ multi , $ id ) {
@@ -181,18 +181,31 @@ private static function perform(ClientState $multi, array &$responses = null): v
181
181
*/
182
182
private static function select (ClientState $ multi , float $ timeout ): int
183
183
{
184
- self ::$ delay ??= new DeferredCancellation ();
185
- delay ($ timeout , true , self ::$ delay ->getCancellation ());
184
+ $ delay = new DeferredFuture ();
185
+ $ id = EventLoop::delay ($ timeout , $ delay ->complete (...));
186
+
187
+ awaitFirst ((function () use ($ delay , $ multi ) {
188
+ yield $ delay ->getFuture ();
189
+
190
+ foreach ($ multi ->openHandles as $ deferred ) {
191
+ yield $ deferred ->getFuture ();
192
+ }
193
+ })());
186
194
187
- return null === self ::$ delay ? 1 : 0 ;
195
+ try {
196
+ return $ delay ->isComplete () ? 0 : 1 ;
197
+ } finally {
198
+ EventLoop::cancel ($ id );
199
+ }
188
200
}
189
201
190
202
private static function generateResponse (Request $ request , AmpClientState $ multi , string $ id , array &$ info , array &$ headers , DeferredCancellation $ canceller , array &$ options , \Closure $ onProgress , &$ handle , ?LoggerInterface $ logger , float &$ pause ): void
191
203
{
192
204
$ request ->setInformationalResponseHandler (static function (Response $ response ) use ($ multi , $ id , &$ info , &$ headers ) {
193
205
self ::addResponseHeaders ($ response , $ info , $ headers );
194
206
$ multi ->handlesActivity [$ id ][] = new InformationalChunk ($ response ->getStatus (), $ response ->getHeaders ());
195
- self ::stopLoop ();
207
+ $ multi ->openHandles [$ id ]->complete ();
208
+ $ multi ->openHandles [$ id ] = new DeferredFuture ();
196
209
});
197
210
198
211
try {
@@ -209,7 +222,7 @@ private static function generateResponse(Request $request, AmpClientState $multi
209
222
if ('HEAD ' === $ response ->getRequest ()->getMethod () || \in_array ($ info ['http_code ' ], [204 , 304 ], true )) {
210
223
$ multi ->handlesActivity [$ id ][] = null ;
211
224
$ multi ->handlesActivity [$ id ][] = null ;
212
- self :: stopLoop ();
225
+ $ multi -> openHandles [ $ id ]-> complete ();
213
226
214
227
return ;
215
228
}
@@ -221,7 +234,8 @@ private static function generateResponse(Request $request, AmpClientState $multi
221
234
$ body = $ response ->getBody ();
222
235
223
236
while (true ) {
224
- self ::stopLoop ();
237
+ $ multi ->openHandles [$ id ]->complete ();
238
+ $ multi ->openHandles [$ id ] = new DeferredFuture ();
225
239
226
240
if (0 < $ pause ) {
227
241
delay ($ pause , true , $ canceller ->getCancellation ());
@@ -244,7 +258,7 @@ private static function generateResponse(Request $request, AmpClientState $multi
244
258
$ info ['download_content_length ' ] = $ info ['size_download ' ];
245
259
}
246
260
247
- self :: stopLoop ();
261
+ $ multi -> openHandles [ $ id ]-> complete ();
248
262
}
249
263
250
264
private static function followRedirects (Request $ originRequest , AmpClientState $ multi , array &$ info , array &$ headers , DeferredCancellation $ canceller , array $ options , \Closure $ onProgress , &$ handle , ?LoggerInterface $ logger , float &$ pause ): ?Response
@@ -421,12 +435,4 @@ private static function getPushedResponse(Request $request, AmpClientState $mult
421
435
return $ response ;
422
436
}
423
437
}
424
-
425
- private static function stopLoop (): void
426
- {
427
- if (null !== self ::$ delay ) {
428
- self ::$ delay ->cancel ();
429
- self ::$ delay = null ;
430
- }
431
- }
432
438
}
0 commit comments