(Cake)PHP queuing: Real-life examples

With the new major v6 of Queue plugin it is time to update my 8-year-old blog post about the Queue system.
A lot has happened since Cake1/2.

With CakePHP4 and modern PHP architecture, we want to look into the queuing system again with some real-life examples.

Prerequisite

Make sure you are familiar with the Queue plugin, maybe tried out some basic use cases already or at least checked out its documentation.

Emails

Registration or contact form, password change links or just notifications, there are usually a lot of needs in every app to include asynchronous email sending.
Async or deferred because you do not want to make the web request slower directly or escalate the failure of sending to that address as exception to the user.
In many cases the SMTP just doesn’t always send, so having the option of an auto-retry after a second is quite useful.
Any failed attempt here is logged and will stay in the queue until either resolved (by retry), or removed (as invalid email).

Here a default example using plain text:

$data = [
    'settings' => [
        'to' => [$email, $username],
        'from' => Configure::read('Config.systemEmail'),
        'subject' => $this->request->getData('subject'),
    ],
    'content' => $this->request->getData('message'),
];
$this->loadModel->get('Queue.QueuedJobs');
$config = [
    'priority' => 2,
];
$this->QueuedJobs->createJob('Queue.Email', $data, $config);

This can be executed directly in the model layer, e.g. UsersTable::afterSave() callback, or more manually from a controller.

Basically: All Email sending (and with it SMTP gateway calls) should be handled through deferred execution.

Webhook triggers

Often, apps are connected to external systems to either send or receive webhooks or API calls.
An incoming webhook in my case is for example a new commit/branch/PR into one GitHub repository.
Here the webhook points to an endpoint on my app and receives these events with a payload that contains all the details.

Now, you do not want to directly work on these inside the same controller action for several reasons.
There could be an exception and it is hard to trace then what failed, as the info is hidden deep inside GitHub repo settings.
It can also fail or timeout and any retry would work out.
So instead, you want to offload this into the queue system, directly respond with a 200 OK and let the queue do the rest of the task.

// Inside the controller action
$data = [
    'commit' => $commitEntity->id,
];
$options = [
    'reference' => $reference, // To identify from frontend
    'priority' => 3,
];
$this->QueuedJobs->createJob(
    'Ci',
    $data,
    $options
);

Now the background system will do any such Ci job without time limit (of usually 30 seconds) as well as much more memory available if needed.
At the end of that job it will usually then trigger an API call to GitHub to mark that commit as green or red.

References

As seen in the previous example, it can be useful to identify a job from the frontend. You can display its status or progress, or allow retriggering if it failed for some reason.
In my case, some of those long-running "parsing" or "composer update" runs can sometimes fail and at a later state restarted then are going through fine.

$queuedJobs = $this->QueuedJobs->find()
    ->select(['reference', 'created'])
    ->where([
        'reference LIKE' => 'translations-parse',
        'completed IS' => null,
    ])
    ->find('list', ['keyField' => 'reference', 'valueField' => 'created'])
    ->toArray();

Now we can add the status report and restart buttons for it in the frontend.

In other cases you want to avoid adding the same job if such a task has already been added.

if ($this->QueuedJobs->isQueued($reference)) {
    $this->Flash->warning('Already queued. Please wait for it to finish.');

    return $this->redirect($this->referer(['action' => 'index']));
}

Tip: You can also use progress bars for jobs that run e.g. minutes to give a bit more visual feedback on the progress.
You can use the progress updating of a task class if that task can provide that info. Otherwise, it will use historical data by default (how long the same task took in average the last times it ran).

You can see this in real life action in the sandbox.

Command line execution

In some cases you want to execute some low-level server command in the CLI, or run some Cake command/shell script.

    $this->QueuedJobs->createJob(
        'Queue.Execute',
        [
            'command' => 'bin/cake translations publish',
            'username' => $this->request->getSession()->read('Auth.User.username'),
        ],
        ['reference' => 'translations-parse']
    );

You could of course refactor any such CLI command to a Queue task. But if it only exists as such script yet, this works as a good workaround for the beginning usually.

Tip: For new code always try to put as much of it into business classes (outside of communication layer), so it can be more easily reused.
A service class is a good reusable way, that allows being used/called from different entrypoints such as Queue tasks or CLI commands etc.

