Using SQL Server as a Db Queue with Multiple Clients

Using SQL Server as a DB queue with multiple clients

I recommend you go over Using tables as Queues.
Properly implemented queues can handle thousands of concurrent users and service as high as 1/2 Million enqueue/dequeue operations per minute. Until SQL Server 2005 the solution was cumbersome and involved a mixing a SELECT and an UPDATE in a single transaction and give just the right mix of lock hints, as in the article linked by gbn. Luckly since SQL Server 2005 with the advent of the OUTPUT clause, a much more elegant solution is available, and now MSDN recommends using the OUTPUT clause:

You can use OUTPUT in applications
that use tables as queues, or to hold
intermediate result sets. That is, the
application is constantly adding or
removing rows from the table

Basically there are 3 parts of the puzzle you need to get right in order for this to work in a highly concurrent manner:

  1. You need to dequeue automically. You have to find the row, skip any locked rows, and mark it as 'dequeued' in a single, atomic operation, and this is where the OUTPUT clause comes into play:
    with CTE as (
SELECT TOP(1) COMMAND, PROCESSED
FROM TABLE WITH (READPAST)
WHERE PROCESSED = 0)
UPDATE CTE
SET PROCESSED = 1
OUTPUT INSERTED.*;

  1. You must structure your table with the leftmost clustered index key on the PROCESSED column. If the ID was used a primary key, then move it as the second column in the clustered key. The debate whether to keep a non-clustered key on the ID column is open, but I strongly favor not having any secondary non-clustered indexes over queues:
    CREATE CLUSTERED INDEX cdxTable on TABLE(PROCESSED, ID);

  1. You must not query this table by any other means but by Dequeue. Trying to do Peek operations or trying to use the table both as a Queue and as a store will very likely lead to deadlocks and will slow down throughput dramatically.

The combination of atomic dequeue, READPAST hint at searching elements to dequeue and leftmost key on the clustered index based on the processing bit ensure a very high throughput under a highly concurrent load.

Processing a database queue across multiple threads - design advice

I recommend read and internalize Using tables as Queues.

If you use the data as a queue, you must organize it properly for queuing operations. The article I linked goes into details about how to do this, what you have is a variant of a Pending Queue.

One thing you must absolutely get rid of is the randomness. If there is one thing that is hard to reproduce in a query, is randomness. ORDER BY NEWID() will scan every row, generate a guid, then SORT, and then give you back top 100. You cannot, under any circumstances, have every worker thread scan the entire table every time, you'll kill the server as the number of unprocessed entries grows.

Instead use pending processing date. Have the queue be organized (clustered) by processing date column (when the item is due for retry) and dequeue using the techniques I show in my linked article. If you want to retry, the dequeue should postpone the item instead of deleting it, ie. WITH (...) UPDATE SET due_date = dateadd(day, 1, getutcdate()) ...

Job queue as SQL table with multiple consumers (PostgreSQL)

Read my post here:

https://stackoverflow.com/a/6500830/32688

If you use transaction and LOCK TABLE you will have no problems.

Can SQL Server database time be used for synchronizing multiple clients?

As a general rule this approach should work. If there is only one source for the current time then time synchronisation issues are not going to affect you.

But you have to consider what happens if your database server goes down. Azure will switch you over to another copy, on another server, in another rack and this database server is not guaranteed to be synchronised at least with regard to time with the original one.

Also this approach will undoubtably get you in trouble if ever you have to scale out your database.

I think i still prefer the Queue based approach.

MSMQ ARCHITECTURE WITH DEDICATED PROCESSORS PER DATABASE

You actually have two questions


  1. First, how do you allocate a resources affinity to a processor to SQL Server.

    Select the database in Sql Management Studio, right click and follow this..

    Sample Image



  1. Clean your Database regularly

    DBCC FREEPROCCACHE;

    DBCC DROPCLEANBUFFERS;



  1. MSMQ, turn on [journaling][2], but also consider another queuing process RabbitMQ etc, or write a simple one to enquque the jobs sample from here

    public class MultiThreadQueue
    {
    BlockingCollection<string> _jobs = new BlockingCollection<string>();

    public MultiThreadQueue(int numThreads)
    {
    for (int i = 0; i < numThreads; i++)
    {
    var thread = new Thread(OnHandlerStart)
    { IsBackground = true };//Mark 'false' if you want to prevent program exit until jobs finish
    thread.Start();
    }
    }

    public void Enqueue(string job)
    {
    if (!_jobs.IsAddingCompleted)
    {
    _jobs.Add(job);
    }
    }

    public void Stop()
    {
    //This will cause '_jobs.GetConsumingEnumerable' to stop blocking and exit when it's empty
    _jobs.CompleteAdding();
    }

    private void OnHandlerStart()
    {
    foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
    {
    Console.WriteLine(job);
    Thread.Sleep(10);
    }
    }
    }

