Компонент Messenger
Дата обновления перевода: 2024-07-05
Компонент Messenger
Компонент Messenger помогает приложениям отправлять и принимать сообщения из других приложений или через очереди сообщений.
Компонент во многом был основан на серии постов Маттиаса Нобака о командах bus и проект SimpleBus.
See also
Эта статья объясняет как использовать функции Messenger в качестве независимого компонента в любом PHP-приложении. Прочтите статью Messenger: работа с синхронизированными сообщениями и сообщениями в очереди, чтобы узнать, как его использовать в приложениях Symfony.
Установка
1
$ composer require symfony/messenger
Note
Если вы устанавливаете этот компонент вне приложения Symfony, вам нужно
подключить файл vendor/autoload.php
в вашем коде для включения механизма
автозагрузки классов, предоставляемых Composer. Детальнее читайте в
этой статье.
Концепты
- Отправитель:
- Отвечает за сериализацию и отправку сообщений чему-то. Это что-то может быть брокером сообщений или сторонней API, к примеру.
- Получатель:
- Отвечает за десериализацию и перенаправление сообщений обработчку(ам). Это может быть пулер очереди сообщений или конечная точка API, к примеру.
- Обработчик:
-
Отвечает за обработку сообщений, используя бизнес-логику, применимую к сообщениям.
Обработчики вызываются промежуточным ПО
HandleMessageMiddleware
. - Промежуточное ПО:
- Промежуточное ПО может получить доступ к сообщению и его обертке (конверту), во время запуска через bus. В буквальном смысле "ПО посредине", оно не касается основной функциональности (бизнес логики) приложения. Вместо этого, оно срезает углы и может применяться по всему приложению, влияя на весь bus сообщений. К примеру: ведение логов, валидацию сообщения, начало транзакции, ... Оно также ответственно за вызов следующего промежуточного ПО в цепочке, что означает, что оно также может настраивать конверт, добавляя к нему штапмы или даже перемещая его, а также прервать цепочку промежуточного ПО. Промежуточное ПО вызывается и при первоначальном запуске сообщения, и позже, когда сообщение получено от транспорта.
- Конверт:
- Специальная концепция мессенджера, предоставляющая полную гибкость внутри bus с сообщениями, оборачивая в себя сообщения, и позволяя добавлять полезную информацию внутри посредоством марок на конвертах.
- Штампы конвертов:
- Информация, которую вам нужно добавить к сообщению: контекст сериализатора, который использовать для транспорта, маркеры, указывающие на получение сообщения, или любые другие метаданные, которые может использовать ваше промежуточное ПО или слой транспорта.
Автобус
Автобус используется для развёртывания сообщений. Поведение автобуса упорядочено в связующем стеке. Компонент поставляется с набором связующего ПО, которое вы можете использовать.
При использовании автобуса сообщений с пакетом Symfony FrameworkBundle, для вас конфигурируется следующее промежуточное ПО:
- SendMessageMiddleware (enables asynchronous processing, logs the processing of your messages if you pass a logger)
- HandleMessageMiddleware (calls the registered handler(s))
Пример:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use App\Message\MyMessage;
use App\MessageHandler\MyMessageHandler;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
$handler = new MyMessageHandler();
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator([
MyMessage::class => [$handler],
])),
]);
$bus->dispatch(new MyMessage(/* ... */));
Note
Каждое промежуточное ПО должно реализовывать MiddlewareInterface.
Обработчики
После диспетчеризации в автобус, сообщения будут обработаны "обработчиками сообщений". Обработчик сообщений - это PHP вызываемое (т.е. функция или экземпляр класса), которое будет проводить необходимую обработку для вашего сообщения:
1 2 3 4 5 6 7 8 9 10 11
namespace App\MessageHandler;
use App\Message\MyMessage;
class MyMessageHandler
{
public function __invoke(MyMessage $message): void
{
// Обработка сообщения...
}
}
Добавление метаданных к сообщениям (конверты)
Если вам нужно добавить метаданные или некоторую конфигурацию в сообщение, оберните
его в класс Envelope и добавьте штампы.
Например, чтобы установить группы сериализации, используемые при прохождении сообщения
через слой транспорта, используйте штамп SerializerStamp
:
1 2 3 4 5 6 7 8 9 10
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
$bus->dispatch(
(new Envelope($message))->with(new SerializerStamp([
// группы применяются ко всему сообщению, так что не забудьте
// определить группу для каждого встроенного объекта
'groups' => ['my_serialization_groups'],
]))
);
Вот некоторые вашные штампы на конверты, которые отправляются в Symfony Messenger:
- DelayStamp, чтобы отложить обработку асинхронного сообщения.
- DispatchAfterCurrentBusStamp, чтобы сообщение было обработано после того, как будет выполнен текущий bus. Прочтите больше в Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена.
- HandledStamp, штамп, который отмечает сообщение обработанным конкретным обработчиком. Позволяет получить доступ к значению, возвращенному обработчиком, и названию обработчка.
- ReceivedStamp, внутренний штамп, который отмечает сообщение полученным от транспорта.
- SentStamp, штамп, который отмечает сообщение отправленным от конкретного отправителя. Позволяет получить доступ к FQCN отправителя и псевдоним, если он доступен, из SendersLocator.
- SerializerStamp, чтобы сконфигурировать группы сериализации, используемые транспортом.
- ValidationStamp, чтобы сконфигурировать группы валидации, используемые при включении промежуточного ПО валидации.
- ErrorDetailsStamp, внутренний штамп, когда сообщение неуспешно в связи с исключением в обработчике.
- ScheduledStamp, штамп, отмечающий сообщение как созданное планировщиком. Это помогает отличить его от сообщений, созданных "вручную". Вы можете узнать больше об этом в Документации по планировщику.
Note
Штамп ErrorDetailsStamp
содержит FlattenException,
который является представлением исключения, вызвавшего ошибку сообщения. Вы можете
получить это исключение с помощью метода
getFlattenException().
Это исключение нормализуется благодаря FlattenExceptionNormalizer,
который помогает сообщать об ошибках в контексте Messenger.
Вместо того, чтобы разбираться с сообщениями напрямую в промежуточном ПО, вы получаете конверт. Таким образом, вы можете исследовать содержание конверта и его штампа или добавить их:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use App\Message\Stamp\AnotherStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class MyOwnMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(ReceivedStamp::class)) {
// Сообщение было получено только что...
// Вы можете, к примеру, добавить другой штамп.
$envelope = $envelope->with(new AnotherStamp(/* ... */));
} else {
// Сообщение только что было запущено
}
return $stack->next()->handle($envelope, $stack);
}
}
Пример выше перенаправит сообщение к следующему промежуточному ПО с дополнительным
штампом если сообщение было только что получено (т.е. имеет хотя бы один
штамп ReceivedStamp
). Вы можете создавать собственные штампы, реализуя
StampInterface.
Если вы хотите изучить все штампы на конверте, используйте метод $envelope->all()
,
который возвращает все штампы, сгрупированные по типам (FQCN). Как вариант, вы можете
итерировать все штампы конкретного типа, используя FQCN в качестве первого параметра
этого метода (например, $envelope->all(ReceivedStamp::class)
).
Note
Любой штамп должен иметь возможность быть сериализованым с использованием компонента Symfony Сериализатор, если он проходит через транспорт, используя базовый сериализатор Serializer.
Транспорт
Для отправки и получения сообщений вам понадобится сконфигурировать транспорт. Транспорт будет отвечать за коммуникацию с вашим брокером сообщений или третьими сторонами.
Ваш собственный отправитель
Представьте, что у вас уже есть сообщение ImportantAction
, проходящее через
автобус сообщений и обрабатываемое обработчиком. Теперь, вы также хотите отправить
это сообщение в виде электронного письма (используя компоненты Mime
и Mailer).
Используя SenderInterface, вы можете создать собственного отправителя:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
namespace App\MessageSender;
use App\Message\ImportantAction;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Mime\Email;
class ImportantActionToEmailSender implements SenderInterface
{
public function __construct(
private MailerInterface $mailer,
private string $toEmail,
) {
}
public function send(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if (!$message instanceof ImportantAction) {
throw new \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));
}
$this->mailer->send(
(new Email())
->to($this->toEmail)
->subject('Important action made')
->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')
);
return $envelope;
}
}
Ваш собственный получатель
Получатель отвечает за получение сообщений из источника и их диспетчеризацию приложению.
Представьте, что вы уже обработали какие-то "команды" в вашем приложении,
используя сообщение NewOrder
. Теперь вы хотите интегрироваться с третьей
стороной или унаследованным приложением, но вы не можете использовать API
и вам нужно использовать общедоступный CSV с новыми командами.
Вы прочтёте этот CSV-файл и запустите сообщение NewOrder
. Всё, что вам нужно
сделать - это написать ваш пользовательский CSV получатель:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
namespace App\MessageReceiver;
use App\Message\NewOrder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;
class NewOrdersFromCsvFileReceiver implements ReceiverInterface
{
private $connection;
public function __construct(
private SerializerInterface $serializer,
private string $filePath,
) {
// Доступное соединение, объединенное с компонентом Messenger
// может быть найдено в "Symfony\Component\Messenger\Bridge\*\Transport\Connection".
$this->connection = /* create your connection */;
}
public function get(): iterable
{
// Получите конверт в соответствии с вашим транспортом, в большинстве,
// случаев ($yourEnvelope здесь), использование связи является самым простым решением
if (null === $yourEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $yourEnvelope['body'],
'headers' => $yourEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($yourEnvelope['id']);
throw $exception;
}
return [$envelope->with(new CustomStamp($yourEnvelope['id']))];
}
public function ack(Envelope $envelope): void
{
// Добавьте информацию об обработанном сообщении
}
public function reject(Envelope $envelope): void
{
// В случае пользовательского соединения
$id = /* get the message id thanks to information or stamps present in the envelope */;
$this->connection->reject($id);
}
}
Получатель и отправитель в одном автобусе
Чтобы разрешить отправку и получение сообщений в одном и том же автобусе и предотвратить бесконечный цикл, автобус сообщений добавит к конвертам сообщений штамп ReceivedStamp и промежуточное ПО SendMessageMiddleware будет знать, что оно не должно снова направлять эти сообщения на транспорт