The Best Way to Use a Db Table as a Job Queue (A.K.A Batch Queue or Message Queue)

The best way to use a DB table as a job queue (a.k.a batch queue or message queue)

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:

State for "E"rror.

Column for Reader process code.

Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS
DECLARE @MsgId INT

BEGIN TRAN

SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime

IF @MsgId IS NOT NULL
BEGIN

UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId

SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END

COMMIT TRAN

Message Queues Vs DB Table Queue via CRON

A message queue (a distributed one at least, e.g. RabbitMQ) gives you the ability to distribute work across physical nodes. You still need to have a process on each node to dequeue work and process it.

It gets down ultimately to your requirements I guess. You can achieve a more manageable solution at scale with using message queues: you can decouple your nodes more easily.

Of course, there is a learning curve... so it again comes back to your target goals.


Note that on each node you can still reuse your cron/db table until (and if) you wish to change the implementation. That's what great about decoupling when you can.

Is it better for a queued job to be queue-aware, or to use a service?

As author of SlmQueue I developed the QueueAwareInterface to be more flexible in requeueing jobs. However, you create a coupling between jobs and queues you might not want.

It's up to you :) But I don't really mind that jobs can push themselves again in the queue. If you are bothered about separation of concerns, you might want to create a service which does the job pushing. You can use that service in both your controllers (for pushing the job a first time) and in your jobs (to push them again when failed).

However, for me that's too much abstraction for a simple goal and I'd personally use the QueueAwareInterface :)

How to properly delay between executing a pool of workers

Okey, so let's start with this working example:

func Test_t(t *testing.T) {

// just a published, this publishes result on a chan
publish := func(s int, ch chan int, wg *sync.WaitGroup) {
ch <- s // this is blocking!!!
wg.Done()
}

wg := &sync.WaitGroup{}
wg.Add(100)

// we'll use done channel to notify the work is done
res := make(chan int)
done := make(chan struct{})
// create worker that will notify that all results were published
go func() {
wg.Wait()
done <- struct{}{}
}()

// let's create a jobs that publish on our res chan
// please note all goroutines are created immediately
for i := 0; i < 100; i++ {
go publish(i, res, wg)
}

// lets get 30 args and then wait
var resCounter int
forloop:
for {
select {
case ss := <-res:
println(ss)
resCounter += 1
// break the loop
if resCounter%30 == 0 {
// after receiving 30 results we are blocking this thread
// no more results will be taken from the channel for 5 seconds
println("received 30 results, waiting...")
time.Sleep(5 * time.Second)
}
case <-done:
// we are done here, let's break this infinite loop
break forloop
}
}
}

I hope this shows moreover how it can be done.

So, what's the problem with your code?
To be honest, it looks fine (I mean 30 results are published, then the code wait, then another 30 results, etc.), but the question is where would you like to wait?

There are a few possibilities I guess:

  • creating workers (this is how your code works now, as I see, it publishes jobs in 30-packs; please notice that the 2-second delay you have in the digit function is applicable only to the goroutine the code is executed)

  • triggering workers (so the "wait" code should be in worker function, not allowing to run more workers - so it must watch how many results were published)

  • handling results (this is how my code works and proper synchronization is in the forloop)

IPC design using ESB and messaging middleware

Disclaimer: I am the CTO of AdroitLogic, which builds the UltraESB mentioned in the question

You can easily get the ESB itself to poll the MS SQL and Oracle Databases for new actions to be performed. This could be scheduled in the ESB giving a cron schedule etc, or a simple delay (e.g. every hour). The ESB can enrich, transform and route etc, but you will need a way to track which records have been successfully processed - maybe a new column in the polled tables? Once that's available, you really do not need a persistent message queue, since the ESB can poll for un-processed records, do whatever is expected over them, and post them to the DMS - and update status as successful or failed. Unless the DMS rejects or becomes unavailable, there is no real point in-retrying, but you may want to do that, and that's possible too. If DMS accepts the record, the ESB can directly update the table columns. If you really want to use a message queue - that too is certainly possible, and depends on your selection.



Related Topics



Leave a reply



Submit