Skip to content

Rewrite promise #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamInterface;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use React\HttpClient\Client as ReactClient;
use React\HttpClient\Request as ReactRequest;
use React\HttpClient\Response as ReactResponse;
Expand Down Expand Up @@ -93,45 +92,42 @@ public function sendRequest(RequestInterface $request)
public function sendAsyncRequest(RequestInterface $request)
{
$reactRequest = $this->buildReactRequest($request);
$deferred = new Deferred();
$promise = new Promise($this->loop);

$reactRequest->on('error', function (\Exception $error) use ($deferred, $request) {
$deferred->reject(new RequestException(
$reactRequest->on('error', function (\Exception $error) use ($promise, $request) {
$promise->reject(new RequestException(
$error->getMessage(),
$request,
$error
));
});

$reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($deferred, $reactRequest, $request) {
$reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($promise, $request) {
$bodyStream = $this->streamFactory->createStream();
$reactResponse->on('data', function ($data) use (&$bodyStream) {
$bodyStream->write((string) $data);
});

$reactResponse->on('end', function (\Exception $error = null) use ($deferred, $request, $reactResponse, &$bodyStream) {
$reactResponse->on('end', function (\Exception $error = null) use ($promise, $request, $reactResponse, &$bodyStream) {
$response = $this->buildResponse(
$reactResponse,
$bodyStream
);
if (null !== $error) {
$deferred->reject(new HttpException(
$promise->reject(new HttpException(
$error->getMessage(),
$request,
$response,
$error
));
} else {
$deferred->resolve($response);
$promise->resolve($response);
}
});
});

$reactRequest->end((string) $request->getBody());

$promise = new Promise($deferred->promise());
$promise->setLoop($this->loop);

return $promise;
}

Expand Down
142 changes: 96 additions & 46 deletions src/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Http\Adapter\React;

use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface as ReactPromise;
use Http\Client\Exception;
use Http\Promise\Promise as HttpPromise;
use Psr\Http\Message\ResponseInterface;
Expand All @@ -12,8 +11,10 @@
* React promise adapter implementation.
*
* @author Stéphane Hulard <stephane@hlrd.me>
*
* @internal
*/
class Promise implements HttpPromise
final class Promise implements HttpPromise
{
/**
* Promise status.
Expand All @@ -22,13 +23,6 @@ class Promise implements HttpPromise
*/
private $state = HttpPromise::PENDING;

/**
* Adapted React promise.
*
* @var ReactPromise
*/
private $promise;

/**
* PSR7 received response.
*
Expand All @@ -43,31 +37,26 @@ class Promise implements HttpPromise
*/
private $exception;

/**
* @var callable|null
*/
private $onFulfilled;

/**
* @var callable|null
*/
private $onRejected;

/**
* React Event Loop used for synchronous processing.
*
* @var LoopInterface
*/
private $loop;

/**
* Initialize the promise.
*
* @param ReactPromise $promise
*/
public function __construct(ReactPromise $promise)
public function __construct(LoopInterface $loop)
{
$promise->then(
function (ResponseInterface $response) {
$this->state = HttpPromise::FULFILLED;
$this->response = $response;
},
function (Exception $error) {
$this->state = HttpPromise::REJECTED;
$this->exception = $error;
}
);
$this->promise = $promise;
$this->loop = $loop;
}

/**
Expand All @@ -80,49 +69,110 @@ function (Exception $error) {
*/
public function then(callable $onFulfilled = null, callable $onRejected = null)
{
$this->promise->then(function () use ($onFulfilled) {
if (null !== $onFulfilled) {
call_user_func($onFulfilled, $this->response);
$newPromise = new self($this->loop);

$onFulfilled = $onFulfilled !== null ? $onFulfilled : function (ResponseInterface $response) {
return $response;
};

$onRejected = $onRejected !== null ? $onRejected : function (Exception $exception) {
throw $exception;
};

$this->onFulfilled = function (ResponseInterface $response) use ($onFulfilled, $newPromise) {
try {
$newPromise->resolve($onFulfilled($response));
} catch (Exception $exception) {
$newPromise->reject($exception);
}
}, function () use ($onRejected) {
if (null !== $onRejected) {
call_user_func($onRejected, $this->exception);
};

$this->onRejected = function (Exception $exception) use ($onRejected, $newPromise) {
try {
$newPromise->resolve($onRejected($exception));
} catch (Exception $exception) {
$newPromise->reject($exception);
}
});
};

return $this;
if ($this->state === HttpPromise::FULFILLED) {
$this->doResolve($this->response);
}

if ($this->state === HttpPromise::REJECTED) {
$this->doReject($this->exception);
}

return $newPromise;
}

/**
* {@inheritdoc}
* Resolve this promise.
*
* @param ResponseInterface $response
*
* @internal
*/
public function getState()
public function resolve(ResponseInterface $response)
{
return $this->state;
if ($this->state !== HttpPromise::PENDING) {
throw new \RuntimeException('Promise is already resolved');
}

$this->state = HttpPromise::FULFILLED;
$this->response = $response;
$this->doResolve($response);
}

private function doResolve(ResponseInterface $response)
{
$onFulfilled = $this->onFulfilled;

if (null !== $onFulfilled) {
$onFulfilled($response);
}
}

/**
* Set EventLoop used for synchronous processing.
* Reject this promise.
*
* @param LoopInterface $loop
* @param Exception $exception
*
* @return Promise
* @internal
*/
public function setLoop(LoopInterface $loop)
public function reject(Exception $exception)
{
$this->loop = $loop;
if ($this->state !== HttpPromise::PENDING) {
throw new \RuntimeException('Promise is already resolved');
}

$this->state = HttpPromise::REJECTED;
$this->exception = $exception;
$this->doReject($exception);
}

private function doReject(Exception $exception)
{
$onRejected = $this->onRejected;

return $this;
if (null !== $onRejected) {
$onRejected($exception);
}
}

/**
* {@inheritdoc}
*/
public function getState()
{
return $this->state;
}

/**
* {@inheritdoc}
*/
public function wait($unwrap = true)
{
if (null === $this->loop) {
throw new \LogicException('You must set the loop before wait!');
}
while (HttpPromise::PENDING === $this->getState()) {
$this->loop->tick();
}
Expand Down
34 changes: 34 additions & 0 deletions tests/PromiseTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace Http\Adapter\React\Tests;

use GuzzleHttp\Psr7\Response;
use Http\Adapter\React\Promise;
use Http\Adapter\React\ReactFactory;
use PHPUnit\Framework\TestCase;

class PromiseTest extends TestCase
{
private $loop;

public function setUp()
{
$this->loop = ReactFactory::buildEventLoop();
}

public function testChain()
{
$promise = new Promise($this->loop);
$response = new Response(200);

$lastPromise = $promise->then(function (Response $response) {
return $response->withStatus(300);
});

$promise->resolve($response);
$updatedResponse = $lastPromise->wait();

self::assertEquals(200, $response->getStatusCode());
self::assertEquals(300, $updatedResponse->getStatusCode());
}
}