Message Queues in Ruby on Rails
As an update -- GitHub have moved to Resque on Redis instead of Delayed job. However they still recommend delayed_job for smaller setups:
https://github.com/resque/resque
Sinatra message Queue
Look at resque. It is framework agnostic and contains rake tasks to start an arbitrary number of workers to consume your queues. It uses redis lists for the queue backends, so you will need to install and manage that.
RabbitMq: Message lost from queue when exceptions occurred during message processing
There is no way to get the messages back when they're lost. Maybe you could try and track down some entries in RMQ's database cache - but that's just a wild guess/long shot and I don't think that it will help.
What you do need to do for the future is:
- in case you are using a single server: make the queues and messages durable, and explicitly acknowledge (so switch off the auto-ACK flag) messages on consumer side only once they're processed.
- in case you are using cluster of RMQ nodes (which is of course recommended exactly to avoid these situations): set up queue mirroring
Take a look at RMQ persistance and high availability.
Rails: How to listen to / pull from service or queue?
I just set up RabbitMQ messaging within my application and will be implementing for decoupled (multiple, distributed) applications in the next day or so. I found this article very helpful (and the RabbitMQ tutorials, too). All the code below is for RabbitMQ and assumes you have a RabbitMQ server up and running on your local machine.
Here's what I have so far - that's working for me:
#Gemfile
gem 'bunny'
gem 'sneakers'
I have a Publisher
that sends to the queue:
# app/agents/messaging/publisher.rb
module Messaging
class Publisher
class << self
def publish(args)
connection = Bunny.new
connection.start
channel = connection.create_channel
queue_name = "#{args.keys.first.to_s.pluralize}_queue"
queue = channel.queue(queue_name, durable: true)
channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
puts "in #{self}.#{__method__}, [x] Sent #{args}!"
connection.close
end
end
end
end
Which I use like this:
Messaging::Publisher.publish(event: {... event details...})
Then I have my 'listener':
# app/agents/messaging/events_queue_receiver.rb
require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"
module Messaging
class EventsQueueReceiver
include Sneakers::Worker
from_queue :events_queue, env: nil
def work(msg)
logger.info msg
response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
ack! if response[:success]
end
end
end
The 'listener' sends the message to Messaging::EventsAgent.distribute
, which is like this:
# app/agents/messaging/events_agent.rb
require_dependency #{Rails.root.join('app','agents','fsm','state_assignment_agent')}"
module Messaging
class EventsAgent
EVENT_HANDLERS = {
enroll_in_program: ["FSM::StateAssignmentAgent"]
}
class << self
def publish(event)
Messaging::Publisher.publish(event: event)
end
def distribute(event)
puts "in #{self}.#{__method__}, message"
if event[:handler]
puts "in #{self}.#{__method__}, event[:handler: #{event[:handler}"
event[:handler].constantize.handle_event(event)
else
event_name = event[:event_name].to_sym
EVENT_HANDLERS[event_name].each do |handler|
event[:handler] = handler
publish(event)
end
end
return {success: true}
end
end
end
end
Following the instructions on Codetunes, I have:
# Rakefile
# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
require File.expand_path('../config/application', __FILE__)
require 'sneakers/tasks'
Rails.application.load_tasks
And:
# app/config/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
I open two console windows. In the first, I say (to get my listener running):
$ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
... a bunch of start up info
2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2
In the second, I say:
$ rails s --sandbox
2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}})
in Messaging::Publisher.publish, [x] Sent {:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}}!
=> :closed
Then, back in my first window, I see:
2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
in Messaging::EventsAgent.distribute, message
in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent
And in my RabbitMQ server, I see:
It's a pretty minimal setup and I'm sure I'll be learning a lot more in coming days.
Good luck!
Messages stacking on consumers, when other consumers are available in RabbitMQ - using bunny for rails
RabbitMQ is working as intended.
Since your code does not set QoS / prefetch, RabbitMQ sends all six messages to your first consumer. Since that consumer takes time to acknowledge messages (simulated by a 45 second sleep in the code), those six remain in the "Unacked" state while your other two consumers don't have anything to work on. Restarting those other two consumers has no effect since all six messages are in "Unacked" waiting for an ack from the first consumer.
When you restart your first consumer, RabbitMQ detects the lost connection and enqueues the six messages to the "Ready" state, and delivers all six (most likely) to another consumer, and the problem repeats.
Please see this runnable code sample for how to set prefetch. With a prefetch of 1
, RabbitMQ will at most deliver one message to a consumer and will wait for an ack before delivering another message to that consumer. In that way, messages will be distributed among your consumers.
NOTE: the RabbitMQ team monitors the rabbitmq-users
mailing list and only sometimes answers questions on StackOverflow.
Message Queue with offline clients support
You can do this with RabbitMQ if you are prepared to stick a broker on each machine, and then have them connect to your central broker with the shovel. Each machine then queues locally when offline. It's more hassle to run though.
Related Topics
Will Uuid as Primary Key in Postgresql Give Bad Index Performance
Ruby Metaprogramming Online Tutorial
How to Get the Number of Days in a Given Month in Ruby, Accounting for Year
How to Find the Path a Ruby Gem Is Installed at (I.E. Gem.Lib_Path C.F. Gem.Bin_Path)
Why Is Rspec So Slow Under Rails
How to Use Array#Delete While Iterating Over the Array
How to Get the Parent's Class Name in Ruby
Packaging Ruby into a Portable Executable
Remove Substring from the String
How to Generate a Random Number Between a and B in Ruby
Should I Define a Main Method in My Ruby Scripts
"Uninitialized Constant" Error When Including a Module
Ruby: How to Define a Class Method in a Module
How to Get Constants Defined by Ruby's Module Class via Reflection