Skip to content

Commit f28f464

Browse files
authored
Merge pull request #517 from php-enqueue/impr-client-extensions
[client] Improve client extension.
2 parents f36d0b9 + f6e0afd commit f28f464

26 files changed

+2074
-770
lines changed

.php_cs.dist renamed to .php_cs.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
'psr4' => true,
2222
'strict_param' => true,
2323
))
24+
->setCacheFile(getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__.'/var/.php_cs.cache')
2425
->setFinder(
2526
PhpCsFixer\Finder::create()
2627
->name('/\.php$/')
2728
->in(__DIR__)
2829
)
29-
;
30+
;

.travis.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ matrix:
3838
cache:
3939
directories:
4040
- $HOME/.composer/cache
41+
- $HOME/.php-cs-fixer
4142

4243
before_install:
4344
- echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini
@@ -52,9 +53,9 @@ install:
5253
- if [ "$PREPARE_CONTAINER" = true ]; then bin/dev -b; fi
5354

5455
script:
55-
- IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS
56-
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer --no-interaction --dry-run --diff -v --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}" fix; fi
57-
- if [ "$PHPSTAN" = true ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- "${COMMIT_SCA_FILES[@]}" ; fi
56+
- PKG_PHP_CHANGED_FILES=`./bin/git-find-changed-php-files.sh "${TRAVIS_COMMIT_RANGE}"`
57+
- if [ "$PHP_CS_FIXER" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then ./bin/php-cs-fixer fix --config=.php_cs.php --no-interaction --dry-run --diff -v --path-mode=intersection -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
58+
- if [ "$PHPSTAN" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
5859
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
5960
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/test.sh --exclude-group=rdkafka; fi
6061
- if [ "RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi

bin/git-find-changed-php-files.sh

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/usr/bin/env bash
2+
3+
if (( "$#" != 1 ))
4+
then
5+
echo "Git range must be provided"
6+
exit 1
7+
fi
8+
9+
10+
IFS='
11+
'
12+
ALL_CHANGED_FILES=$(git diff --name-only --diff-filter=ACMRTUXB "$1");
13+
PKG_PHP_CHANGED_FILES=$(echo "$ALL_CHANGED_FILES" | grep -E "^pkg\/" | grep -E ".*?\.php$");
14+
15+
echo "$PKG_PHP_CHANGED_FILES";

bin/pre-commit

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ function getFilesToFix()
6868
return (bool) preg_match('/\.(php|twig|translations\/*.yml)$/', $file);
6969
});
7070

71+
$stagedFiles = array_filter($stagedFiles, function ($file) {
72+
return (bool) preg_match('/^pkg\//', $file);
73+
});
74+
7175
return $stagedFiles;
7276
}
7377

@@ -104,7 +108,7 @@ function runPhpCsFixer()
104108
$returnCode = null;
105109

106110
exec(sprintf(
107-
'%s %s fix %s --dry-run',
111+
'%s %s fix %s --dry-run --config=.php_cs.php',
108112
$phpBin,
109113
$phpCsFixerBin,
110114
$projectRootDir.'/'.$file

pkg/enqueue/Client/ChainExtension.php

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Client;
44

5-
class ChainExtension implements ExtensionInterface
5+
final class ChainExtension implements ExtensionInterface
66
{
77
/**
88
* @var ExtensionInterface[]
@@ -14,26 +14,36 @@ class ChainExtension implements ExtensionInterface
1414
*/
1515
public function __construct(array $extensions)
1616
{
17-
$this->extensions = $extensions;
17+
array_walk($extensions, function (ExtensionInterface $extension) {
18+
$this->extensions[] = $extension;
19+
});
1820
}
1921

20-
/**
21-
* {@inheritdoc}
22-
*/
23-
public function onPreSend($topic, Message $message)
22+
public function onPreSendEvent(PreSend $event): void
2423
{
2524
foreach ($this->extensions as $extension) {
26-
$extension->onPreSend($topic, $message);
25+
$extension->onPreSendEvent($event);
2726
}
2827
}
2928

30-
/**
31-
* {@inheritdoc}
32-
*/
33-
public function onPostSend($topic, Message $message)
29+
public function onPreSendCommand(PreSend $event): void
30+
{
31+
foreach ($this->extensions as $extension) {
32+
$extension->onPreSendCommand($event);
33+
}
34+
}
35+
36+
public function onDriverPreSend(DriverPreSend $context): void
37+
{
38+
foreach ($this->extensions as $extension) {
39+
$extension->onDriverPreSend($context);
40+
}
41+
}
42+
43+
public function onPostSend(PostSend $event): void
3444
{
3545
foreach ($this->extensions as $extension) {
36-
$extension->onPostSend($topic, $message);
46+
$extension->onPostSend($event);
3747
}
3848
}
3949
}

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\Config;
6+
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
67
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
7-
use Enqueue\Client\Message;
8+
use Enqueue\Client\PreSend;
89
use Enqueue\Consumption\Context;
9-
use Enqueue\Consumption\EmptyExtensionTrait;
10+
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
1011
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
1112

12-
class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
13+
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
1314
{
14-
use EmptyExtensionTrait;
15+
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;
1516

1617
/**
1718
* @var string[]
@@ -60,26 +61,14 @@ public function onPreReceived(Context $context)
6061
}
6162
}
6263

63-
/**
64-
* {@inheritdoc}
65-
*/
66-
public function onPreSend($topic, Message $message)
64+
public function onPreSendCommand(PreSend $context): void
6765
{
68-
if (Config::COMMAND_TOPIC != $topic) {
69-
return;
70-
}
66+
$message = $context->getMessage();
67+
$command = $context->getCommand();
7168

72-
$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
73-
if (array_key_exists($commandName, $this->processorNameToQueueNameMap)) {
74-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
75-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$commandName]);
69+
if (array_key_exists($command, $this->processorNameToQueueNameMap)) {
70+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command);
71+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$command]);
7672
}
7773
}
78-
79-
/**
80-
* {@inheritdoc}
81-
*/
82-
public function onPostSend($topic, Message $message)
83-
{
84-
}
8574
}

