Using a Database Table as a Queue

Using a database table as a queue

I'd use an IDENTITY field as the primary key to provide the uniquely incrementing ID for each queued item, and stick a clustered index on it. This would represent the order in which the items were queued.

To keep the items in the queue table while you process them, you'd need a "status" field to indicate the current status of a particular item (e.g. 0=waiting, 1=being processed, 2=processed). This is needed to prevent an item be processed twice.

When processing items in the queue, you'd need to find the next item in the table NOT currently being processed. This would need to be in such a way so as to prevent multiple processes picking up the same item to process at the same time as demonstrated below. Note the table hints UPDLOCK and READPAST which you should be aware of when implementing queues.

e.g. within a sproc, something like this:

DECLARE @NextID INTEGER

BEGIN TRANSACTION

-- Find the next queued item that is waiting to be processed
SELECT TOP 1 @NextID = ID
FROM MyQueueTable WITH (UPDLOCK, READPAST)
WHERE StateField = 0
ORDER BY ID ASC

-- if we've found one, mark it as being processed
IF @NextId IS NOT NULL
UPDATE MyQueueTable SET Status = 1 WHERE ID = @NextId

COMMIT TRANSACTION

-- If we've got an item from the queue, return to whatever is going to process it
IF @NextId IS NOT NULL
SELECT * FROM MyQueueTable WHERE ID = @NextID

If processing an item fails, do you want to be able to try it again later? If so, you'll need to either reset the status back to 0 or something. That will require more thought.

Alternatively, don't use a database table as a queue, but something like MSMQ - just thought I'd throw that in the mix!

The best way to use a DB table as a job queue (a.k.a batch queue or message queue)

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:

State for "E"rror.

Column for Reader process code.

Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS
DECLARE @MsgId INT

BEGIN TRAN

SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime

IF @MsgId IS NOT NULL
BEGIN

UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId

SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END

COMMIT TRAN

Using SQL Server as a DB queue with multiple clients

I recommend you go over Using tables as Queues.
Properly implemented queues can handle thousands of concurrent users and service as high as 1/2 Million enqueue/dequeue operations per minute. Until SQL Server 2005 the solution was cumbersome and involved a mixing a SELECT and an UPDATE in a single transaction and give just the right mix of lock hints, as in the article linked by gbn. Luckly since SQL Server 2005 with the advent of the OUTPUT clause, a much more elegant solution is available, and now MSDN recommends using the OUTPUT clause:

You can use OUTPUT in applications
that use tables as queues, or to hold
intermediate result sets. That is, the
application is constantly adding or
removing rows from the table

Basically there are 3 parts of the puzzle you need to get right in order for this to work in a highly concurrent manner:

  1. You need to dequeue automically. You have to find the row, skip any locked rows, and mark it as 'dequeued' in a single, atomic operation, and this is where the OUTPUT clause comes into play:
    with CTE as (
SELECT TOP(1) COMMAND, PROCESSED
FROM TABLE WITH (READPAST)
WHERE PROCESSED = 0)
UPDATE CTE
SET PROCESSED = 1
OUTPUT INSERTED.*;

  1. You must structure your table with the leftmost clustered index key on the PROCESSED column. If the ID was used a primary key, then move it as the second column in the clustered key. The debate whether to keep a non-clustered key on the ID column is open, but I strongly favor not having any secondary non-clustered indexes over queues:
    CREATE CLUSTERED INDEX cdxTable on TABLE(PROCESSED, ID);

  1. You must not query this table by any other means but by Dequeue. Trying to do Peek operations or trying to use the table both as a Queue and as a store will very likely lead to deadlocks and will slow down throughput dramatically.

The combination of atomic dequeue, READPAST hint at searching elements to dequeue and leftmost key on the clustered index based on the processing bit ensure a very high throughput under a highly concurrent load.

SQL Server, using a table as a queue

I need mechanism whereby I can attempt
to select a single row from the table
and, if there isn't one, block until
there is (preferably for a specific
period of time).

You can loop and check for new rows every second:

while not exists (select * from QueueTable)
begin
wait for delay '00:01'
end

Disclaimer: this is not code I would use for a production system, but it does what you ask.

Message Queues Vs DB Table Queue via CRON

A message queue (a distributed one at least, e.g. RabbitMQ) gives you the ability to distribute work across physical nodes. You still need to have a process on each node to dequeue work and process it.

It gets down ultimately to your requirements I guess. You can achieve a more manageable solution at scale with using message queues: you can decouple your nodes more easily.

Of course, there is a learning curve... so it again comes back to your target goals.


Note that on each node you can still reuse your cron/db table until (and if) you wish to change the implementation. That's what great about decoupling when you can.

Using MySQL table as a queue with threads in Python

2 quick points :

  1. Assuming you are using cPython, The GIL will effectively
    render threading useless, allowing only 1 thread through the
    interpreter at one time. Couple of workarounds are :

    • The Gevent library [source]

      gevent is a coroutine-based Python networking library that uses
      greenlet to provide a high-level synchronous API on top of the libev
      event loop.

    • The multiprocessing module, you can spawn multiple processes - this is true concurrency in python.

    • The concurrent.futures module - new in python 3, port available for
      python 2. [source]

      This is a new high-level library that operates only at a “job” level, which means that you no longer have to fuss with

      synchronization, or managing threads or processes. you just specify a
      thread or process pool with a certain number of “workers,” submit

      jobs, and collate the results. It’s new in Python 3.2, but a port for
      Python 2.6+ is available at http://code.google.com/p/pythonfutures.

You can use the SSDictCursor() of MySQLdb and do a fetchone().This is a streaming cursor and you can run this in an infinite while() loop to resemble a queue:

cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other

  1. Having said all that, I would suggest you look at implementing tools like celery or mongodb to emulate queues and workers. Relational databases are just not cut out for that kind of a job and suffer unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.


Related Topics



Leave a reply



Submit