How Get Best Performance Rails Requests Parallel Sidekiq Worker

Concurrent HTTP requests from within a single Sidekiq worker?

It's reasonable to use threads within Sidekiq job threads. It's not reasonable to build your own threading infrastructure. You can use a reusable thread pool with the concurrent-ruby or parallel gems, you can use an http client which is thread-safe and allows concurrent requests, etc. HTTP.rb is a good one from Tony Arcieri but plain old net/http will work too:

https://github.com/httprb/http/wiki/Thread-Safety

Just remember that there's a few complexities: the job might be retried, how do you handle errors that the HTTP client raises? If you don't split these requests 1-to-1 with jobs, you might need to track each or idempotency becomes an issue.

And you are always welcome to increase your Sidekiq Enterprise thread count. :-D

Big task or multiple small tasks with Sidekiq

Here are my thoughts.

1. Do a single SQL query instead of N queries

This line: group.users += users_to_process is likely to produce N SQL queries (where N is users_to_process.count). I assume that you have many-to-many connection between users and groups (with user_groups join table/model), so you should use some Mass inserting data technique:

users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")

Yes, it's raw SQL. And it's fast.

2. User pluck(:id) instead of map(&:id)

pluck is much quicker, because:

  • It will select only 'id' column, so less data is transferred from DB
  • More importantly, it won't create ActiveRecord object for each raw

Doing SQL is cheap. Creating Ruby objects is really expensive.

3. Use horizontal parallelization instead of vertical parallelization

What I mean here, is if you need to do sequential tasks A -> B -> C for a dozen of records, there are two major ways to split the work:

  • Vertical segmentation. AWorker does A(1), A(2), A(3); BWorker does B(1), etc.; CWorker does all C(i) jobs;
  • Horizontal segmentation. UniversalWorker does A(1)+B(1)+C(1).

Use the latter (horizontal) way.

It's a statement from experience, not from some theoretical point of view (where both ways are feasible).

Why you should do that?

  • When you use vertical segmentation, you will likely get errors when you pass job from one worker down to another. Like such kind of errors. You will pull your hair out if you bump into such errors, because they aren't persistent and easily reproducible. Sometimes they happen and sometimes they aren't. Is it possible to write a code which will pass the work down the chain without errors? Sure, it is. But it's better to keep it simple.
  • Imagine that your server is at rest. And then suddenly new jobs arrive. Your B and C workers will just waste the RAM, while your A workers do the job. And then your A and C will waste the RAM, while B's are at work. And so on. If you make horizontal segmentation, your resource drain will even itself out.

Applying that advice to your specific case: for starters, don't call perform_async in another async task.

4. Process in batches

Answering your original question – yes, do process in batches. Creating and managing async task takes some resources by itself, so there's no need to create too many of them.


TL;DR So in the end, your code could look something like this:

# model code

BATCH_SIZE = 100

def add_users
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
# With 100,000 users performance of this query should be acceptable
# to make it in a synchronous fasion
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")

users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
AddUserToGroupWorker.perform_async group_id, batch
end
end

# add_user_to_group_worker.rb

def perform(group_id, user_ids_to_add)
group = Group.find group_id

# Do some heavy load with a batch as a whole
# ...
# ...
# If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead

user_ids_to_add.each do |id|
# do it synchronously – we already parallelized the job
# by splitting it in slices in the model above
UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
end
end

Sidekiq: perform_async and order-dependent operations

If the email can only be sent after the text message has been sent, then send the email after successful completion of sending the text.

class ContactUserWorker
include Sidekiq::Worker

def perform(user_id)
SendUserTextWorker.perform_async(user_id)
end
end

class SendUserTextWorker
include Sidekiq::Worker

def perform(user_id)
user = User.find(user_id)
text_sent = user.send_text
SendUserEmailWorker.perform_async(user_id) if text_sent
end
end

class SendUserEmailWorker
include Sidekiq::Worker

def perform(user_id)
user = User.find(user_id)
user.send_email
end
end

In user.send_text you need to handle the fact that neither the text or the email has been sent.

How do I create a worker daemon which waits for jobs and executes them?

I'd build a queue in a table in the database, and a bit of code that is periodically started by cron, which walks that table, passing requests to Typhoeus and Hydra.

Here's how the author summarizes the gem:

Like a modern code version of the mythical beast with 100 serpent heads, Typhoeus runs HTTP requests in parallel while cleanly encapsulating handling logic.

As users add requests, append them to the table. You'll want fields like:

  • A "processed" field so you can tell which were handled in case the system goes down.
  • A "success" field so you can tell which requests were processed successfully, so you can retry if they failed.
  • A "retry_count" field so you can retry up to "n" times, then flag that URL as unreachable.
  • A "next_scan_time" field that says when the URL should be scanned again so you don't DOS a site by hitting it continuously.

Typhoeus and Hydra are easy to use, and do make it easy to handle multiple requests.



Related Topics



Leave a reply



Submit