Job queue as SQL table with multiple consumers (PostgreSQL)
Read my post here:
https://stackoverflow.com/a/6500830/32688
If you use transaction and LOCK TABLE you will have no problems.
job queue with multiple consumers did the same job twice
Your solution with FOR UPDATE SKIP LOCKED
is fine. It'll ensure a row is locked by exactly one session before being updated for processing. No transaction can choose a row already locked by another transaction, and when the lock is released on commit, subsequent SELECT
clauses will no longer match the row.
The original failed because the subquery's SELECT
can choose the same row concurrently in multiple sessions, each of which then tries to UPDATE
the row. There's no WHERE
clause in the UPDATE
that'd make that fail; it's perectly fine for two concurrent sessions to UPDATE invoice_job SET status = 'working' WHERE node = 42
or whatever. The second update will happily run and commit once the first update succeeds.
You could also make it safe by repeating the WHERE
clause in the UPDATE
UPDATE invoice_job SET status = 'working', date_time_start = now(),
node = $ip
WHERE id = (SELECT id FROM invoice_job WHERE status = 'created' ORDER BY id LIMIT 1)
AND status = 'created'
RETURNING *
... but this will often return zero rows under high concurrency.
In fact it will return zero rows for all but one of a set of concurrent executions, so it's no better than a serial queue worker. This is true of most of the other "clever" tricks people use to try to do concurrent queues, and one of the main reasons SKIP LOCKED
was introduced.
The fact that you only noticed this problem now tells me that you would actually be fine with a simple, serial queue dispatch where you LOCK TABLE
before picking the first row. But SKIP LOCKED
will scale better if your workload grows.
Multiple processes are reading from a table to take jobs
There is SELECT ... FOR UPDATE
feature:
FOR UPDATE causes the rows retrieved by the SELECT statement to be locked as though for update. This prevents them from being modified or deleted by other transactions until the current transaction ends. That is, other transactions that attempt UPDATE, DELETE, or SELECT FOR UPDATE of these rows will be blocked until the current transaction ends.
One possible way of implementing your queue is:
- Worker process runs
SELECT ... WHERE status = 'open' FOR UPDATE
. - Worker process runs
UPDATE ... WHERE id IN (...)
with IDs it got from previous step. - Worker process does its stuff.
- Worker process update tasks statuses to
completed
.
Sharing work specified in Postgres table between multiple Python workers
Postgres has the SKIP LOCKED
option, and the use case they specify in the docs seems to closely align with your goals here:
...any selected rows that cannot be immediately locked are skipped.
Skipping locked rows provides an inconsistent view of the data, so
this is not suitable for general purpose work, but can be used to
avoid lock contention with multiple consumers accessing a queue-like
table.
On the SQLAlchemy side of the equation, the with_for_update()
method that you are already using provides the skip_locked
Boolean flag:
...will render FOR UPDATE SKIP LOCKED on Oracle and PostgreSQL dialects or FOR SHARE
SKIP LOCKED if read=True is also specified.
So, appears that Postgresql and SQLAlchemy have you covered:)
Exclusive lock a table for more than one query
I wouldn't lock the table at all, no need for. I think. When I have 500 tickets, I would create 500 records (tickets). When someone buys a numbers of tickets, you have to update these tickets as SOLD. Use a SELECT FOR UPDATE statement in combination with SKIP LOCKED to get the number of tickets you need and then UPDATE these selected records. Multiple customers can buy tickets at the same time, without issues.
The only thing left, is what to do when someone wants to buy 10 tickets when you don't have 10 tickets anymore.
Ensure processing order with multiple consumer
According to http://activemq.apache.org/message-groups.html, you can set "JMSXGroupID" to ensure processing order with multiple consumers
Related Topics
Oracle Convert Timestamp with Timezone to Date
MySQL Bulk Load Command Line Tool
Select Max(X) Is Returning Null; How to Make It Return 0
How to Return Default Value from SQL Query
How to Get the Last Day of Month in Postgres
MySQL - Difference Between in and Exist
SQL Server: Calculating Date Ranges
How to Compare Two Columns for Equality in SQL Server
How to Read Multiple Result Sets Returned from a SQL Server Stored Procedure in R
How to Use Parameters with Rpostgresql (To Insert Data)
How to Do "Where Exists" in Arel
Rails 3 Activerecord Query Using Both SQL in and SQL or Operators
How to Properly Add Brackets to SQL Queries with 'Or' and 'And' Clauses by Using Arel
Connect SQL Server in Ruby on Rails
Moving Average Based on Timestamps in Postgresql
Grant Execute Permission for a User on All Stored Procedures in Database