Description
Description
I am trying to configure Symfony Messenger with the SQS Transport for handling messages that will be published by a different system.
Since these messages are going to be published by a different system I need to create my own serializer (implementing Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
).
There is literally no documentation on how to do that and serializing the stamps is being really dificult (since some default symfony stamps have Closure
as members and that can't be serialized). That is preventing me of using the retry mechanisms as it usually works when using the default serializer.
If the messenger could simply not delete the message in SQS queue, after a couple of retries the message could be sent (by amazon) to the dead letter queue and we wouldn't need to deal with all the stamps problem.
So the suggestion is:
- Document how a working implementation of
Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
could be written (the one from SymfonyCasts doesn't work) - Allow some configuration to not remove the message from SQS queue unless the message is correctly handled. This would allow DLQ configurations in the Amazon side.
Example
This is my latest attempt of having the correct implementation of SerializerInterface, which works, but it's a bit hacky, having to ignore AckStamp
because it has a Closure
member:
<?php
namespace App\Infrastructure\Message\SymfonySetup;
use App\Domain\Message\ErrorMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class JsonMessageSerializer implements SerializerInterface
{
public function __construct(private SerializerInterface $defaultSerializer)
{
}
/**
* @param array{"body": string, "headers"?: array{"stamps"?: string}} $encodedEnvelope
* @return Envelope
*/
public function decode(array $encodedEnvelope): Envelope
{
$data = json_decode($encodedEnvelope['body'], true);
if (!is_array($data) || !array_key_exists('type', $data)) {
try {
return $this->defaultSerializer->decode($encodedEnvelope);
} catch (MessageDecodingFailedException) {
return new Envelope(new ErrorMessage($encodedEnvelope['body']));
}
}
$messageClassName = $this->getMessageClassName($data['type']);
if (!class_exists($messageClassName)) {
/** If we are not able to figure out the message type, emmit an error message */
return new Envelope(new ErrorMessage($encodedEnvelope['body']));
}
unset($data['type']);
$message = new $messageClassName(...$data);
$stamps = [];
if (isset($encodedEnvelope['headers']) && isset($encodedEnvelope['headers']['stamps'])) {
/** @var array<string, StampInterface[]> $stamps */
$stamps = unserialize(base64_decode($encodedEnvelope['headers']['stamps']));
// Flattening stamps array
$stamps = array_merge(...array_values($stamps));
}
return new Envelope($message, $stamps);
}
/**
* @param Envelope $envelope
* @return array{body: string}
*/
public function encode(Envelope $envelope): array
{
$messageObject = $envelope->getMessage();
$messageClass = get_class($messageObject);
if (!str_starts_with($messageClass, 'App\Domain\Message')) {
/** @var array{body: string} $encodedEnvelope */
$encodedEnvelope = $this->defaultSerializer->encode($envelope);
return $encodedEnvelope;
}
$messageData = (array) $messageObject;
$messageData['type'] = $this->getTypeFromMessageClassName($messageClass);
// Removing ActStamp because it has a Closure as a member, so it can't be serialized
$envelopeStamps = $envelope->withoutAll(AckStamp::class)->all();
return [
'body' => json_encode($messageData, flags: JSON_THROW_ON_ERROR),
'headers' => ['stamps' => base64_encode(serialize($envelopeStamps))]
];
}
private function getMessageClassName(string $notificationType): string
{
$spaced = str_replace('_', ' ', $notificationType);
$pascalCased = ucwords($spaced);
$className = str_replace(' ', '', $pascalCased);
return "\\App\\Domain\\Message\\$className";
}
private function getTypeFromMessageClassName(string $messageClassName): string
{
$classShortName = substr($messageClassName, strrpos($messageClassName, '\\') + 1);
/** @var string $underscored */
$underscored = preg_replace('/([A-Z])/', '_$1', $classShortName);
$lowereCased = strtolower($underscored);
return trim($lowereCased, '_');
}
}