pkg/enqueue/Client/DriverPreSend.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
final class DriverPreSend
6+
{
7+
private $message;
8+
9+
private $producer;
10+
11+
private $driver;
12+
13+
public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
14+
{
15+
$this->message = $message;
16+
$this->producer = $producer;
17+
$this->driver = $driver;
18+
}
19+
20+
public function getMessage(): Message
21+
{
22+
return $this->message;
23+
}
24+
25+
public function getProducer(): ProducerInterface
26+
{
27+
return $this->producer;
28+
}
29+
30+
public function getDriver(): DriverInterface
31+
{
32+
return $this->driver;
33+
}
34+
35+
public function isEvent(): bool
36+
{
37+
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function getCommand(): string
41+
{
42+
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
43+
}
44+
45+
public function getTopic(): string
46+
{
47+
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
48+
}
49+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
trait EmptyExtensionTrait
6+
{
7+
public function onPreSendEvent(PreSend $context): void
8+
{
9+
}
10+
11+
public function onPreSendCommand(PreSend $context): void
12+
{
13+
}
14+
15+
public function onDriverPreSend(DriverPreSend $context): void
16+
{
17+
}
18+
19+
public function onPostSend(PostSend $context): void
20+
{
21+
}
22+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\Client\Extension;
4+
5+
use Enqueue\Client\EmptyExtensionTrait;
6+
use Enqueue\Client\ExtensionInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\PreSend;
9+
use Enqueue\Util\JSON;
10+
11+
class PrepareBodyExtension implements ExtensionInterface
12+
{
13+
use EmptyExtensionTrait;
14+
15+
public function onPreSendEvent(PreSend $context): void
16+
{
17+
$this->prepareBody($context->getMessage());
18+
}
19+
20+
public function onPreSendCommand(PreSend $context): void
21+
{
22+
$this->prepareBody($context->getMessage());
23+
}
24+
25+
private function prepareBody(Message $message): void
26+
{
27+
$body = $message->getBody();
28+
$contentType = $message->getContentType();
29+
30+
if (is_scalar($body) || null === $body) {
31+
$contentType = $contentType ?: 'text/plain';
32+
$body = (string) $body;
33+
} elseif (is_array($body)) {
34+
// only array of scalars is allowed.
35+
array_walk_recursive($body, function ($value) {
36+
if (!is_scalar($value) && null !== $value) {
37+
throw new \LogicException(sprintf(
38+
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
39+
is_object($value) ? get_class($value) : gettype($value)
40+
));
41+
}
42+
});
43+
44+
$contentType = $contentType ?: 'application/json';
45+
$body = JSON::encode($body);
46+
} elseif ($body instanceof \JsonSerializable) {
47+
$contentType = $contentType ?: 'application/json';
48+
$body = JSON::encode($body);
49+
} else {
50+
throw new \InvalidArgumentException(sprintf(
51+
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
52+
is_object($body) ? get_class($body) : gettype($body)
53+
));
54+
}
55+
56+
$message->setContentType($contentType);
57+
$message->setBody($body);
58+
}
59+
}

pkg/enqueue/Client/ExtensionInterface.php

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@
44

55
interface ExtensionInterface
66
{
7-
/**
8-
* @param string $topic
9-
* @param Message $message
10-
*
11-
* @return
12-
*/
13-
public function onPreSend($topic, Message $message);
7+
public function onPreSendEvent(PreSend $context): void;
148

15-
/**
16-
* @param string $topic
17-
* @param Message $message
18-
*
19-
* @return
20-
*/
21-
public function onPostSend($topic, Message $message);
9+
public function onPreSendCommand(PreSend $context): void;
10+
11+
public function onDriverPreSend(DriverPreSend $context): void;
12+
13+
public function onPostSend(PostSend $context): void;
14+
15+
// /**
16+
// * @deprecated
17+
// */
18+
// public function onPreSend($topic, Message $message);
19+
//
20+
// /**
21+
// * @deprecated
22+
// */
23+
// public function onPostSend($topic, Message $message);
2224
}

pkg/enqueue/Client/PostSend.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
final class PostSend
6+
{
7+
private $message;
8+
9+
private $producer;
10+
11+
private $driver;
12+
13+
public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
14+
{
15+
$this->message = $message;
16+
$this->producer = $producer;
17+
$this->driver = $driver;
18+
}
19+
20+
public function getMessage(): Message
21+
{
22+
return $this->message;
23+
}
24+
25+
public function getProducer(): ProducerInterface
26+
{
27+
return $this->producer;
28+
}
29+
30+
public function getDriver(): DriverInterface
31+
{
32+
return $this->driver;
33+
}
34+
35+
public function isEvent(): bool
36+
{
37+
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function getCommand(): string
41+
{
42+
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
43+
}
44+
45+
public function getTopic(): string
46+
{
47+
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
48+
}
49+
}

0 commit comments

Comments
 (0)