Job Queue as SQL Table with Multiple Consumers (Postgresql)

Job queue as SQL table with multiple consumers (PostgreSQL)

Read my post here:

https://stackoverflow.com/a/6500830/32688

If you use transaction and LOCK TABLE you will have no problems.

job queue with multiple consumers did the same job twice

Your solution with FOR UPDATE SKIP LOCKED is fine. It'll ensure a row is locked by exactly one session before being updated for processing. No transaction can choose a row already locked by another transaction, and when the lock is released on commit, subsequent SELECT clauses will no longer match the row.

The original failed because the subquery's SELECT can choose the same row concurrently in multiple sessions, each of which then tries to UPDATE the row. There's no WHERE clause in the UPDATE that'd make that fail; it's perectly fine for two concurrent sessions to UPDATE invoice_job SET status = 'working' WHERE node = 42 or whatever. The second update will happily run and commit once the first update succeeds.

You could also make it safe by repeating the WHERE clause in the UPDATE

UPDATE invoice_job SET status = 'working', date_time_start = now(), 
node = $ip
WHERE id = (SELECT id FROM invoice_job WHERE status = 'created' ORDER BY id LIMIT 1)
AND status = 'created'
RETURNING *

... but this will often return zero rows under high concurrency.

In fact it will return zero rows for all but one of a set of concurrent executions, so it's no better than a serial queue worker. This is true of most of the other "clever" tricks people use to try to do concurrent queues, and one of the main reasons SKIP LOCKED was introduced.

The fact that you only noticed this problem now tells me that you would actually be fine with a simple, serial queue dispatch where you LOCK TABLE before picking the first row. But SKIP LOCKED will scale better if your workload grows.

Multiple processes are reading from a table to take jobs

There is SELECT ... FOR UPDATE feature:

FOR UPDATE causes the rows retrieved by the SELECT statement to be locked as though for update. This prevents them from being modified or deleted by other transactions until the current transaction ends. That is, other transactions that attempt UPDATE, DELETE, or SELECT FOR UPDATE of these rows will be blocked until the current transaction ends.

One possible way of implementing your queue is:

  1. Worker process runs SELECT ... WHERE status = 'open' FOR UPDATE.
  2. Worker process runs UPDATE ... WHERE id IN (...) with IDs it got from previous step.
  3. Worker process does its stuff.
  4. Worker process update tasks statuses to completed.

Sharing work specified in Postgres table between multiple Python workers

Postgres has the SKIP LOCKED option, and the use case they specify in the docs seems to closely align with your goals here:

...any selected rows that cannot be immediately locked are skipped.
Skipping locked rows provides an inconsistent view of the data, so
this is not suitable for general purpose work, but can be used to
avoid lock contention with multiple consumers accessing a queue-like
table.

On the SQLAlchemy side of the equation, the with_for_update() method that you are already using provides the skip_locked Boolean flag:

...will render FOR UPDATE SKIP LOCKED on Oracle and PostgreSQL dialects or FOR SHARE
SKIP LOCKED if read=True is also specified.

So, appears that Postgresql and SQLAlchemy have you covered:)

Exclusive lock a table for more than one query

I wouldn't lock the table at all, no need for. I think. When I have 500 tickets, I would create 500 records (tickets). When someone buys a numbers of tickets, you have to update these tickets as SOLD. Use a SELECT FOR UPDATE statement in combination with SKIP LOCKED to get the number of tickets you need and then UPDATE these selected records. Multiple customers can buy tickets at the same time, without issues.

The only thing left, is what to do when someone wants to buy 10 tickets when you don't have 10 tickets anymore.

Ensure processing order with multiple consumer

According to http://activemq.apache.org/message-groups.html, you can set "JMSXGroupID" to ensure processing order with multiple consumers



Related Topics



Leave a reply



Submit