Message Queues in Ruby on Rails

Message Queues in Ruby on Rails

As an update -- GitHub have moved to Resque on Redis instead of Delayed job. However they still recommend delayed_job for smaller setups:

https://github.com/resque/resque

Sinatra message Queue

Look at resque. It is framework agnostic and contains rake tasks to start an arbitrary number of workers to consume your queues. It uses redis lists for the queue backends, so you will need to install and manage that.

RabbitMq: Message lost from queue when exceptions occurred during message processing

There is no way to get the messages back when they're lost. Maybe you could try and track down some entries in RMQ's database cache - but that's just a wild guess/long shot and I don't think that it will help.

What you do need to do for the future is:

  • in case you are using a single server: make the queues and messages durable, and explicitly acknowledge (so switch off the auto-ACK flag) messages on consumer side only once they're processed.
  • in case you are using cluster of RMQ nodes (which is of course recommended exactly to avoid these situations): set up queue mirroring

Take a look at RMQ persistance and high availability.

Rails: How to listen to / pull from service or queue?

I just set up RabbitMQ messaging within my application and will be implementing for decoupled (multiple, distributed) applications in the next day or so. I found this article very helpful (and the RabbitMQ tutorials, too). All the code below is for RabbitMQ and assumes you have a RabbitMQ server up and running on your local machine.

Here's what I have so far - that's working for me:

  #Gemfile
gem 'bunny'
gem 'sneakers'

I have a Publisher that sends to the queue:

  # app/agents/messaging/publisher.rb
module Messaging
class Publisher
class << self

def publish(args)
connection = Bunny.new
connection.start
channel = connection.create_channel
queue_name = "#{args.keys.first.to_s.pluralize}_queue"
queue = channel.queue(queue_name, durable: true)
channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
puts "in #{self}.#{__method__}, [x] Sent #{args}!"
connection.close
end

end
end
end

Which I use like this:

  Messaging::Publisher.publish(event: {... event details...})

Then I have my 'listener':

  # app/agents/messaging/events_queue_receiver.rb
require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"

module Messaging
class EventsQueueReceiver
include Sneakers::Worker
from_queue :events_queue, env: nil

def work(msg)
logger.info msg
response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
ack! if response[:success]
end

end
end

The 'listener' sends the message to Messaging::EventsAgent.distribute, which is like this:

  # app/agents/messaging/events_agent.rb
require_dependency #{Rails.root.join('app','agents','fsm','state_assignment_agent')}"

module Messaging
class EventsAgent
EVENT_HANDLERS = {
enroll_in_program: ["FSM::StateAssignmentAgent"]
}
class << self

def publish(event)
Messaging::Publisher.publish(event: event)
end

def distribute(event)
puts "in #{self}.#{__method__}, message"
if event[:handler]
puts "in #{self}.#{__method__}, event[:handler: #{event[:handler}"
event[:handler].constantize.handle_event(event)
else
event_name = event[:event_name].to_sym
EVENT_HANDLERS[event_name].each do |handler|
event[:handler] = handler
publish(event)
end
end
return {success: true}
end

end
end
end

Following the instructions on Codetunes, I have:

  # Rakefile
# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.

require File.expand_path('../config/application', __FILE__)

require 'sneakers/tasks'
Rails.application.load_tasks

And:

  # app/config/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

I open two console windows. In the first, I say (to get my listener running):

  $ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
... a bunch of start up info
2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2

In the second, I say:

  $ rails s --sandbox
2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}})
in Messaging::Publisher.publish, [x] Sent {:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}}!
=> :closed

Then, back in my first window, I see:

  2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
in Messaging::EventsAgent.distribute, message
in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent

And in my RabbitMQ server, I see:

Sample Image

It's a pretty minimal setup and I'm sure I'll be learning a lot more in coming days.

Good luck!

Messages stacking on consumers, when other consumers are available in RabbitMQ - using bunny for rails

RabbitMQ is working as intended.

Since your code does not set QoS / prefetch, RabbitMQ sends all six messages to your first consumer. Since that consumer takes time to acknowledge messages (simulated by a 45 second sleep in the code), those six remain in the "Unacked" state while your other two consumers don't have anything to work on. Restarting those other two consumers has no effect since all six messages are in "Unacked" waiting for an ack from the first consumer.

When you restart your first consumer, RabbitMQ detects the lost connection and enqueues the six messages to the "Ready" state, and delivers all six (most likely) to another consumer, and the problem repeats.

Please see this runnable code sample for how to set prefetch. With a prefetch of 1, RabbitMQ will at most deliver one message to a consumer and will wait for an ack before delivering another message to that consumer. In that way, messages will be distributed among your consumers.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

Message Queue with offline clients support

You can do this with RabbitMQ if you are prepared to stick a broker on each machine, and then have them connect to your central broker with the shovel. Each machine then queues locally when offline. It's more hassle to run though.



Related Topics



Leave a reply



Submit