Hope this helps :)


The question has been reworded, he meant sometheng else when he said Processors.


Update added a consumer pattern with onPeek :

You really need to post some code!

Consider using the OnPeekCompleted method. If there is an error you can leave the message on the queue

If you have some kind of header which identifies the message you can switch to a different dedicated/thread.

    private static void OnPeekCompleted(Object sourceQueue, PeekCompletedEventArgs asyncResult)
{
// Set up and connect to the queue.
MessageQueue mq = (MessageQueue)sourceQueue;

// gets a new transaction going
using (var txn = new MessageQueueTransaction())
{
try
{
// retrieve message and process
txn.Begin();
// End the asynchronous peek operation.
var message = mq.Receive(txn);

#if DEBUG
// Display message information on the screen.
if (message != null)
{
Console.WriteLine("{0}: {1}", message.Label, (string)message.Body);
}
#endif
// message will be removed on txn.Commit.
txn.Commit();
}
catch (Exception ex)
{
// If there is an error you can leave the message on the queue, don't remove message from queue
Console.WriteLine(ex.ToString());
txn.Abort();
}
}

// Restart the asynchronous peek operation.
mq.BeginPeek();
}

You can also use a service broker

SQL Server Process Queue Race Condition

Edit:

I googled to check my answer: "Processing Data Queues in SQL Server with READPAST and UPDLOCK". It's been years since I read about and played with this solution.

Original:

If you use the READPAST hint, then locked rows are skipped. You've used ROWLOCK so you should avoid lock escalation. You also need UPDLOCK, as I found out.

So process 1 locks 20 rows, process 2 will take the next 20, process 3 takes rows 41 to 60, etc

The update can also be written like this:

UPDATE TOP (20)
foo
SET
ProcessorID = @PROCID
FROM
OrderTable foo WITH (ROWLOCK, READPAST, UPDLOCK)
WHERE
ProcessorID = 0

Refresh, Oct 2011

This can be done more elegantly with the OUTPUT clause if you need a SELECT and an UPDATE in one go.

Using SQL Service broker vs Queues with web service

TL;DR: I would recommend against using Service Broker in this kind of environment.

Detailed answer

While Service Broker is indeed a very lightweight and reliable communication mechanism, it was designed with a different goal in mind. Namely, it works best in a static topology, when administrators setup everything once and then the entire system runs for years, with little or no changes.

Judging by what I understood from your explanation, your network of connected hosts is much more dynamic, with hosts coming and going on a daily basis. This will incur high maintenance costs on your support, because in order to establish communication between two Service Broker endpoints belonging to different SQL Server instances, you will need (among many other things) to generate at least 1 pair of certificates and exchange their public keys between participating instances, after which they will have to be deployed in both the master and the subject databases on both sides.

This certificate exchange and deployment should be done before Service Broker messaging will be possible, so you will need another communication channel between the servers for the exchange to happen. Normally, this is done manually by DBAs due to high security risks associated with potential loss of transport-level keys. In your environment, however, there is a good chance that people will simply not be able to keep up. Not to mention a potential for human errors, which will be quite high due to large amount of repetitive manual work.

In short, I would recommend to look for something which is easier to deploy and maintain. Change tracking might be a good start; as for transport, you have a full smorgasbord of choices, from WCF to WebAPI (to whatever else have appeared in the last few years).

The best way to use a DB table as a job queue (a.k.a batch queue or message queue)

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:

State for "E"rror.

Column for Reader process code.

Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS
DECLARE @MsgId INT

BEGIN TRAN

SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime

IF @MsgId IS NOT NULL
BEGIN

UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId

SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END

COMMIT TRAN


Related Topics



Leave a reply



Submit