Class | Delayed::Worker |
In: |
lib/delayed/worker.rb
|
Parent: | Object |
DEFAULT_SLEEP_DELAY | = | 5 |
DEFAULT_MAX_ATTEMPTS | = | 25 |
DEFAULT_MAX_RUN_TIME | = | 4.hours |
DEFAULT_DEFAULT_PRIORITY | = | 0 |
DEFAULT_DELAY_JOBS | = | true |
DEFAULT_QUEUES | = | [] |
DEFAULT_READ_AHEAD | = | 5 |
name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 81 81: def self.after_fork 82: # Re-open file handles 83: @files_to_reopen.each do |file| 84: begin 85: file.reopen file.path, "a+" 86: file.sync = true 87: rescue ::Exception 88: end 89: end 90: 91: backend.after_fork 92: end
# File lib/delayed/worker.rb, line 56 56: def self.backend=(backend) 57: if backend.is_a? Symbol 58: require "delayed/serialization/#{backend}" 59: require "delayed/backend/#{backend}" 60: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize 61: end 62: @@backend = backend 63: silence_warnings { ::Delayed.const_set(:Job, backend) } 64: end
# File lib/delayed/worker.rb, line 70 70: def self.before_fork 71: unless @files_to_reopen 72: @files_to_reopen = [] 73: ObjectSpace.each_object(File) do |file| 74: @files_to_reopen << file unless file.closed? 75: end 76: end 77: 78: backend.before_fork 79: end
# File lib/delayed/worker.rb, line 66 66: def self.guess_backend 67: warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code." 68: end
# File lib/delayed/worker.rb, line 94 94: def self.lifecycle 95: @lifecycle ||= Delayed::Lifecycle.new 96: end
# File lib/delayed/worker.rb, line 98 98: def initialize(options={}) 99: @quiet = options.has_key?(:quiet) ? options[:quiet] : true 100: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) 101: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) 102: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) 103: self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead) 104: self.class.queues = options[:queues] if options.has_key?(:queues) 105: 106: self.plugins.each { |klass| klass.new } 107: end
# File lib/delayed/worker.rb, line 31 31: def self.reset 32: self.sleep_delay = DEFAULT_SLEEP_DELAY 33: self.max_attempts = DEFAULT_MAX_ATTEMPTS 34: self.max_run_time = DEFAULT_MAX_RUN_TIME 35: self.default_priority = DEFAULT_DEFAULT_PRIORITY 36: self.delay_jobs = DEFAULT_DELAY_JOBS 37: self.queues = DEFAULT_QUEUES 38: self.read_ahead = DEFAULT_READ_AHEAD 39: end
# File lib/delayed/worker.rb, line 212 212: def failed(job) 213: self.class.lifecycle.run_callbacks(:failure, self, job) do 214: job.hook(:failure) 215: self.class.destroy_failed_jobs ? job.destroy : job.fail! 216: end 217: end
# File lib/delayed/worker.rb, line 225 225: def max_attempts(job) 226: job.max_attempts || self.class.max_attempts 227: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 113 113: def name 114: return @name unless @name.nil? 115: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" 116: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 200 200: def reschedule(job, time = nil) 201: if (job.attempts += 1) < max_attempts(job) 202: time ||= job.reschedule_at 203: job.run_at = time 204: job.unlock 205: job.save! 206: else 207: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO 208: failed(job) 209: end 210: end
# File lib/delayed/worker.rb, line 183 183: def run(job) 184: runtime = Benchmark.realtime do 185: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } 186: job.destroy 187: end 188: say "#{job.name} completed after %.4f" % runtime 189: return true # did work 190: rescue DeserializationError => error 191: job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" 192: failed(job) 193: rescue Exception => error 194: self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) } 195: return false # work failed 196: end
# File lib/delayed/worker.rb, line 219 219: def say(text, level = Logger::INFO) 220: text = "[Worker(#{name})] #{text}" 221: puts text unless @quiet 222: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger 223: end
# File lib/delayed/worker.rb, line 124 124: def start 125: trap('TERM') { say 'Exiting...'; stop } 126: trap('INT') { say 'Exiting...'; stop } 127: 128: say "Starting job worker" 129: 130: self.class.lifecycle.run_callbacks(:execute, self) do 131: loop do 132: self.class.lifecycle.run_callbacks(:loop, self) do 133: result = nil 134: 135: realtime = Benchmark.realtime do 136: result = work_off 137: end 138: 139: count = result.sum 140: 141: break if stop? 142: 143: if count.zero? 144: sleep(self.class.sleep_delay) 145: else 146: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] 147: end 148: end 149: 150: break if stop? 151: end 152: end 153: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 165 165: def work_off(num = 100) 166: success, failure = 0, 0 167: 168: num.times do 169: case reserve_and_run_one_job 170: when true 171: success += 1 172: when false 173: failure += 1 174: else 175: break # leave if no work could be done 176: end 177: break if stop? # leave if we're exiting 178: end 179: 180: return [success, failure] 181: end
# File lib/delayed/worker.rb, line 231 231: def handle_failed_job(job, error) 232: job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" 233: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR 234: reschedule(job) 235: end