Skip to content

[Messenger] Allow the use of transports default dead letter queue mechanisms and document SerializerInterface #17636

Closed
@CViniciusSDias

Description

@CViniciusSDias

Description

I am trying to configure Symfony Messenger with the SQS Transport for handling messages that will be published by a different system.
Since these messages are going to be published by a different system I need to create my own serializer (implementing Symfony\Component\Messenger\Transport\Serialization\SerializerInterface).
There is literally no documentation on how to do that and serializing the stamps is being really dificult (since some default symfony stamps have Closure as members and that can't be serialized). That is preventing me of using the retry mechanisms as it usually works when using the default serializer.
If the messenger could simply not delete the message in SQS queue, after a couple of retries the message could be sent (by amazon) to the dead letter queue and we wouldn't need to deal with all the stamps problem.

So the suggestion is:

  1. Document how a working implementation of Symfony\Component\Messenger\Transport\Serialization\SerializerInterface could be written (the one from SymfonyCasts doesn't work)
  2. Allow some configuration to not remove the message from SQS queue unless the message is correctly handled. This would allow DLQ configurations in the Amazon side.

Example

This is my latest attempt of having the correct implementation of SerializerInterface, which works, but it's a bit hacky, having to ignore AckStamp because it has a Closure member:

<?php

namespace App\Infrastructure\Message\SymfonySetup;

use App\Domain\Message\ErrorMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class JsonMessageSerializer implements SerializerInterface
{
    public function __construct(private SerializerInterface $defaultSerializer)
    {
    }

    /**
     * @param array{"body": string, "headers"?: array{"stamps"?: string}} $encodedEnvelope
     * @return Envelope
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        $data = json_decode($encodedEnvelope['body'], true);
        if (!is_array($data) || !array_key_exists('type', $data)) {
            try {
                return $this->defaultSerializer->decode($encodedEnvelope);
            } catch (MessageDecodingFailedException) {
                return new Envelope(new ErrorMessage($encodedEnvelope['body']));
            }
        }

        $messageClassName = $this->getMessageClassName($data['type']);
        if (!class_exists($messageClassName)) {
            /** If we are not able to figure out the message type, emmit an error message */
            return new Envelope(new ErrorMessage($encodedEnvelope['body']));
        }

        unset($data['type']);
        $message = new $messageClassName(...$data);

        $stamps = [];
        if (isset($encodedEnvelope['headers']) && isset($encodedEnvelope['headers']['stamps'])) {
            /** @var array<string, StampInterface[]> $stamps */
            $stamps = unserialize(base64_decode($encodedEnvelope['headers']['stamps']));

            // Flattening stamps array
            $stamps = array_merge(...array_values($stamps));
        }

        return new Envelope($message, $stamps);
    }

    /**
     * @param Envelope $envelope
     * @return array{body: string}
     */
    public function encode(Envelope $envelope): array
    {
        $messageObject = $envelope->getMessage();
        $messageClass = get_class($messageObject);
        if (!str_starts_with($messageClass, 'App\Domain\Message')) {
            /** @var array{body: string} $encodedEnvelope */
            $encodedEnvelope = $this->defaultSerializer->encode($envelope);
            return $encodedEnvelope;
        }

        $messageData = (array) $messageObject;
        $messageData['type'] = $this->getTypeFromMessageClassName($messageClass);

        // Removing ActStamp because it has a Closure as a member, so it can't be serialized
        $envelopeStamps = $envelope->withoutAll(AckStamp::class)->all();
        return [
            'body' => json_encode($messageData, flags: JSON_THROW_ON_ERROR),
            'headers' => ['stamps' => base64_encode(serialize($envelopeStamps))]
        ];
    }

    private function getMessageClassName(string $notificationType): string
    {
        $spaced = str_replace('_', ' ', $notificationType);
        $pascalCased = ucwords($spaced);
        $className = str_replace(' ', '', $pascalCased);

        return "\\App\\Domain\\Message\\$className";
    }

    private function getTypeFromMessageClassName(string $messageClassName): string
    {
        $classShortName = substr($messageClassName, strrpos($messageClassName, '\\') + 1);
        /** @var string $underscored */
        $underscored = preg_replace('/([A-Z])/', '_$1', $classShortName);
        $lowereCased = strtolower($underscored);

        return trim($lowereCased, '_');
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    MessengerhasPRA Pull Request has already been submitted for this issue.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions