Shane Xu's Home

Life is too short for so much sorrow.

Sidekiq原理

第一次接触Sidekiq是在前年吧。那时候,为了帮朋友定制一个论坛系统,我选了discourse,然而终究因为它不支持IE8,最后我们放弃了,这是后话。但是这个过程中,我重新拾起了荒废多年的ruby(我好像从来都没有正经学习过ruby),同时我也接触到了一个ruby的后台任务框架——Sidekiq(Simple, efficient background processing for Ruby.)。我那时就是简单的看了看文档和配置,觉得能用就行了。后来,因为自己想着什么时候用scala搞一个后台任务框架,就想什么时候看看Sidekiq的源代码。结果这一念想,过了一年多才得以实现。

首先,我们从Sidekiq的源代码中examples目录下的por.rb(plain old ruby)这个例子。

 1: require 'sidekiq'
 2: 
 3: # If your client is single-threaded, we just need a single connection in our Redis connection pool
 4: Sidekiq.configure_client do |config|
 5:   config.redis = { :namespace => 'x', :size => 1 }
 6: end
 7: 
 8: # Sidekiq server is multi-threaded so our Redis connection pool size defaults to concurrency (-c)
 9: Sidekiq.configure_server do |config|
10:   config.redis = { :namespace => 'x' }
11: end
12: 
13: # Start up sidekiq via
14: # ./bin/sidekiq -r ./examples/por.rb
15: # and then you can open up an IRB session like so:
16: # irb -r ./examples/por.rb
17: # where you can then say
18: # PlainOldRuby.perform_async "like a dog", 3
19: #
20: class PlainOldRuby
21:   include Sidekiq::Worker
22: 
23:   def perform(how_hard="super hard", how_long=1)
24:     sleep how_long
25:     puts "Workin' #{how_hard}"
26:   end
27: end

我们撇开前面两段配置Sidekiq的代码,直接看第21行的 include Sidekiq::Worker ,在加了这一行代码之后 PlainOldRuby 才会有 perform_asyncperform_inperform_at 这三个方法。我们来具体看下 perform_async 方法。

 1: def perform_async(*args)
 2:   client_push('class' => self, 'args' => args)
 3: end
 4: 
 5: def client_push(item) # :nodoc:
 6:   pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
 7:   hash = if Thread.current[:sidekiq_worker_set]
 8:     x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil
 9:     x.stringify_keys.merge(item.stringify_keys)
10:   else
11:     item.stringify_keys
12:   end
13:   Sidekiq::Client.new(pool).push(hash)
14: end

在执行 perform_async 的时候,实际上是将传入的参数打包后,再在 client_push 方法中,序列化成string后通过 Sidekiq::Clientpush 方法发送出去。我们再来看 Sidekiq::Client 中的 push 方法。

 1: ##
 2: # The main method used to push a job to Redis.  Accepts a number of options:
 3: #
 4: #   queue - the named queue to use, default 'default'
 5: #   class - the worker class to call, required
 6: #   args - an array of simple arguments to the perform method, must be JSON-serializable
 7: #   retry - whether to retry this job if it fails, default true or an integer number of retries
 8: #   backtrace - whether to save any error backtrace, default false
 9: #
