@@ -268,6 +268,7 @@ do is to write your own CSV receiver::
268
268
269
269
use App\Message\NewOrder;
270
270
use Symfony\Component\Messenger\Envelope;
271
+ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
271
272
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
272
273
use Symfony\Component\Serializer\SerializerInterface;
273
274
@@ -284,17 +285,24 @@ do is to write your own CSV receiver::
284
285
285
286
public function get(): void
286
287
{
287
- $ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv');
288
-
289
- foreach ($ordersFromCsv as $orderFromCsv) {
290
- $order = new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']);
291
-
292
- $envelope = new Envelope($order);
293
-
294
- $handler($envelope);
288
+ // Receive the envelope according to your transport ($yourEnvelope here),
289
+ // in most cases, using a connection is the easiest solution.
290
+
291
+ if (null === $yourEnvelope) {
292
+ return [];
295
293
}
296
-
297
- return [$envelope];
294
+
295
+ try {
296
+ $envelope = $this->serializer->decode([
297
+ 'body' => $yourEnvelope['body'],
298
+ 'headers' => $yourEnvelope['headers'],
299
+ ]);
300
+ } catch (MessageDecodingFailedException $exception) {
301
+ $this->connection->reject($yourEnvelope['id']);
302
+ throw $exception;
303
+ }
304
+
305
+ return [$yourEnvelope->with(new CustomStamp($yourEnvelope['id']);
298
306
}
299
307
300
308
public function ack(Envelope $envelope): void
@@ -304,7 +312,8 @@ do is to write your own CSV receiver::
304
312
305
313
public function reject(Envelope $envelope): void
306
314
{
307
- // Reject the message if needed
315
+ // In the case of a custom connection
316
+ $this->connection->reject($this->findCustomStamp($envelope)->getId());
308
317
}
309
318
}
310
319
0 commit comments