Laravel 4.2. Очереди / Queues
Laravel позволяет из коробки использовать очереди (queues) с поддержкой таких сервисов, как Beanstalkd, IronMQ, Amazon SQS. Начиная с версии Laravel 4.2 появилась поддержка очередей на Redis. В целом, официальная документация по работе с очередями вполне понятная, но есть некоторые подводные камни, которые я и опишу здесь.
Добавление задания в очередь
В качестве задания (Job) может выступать анонимная функция или простой PHP класс с методом fire(), по умолчанию. Но вы также можете явно указать имя метода. Базовая структура Job-класса:
class MyJob
{
public function fire(Illuminate\Queue\Jobs\Job $job, $data)
{
// ...
}
}
Приведу несколько примеров добавления задания в очередь.
Добавить задание в очередь, для дефолтной структурой класса с методом fire():
Queue::push('MyJob', ['param' => 'value'], 'custom-queue-name');
Добавить в очередь задание на запуск метода exec():
Queue::push('MyJob@exec', ['param' => 'value']);
Описание задания в анонимной ф-ции:
Queue::push(function(Illuminate\Queue\Jobs\Job $job) {
// (!) Не используйте константы __DIR__ и __FILE__ внутри замыкания
$job->delete();
});
Добавление в очередь нескольких заданий с одним набором данных/параметров:
Queue::bulk(['SendEmailJob', 'NotifyUserJob'], ['param' => 'value']);
Добавление отложенного задания в очередь (добавить задание через 5 минут):
Queue::later(5*60, 'SendEmailJob@send', ['param' => 'value']);
Примечание
Если во время обработки задания возникнет исключение, задание будет помещено обратно в очередь.
Управление заданием
Объект класса Illuminate\Queue\Jobs\Job предоставляет методы для управления текущим заданием. Рассмотрим какие команды понимает задание.
Получить ID задания:
$id = $job->getJobId();
Переместить задание обратно в очередь через 45 секунд, release - уволить, освободить, отпустить:
$job->release($delay = 45);
Счетчик вызовов задания (сколько раз задание было запущено):
$num = $job->attempts();
Удалить задание, например после успешного выполнения:
$job->delete();
Примечание
Если вы не удалите задание после его успешного выполнения - оно будет выполняться снова и снова, пока вы вручную не удалите его из очереди.
Обработка заданий
Запустить одно следующее задание
Чтобы вручную выполнить очередное задание из очереди выполните:
php artisan queue:work
"Слушатель" очереди заданий
Listener каждый раз инициирует новый инстанс фреймворка (:work).
Для того, чтобы задания выполнялись, необходимо запустить слушателя:
php artisan queue:listen --timeout=360 --queue=default,high,low
Приоритет очередей
В параметре queue вы можете указать "прослушиваемые" очереди в порядке приоритета - сначала будут выполнятся задания из очереди default, потом high, и только после этого low.
Таймаут
Параметр timeout указывает максимально допустимое время выполнения задания. Если какое-то задание не будет выполнено за это время - то работа слушателя будет прервана с исключением (почему падает сам listener, вместо того чтобы прервать задание):
[Symfony\Component\Process\Exception\ProcessTimedOutException] The process 'php artisan queue:work --queue="default" --delay=0 --memory=128 --sleep=3 --tries=0 --env=production' exceeded the timeout of 360 seconds.
Воркер в режиме демона
Воркер в режиме демона инициализирует приложение один раз и использует запущенный ранее инстанс (спасибо, @smgladkovskiy). Это несколько снижает нагрузку на CPU, но может повлечь выполнение задания с устаревшим телом/кодом. Поэтому нужно перезапускать демон после внесения изменений.
Для запуска воркера в режиме демона (проверка заданий в цикле) выполните:
php artisan queue:work --daemon
Контроль обработчика очереди
Для поддержки обработчика очередей (слушателя или демона воркера) в рабочем состоянии придется использовать сторонние утилиты, как supervisord или upstart (примеры использования).
Очереди на Redis
Очереди на Redis - это самый простой вариант использования очередей и запуска отложенных задач. Redis прост в установке и не прихотлив в обслуживании.
Примечание
Перед запуском следующего задания или слушателя проверьте ваши настройки Redis в файле app/config/database.php. При включенном параметре redis.cluster вы получите исключение: [Predis\NotSupportedException] Cannot initialize a MULTI/EXEC context when using aggregated connections.
По умолчанию дефолтная очередь называется default. Задания этой очереди хранятся в Redis хранилище как список (list) с ключем queues:default. Отложенные задания хранятся в упорядоченном списке queues:default:delayed.
queues:default дефолтная очередь заданий
queues:default:reserved задания которые сейчас выполняются
queues:default:delayed отложенные задания
Приведу некоторые полезные команды консоли Redis для просмотра и очистки очереди.
Получить список очередей:
KEYS *queue*
Посмотреть задания, ожидающие запуска:
LRANGE queues:default 0 -1
Очистить очередь:
DEL queues:default
Посмотреть все задания отложенной очереди:
ZRANGE queues:default:delayed 0 -1
Очистить очередь отложенных заданий:
ZREMRANGEBYRANK queues:default:delayed 0 -1
Вы можете отправить команду Redis клиенту непосредственно из консоли (netcat выдает лишние данные):
echo -en "KEYS *queue*\r\n" | redis-cli
# или
(echo -en "KEYS *queue*\r\n"; sleep 0.1) | nc localhost 6379
Возможные ошибки
Ошибка: "[Predis\Connection\ConnectionException] Connection refused [tcp://127.0.0.1:6379]" указывает на то, что Redis сервер не запущен или неправильно указана порт (хост).
#queues, #workers, #tasks, #job, #daemon