10: # All options must be strings, not symbols.  NB: because we are serializing to JSON, all
11: # symbols in 'args' will be converted to strings.  Note that +backtrace: true+ can take quite a bit of
12: # space in Redis; a large volume of failing jobs can start Redis swapping if you aren't careful.
13: #
14: # Returns a unique Job ID.  If middleware stops the job, nil will be returned instead.
15: #
16: # Example:
17: #   push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
18: #
19: def push(item)
20:   normed = normalize_item(item)
21:   payload = process_single(item['class'], normed)
22: 
23:   if payload
24:     raw_push([payload])
25:     payload['jid']
26:   end
27: end
28: 
29: def raw_push(payloads)
30:   @redis_pool.with do |conn|
31:     conn.multi do
32:       atomic_push(conn, payloads)
33:     end
34:   end
35:   true
36: end
37: 
38: def atomic_push(conn, payloads)
39:   if payloads.first['at']
40:     conn.zadd('schedule'.freeze, payloads.map do |hash|
41:       at = hash.delete('at'.freeze).to_s
42:       [at, Sidekiq.dump_json(hash)]
43:     end)
44:   else
45:     q = payloads.first['queue']
46:     now = Time.now.to_f
47:     to_push = payloads.map do |entry|
48:       entry['enqueued_at'.freeze] = now
49:       Sidekiq.dump_json(entry)
50:     end
51:     conn.sadd('queues'.freeze, q)
52:     conn.lpush("queue:#{q}", to_push)
53:   end
54: end
55: 
56: def process_single(worker_class, item)
57:   queue = item['queue']
58: 
59:   middleware.invoke(worker_class, item, queue, @redis_pool) do
60:     item
61:   end
62: end
63: 
64: ##
65: # Define client-side middleware:
66: #
67: #   client = Sidekiq::Client.new
68: #   client.middleware do |chain|
69: #     chain.use MyClientMiddleware
70: #   end
71: #   client.push('class' => 'SomeWorker', 'args' => [1,2,3])
72: #
73: # All client instances default to the globally-defined
74: # Sidekiq.client_middleware but you can change as necessary.
75: #
76: def middleware(&block)
77:   @chain ||= Sidekiq.client_middleware
78:   if block_given?
79:     @chain = @chain.dup
80:     yield @chain
81:   end
82:   @chain
83: end

这里首先读一下注释。然后来看 process_single ,这个方法里面作重要的莫过于 middleware.invoke 这里的代码。 middleware 是什么?如果你有node.js的express或者别的框架的开发经验的话,或者直接从字面意思上来看的话可能就会马上理解。 push 方法中有判断 payload 是否为空的代码,如果为空那么这个任务也就不执行了。而 payload 是否为空就是取决于一系列的 Middleware 的作用。我们再来看看 raw_push 方法。这个方法从 pool 中获取了一个连接,并且打开了一个redis的multi命令的上下文,然后交给了 atomic_push 方法。在 atomic_push 方法则将提交过来的任务分成两类,一种是简单的异步任务,它是由调用 perform_async 产生的,一种是定时任务,它是由调用 perform_in 或者 perform_at 产生的,分别用不同的逻辑添加到redis中,这里就不再分析了。至此Sidekiq中提交任务到redis中的过程分析结束。

现在任务已经添加到redis中,那么谁来又是在什么时候执行这些任务的呢?

我们首先来看一下 Sidekiq::Manager 类。我这里只摘出 initializestart 方法。

 1: def initialize(options={})
 2:   logger.debug { options.inspect }
 3:   @options = options
 4:   @count = options[:concurrency] || 25
 5:   raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
 6: 
 7:   @done = false
 8:   @workers = Set.new
 9:   @count.times do
10:     @workers << Processor.new(self)
11:   end
12:   @plock = Mutex.new
13: end
14: 
15: def start
16:   @workers.each do |x|
17:     x.start
18:   end
19: end

start 方法其实就是遍历 @wrokers 中的 Sidekiq::Processor 实例,调用其 start 方法。我们来看看 Sidekiq::Processor 中的部分代码。我这里只摘出了 initialize 方法。

1: def initialize(mgr)
2:   @mgr = mgr
3:   @down = false
4:   @done = false
5:   @job = nil
6:   @thread = nil
7:   @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
8: end

Processor 启动之后,也就是调用了 start 方法之后, Processor 会通过@strategy的 retrieve_work 方法从redis中获取任务。再来看一下 Sidekiq::BasicFetch 类中的 retrieve_work 方法。

1: def retrieve_work
2:   work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
3:   UnitOfWork.new(*work) if work
4: end

可以看到这里调用了redis的 brpop 命令。这是我第一次接触这个命令,说实话虽然用redis也有一段时间了,但是这个BRPOP命令是第一次看见。所以我就去查了下官方文档。

BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.

BRPOP就是RPOP的阻塞版本,当list为空的时候,这个命令就一直阻塞直到超时。当我读完BRPOP的文档之后,我对Sidekiq的原理也基本明了了。

Comments

comments powered by Disqus