The best way to use a DB table as a job queue (a.k.a batch queue or message queue)
Here's what I've used successfully in the past:
MsgQueue table schema
MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable
Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).
Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.
The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.
Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.
This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.
Code from comments:
CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) )
AS
DECLARE @MsgId INT
BEGIN TRAN
SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime
IF @MsgId IS NOT NULL
BEGIN
UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId
SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END
COMMIT TRAN
Message Queues Vs DB Table Queue via CRON
A message queue (a distributed one at least, e.g. RabbitMQ) gives you the ability to distribute work across physical nodes. You still need to have a process on each node to dequeue work and process it.
It gets down ultimately to your requirements I guess. You can achieve a more manageable solution at scale with using message queues: you can decouple your nodes more easily.
Of course, there is a learning curve... so it again comes back to your target goals.
Note that on each node you can still reuse your cron/db table until (and if) you wish to change the implementation. That's what great about decoupling when you can.
Is it better for a queued job to be queue-aware, or to use a service?
As author of SlmQueue I developed the QueueAwareInterface to be more flexible in requeueing jobs. However, you create a coupling between jobs and queues you might not want.
It's up to you :) But I don't really mind that jobs can push themselves again in the queue. If you are bothered about separation of concerns, you might want to create a service which does the job pushing. You can use that service in both your controllers (for pushing the job a first time) and in your jobs (to push them again when failed).
However, for me that's too much abstraction for a simple goal and I'd personally use the QueueAwareInterface :)
How to properly delay between executing a pool of workers
Okey, so let's start with this working example:
func Test_t(t *testing.T) {
// just a published, this publishes result on a chan
publish := func(s int, ch chan int, wg *sync.WaitGroup) {
ch <- s // this is blocking!!!
wg.Done()
}
wg := &sync.WaitGroup{}
wg.Add(100)
// we'll use done channel to notify the work is done
res := make(chan int)
done := make(chan struct{})
// create worker that will notify that all results were published
go func() {
wg.Wait()
done <- struct{}{}
}()
// let's create a jobs that publish on our res chan
// please note all goroutines are created immediately
for i := 0; i < 100; i++ {
go publish(i, res, wg)
}
// lets get 30 args and then wait
var resCounter int
forloop:
for {
select {
case ss := <-res:
println(ss)
resCounter += 1
// break the loop
if resCounter%30 == 0 {
// after receiving 30 results we are blocking this thread
// no more results will be taken from the channel for 5 seconds
println("received 30 results, waiting...")
time.Sleep(5 * time.Second)
}
case <-done:
// we are done here, let's break this infinite loop
break forloop
}
}
}
I hope this shows moreover how it can be done.
So, what's the problem with your code?
To be honest, it looks fine (I mean 30 results are published, then the code wait, then another 30 results, etc.), but the question is where would you like to wait?
There are a few possibilities I guess:
creating workers (this is how your code works now, as I see, it publishes jobs in 30-packs; please notice that the 2-second delay you have in the
digit
function is applicable only to the goroutine the code is executed)triggering workers (so the "wait" code should be in worker function, not allowing to run more workers - so it must watch how many results were published)
handling results (this is how my code works and proper synchronization is in the
forloop
)
IPC design using ESB and messaging middleware
Disclaimer: I am the CTO of AdroitLogic, which builds the UltraESB mentioned in the question
You can easily get the ESB itself to poll the MS SQL and Oracle Databases for new actions to be performed. This could be scheduled in the ESB giving a cron schedule etc, or a simple delay (e.g. every hour). The ESB can enrich, transform and route etc, but you will need a way to track which records have been successfully processed - maybe a new column in the polled tables? Once that's available, you really do not need a persistent message queue, since the ESB can poll for un-processed records, do whatever is expected over them, and post them to the DMS - and update status as successful or failed. Unless the DMS rejects or becomes unavailable, there is no real point in-retrying, but you may want to do that, and that's possible too. If DMS accepts the record, the ESB can directly update the table columns. If you really want to use a message queue - that too is certainly possible, and depends on your selection.
Related Topics
Group Query Results by Month and Year in Postgresql
How to Drop Table Variables in SQL-Server? Should I Even Do This
How to Sort the Result from String_Agg()
SQL Get All Records Older Than 30 Days
SQL - Best Practice for a Friendship Table
How to Select the Comparison of Two Columns as One Column in Oracle
SQL How to Select the Most Recent Date Item
Use of Xml.Modify to Insert Parameters into Specific Element of an Xml Column
Postgresql Alter Column Data Type to Timestamp Without Time Zone
List All Sequences in a Postgres Db 8.1 with SQL
Mysql: Returning Multiple Columns from an In-Line Subquery
How to Find Records That Are Not Joined
Table in Excel from SQL Server Stored Procedure with Parameter Field in Workbook
Sql: How to to Sum Two Values from Different Tables