Sidekiq原理
第一次接触Sidekiq是在前年吧。那时候,为了帮朋友定制一个论坛系统,我选了discourse,然而终究因为它不支持IE8,最后我们放弃了,这是后话。但是这个过程中,我重新拾起了荒废多年的ruby(我好像从来都没有正经学习过ruby),同时我也接触到了一个ruby的后台任务框架——Sidekiq(Simple, efficient background processing for Ruby.)。我那时就是简单的看了看文档和配置,觉得能用就行了。后来,因为自己想着什么时候用scala搞一个后台任务框架,就想什么时候看看Sidekiq的源代码。结果这一念想,过了一年多才得以实现。
首先,我们从Sidekiq的源代码中examples目录下的por.rb(plain old ruby)这个例子。
1: require 'sidekiq' 2: 3: # If your client is single-threaded, we just need a single connection in our Redis connection pool 4: Sidekiq.configure_client do |config| 5: config.redis = { :namespace => 'x', :size => 1 } 6: end 7: 8: # Sidekiq server is multi-threaded so our Redis connection pool size defaults to concurrency (-c) 9: Sidekiq.configure_server do |config| 10: config.redis = { :namespace => 'x' } 11: end 12: 13: # Start up sidekiq via 14: # ./bin/sidekiq -r ./examples/por.rb 15: # and then you can open up an IRB session like so: 16: # irb -r ./examples/por.rb 17: # where you can then say 18: # PlainOldRuby.perform_async "like a dog", 3 19: # 20: class PlainOldRuby 21: include Sidekiq::Worker 22: 23: def perform(how_hard="super hard", how_long=1) 24: sleep how_long 25: puts "Workin' #{how_hard}" 26: end 27: end
我们撇开前面两段配置Sidekiq的代码,直接看第21行的 include Sidekiq::Worker
,在加了这一行代码之后 PlainOldRuby
才会有 perform_async
, perform_in
, perform_at
这三个方法。我们来具体看下 perform_async
方法。
1: def perform_async(*args) 2: client_push('class' => self, 'args' => args) 3: end 4: 5: def client_push(item) # :nodoc: 6: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool 7: hash = if Thread.current[:sidekiq_worker_set] 8: x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil 9: x.stringify_keys.merge(item.stringify_keys) 10: else 11: item.stringify_keys 12: end 13: Sidekiq::Client.new(pool).push(hash) 14: end
在执行 perform_async
的时候,实际上是将传入的参数打包后,再在 client_push
方法中,序列化成string后通过 Sidekiq::Client
的 push
方法发送出去。我们再来看 Sidekiq::Client
中的 push
方法。
1: ## 2: # The main method used to push a job to Redis. Accepts a number of options: 3: # 4: # queue - the named queue to use, default 'default' 5: # class - the worker class to call, required 6: # args - an array of simple arguments to the perform method, must be JSON-serializable 7: # retry - whether to retry this job if it fails, default true or an integer number of retries 8: # backtrace - whether to save any error backtrace, default false 9: # 10: # All options must be strings, not symbols. NB: because we are serializing to JSON, all 11: # symbols in 'args' will be converted to strings. Note that +backtrace: true+ can take quite a bit of 12: # space in Redis; a large volume of failing jobs can start Redis swapping if you aren't careful. 13: # 14: # Returns a unique Job ID. If middleware stops the job, nil will be returned instead. 15: # 16: # Example: 17: # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) 18: # 19: def push(item) 20: normed = normalize_item(item) 21: payload = process_single(item['class'], normed) 22: 23: if payload 24: raw_push([payload]) 25: payload['jid'] 26: end 27: end 28: 29: def raw_push(payloads) 30: @redis_pool.with do |conn| 31: conn.multi do 32: atomic_push(conn, payloads) 33: end 34: end 35: true 36: end 37: 38: def atomic_push(conn, payloads) 39: if payloads.first['at'] 40: conn.zadd('schedule'.freeze, payloads.map do |hash| 41: at = hash.delete('at'.freeze).to_s 42: [at, Sidekiq.dump_json(hash)] 43: end) 44: else 45: q = payloads.first['queue'] 46: now = Time.now.to_f 47: to_push = payloads.map do |entry| 48: entry['enqueued_at'.freeze] = now 49: Sidekiq.dump_json(entry) 50: end 51: conn.sadd('queues'.freeze, q) 52: conn.lpush("queue:#{q}", to_push) 53: end 54: end 55: 56: def process_single(worker_class, item) 57: queue = item['queue'] 58: 59: middleware.invoke(worker_class, item, queue, @redis_pool) do 60: item 61: end 62: end 63: 64: ## 65: # Define client-side middleware: 66: # 67: # client = Sidekiq::Client.new 68: # client.middleware do |chain| 69: # chain.use MyClientMiddleware 70: # end 71: # client.push('class' => 'SomeWorker', 'args' => [1,2,3]) 72: # 73: # All client instances default to the globally-defined 74: # Sidekiq.client_middleware but you can change as necessary. 75: # 76: def middleware(&block) 77: @chain ||= Sidekiq.client_middleware 78: if block_given? 79: @chain = @chain.dup 80: yield @chain 81: end 82: @chain 83: end
这里首先读一下注释。然后来看 process_single
,这个方法里面作重要的莫过于 middleware.invoke
这里的代码。 middleware
是什么?如果你有node.js的express或者别的框架的开发经验的话,或者直接从字面意思上来看的话可能就会马上理解。 push
方法中有判断 payload
是否为空的代码,如果为空那么这个任务也就不执行了。而 payload
是否为空就是取决于一系列的 Middleware
的作用。我们再来看看 raw_push
方法。这个方法从 pool
中获取了一个连接,并且打开了一个redis的multi命令的上下文,然后交给了 atomic_push
方法。在 atomic_push
方法则将提交过来的任务分成两类,一种是简单的异步任务,它是由调用 perform_async
产生的,一种是定时任务,它是由调用 perform_in
或者 perform_at
产生的,分别用不同的逻辑添加到redis中,这里就不再分析了。至此Sidekiq中提交任务到redis中的过程分析结束。
现在任务已经添加到redis中,那么谁来又是在什么时候执行这些任务的呢?
我们首先来看一下 Sidekiq::Manager
类。我这里只摘出 initialize
和 start
方法。
1: def initialize(options={}) 2: logger.debug { options.inspect } 3: @options = options 4: @count = options[:concurrency] || 25 5: raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 6: 7: @done = false 8: @workers = Set.new 9: @count.times do 10: @workers << Processor.new(self) 11: end 12: @plock = Mutex.new 13: end 14: 15: def start 16: @workers.each do |x| 17: x.start 18: end 19: end
start
方法其实就是遍历 @wrokers
中的 Sidekiq::Processor
实例,调用其 start
方法。我们来看看 Sidekiq::Processor
中的部分代码。我这里只摘出了 initialize
方法。
1: def initialize(mgr) 2: @mgr = mgr 3: @down = false 4: @done = false 5: @job = nil 6: @thread = nil 7: @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) 8: end
在 Processor
启动之后,也就是调用了 start
方法之后, Processor
会通过@strategy的 retrieve_work
方法从redis中获取任务。再来看一下 Sidekiq::BasicFetch
类中的 retrieve_work
方法。
1: def retrieve_work 2: work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } 3: UnitOfWork.new(*work) if work 4: end
可以看到这里调用了redis的 brpop
命令。这是我第一次接触这个命令,说实话虽然用redis也有一段时间了,但是这个BRPOP命令是第一次看见。所以我就去查了下官方文档。
BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.
BRPOP就是RPOP的阻塞版本,当list为空的时候,这个命令就一直阻塞直到超时。当我读完BRPOP的文档之后,我对Sidekiq的原理也基本明了了。