How/Why to Sweep Async Tasks Under a Postgres Table

I love thin and stupid servers, where each endpoint wraps a very useless DB query.

Stupid questions are fast. Fast queries make websites smoother and faster. Keep those click/render loops sacred.

Sweep complexity under A task table:

router.post("/signup", async ctx => {
  const { email, password } = await ctx.request.body().value;
  const [{ usr_id } = { usr_id: null }] = await sql`
    with usr_ as (
      insert into usr (email, password)
      values (${email}, crypt(${password}, gen_salt('bf')))
      returning *
    ), task_ as (
      insert into task (task_type, params)
      values ('SEND_EMAIL_WELCOME', ${sql({ usr_id })})
    )
    select * from usr_
  `;
  await ctx.cookies.set("usr_id", usr_id);
  ctx.response.status = 204;
});

Of course using mailgun.send It’s easier than queuing up task table. Adding indirect signals rarely makes the system Less Complex. But somehow I’m here to advocate exactly that. You can ignore my declaration and go to my implementation at the end.

latent surface error area

Consumers don’t care about cosmic rays. They want one thing. More importantly, they want instant confirmation Of his point. They want to relieve the mental burden of their target.

To give them this responsibility, your DB is probably the only thing that matters. Once the information is committed to your database, you can confidently say “we’ll take it from here”.

You can send the email later. You can process the payment later. You can do almost anything afterward. Simply let your customer know they can continue with their day.

Delight your customers with clear feedback.

Make your computer happy by writing in one place at a time.

Never Control Your Own Two-Step Commitment

It is a sin to write in two places at the “same time”.

When the gods gave us computer storage, people became sad. “What is stability? Where are our guarantees? Why should I do this,” he cried. fsync?” And so they remained in their coding caves wearing sackcloth and ashes for many years.

People were very happy when the devs wrote Postgres (and other crappy databases) on tablets of stone. The sacred “database transactions” allowed mankind to pretend they could read/write to multiple locations at the same time.

To date, the databases work occasionally.

But some developers deny the actions of gods. They mix multiple instruments, and thus commit the sin of writing in multiple places.

“Oh, we’ll just send a pubsub message after inserting the row.” But the data is lost. Message before inserting row? Data lost. All blasphemers are doomed to reinvent the two-stage commitment.

a way of doing things

I like Lego. I love Play-Doh. I love Lincoln Logs. However, I don’t like to mix them together.

It is painful to audit the system when state is spread across SQS, Redis, PubSub, Celery, Airflow, etc. I don’t have to open a local detective agency to find out why a process isn’t working as expected.

Most modern projects use SQL. Because I don’t like mixing systems, I try to take SQL as far as possible.

Of all the SQL databases, Postgres currently offers the best mix of modern first-class features and third-party extensions. Postgres can be your knock-off Kafka, artificial Airflow, useless ClickHouse, dirty ElasticSearch, poor man’s PubSub, on-sale Celery, etc.

Of course, Postgres doesn’t have all the fancy features of every specific system. But colocating queue/pipelined/async data in your main database eliminates many errors. In my experience, transaction guarantees outweigh everything else.

TODO-driven development

while (true) {
  // const rows = await ...
  for (const { task_type, params } of rows)
    if (task_type in tasks) {
      await tasks[task_type](tx, params);
    } else {
      console.error(`Task type not implemented: ${task_type}`);
    }
}

With a simple retry system, asynchronous decoupling magically tracks all your incomplete flows.

There is no need to rely on Jira – bugs and unused tasks will be logged and retried. Working iteratively from error queues is truly a wonderful experience. All your live/urgent TODOs are printed in one place (in development and production).

With this paradigm, you will gravitate towards scalable pipelines. Wishful thinking creates natural architecture.

human fault tolerance

Many systems impose useless retry-loops on humans.

Humans should receive feedback for human errors. But humans should not have to respond to problems that can be handled by computers (and their software developers).

Remember, all your retry-loops have to happen somewhere. Be careful what you hand over to clients and developers. Your business’s bottom line is tied to human patience; Computers have infinitely more patience than humans.

show me the code

is here task table:

create table task
( task_id bigint primary key not null generated always as identity
, task_type text not null -- consider using enum
, params jsonb not null -- hstore also viable
, created_at timestamptz not null default now()
, unique (task_type, params) -- optional, for pseudo-idempotency
)

Here is the code for the task worker:

const tasks = {
  SEND_EMAIL_WELCOME: async (tx, params) => {
    const { email } = params;
    if (!email) throw new Error(`Bad params ${JSON.stringify(params)}.`);
    await sendEmail({ email, body: "WELCOME" });
  },
};

(async () => {
  while (true) {
    try {
      while (true) {
        await sql.begin(async (tx: any) => {
          const rows = await tx`
            delete from task
            where task_id in
            ( select task_id
              from task
              order by random() -- use tablesample for better performance
              for update
              skip locked
              limit 1
            )
            returning task_id, task_type, params::jsonb as params
          `;
          for (const { task_type, params } of rows)
            if (task_type in tasks) {
              await tasks[task_type](tx, params);
            } else {
              throw new Error(`Task type not implemented: ${task_type}`);
            }
          if (rows.length <= 0) {
            await delay(10 * 1000);
          }
        });
      }
    } catch (err) {
      console.error(err);
      await delay(1 * 1000);
    }
  }
})();

Some notable features of this snippet:

  • work line will be No if removed sendEmail fails. PG transactions will be rolled back. row and sendEmail Will try again.
  • PG transactions tx The work is carried forward. This is convenient for marking rows as “processed”, etc.
  • Transactions greatly improve error-handling. Always organize reversible queries before causing irreversible side effects (e.g. mark DB state before sending email). Remember that the DB is committed last.
  • due to skip lockedYou can run any number of these workers in parallel. They won’t step on each other’s toes.
  • Random ordering is technically optional, but it makes the system more resilient to errors. With sufficient randomness, a single task type cannot block the queue for everyone.
  • Use order by (case task_type ... end), random() To create an easy priority queue.
  • Limiting the number of retries makes the code more complex, but it’s definitely worth it for user-facing side effects like emails.
  • if (rows.length <= 0) Prevents over-enthusiasm voting. Your DBA will be grateful.



Leave a Comment