Laravel 4.2. Очереди / Queues

Категория: Laravel

Laravel позволяет из коробки использовать очереди (queues) с поддержкой таких сервисов, как Beanstalkd, IronMQ, Amazon SQS. Начиная с версии Laravel 4.2 появилась поддержка очередей на Redis. В целом, официальная документация по работе с очередями вполне понятная, но есть некоторые подводные камни, которые я и опишу здесь.

Добавление задания в очередь

В качестве задания (Job) может выступать анонимная функция или простой PHP класс с методом fire(), по умолчанию. Но вы также можете явно указать имя метода. Базовая структура Job-класса:

class MyJob
{
  public function fire(Illuminate\Queue\Jobs\Job $job, $data)
  {
      // ...
  }
}

Приведу несколько примеров добавления задания в очередь.

Добавить задание в очередь, для дефолтной структурой класса с методом fire():

Queue::push('MyJob', ['param' => 'value'], 'custom-queue-name');

Добавить в очередь задание на запуск метода exec():

Queue::push('MyJob@exec', ['param' => 'value']);

Описание задания в анонимной ф-ции:

Queue::push(function(Illuminate\Queue\Jobs\Job $job) {
  // (!) Не используйте константы __DIR__ и __FILE__ внутри замыкания

  $job->delete();
});

Добавление в очередь нескольких заданий с одним набором данных/параметров:

Queue::bulk(['SendEmailJob', 'NotifyUserJob'], ['param' => 'value']);

Добавление отложенного задания в очередь (добавить задание через 5 минут):

Queue::later(5*60, 'SendEmailJob@send', ['param' => 'value']);
Примечание

Если во время обработки задания возникнет исключение, задание будет помещено обратно в очередь.

Управление заданием

Объект класса Illuminate\Queue\Jobs\Job предоставляет методы для управления текущим заданием. Рассмотрим какие команды понимает задание.

Получить ID задания:

$id = $job->getJobId();

Переместить задание обратно в очередь через 45 секунд, release - уволить, освободить, отпустить:

$job->release($delay = 45);

Счетчик вызовов задания (сколько раз задание было запущено):

$num = $job->attempts();

Удалить задание, например после успешного выполнения:

$job->delete();
Примечание

Если вы не удалите задание после его успешного выполнения - оно будет выполняться снова и снова, пока вы вручную не удалите его из очереди.

Обработка заданий

Запустить одно следующее задание

Чтобы вручную выполнить очередное задание из очереди выполните:

php artisan queue:work

"Слушатель" очереди заданий

Listener каждый раз инициирует новый инстанс фреймворка (:work).

Для того, чтобы задания выполнялись, необходимо запустить слушателя:

php artisan queue:listen --timeout=360 --queue=default,high,low
Приоритет очередей

В параметре queue вы можете указать "прослушиваемые" очереди в порядке приоритета - сначала будут выполнятся задания из очереди default, потом high, и только после этого low.

Таймаут

Параметр timeout указывает максимально допустимое время выполнения задания. Если какое-то задание не будет выполнено за это время - то работа слушателя будет прервана с исключением (почему падает сам listener, вместо того чтобы прервать задание):

[Symfony\Component\Process\Exception\ProcessTimedOutException]                                                                     The process 'php artisan queue:work --queue="default" --delay=0 --memory=128 --sleep=3 --tries=0 --env=production' exceeded the timeout of 360 seconds.

Воркер в режиме демона

Воркер в режиме демона инициализирует приложение один раз и использует запущенный ранее инстанс (спасибо, @smgladkovskiy). Это несколько снижает нагрузку на CPU, но может повлечь выполнение задания с устаревшим телом/кодом. Поэтому нужно перезапускать демон после внесения изменений.

Для запуска воркера в режиме демона (проверка заданий в цикле) выполните:

php artisan queue:work --daemon

Контроль обработчика очереди

Для поддержки обработчика очередей (слушателя или демона воркера) в рабочем состоянии придется использовать сторонние утилиты, как supervisord или upstart (примеры использования).

Очереди на Redis

Очереди на Redis - это самый простой вариант использования очередей и запуска отложенных задач. Redis прост в установке и не прихотлив в обслуживании.

Примечание

Перед запуском следующего задания или слушателя проверьте ваши настройки Redis в файле app/config/database.php. При включенном параметре redis.cluster вы получите исключение: [Predis\NotSupportedException] Cannot initialize a MULTI/EXEC context when using aggregated connections.

По умолчанию дефолтная очередь называется default. Задания этой очереди хранятся в Redis хранилище как список (list) с ключем queues:default. Отложенные задания хранятся в упорядоченном списке queues:default:delayed.

queues:default            дефолтная очередь заданий
queues:default:reserved   задания которые сейчас выполняются
queues:default:delayed    отложенные задания

Приведу некоторые полезные команды консоли Redis для просмотра и очистки очереди.

Получить список очередей:

KEYS *queue*

Посмотреть задания, ожидающие запуска:

LRANGE queues:default 0 -1

Очистить очередь:

DEL queues:default

Посмотреть все задания отложенной очереди:

ZRANGE queues:default:delayed 0 -1

Очистить очередь отложенных заданий:

ZREMRANGEBYRANK queues:default:delayed 0 -1

Вы можете отправить команду Redis клиенту непосредственно из консоли (netcat выдает лишние данные):

echo -en "KEYS *queue*\r\n" | redis-cli
# или
(echo -en "KEYS *queue*\r\n"; sleep 0.1) | nc localhost 6379

Возможные ошибки

Ошибка: "[Predis\Connection\ConnectionException] Connection refused [tcp://127.0.0.1:6379]" указывает на то, что Redis сервер не запущен или неправильно указана порт (хост).

#queues, #workers, #tasks, #job, #daemon

категория: Laravel