Chaining

Often, there is the need of several jobs running in a specific order, or only being triggered after a previous job has a specific outcome.
Here it is best to let the job spawn a follow-up ob at the end.
So let’s say we release a repo in GitHub using such a job. Once it is tagged, we would like to import it into our system and document all its public API.

public function run(array $data, int $jobId): void {
    ...
    
    // Now we can start the follow up job
    $data = [
        'repository' => $repository,
        'tag' => $tag,
    ];

    $this->QueuedJobs->createJob('Import', $data);

This chain can be as long as you need. But be mindful to not increase complexity here offer time to a point where the code or flow becomes un-maintainable.

For example: If you have an API documentation task, you can automatically trigger that update from within such an Import job at the very end of it.
This way all your parts of the system are always up to date and triggered when something changes.

Use DTOs

Using CakeDto plugin you can make your code much more reliable, testable
and developer-friendly.

Set up a DTO per task in your dto.xml, e.g.

<dto name="OrderUpdateNotificationQueueData" immutable="true">
    <field name="orderId" type="int" required="true"/>
    <field name="type" type="string" required="true"/>
    ...
</dto>

Instead of passing a plain array you can now rely on a clean API for input:

$data = OrderUpdateNotificationQueueDataDto::createFromArray([
    'orderId' => $order->id,
    'type' => 'orderConfirmationToCustomer',
])->toArray();
$this->getTableLocator()->get('Queue.QueuedJobs')
    ->createJob('OrderUpdateNotification', $data);

Any of the fields not provided or defined will throw a clear exception.

Same then for the counterpart within the task:

public function run(array $data, int $jobId): void {
    $queueData = OrderUpdateNotificationQueueDataDto::createFromArray($data);

    $order = $this->fetchTable('Orders')
        ->get($queueData->getOrderId(), contain: ['OrderItems']);
    $this->getMailer('OrderConfirmation')
        ->send($queueData->getType(), [$order]);
}

PHPStan together with tests can now fully monitor and assert necessary data.

State machine

If you find yourself building too much logic around all of it, then you might actually be in need of a state machine, e.g. the Spryker one.
Here we can actually leverage a sophisticated system of transitions between states to make path decisions and trigger events and therefore tasks on the way to reach a final state.

This is especially useful if you have long-running jobs, that can also sometimes fail and need re-run. Or if you have multiple paths to reach a next state, either automated or manually for some edge cases.

In my example, there is a task that creates the demo shop "composer update" changes.
But in some cases, this is not working out by tooling (blocked by constraints) and requires manual resolution. Here having then the option to bypass the automated run is helpful.
All that can be more easily coded and graphically displayed with such a state machine approach.

Here a task triggers the state machine after a successful run:

public function run(array $data, int $jobId): void {
    ... // Can take a few minutes

    $stateMachineFacade = new StateMachineFacade();
    $itemDto = new ItemDto();
    $itemDto->setIdentifier($releaseGroup->id);
    $itemDto->setStateMachineName(ReleaseStateMachineHandler::NAME);
    $itemDto->setStateName($releaseGroup->release_state->state);
    $itemDto->setProcessName($releaseGroup->release_state->process);

    $stateMachineFacade->triggerEvent('confirm update', $itemDto);
}

This will then trigger other commands and conditions again, which themselves might trigger more background jobs along the line.

In the end, the whole state machine process is just a series of background tasks triggering the next task(s) to be done.
Zero manual work is needed here for the default (happy) case, where nothing goes wrong.
So combining the Queue with a state machine is a very powerful tool. For details see also my last year’s talk about it.

Here the more concrete state machine I am talking about:

You can see that based on a timeout (e.g. 1 min) the same task is executed over and over again until it confirms what we need and then it continues.
There is an optional path configured to manually overwrite the loop if you don’t need or want to wait.
But in the default (happy) case, it will fully automatically transition from beginning to end.

Is there more?

Yeah, I bet, tons of other cases: image-processing, video-rendering, PDF or Zip package creation, …

Also, a pure PHP implementation has its limitations, and once you need a more high-end or sophisticated approach, you might want to look also into other alternatives.
For me and small to medium sizes apps, this Queue solution worked perfectly and with ease so far.

I hope, this gives some good insight into a tool like the Queue and its benefits for pretty much every app out there.
Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.