File tree Expand file tree Collapse file tree 1 file changed +21
-0
lines changed Expand file tree Collapse file tree 1 file changed +21
-0
lines changed Original file line number Diff line number Diff line change @@ -8,6 +8,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
8
8
* [ Send message to queue] ( #send-message-to-queue )
9
9
* [ Consume message] ( #consume-message )
10
10
* [ Serialize message] ( #serialize-message )
11
+ * [ Chnage offset] ( #change-offset )
11
12
12
13
## Installation
13
14
@@ -84,6 +85,9 @@ $fooQueue = $psrContext->createQueue('foo');
84
85
85
86
$consumer = $psrContext->createConsumer($fooQueue);
86
87
88
+ // Enable async commit to gain better performance.
89
+ //$consumer->setCommitAsync(true);
90
+
87
91
$message = $consumer->receive();
88
92
89
93
// process a message
@@ -115,4 +119,21 @@ class FooSerializer implements Serializer
115
119
$psrContext->setSerializer(new FooSerializer());
116
120
```
117
121
122
+ ## Change offset
123
+
124
+ By default consumers starts from the beginning of the topic and updates the offset while you are processing messages.
125
+ There is an ability to change the current offset.
126
+
127
+ ``` php
128
+ <?php
129
+ /** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */
130
+
131
+ $fooQueue = $psrContext->createQueue('foo');
132
+
133
+ $consumer = $psrContext->createConsumer($fooQueue);
134
+ $consumer->setOffset(123);
135
+
136
+ $message = $consumer->receive(2000);
137
+ ```
138
+
118
139
[ back to index] ( index.md )
You can’t perform that action at this time.
0 commit comments