Skip to content

Commit 98b3b42

Browse files
committed
[doc] add docs
1 parent c15cc2c commit 98b3b42

File tree

7 files changed

+226
-103
lines changed

7 files changed

+226
-103
lines changed

docs/client/message_bus.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Client. Message bus
2+
3+
Here's a description of message bus from [Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html)
4+
5+
> A Message Bus is a combination of a common data model, a common command set, and a messaging infrastructure to allow different systems to communicate through a shared set of interfaces.
6+
7+
If all your applications built on top of Enqueue Client you have to only make sure they send message to a shared topic.
8+
The rest is done under the hood.
9+
10+
If you'd like to connect another application (written on Python for example ) you have to follow these rules:
11+
12+
* An application defines its own queue that is connected to the topic as fanout.
13+
* A message sent to message bus topic must have a header `enqueue.topic_name`.
14+
* Once a message is received it could be routed internally. `enqueue.topic_name` header could be used for that.
15+
16+
[back to index](../index.md)

docs/client/message_examples.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Client. Message examples
2+
3+
## Delay
4+
5+
Message sent with a delay set is processed after the delay time exceed.
6+
Some brokers may not support it from scratch.
7+
In order to use delay feature with [RabbitMQ](https://www.rabbitmq.com/) you have to install a [delay plugin](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange).
8+
9+
```php
10+
<?php
11+
12+
use Enqueue\Client\Message;
13+
14+
$message = new Message();
15+
$message->setDelay(60); // seconds
16+
17+
/** @var \Enqueue\Client\MessageProducerInterface $producer */
18+
$producer->send('aTopic', $message);
19+
```
20+
21+
## Expiration (TTL)
22+
23+
The message may have an expiration or TTL (time to live).
24+
The message is removed from the queue if the expiration exceeded but the message has not been consumed.
25+
For example it make sense to send a forgot password email within first few minutes, nobody needs it in an hour.
26+
27+
```php
28+
<?php
29+
30+
use Enqueue\Client\Message;
31+
32+
$message = new Message();
33+
$message->setExpire(60); // seconds
34+
35+
/** @var \Enqueue\Client\MessageProducerInterface $producer */
36+
$producer->send('aTopic', $message);
37+
```
38+
39+
## Priority
40+
41+
You can set a priority If you want a message to be processed quicker than other messages in the queue.
42+
Client defines five priority constants: `MessagePriority::VERY_LOW`, `MessagePriority::LOW`, `MessagePriority::NORMAL`, `MessagePriority::HIGH`, `MessagePriority::VERY_HIGH`.
43+
The `MessagePriority::NORMAL` is default priority.
44+
45+
```php
46+
<?php
47+
48+
use Enqueue\Client\Message;
49+
use Enqueue\Client\MessagePriority;
50+
51+
$message = new Message();
52+
$message->setPriority(MessagePriority::HIGH);
53+
54+
/** @var \Enqueue\Client\MessageProducerInterface $producer */
55+
$producer->send('aTopic', $message);
56+
```
57+
58+
## Timestamp, Content type, Message id
59+
60+
Those are self describing things.
61+
Usually they are set by Client so you dont have to worry about them.
62+
If you do not like what Client set you can always set custom values:
63+
64+
```php
65+
<?php
66+
67+
use Enqueue\Client\Message;
68+
69+
$message = new Message();
70+
$message->setMessageId('aCustomMessageId');
71+
$message->setTimestamp(time());
72+
$message->setContentType('text/plain');
73+
74+
/** @var \Enqueue\Client\MessageProducerInterface $producer */
75+
$producer->send('aTopic', $message);
76+
```
77+
78+
[back to index](../index.md)

docs/client/supported_brokers.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Client. Supported brokers
2+
3+
Here's the list of protocols and Client features supported by them
4+
5+
| Protocol | Priority | Delay | Expiration | Setup broker | Message bus |
6+
|:--------------:|:--------:|:--------:|:----------:|:------------:|:-----------:|
7+
| AMQP | No | No | Yes | Yes | Yes |
8+
| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes |
9+
| STOMP | No | No | Yes | No | Yes** |
10+
| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** |
11+
12+
13+
* \* Possible if a RabbitMQ delay plugin is installed.
14+
* \*\* Possible if topics (exchanges) are configured on broker side manually.
15+
* \*\*\* Possible if RabbitMQ Managment Plugin is installed.
16+
17+
[back to index](../index.md)

docs/index.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@
77
- [NULL](null_transport.md)
88
* Consumption
99
- [Extensions](consumption/extensions.md)
10+
* Client
11+
- [Message examples](client/message_examples.md)
12+
- [Supported brokers](client/supported_brokers.md)
13+
- [Message bus](client/message_bus.md)
14+
* Job queue
15+
- [Run unique job](job_queue/run_unique_job.md)
16+
- [Run sub job(s)](job_queue/run_sub_job.md)
1017
* EnqueueBundle (Symfony).
1118
- [Quick tour](bundle/quick_tour.md)
1219
- [Config reference](bundle/config_reference.md)

docs/job_queue/run_sub_job.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
## Job queue. Run sub job
2+
3+
It shows how you can create and run a sub job, which it is executed separately.
4+
You can create as many sub jobs as you like.
5+
They will be executed in parallel.
6+
7+
```php
8+
<?php
9+
use Enqueue\Consumption\MessageProcessorInterface;
10+
use Enqueue\Client\MessageProducerInterface;
11+
use Enqueue\Consumption\Result;
12+
use Enqueue\Psr\Message;
13+
use Enqueue\Psr\Context;
14+
use Enqueue\JobQueue\JobRunner;
15+
use Enqueue\JobQueue\Job;
16+
use Enqueue\Util\JSON;
17+
18+
class RootJobMessageProcessor implements MessageProcessorInterface
19+
{
20+
/** @var JobRunner */
21+
private $jobRunner;
22+
23+
/** @var MessageProducerInterface */
24+
private $producer;
25+
26+
public function process(Message $message, Context $context)
27+
{
28+
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) {
29+
$runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) {
30+
$this->producer->send('aJobTopic', [
31+
'jobId' => $childJob->getId(),
32+
// other data required by sub job
33+
]);
34+
});
35+
36+
return true;
37+
});
38+
39+
return $result ? Result::ACK : Result::REJECT;
40+
}
41+
}
42+
43+
class SubJobMessageProcessor implements MessageProcessorInterface
44+
{
45+
/** @var JobRunner */
46+
private $jobRunner;
47+
48+
public function process(Message $message, Context $context)
49+
{
50+
$data = JSON::decode($message->getBody());
51+
52+
$result = $this->jobRunner->runDelayed($data['jobId'], function () use ($data) {
53+
// do your job
54+
55+
return true;
56+
});
57+
58+
return $result ? Result::ACK : Result::REJECT;
59+
}
60+
}
61+
```
62+
63+
[back to index](../index.md)

docs/job_queue/run_unique_job.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
## Job queue. Run unique job
2+
3+
There is job queue component build on top of a transport. It provides some additional features:
4+
5+
* Stores jobs to a database. So you can query that information and build a UI for it.
6+
* Run unique job feature. If used guarantee that there is not any job with the same name running same time.
7+
* Sub jobs. If used allow split a big job into smaller pieces and process them asynchronously and in parallel.
8+
* Depended job. If used allow send a message when the whole job is finished (including sub jobs).
9+
10+
Here's some examples.
11+
It shows how you can run unique job using job queue (The configuration is described in a dedicated chapter).
12+
13+
```php
14+
<?php
15+
use Enqueue\Consumption\MessageProcessorInterface;
16+
use Enqueue\Consumption\Result;
17+
use Enqueue\Psr\Message;
18+
use Enqueue\Psr\Context;
19+
use Enqueue\JobQueue\JobRunner;
20+
21+
class MessageProcessor implements MessageProcessorInterface
22+
{
23+
/** @var JobRunner */
24+
private $jobRunner;
25+
26+
public function process(Message $message, Context $context)
27+
{
28+
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function () {
29+
// do your job, there is no any other processes executing same job,
30+
31+
return true; // if you want to ACK message or false to REJECT
32+
});
33+
34+
return $result ? Result::ACK : Result::REJECT;
35+
}
36+
}
37+
```
38+
39+
[back to index](../index.md)

docs/quick_tour.md

Lines changed: 6 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,12 @@ $queueConsumer->consume();
156156

157157
## Client
158158

159-
It provides a high level abstraction.
160-
The goal of the component is hide as much as possible details from you so you can concentrate on things that really matters.
161-
For example, It configure a broker for you, if needed.
162-
It provides easy to use services for producing and processing messages.
163-
It supports message bus so different applications can send message to each other.
159+
It provides an easy to use high level abstraction.
160+
The goal of the component is hide as much as possible low level details so you can concentrate on things that really matters.
161+
For example, It configure a broker for you by creating queuest, exchanges and bind them.
162+
It provides easy to use services for producing and processing messages.
163+
It supports unified format for setting message expiration, delay, timestamp, correlation id.
164+
It supports message bus so different applications can talk to each other.
164165

165166
Here's an example of how you can send and consume messages.
166167

@@ -185,104 +186,6 @@ $client->send('foo_topic', 'Hello there!');
185186
$client->consume();
186187
```
187188

188-
## Job queue
189-
190-
There is job queue component build on top of a transport. It provides some additional features:
191-
192-
* Stores jobs to a database. So you can query that information and build a UI for it.
193-
* Run unique job feature. If used guarantee that there is not any job with the same name running same time.
194-
* Sub jobs. If used allow split a big job into smaller pieces and process them asynchronously and in parallel.
195-
* Depended job. If used allow send a message when the whole job is finished (including sub jobs).
196-
197-
Here's some examples.
198-
First shows how you can run unique job using job queue (The configuration is described in a dedicated chapter).
199-
200-
```php
201-
<?php
202-
use Enqueue\Consumption\MessageProcessorInterface;
203-
use Enqueue\Consumption\Result;
204-
use Enqueue\Psr\Message;
205-
use Enqueue\Psr\Context;
206-
use Enqueue\JobQueue\JobRunner;
207-
208-
class MessageProcessor implements MessageProcessorInterface
209-
{
210-
/** @var JobRunner */
211-
private $jobRunner;
212-
213-
public function process(Message $message, Context $context)
214-
{
215-
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function () {
216-
// do your job, there is no any other processes executing same job,
217-
218-
return true; // if you want to ACK message or false to REJECT
219-
});
220-
221-
return $result ? Result::ACK : Result::REJECT;
222-
}
223-
}
224-
```
225-
226-
Second shows how you can create and run a sub job, which it is executed separately.
227-
You can create as many sub jobs as you like.
228-
They will be executed in parallel.
229-
230-
```php
231-
<?php
232-
use Enqueue\Consumption\MessageProcessorInterface;
233-
use Enqueue\Client\MessageProducerInterface;
234-
use Enqueue\Consumption\Result;
235-
use Enqueue\Psr\Message;
236-
use Enqueue\Psr\Context;
237-
use Enqueue\JobQueue\JobRunner;
238-
use Enqueue\JobQueue\Job;
239-
use Enqueue\Util\JSON;
240-
241-
class RootJobMessageProcessor implements MessageProcessorInterface
242-
{
243-
/** @var JobRunner */
244-
private $jobRunner;
245-
246-
/** @var MessageProducerInterface */
247-
private $producer;
248-
249-
public function process(Message $message, Context $context)
250-
{
251-
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) {
252-
$runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) {
253-
$this->producer->send('aJobTopic', [
254-
'jobId' => $childJob->getId(),
255-
// other data required by sub job
256-
]);
257-
});
258-
259-
return true;
260-
});
261-
262-
return $result ? Result::ACK : Result::REJECT;
263-
}
264-
}
265-
266-
class SubJobMessageProcessor implements MessageProcessorInterface
267-
{
268-
/** @var JobRunner */
269-
private $jobRunner;
270-
271-
public function process(Message $message, Context $context)
272-
{
273-
$data = JSON::decode($message->getBody());
274-
275-
$result = $this->jobRunner->runDelayed($data['jobId'], function () use ($data) {
276-
// do your job
277-
278-
return true;
279-
});
280-
281-
return $result ? Result::ACK : Result::REJECT;
282-
}
283-
}
284-
```
285-
286189
## Cli commands
287190

288191
The library provides handy commands out of the box.

0 commit comments

Comments
 (0)