Messenger: работа с синхронизированными сообщениями и сообщениями в очереди
Дата обновления перевода 2023-01-16
Messenger: работа с синхронизированными сообщениями и сообщениями в очереди
Messenger предоставляет автобус сообщений с возможностью отправки сообщений, а затем работы с ними сразу же в вашем приложении, или их отправки через транспорт (например, очередь) для их обработки позже. Чтобы узнать об этом больше, прочтите документацию компонента Messenger.
Установка
В приложениях, использующих Symfony Flex , выполните эту команду, чтобы установить Messenger:
1
$ composer require symfony/messenger
Создание сообщения и обработчика
Messenger строится на двух разных классах, которые вы создадите: (1) классе сообщения, который содержит данные, и (2) классе обработчика(ов), который будет вызван после запуска сообщения. Класс обработчика будет читать класс сообщения и выполнять некоторое действие.
Для класса сообщения нет особых требований, кроме того, что его можно сериализовать:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
Обработчик сообщений - это PHP-вызываемое, рекомендованный способ его создания -
создать класс, реализующий MessageHandlerInterface
и имеющий метод __invoke()
, который имеет подсказки класса сообщения (или интерфейс сообщения):
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message)
{
// ... сделайте что-то - вроде отправки SMS!
}
}
Tip
Вы можете также использовать атрибут #[AsMessageHandler]
в отдельных методах
класса. Вы можете использовать атрибут в стольки методах одного класа, скольки
хотите, что позволяет вам группировать обработку нескольких связанных типов сообщений.
6.1
Поддержка для #[AsMessageHandler]
в методах была представлена в Symfony 6.1.
Благодаря автоконфигурации и подсказке SmsNotification
,
Symfony значет, что этот отбработчик должен быть вызван, когда запускается сообщение SmsNotification
.
В большинстве случаев, это все, что вам нужно будет сделать. Но вы можете также
сконфигурировать обработчики сообщений вручную . Чтобы
увидеть всех сконфигурированных обработчиков, выполните:
1
$ php bin/console debug:messenger
Запуск сообщения
Вы готовы! Для запуска сообщения (и вызова обработчика), внедрите сервис
messenger.default_bus
(через MessageBusInterface
), например, в контроллер:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus)
{
// приведет к вызову SmsNotificationHandler
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// ...
}
}
Транспорт: асинхронные сообщения/сообщения в очереди
По умолчанию, сообщения обрабатываются сразу же после запуска. Если вы хотите обработать сообщение асинхронно, вы можете сконфигурировать транспорт. Транспорт может отправлять сообщения (например, в систему очереди) и затем получать из через работника . Messenger поддерживает несколько транспортов .
Note
Если вы хотите использовать транспорт, который не поддерживается, посмотрите на транспорт Enqueue, который поддерживает штуки вроде Kafka и Google Pub/Sub.
Транспорт регистрируется с использованием "DSN". Благодаря рецепту Flex для Messenger,
ваш файл .env
уже имеет несколько примеров.
1 2 3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
Раскомментируйте тот транспорт, который вы хотите (или установите его в .env.local
). См.
, чтобы узнать больше деталей.
Далее, в config/packages/messenger.yaml
, давайте определим транспорт под название async
,
использующий эту конфигурацию:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
# или расширено для конфигурации большего количества опций
#async:
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
# options: []
Маршрутизация сообщений к транспорту
Теперь, когда у вас есть сконфигурированный транспорт, вместо немедленно обработки сообщений, вы можете сконфигурировать их так, чтобы они были отправлены транспорту:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
# async - это то имя, которое вы дали вашему транспорту выше
'App\Message\SmsNotification': async
Благодаря этому, App\Message\SmsNotification
будет отправлен транспорту async
,
и его обработчик(и) не будут вызваны сразу же. Все сообщения, не совпавшие с
routing
будут обработаны немедленно.
Note
Вы можете использовать '*'
как класс сообщения. Он будет вести себя как
правило маршрутизации по умолчанию для любого сообщения, не совпадающего с
routing
. Это полезно, чтобы гарантировать, что ни одно сообщение не
обрабатывается синхронно по умолчанию.
Единственный недостаток в том, что '*'
также будет применяться к электронным
письмам, отправленным с Symfony Mailer (который использует SendEmailMessage
,
если доступен Messenger). Это может привести к проблемам, если ваши письма
не поддаются сериализации (например, если они содержат вложения файлов как
PHP источники/потоки).
Вы также можете маршрутизировать классы по их родительскому классу или интерфейсу. Или отправлять сообщения нескольким транспортам:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml
framework:
messenger:
routing:
# маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
'App\Message\AbstractAsyncMessage': async
'App\Message\AsyncMessageInterface': async
'My\Message\ToBeSentToTwoSenders': [async, audit]
Note
Если вы сконфигурируете машрутизацию и для дочернего, и для родительского
класса, используются оба правила. Например, если у вас есть объект SmsNotification
,
расширяющийся из Notification
, будут использованы маршрутизации и для Notification
,
и для SmsNotification
.
Сущности Doctrine в сообщениях
Если вам нужно передать сущность в Doctrine в сообщении, лучше всего передать основной
ключ сущности (или любую релевантную информацию, необходимую обработчику, вроде email
,
и др.) вместо объекта:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
private $userId;
public function __construct(int $userId)
{
$this->userId = $userId;
}
public function getUserId(): int
{
return $this->userId;
}
}
Затем, в вашем обработчике, вы можете сделать запрос свежего объекта:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
private $userRepository;
public function __construct(UserRepository $userRepository)
{
$this->userRepository = $userRepository;
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail)
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... отправить электронное письмо!
}
}
Это гарантирует, что сущность содержит свежие данные.
Синхронная обработка сообщений
Если сообщение не совпадает ни с одним правилом маршрутизации , оно
не будет отправлено ни одному транспорту, и будет обработано немедленно. В некоторых случаях
(например, при связывании обработчиков с разными транспортами), легче и более гибко обработать
их ясно: создав транспорт sync
и "отправив" сообщения в него для немедленной обработки:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... другие транспорты
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
Создание вашего собственного транспорта
Вы также можете создать собственный транспорт, если вам нужно отправлять или получать сообщения из чего-то, что не поддерживается. См. Как создать ваш собственный транспорт сообщений.
Потребление сообщений (запуск работника)
Когда ваши сообщения были маршрутизированы, в большинстве случаев, вам нужно будет
"потребить" их. Вы можете сделать это с помощью команды messenger:consume
:
1 2 3 4
$ php bin/console messenger:consume async
# используйте -vv, чтобы увидеть детали того, что происходит
$ php bin/console messenger:consume async -vv
Первый аргумент - это имя получателя (или id сервиса, если вы маршрутизировали к пользовательскому сервису). По умолчанию, команда будет выполняться бесконечно: искать новые сообщения в вашем транспорте и обрабатывать их. Эта команда называется вашим "работником".
Tip
Чтобы правильно остановить работника, вызовите экземпляр StopWorkerException.
Развёртывание в производство
В производстве, есть несколько важных вещей, о которых стоит подумать:
- Используйте Супервизора, чтобы ваш(и) работник(и) работали
- Вы хотите, чтобы один или более "работников" работали все время. Чтобы сделать это, используйте систему контроля процесса вроде Супервизора .
- Не позволяйте работникам работать бесконечно
-
Некоторые сервисы (вроде
EntityManager
Doctrine) будут потреблять все больше памяти со временем. Поэтому, вместо того, чтобы позволять вашему работнику работать всегда, используйте флажок вродеmessenger:consume --limit=10
, чтобы указать работнику, что он должен обработать только 10 сообщений до прекращения работы (затем Супервизор создаст новый процесс). Также есть другие опции вроде--memory-limit=128M
и--time-limit=3600
. - Остановка работников, которые столкнулись с ошибками
-
Если зависимость работника, вроде вашего сервера БД не отвечает, или достигнут
тайм-аут, вы можете попробовать добавить логику повторного соединения ,
или просто остановить работника, если он получает слишком много ошибок, с помощью
опции
--failure-limit
командыmessenger:consume
. - Перезагружайте работников при развёртывании
-
Каждый раз при развёртывании вам нужно будет перезагрузить все процессы ваших работников,
чтобы они видели новозапущенный код. Чтобы сделать это, выполните
messenger:stop-workers
при запуске. Это сигнализирует каждому работнику, что он должен закончить обработку текущего сообщения и грациозно завершить работу. Затем, Супервизор создаст новые процессы работников. Команда использует кеш app внутренне - поэтому убедитесь в том, что он сконфигурирован для использования адаптера по вашему вкусу. - Используйте один кеш между запусками
-
Если ваша стратегия развёртывания заключается в создании новых целевых каталогов, вам
нужно установить значение опции конфигурации cache.prefix.seed ,
чтобы использовать одно и то же пространство имен кеша между запусками. Иначе, пул
cache.app
будет использовать значение параметраkernel.project_dir
в качестве базы для пространства имен, что приведет к разным пространствам имен каждый раз при запуске.
Приоритезированный транспорт
Иногда определенные типы сообщений должны иметь более высокий приоритет и быть обработаны до других. Чтобы сделать это возможным, вы можете создать несколько транспортов и маршрутизировать разные сообщения к ним. Например:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
# queue_name соответствует транспорту doctrine
queue_name: high
# для AMQP отправьте отдельный обмен, а затем поставьте в очередь
#exchange:
# name: high
#queues:
# messages_high: ~
# or redis try "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
routing:
'App\Message\SmsNotification': async_priority_low
'App\Message\NewUserWelcomeEmail': async_priority_high
Затем вы можете запускать отдельных работников для каждого транспорта, или инструктировать одного работника, чтобы он обрабатывал сообщения в порядке приоритетности:
1
$ php bin/console messenger:consume async_priority_high async_priority_low
Работник будет всегда вначале искать сообщения, ожидающие async_priority_high
.
Если таких нет, затем он будет потреблять сообщения из async_priority_low
.
Ограничьте потребление для конкретных очередей
Некоторый транспорт (особенно AMQP) имеет концепцию обмена и очередей. Транспорт Symfony всегда связан с обменом. По умолчанию, работник потребляет из всех очередей, соединенных с обменом указанного транспорта. Однако, есть примеры использования, когда вам нужно, чтобы работник потреблял только из конкретной очереди.
You can limit the worker to only process messages from specific queues:
1
$ php bin/console messenger:consume my_transport --queues=fasttrack
Чтобы позволить использование опции queues
, получатель должен реализовывать
QueueReceiverInterface.
Проверка количества сообщений в очереди для транспорта
Выполните команду messenger:stats
, чтобы узнать, сколько сообщений находятся в
"очередяз" некоторого или всех транспортов:
1 2 3 4 5
# отображает количество сообщений в очереди во всех транспортах
$ php bin/console messenger:stats
# отображает статистику только для некоторых транспортов
$ php bin/console messenger:stats my_transport_name other_transport_name
Note
Для того, чтобы эта команда работала, сконфигурированный получатель транспорта
должен реализовывать
MessageCountAwareInterface.
6.2
Команда messenger:stats
была представлена в Symfony 6.2.
Конфигурация супервизора
Супервизор - это отличный инструмент для гарантии того, что процесс(ы) ваших
работников всегда производятся (даже если он закрывается в связи с ошибкой,
достижением лимита сообщений или благодаря messenger:stop-workers
). Вы можете
установить его на Ubuntu, к примеру, через:
1
$ sudo apt-get install supervisor
Файлы конфигурации Супервизора обычно живут в каталоге /etc/supervisor/conf.d
.
Например, вы можете создать там новый файл messenger-worker.conf
, чтобы убедиться,
что 2 экземпляра messenger:consume
работают всегда:
1 2 3 4 5 6 7 8 9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
Измените аргумент async
, чтобы он использовал название вашего транспорта
(или транспортов) и user
на Unix-пользователя на вашем сервере.
Caution
Во время развёртывания, что-то может быть не доступно (например, БД),
что приведет к ошибке запуска потребителя. В такой ситуации, Supervisor
попробует startretries
несколько раз, чтобы перезапустить команду.
Убедитесь в том, что изменили эту настройку, чтобы избежать получения
команды в ФАТАЛЬНОМ состоянии, после которого её невозможно будет
перезапустить.
Каждый перезапуск, Supervisor увеличивает задержку на 1 секунду. Например,
если значение - 10
, он подождёт 1 сек, 2 сек, 3 сек и т.д. Это даёт
сервису в общей сложности 55 секунд, чтобы снова стать доступным. Увеличьте
настройку startretries
, чтобы покрыть максимум ожидаемого времени простоя.
Если вы используете транспорт Redis, заметьте, что каждому работнику нужно
уникальное имя потребителя, чтобы избежать обработки одного сообщения несколькими
работникам. Один из способов достичь этого - установить переменную окружения в файле
конфигурации Супервизора, на которую вы потом можете сослаться в messenger.yaml
(см. раздел Redis выше):
1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
Далее, укажите Supervisor прочитать вашу конфигурацию и запустить ваших работников:
1 2 3 4 5
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
См. документацию Supervisor, чтобы узнать больше деталей.
Грациозное завершение работы
Если вы установили в своем проекте PHP-расширение PCNTL, работники будут
обрабатывать POSIX сигнал SIGTERM
для окончания обработки текущего сообщения
перед выходом.
В некоторых случаях, сигнал SIGTERM
отправляется самим Супервизором (например,
остановка контейнера Docker в котором Супервизор - точка входа). В таких случаях, вам
нужно добавить ключ stopwaitsecs
к конфигурации программы (со значением желаемого
периода острочки в секундах) для того, чтобы выполнить грациозное выключение:
1 2
[program:x]
stopwaitsecs=20
Конфигурация Systemd
Хотя Supervisor - это отличный инструмент, он имеет недостаток в том, что вам нужен доступ к системе, чтобы его запустить. Systemd стала стандартом в большинстве дистрибуций Linux, и имеет хорошую альтернативу под названием сервисы пользователя.
Файлы конфигурации сервисов пользователя Systemd обычно живут в каталоге
~/.config/systemd/user
. Например, вы можете создать новый файл messenger-worker.service
.
Или файл messenger-worker@.service
, если вы хотите, чтобы больше экземпляров
работали одновременно:
1 2 3 4 5 6 7 8 9 10
[Unit]
Description=Symfony messenger-consume %i
[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
Теперь, укажите systemd включить и запустить одного работника:
1 2 3 4 5 6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service
# to enable and start 20 workers
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service
Если вы измените ваш файл конфигурации сервиса, вам нужно перезагрузить daemon:
1
$ systemctl --user daemon-reload
Чтобы перезапустить всех ваших потребителей:
1
$ systemctl --user restart messenger-consume@*.service
Экземпляр пользователя systemd запускается только после первого входа в систему конкретным пользователем. Потребителю часто нужно запускаться при начальной загрузке системы. Включите lingering в пользователе, чтобы активировать это поведение:
1
$ loginctl enable-linger <your-username>
Логами управляет journald и с ними можно работать, используя команду journalctl:
1 2 3 4 5 6 7 8
# следить за логами потребителя номер 11
$ journalctl -f --user-unit messenger-consume@11.service
# следить за логами всех потребителей
$ journalctl -f --user-unit messenger-consume@*
# следить за всеми логами из ваших сервисов пользователя
$ journalctl -f _UID=$UID
См. документацию systemd, чтобы узнать больше.
Note
Вам нужно либо иметь повышенные привелегии для команды journalctl
, либо
добавить вашего пользователя в группу systemd-journal:
1
$ sudo usermod -a -G systemd-journal <your-username>
Работник без состояния
PHP создан быть без состояния, разные запросы не имеют общих источников. В HTTP контексте PHP очищает все перед отправкой ответа, поэтому вы можете решить не заботиться о сервисах, которые могут допускать утечку памяти.
С другой стороны, работники обычно работают в долгосрочных CLI-процессах, которые не заканчиваются после обработки сообщения. Поэтому вам нужно быть осторожными с состояниями сервисов, чтобы они не давали утечку информации и/или памяти из одного сообщения в другое.
Однако, некоторые сервисы Symfony, вроде обработчика fingers crossed Monolog,
допускают утечку по своей задумке. Symfony предоставляет функцию перезапуска сервиса,
чтобы решить эту проблему. При перезапуске контейнера автоматически между двумя сообщениями,
Symfony ищет любые сервисы, реализующие ResetInterface
(включая ваши собственные сервисы) и вызывает их метод reset()
, чтобы они могли очистить
своё внутренее состояние.
Если сервис не без состояния, и вы хотите перезапускать его свойства после каждого сообщения,
то сервис должен реализовывать ResetInterface, где вы
можете перезапустить свойства в методе reset()
.
Если вы не хотите перезапускать контейнер, добавьте опцию --no-reset
при
выполнении команды messenger:consume
.
6.1
В версиях Symfony до 6.1, сервис-контейнер не перезапускался автоматически
между сообщениями, и вам нужно было устанавливать опцию
framework.messenger.reset_on_message
как true
.
Повторные попытки и ошибки
Если во время потребления сообщения из транспорта будет вызвано исключение, оно будет автоматически повторно отправлено транспорту, чтобы попробовать снова. По умолчанию, сообщение имеет 3 попытки перед сбросом или отправкой транспорту ошибок . Каждая повторная попытка будет тажке отложена во времени, на случай, если она была вызвана временной проблемой. Все это можно сконфигурировать для каждого транспорта:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# конфигурация по умолчанию
retry_strategy:
max_retries: 3
# задержка в милисекундах
delay: 1000
# делает так, чтобы задержка была дольше перед каждой повторной попыткой
# например, задержка в 1 секунду, 2 секунду, 4 секунду
multiplier: 2
max_delay: 0
# переопределить это все сервисом, который
# реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
Tip
Symfony запускает WorkerMessageRetriedEvent, когда сообщение имеет повторные попытки, поэтому вы можете выполнить собственную логику.
Note
Благодаря SerializedMessageStamp, сериализованная форма сообщения сохраняется, что предупреждает его повторную сериализацию, если позже будет ещё одна попытка сообщения.
6.1
Класс SerializedMessageStamp
был представлен в Symfony 6.1.
Избегание повторных попыток
Иногда обработка сообщения может быть неудачной, и вы будете знать, что это перманентно и повторных попыток не нужно. Если вы вызовете UnrecoverableMessageHandlingException, сообщение не будет иметь повторных попыток.
Форсирование повторных попыток
Иногда обработка сообщения может быть неудачной, и вы будете знать, что это временно и необходима повторная попытка. Если вы вызовете RecoverableMessageHandlingException, сообщение всегда будет иметь повторную попытку.
Сохранения и повторные попытки неудачных сообщений
Если сообщение терпит неудачу и имеет несколько повторных попыток (max_retries
), оно
затем будет отбраковано. Чтобы избежать этого, вы можете сконфигурировать failure_transport
:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
# после повторных попыток, сообщения будут отправлены транспорту "failed"
failure_transport: failed
transports:
# ... другой транспорт
failed: 'doctrine://default?queue_name=failed'
В этом примере, если обработка сообщения терпит неудачу 3 раза (значение
по умолчанию max_retries
), оно затем будет отправлено транспорту failed
.
Хотя вы можете использовать messenger:consume failed
для потребления его, как
обычного транспорта, вам скорее захочется вручную просмотреть сообщения в транспорте
ошибок и решить об их повторных попытках:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# увидеть все сообщения в транспорте ошибок
$ php bin/console messenger:failed:show
# увидеть первых 10 сообщений
$ php bin/console messenger:failed:show --max=10
# увидеть только сообщения MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'
# увидеть количество сообщений по классу сообщения
$ php bin/console messenger:failed:show --stats
# увидеть детали конкретной ошибки
$ php bin/console messenger:failed:show 20 -vv
# увидеть и повторно попробовать каждое сообщение по-отдельности
$ php bin/console messenger:failed:retry -vv
# повторная попытка конкретных сообщений
$ php bin/console messenger:failed:retry 20 30 --force
# удалить сообщение без повторной попытки
$ php bin/console messenger:failed:remove 20
# удалить сообщения без повторных попыток и показать каждое сообщение перед удалением
$ php bin/console messenger:failed:remove 20 30 --show-messages
6.2
Опции --class-filter
и --stats
были представлены в Symfony 6.2.
Если сообщение опять потерпит неудачу, оно будет отправлено обратно в транспорт ошибок в соответствии с обычными правилами повторных попыток . Как только будет достигнут максимум повторных попыток, сообщение будет сброшено перманентно.
Несколько ошибочных транспортов
Иногда недостаточно иметь один глобальный сконфигурированный failed transport
,
котому что некоторые сообщения важнее, чем другие. В таких случаях, вы можете переопределить
транспорт ошибок только для определенных транспортов:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
# после повторных попыток, сообщения будут отправлены транспорту "failed"
# по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
failure_transport: failed_default
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
failure_transport: failed_high_priority
# так как транспорт ошибок не сконфигурирован, будет использоваться установленный
# глобальный набор "failure_transport"
async_priority_low:
dsn: 'doctrine://default?queue_name=async_priority_low'
failed_default: 'doctrine://default?queue_name=failed_default'
failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
Если нету определенного failure_transport
глобально или на уровне транспорта, сообщение
будет сброшено после определенного количества попыток.
Неудачные команды имеют необязательную опцию --transport
, чтобы указать
failure_transport
, сконфигурированный на уровне транспорта.
1 2 3 4 5 6 7 8
# увидеть все сообщения в транспорте "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport
# повторная попытка конкретных сообщений из "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
# удалить сообщение без повторной попытки из "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
Конфигурация транспорта
Messenger поддерживает несколько разных типов транспорта, каждый со своими опциями. Опции могут быть переданы транспорту через строку DSN или конфигурацию.
1 2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
auto_setup: false
Опции, определенные под options
, главенствуют над определенными в DSN.
Транспорт AMQP
Транспорт AMQP использует PHP-расширение AMQP для отправки сообщений в очередь вроде RabbitMQ. Установите его, выполнив:
1
$ composer require symfony/amqp-messenger
DSN транспорта AMQP может выглядеть так:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# или используйте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages
Если вы хотите использовать AMQP, зашифрованный с помощью TLS/SSL, вы должны также
предоставить CA-сертификат. Определите путь сертификата в настройке PHP.ini amqp.cacert
(например, amqp.cacert = /etc/ssl/certs
) или в параметре DSN cacert
(например, amqps://localhost?cacert=/etc/ssl/certs/
).
Порт, по умолчанию используемый AMQP, зашифрованным с помощью TLS/SSL, - 5671, но
вы можете переопределить его в параметре DSN port
(например,
amqps://localhost?cacert=/etc/ssl/certs/&port=12345
).
Note
По умолчанию, транспорт будет автоматически создавать любые необходимые обмены, очереди и связующие ключи. Это можно отключить, но некоторые функции могут работать некорректно (вроде отложенных очередей).
Note
Вы можете ограничить потребителей транспорта AMQP, чтобы они обрабатывали только сообщения из некоторых очередей какого-то обмена. См. .
Транспорт имеет другие опции, включая способы конфигурации обмена, связующие ключи очереди и т.д. Смотрите документацию Connection.
Транспорт имеет ряд опций:
6.1
Опция connection_name
была представлена в Symfony 6.1.
Вы можете также сконфигурирвать настройки специально для AMQP в вашем оообщении, добавив AmqpStamp к вашему Конверту:
1 2 3 4 5 6 7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
Caution
Потребители не отображаются в панели админа, так как этот транспорт не полагается на
\AmqpQueue::consume()
, который блокирует. Наличие блокирующего получателя делает опции
--time-limit/--memory-limit
команды messenger:consume
. а также команды
messenger:stop-workers
бесполезными, так как они полагаются на тот факт, что получатель
возвращается незамедлительно, независимо от того, находит он сообщение или нет. Работник
потребления отвечает за итерацию до получения сообщения для обработки и/или до того, как будет
достигнуто одно из условий остановки. Таким образом, логика остановки работника может быть
достигнута, если он застрял на блокирующем вызове.
Транспорт Doctrine
Транспорт Doctrine может быть использован для хранения сообщений в таблице базы данных. Установите его, запустив:
1
$ composer require symfony/doctrine-messenger
DSN транспорта Doctrine может выглядеть так:
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
Формат doctrine://<connection_name>
, в случае если у вас есть несколько соединений и вы
хотите использовать какое-либо другое, чем "default". Транспорт будет автоматически создавать
таблицу под названием messenger_messages
.
Или, для создания таблицы самостоятельно, установите опцию auto_setup
как false
,
и сгенерируйте миграцию .
Tip
Чтобы избежать попыток инструментов вроде Doctrine Migrations удалить эту таблицу,
так как она не есть частью вашей обычной схемы, вы можете установить опцию schema_filter
:
- YAML
- XML
- PHP
1 2 3 4
# config/packages/doctrine.yaml
doctrine:
dbal:
schema_filter: '~^(?!messenger_messages)~'
Caution
Свойство datetime сообщений, хранящееся в базе данных, использует временную зону текущей системы. Это может вызвать проблемы, если несколько машин с разными конфигурациями временных зон используют одно хранилище.
Транспорт имеет такие опции:
????? | ???????? | ?? ????????? |
---|---|---|
table_name | ???????? ??????? | messenger_messages |
queue_name | ???????? ??????? (??????? ? ???????, ????? ???????????? ???? ??????? ??? ?????????? ???????????) | default |
redeliver_timeout | ????-??? ????? ????????? ???????? ????????? ? ???????, ?? ?? ? ????????? "?????????" (???? ???????? ?? ?????-?? ??????? ???????????, ?????????? ???, ? ????? ?? ?????? ????? ??????????? ?????????) - ? ????????. | 3600 |
auto_setup | ?????? ?? ??????? ???? ??????? ????????????? ?? ????? ????????/?????????. | true |
Note
Установите redeliver_timeout
в большее значение, чем длительность вашего
самого медленного сообщения. Иначе, некоторые сообщения будут запускаться
вторрой раз, пока первый всё ещё обрабатывается.
При использовании PostgreSQL, у вас есть доступ к следующим опциям для получения преимуществ функции LISTEN/NOTIFY. Это позволяет более производительный подход, чем поведение голосования транспорта Doctrine по умолчанию, так как PostgreSQL будет напрямую уведомлять работников, когда новое сообщение будет появляться в таблице.
????? | ???????? | ?? ????????? |
---|---|---|
use_notify | ???????????? ?? LISTEN/NOTIFY. | true |
check_delayed_interval | ???????? ???????? ?????????? ?????????, ? ????????????. ?????????? ??? 0, ????? ????????? ????????. | 1000 |
get_notify_timeout | ????????????????? ??????? ???????? ??????
??? ?????? PDO::pgsqlGetNotify` , ?
????????????. |
0 |
Транспорт Beanstalkd
Транспорт Beanstalkd отправляет сообщения прямо с рабочую очередь Beanstalkd. Установите его, выполнив:
1
$ composer require symfony/beanstalkd-messenger
DSN транспорта Beanstalkd может выглядеть так:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120
# Если порта нет, он по умолчанию будет 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost
Транспорт имеет ряд опций:
????? | ???????? | ?? ????????? |
---|---|---|
tube_name | ???????? ??????? | default |
timeout | ????-??? ??????? ????????? - ? ????????. | 0 (???????? ?????? ????? ?? ???? ??????? ?????, ???? ??????? TransportException) |
ttr | ????? ?????????? ????????? ????? ??? ??? ????????? ??? ??????? ? ??????? ?????????? - ? ????????. | 90 |
Транспорт Redis
Транспорт Redis использует потоки для создания очереди сообщений. Этот транспорт требует PHP-расширения Redis (>=4.3) и работающего сервера Redis (^5.0). Установите его, выполнив:
$ composer require symfony/redis-messenger
DSN транспорта Redis может выглядеть так:
1 2 3 4 5 6 7 8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Полный пример DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Пример кластера Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Пример Unix-сокета
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
Некоторые опции могут быть сконфигурированы через DSN или ключ options
под транспортом в messenger.yaml
:
6.1
Опции persistent_id
, retry_interval
, read_timeout
, timeout
и
sentinel_master
были представлены в Symfony 6.1.
Caution
Никогда не должно быть более одной команды messenger:consume
выполняемой с одинаковой
комбинацией stream
, group
и consumer
, иначе сообщения могут быть обработаны
более, чем один раз. Если вы запускаете несколько работников очерели, consumer
может
быть установлен как переменная окружения (вроде %env(MESSENGER_CONSUMER_NAME)%
),
установленная Супервизором (пример ниже) или любым другим сервисом, используемым для
управления процессами работников.
В окружении контейнера, HOSTNAME
может быть использовано как имя потребителя,
так как там только один работник на контейнер/хост. Если вы используете Kubernetes для
управления контейнерами, рассмотрите использование StatefulSet
для стабилизации имен.
Tip
Установите delete_after_ack
как true
(если у вас одна группа) или
определите stream_max_entries
(если вы можете предположить, какое максимальное
количество записей допустимо в вашем случае), чтобы избежать утечек памяти.
В другом случае. все сообщения навсегда останутся в Redis.
Транспорт в памяти
Транспорт in-memory
на самом деле не доставляет сообщения. Вместо этого,
он дедржит их в памяти во время запроса, что может быть полезным для тестирования.
Например, если у вас есть транспорт async_priority_normal
, вы можете переопределить
его в окружении test
, чтобы использовать этот транспорт:
- YAML
- XML
- PHP
1 2 3 4 5
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
Тогда, во время тестирования, сообщения не будут отправлены реальному транспорту. Даже лучше, в тесте, вы можете проверить, чтобы только одно сообщение было отправлено во время запроса:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething()
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/* @var InMemoryTransport $transport */
$transport = self::$container->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
Транспорт имеет ряд опций:
serialize
(булево, по умолчанию:false
)- Сериализовать сообщения или нет. Это полезно для тестирования дополнительного слоя, особенно когда вы используете собственный сериализатор сообщений.
Note
Все транспорты in-memory
будут автоматически сброшены после каждого теста
в классах тестов, расширяющих
KernelTestCase
или WebTestCase.
Amazon SQS
Транспорт Amazon SQS прекрасно подходит для приложения на AWS. Установите его, выполнив:
1
$ composer require symfony/amazon-sqs-messenger
DSN SQS транспорта выглядит так:
1 2 3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable
Note
Транспорт автоматически создаст необходимые очерди. Это можно отключить, установив
опцию auto_setup
как false
.
Tip
До отправки или получения сообщения, Symfony необходимо конвертировать название
очереди в URL очереди AWS вызвав API GetQueueUrl
в AWS. Этого дополнительного
API-вызова можно избежать, предоставив DSN, которая является URL очереди.
Транспорт имеет ряд опций:
????? | ???????? | ?? ????????? |
---|---|---|
access_key |
???? ??????? AWS | |
account |
????????????? AWS-???????? | ???????? ??????? ?????? |
auto_setup |
????? ?? ????????????? ???????? ??????? ?? ????? ????????/?????????. | true |
buffer_size |
?????????? ????????? ??? ???????????????? ?????????? | 9 |
debug |
???? true . ????? ??? ???? HTTP
???????? ? ??????? (??? ?????? ??
??????????????????) |
false |
endpoint |
?????????? URL ? SQS-??????? | https://sqs.eu-west-1.amazonaws.com |
poll_timeout |
????? ???????? ?????? ????????? ? ???????? | 0.1 |
queue_name |
???????? ??????? | messages |
region |
???????? AWS-??????? | eu-west-1 |
secret_key |
????????? ???? AWS | |
session_token |
????? ?????? AWS | |
visibility_timeout |
?????????? ??????, ?? ?????????? ??????? ????????? ?? ????? ??????? (Visibility Timeout) | ???????????? ??????? |
wait_time |
???????????? Long polling ? ???????? | 20 |
6.1
Опция session_token
была представлена в Symfony 6.1.
Note
Параметр wait_time
определяет максимальное время ожидания для Amazon SQS до
того, как сообщение будет доступно в очереди, перед отправкой ответа. Это помогает
снизить стоимость использования Amazon SQS, устранив некоторое количество пустых ответов.
Параметр poll_timeout
определяет время ожидания получателя до возвращения null.
Он избегает блокировки других получателей от вызова.
Note
Если название очереди имеет суффикс .fifo
, AWS создаст очередь FIFO.
Используйте марку AmazonSqsFifoStamp,
чтобы определить Message group ID
и Message deduplication ID
.
Очереди FIFO не поддерживают установки задержки отдельных сообдений, значение delay: 0
требуется в настройках стратегии повторных попыток.
Сериализация сообщений
Когда сообщения отправляются (и получаются) в транспорт, они сериализуются с
использование нативных функций PHP serialize()
и unserialize()
. Вы можете
изменить это глобально (или для каждого транспорта) на сервис, реализующий
SerializerInterface:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
serializer:
default_serializer: messenger.transport.symfony_serializer
symfony_serializer:
format: json
context: { }
transports:
async_priority_normal:
dsn: # ...
serializer: messenger.transport.symfony_serializer
messenger.transport.symfony_serializer
- это встроенный сервис, который использует
компонент Serializer и может быть сконфигурирован несколькими способами.
Если вы выберете использовать сериализатор Symfony, вы сможете контролировать контекст для
каждого случая отдельно через SerializerStamp
(см. Конверты и марки).
Tip
При отправке/получении сообщений в/из другого транспорта, вам может понадобиться больше контроля над процессом сериализации. Использование пользовательского сериализатора предоставляет такой контроль. См. Туториал по сериализации сообщений SymfonyCasts, чтобы узнать больше.
Настройка обработчиков
Конфигурация обработчиков с использованием атрибутов
Вы можете сконфигурировать вашего обработчика, передав опции атрибуту:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message)
{
// ...
}
}
Возможные опции для конфигурации атрибута:
bus
fromTransport
handles
method
priority
Конфигурация обработчиков вручную
Symfony обычно будет находить и регистрировать вашего обработчика автоматически .
Но вы также можете сконфигурировать его вручную - и передать ему дополнительную конфигурацию -
тегировав сервис обработчика messenger.message_handler
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11
# config/services.yaml
services:
App\MessageHandler\SmsNotificationHandler:
tags: [messenger.message_handler]
# или сконфигурировать с опциями
tags:
-
name: messenger.message_handler
# необходимо только если невозможно угадать по подсказке
handles: App\Message\SmsNotification
Возможные опции конфигурации с тегами:
bus
from_transport
handles
method
priority
Обработка нескольких сообщений
Один класс обработчика может обрабатывать несколько сообщений. Для этого,
добавьте атрибут #AsMessageHandler
ко всем методам обработки:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
class SmsNotificationHandler
{
#[AsMessageHandler]
public function handleSmsNotification(SmsNotification $message)
{
// ...
}
#[AsMessageHandler]
public function handleOtherSmsNotification(OtherSmsNotification $message)
{
// ...
}
}
6.2
Реализация MessageSubscriberInterface - это ещё один способ обработки нескольких сообщений с одним классом обработчика. Этот интерфейс устарел в Symfony 6.2.
Связывание обработчиков с разными транспортами
Каждое сообщение может иметь несколько обработчиков, и когда сообщение потребляется, вызываются все его обработчики. Но вы можете также сконфигурироать обработчика так, чтобы он вызывался только когда сообщение получено из конкретного транспорта. Это позволяет вам имет одно сообщение, где каждый обработчик вызывается разными "работниками", потребляющими разный транспорт.
Представьте, что у вас есть сообщение UploadedImage
с двумя обработчиками:
ThumbnailUploadedImageHandler
: вы хотите, чтобы это обрабатывалось транспортом под названиемimage_transport
NotifyAboutNewUploadedImageHandler
: вы хотите, чтобы это обрабатывалось транспортом под названиемasync_priority_normal
Чтобы сделать это, добавьте опцию from_transport
к каждому обработчику. Например:
1 2 3 4 5 6 7 8 9 10 11 12 13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
#[AsMessageHandler(from_transport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
public function __invoke(UploadedImage $uploadedImage)
{
// создать миниатюры
}
}
И, похожим образом:
1 2 3 4 5 6 7 8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
#[AsMessageHandler(from_transport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
// ...
}
Затем, убедитесь, что "маршрутизируете" ваше сообщение к обоим транспортам:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: # ...
image_transport: # ...
routing:
# ...
'App\Message\UploadedImage': [image_transport, async_priority_normal]
Вот и все! Теперь вы можете потреблять каждый транспорт:
1 2 3 4
# вызовет ThumbnailUploadedImageHandler только при обработке сообщения
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
Caution
Если обработчик не имеет конфигурации from_transport
, он будет выполнен
в каждом транспорте, из которого будет получено это сообщение.
Расширение Messenger
Конверты и марки
Сообщение может быть любым PHP-объектом. Иногда вам может понадобиться сконфигурировать что-то дополнительное в сообщении - вроде того, как оно должно быть обработано внутри AMQP или добавления задержки перед обработкой сообщения. Вы можете сделать это, добавив марку ("stamp") к вашему сообщению:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus)
{
$bus->dispatch(new SmsNotification('...'), [
// подождать 5 секунд перед обработкой
new DelayStamp(5000),
]);
// или ясно создайте Конверт
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
Внутренне, каждое сообщение оборачивается в конверт (Envelope
), содержащий
сообщение и марки. Вы можете создать его вручную или позволить автобусу сообщений
сделать это. Существует множество различных марок для разных целей и они используются
внутренне для отслеживания информации о сообщении - вроде того, какой автобус обрабатывает
его или имеет ли оно повторные попытки после неудачи.
Промежуточное ПО
То, что происходит после запуска сообщения в автобус сообщений, зависит от его набора промежуточного ПО и его порядка. По умолчанию, промежуточное ПО, сконфигурированное для каждого автобуса, выглядит так:
add_bus_name_stamp_middleware
- добавляет марку для записи того, в каком атобусе было запущено это собщение;dispatch_after_current_bus
- см. Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена;failed_message_processing_middleware
- обрабатывает сообщения, которые имеют повторные попытки через транспорт ошибок , чтобы они правильно функционировали, как будто бы они были получены из изначального транспорта;- Ваша собственная коллекция middleware;
send_message
- если машрутизация сконфигурирована для транспорта, отправляет сообщения этому транспорту и останавливает цепь промежуточного ПО;handle_message
- вызывает обработчика(ов) сообщений для заданног сообщения.
Note
Эти названия промежуточного ПО - на самом деле сокращения. Настоящие id сервисов имеют
префикс messenger.middleware.
(например,messenger.middleware.handle_message
).
Промежуточное ПО выполняется после запуска сообщения, и также еще раз, когда сообщение получено через работника (для сообщений, которые были отправлены транспорту для асинхронной обработки). Помните это, если вы создаете собственное промежуточное ПО.
Вы можете добавить собственное промежуточное ПО в список, или полностью отключить промежуточное ПО по умолчанию и добавить только ваше собственное:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
buses:
messenger.bus.default:
# отключить промежуточное ПО по умолчанию
default_middleware: false
# и/или добавить ваше собственное
middleware:
# id серисов, релизующих Symfony\Component\Messenger\Middleware\MiddlewareInterface
- 'App\Middleware\MyMiddleware'
- 'App\Middleware\AnotherMiddleware'
Note
Если сервис промежуточного ПО абстрактный, будет создан другой экземпляр сервиса для каждого автобуса.
Промежуточное ПО для Doctrine
1.11
Следующее промежуточное по для Doctrine было представлено в DoctrineBundle 1.11.
Если вы в своем приложении используете Doctrine, существует ряд необязательного промежуточного ПО, которое вы можете захотеть использовать:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# каждый раз при обработке сообщения, соединение Doctrine
# "пингуется" и повторно подключается, если оно закрыто. Полезно,
# если ваши работники работают долгое время и соединение базы
# данных иногда теряется
- doctrine_ping_connection
# После обработки, соединение Doctrine закрывается, что может
# освободить соединения базы данных в работнике, вместо того,
# чтобы держать их открытыми всегда
- doctrine_close_connection
# оборачивает всех обработчиков в одну транзакцию Doctrine
# обработчикам не надо вызывать flush(), а ошибка в любом
# обработчике вызовет откат
- doctrine_transaction
# or pass a different entity manager to any
#- doctrine_transaction: ['custom']
Другое промежуточное ПО
Добавьте промежуточное ПО router_context
, если вам нужно генерировать абсолютные
URL в потребителе (например, отображать шаблон со ссылками). Это промежуточное ПО
хранит контекст изначального запроса (т.е. хост, HTTP-порт и т.д.), что необходимо
при создании абсолютных URL.
Добавьте промежуточное ПО validation
, если вам нужно валидировать объект сообщения,
используя компонент Validator, перед его обработкой. Если
валидация будет неуспешной, будет вызвано ValidationFailedException
.
ValidationStamp может быть использован
для конфигурации групп валидации.
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
- router_context
- validation
События Messenger
В дополнение к промежуточному ПО, Messenger также запускает несколько событий. Вы можете создать слушателя событий, чтобы подключаться к разным частям процесса. Для каждого, класс события будет названием события:
Несколько автобусов, автобусов команд и событий
Messenger предоставляет вам один сервис автобуса сообщений по умолчанию. Но вы можете сконфигурировать столько, сколько вы хотите, создав автобусы "команд", "запросов" или "событий" и контролируя их промежуточное ПО. См. Несколько автобусов.