Mosaic from Ajijic

Thursday, December 2 2021

PostQueue a queue system with postgres

Essau Ramirez

@saumotions

PostQueue a queue system with postgres

Job queues are something usually left for a system like redis, SQS or RabbitMQ to send jobs at a high scale to a pool of background workers listening to every new incoming request. But recently I came into an article about how you can use Postgres atomic operations in order to acquire jobs and lock them so they’re only sent once to the workers. So I decided to give this a try.

Queue API

First we need to understand the API that we need to build for distributed job queues. We will use the disque module as reference. Disque is a module written by the creator of redis that tried to standardize the distributed queue system people use redis for.

Jobs Table

CREATE TABLE jobs (
    id text PRIMARY KEY,
    created timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
    name text,
    visibility_at timestamp with time zone,
    status text NOT NULL,
    attempts integer NOT NULL DEFAULT 0,
    expires_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP,
    data jsonb,
    queue_name text DEFAULT 'default'::text,
    reference text
);

-- Indices -------------------------------------------------------

CREATE UNIQUE INDEX jobs_pkey ON jobs(id text_ops);
CREATE INDEX queue_name_idx ON jobs(queue_name text_ops);

The jobs table is in charge of holding the name of the job, data to be passed to the worker, a unique id, a visibility time (we can schedule jobs in the future), an expiration time, and the number of attempts with a maximum.

Retrieve

This is the way workers will retrieve jobs from a queue, and it’s very important that the same job doesn’t get sent to multiple workers.

Retrieval works by fetching and updating fields at the same time so that each fetch is unique for each worker running it.

  update jobs set visibility_at = now() + interval '10' minute, status = 'in_transit', attempts = attempts + 1
  where
  id in (select id from jobs where (visibility_at <= now() or visibility_at is null) and (expires_at > now() or expires_at is null)
  and attempts <= 3 and status != 'completed' order by created asc limit 10)
  returning id, data as "jobData", queue_name as "queueName"

We make use of postgres’ returning functionality so that we return the data for each row that was updated by the update query.

The update query sets the visibility_at column 10 minutes into the future in case we need to retry the job if the worker failed to process it.

The where clause works by fetching all job ids with a visibility time set in the past that don’t have the status as complete, that haven’t expired yet and the number of attempts is till less than the max amount.

Add Job

insert into "jobs" (id, queue_name, visibility_at, data, status, expires_at)
values ('uuid', 'default', now() + interval '3600' second, '{"message_id": 123}', 'pending', now() + interval '3600' second)

ACK

The ack command is used to acknowledge that a job was ran by a worker. Given an id fetched

update jobs set status = 'completed'
where id = 'uuid'`;

NACK

The nack command is used to “nacknowledege” a job, this is in order to send a job back to the queue for processing in case a worker finds out it could not process a job.

update jobs set status = 'pending', visibility_at = now()
where id = 'uuid'`;

Working

If a job takes longer than expected to process, you can increase the visibility time so that it doesn’t get sent back to the queue.

update jobs set status = 'pending', visibility_at = now() + interval '600' second
where id = 'uuid'`;