@@ -264,6 +264,7 @@ do is to write your own CSV receiver::
264
264
265
265
use App\Message\NewOrder;
266
266
use Symfony\Component\Messenger\Envelope;
267
+ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
267
268
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
268
269
use Symfony\Component\Serializer\SerializerInterface;
269
270
@@ -280,17 +281,23 @@ do is to write your own CSV receiver::
280
281
281
282
public function get(): iterable
282
283
{
283
- $ordersFromCsv = $this->serializer->deserialize(file_get_contents($this->filePath), 'csv');
284
-
285
- foreach ($ordersFromCsv as $orderFromCsv) {
286
- $order = new NewOrder($orderFromCsv['id'], $orderFromCsv['account_id'], $orderFromCsv['amount']);
287
-
288
- $envelope = new Envelope($order);
284
+ // Receive the envelope according to your transport ($yourEnvelope here),
285
+ // in most cases, using a connection is the easiest solution.
286
+ if (null === $yourEnvelope) {
287
+ return [];
288
+ }
289
289
290
- $handler($envelope);
290
+ try {
291
+ $envelope = $this->serializer->decode([
292
+ 'body' => $yourEnvelope['body'],
293
+ 'headers' => $yourEnvelope['headers'],
294
+ ]);
295
+ } catch (MessageDecodingFailedException $exception) {
296
+ $this->connection->reject($yourEnvelope['id']);
297
+ throw $exception;
291
298
}
292
299
293
- return [$envelope] ;
300
+ return [$envelope->with(new CustomStamp($yourEnvelope['id']) ;
294
301
}
295
302
296
303
public function ack(Envelope $envelope): void
@@ -300,7 +307,8 @@ do is to write your own CSV receiver::
300
307
301
308
public function reject(Envelope $envelope): void
302
309
{
303
- // Reject the message if needed
310
+ // In the case of a custom connection
311
+ $this->connection->reject($this->findCustomStamp($envelope)->getId());
304
312
}
305
313
}
306
314
0 commit comments