Class Delayed::Worker
In: lib/delayed/worker.rb
Parent: Object

Methods

Attributes

name_prefix  [RW]  name_prefix is ignored if name is set directly

Public Class methods

[Source]

    # File lib/delayed/worker.rb, line 32
32:     def self.backend=(backend)
33:       if backend.is_a? Symbol
34:         require "delayed/serialization/#{backend}"
35:         require "delayed/backend/#{backend}"
36:         backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
37:       end
38:       @@backend = backend
39:       silence_warnings { ::Delayed.const_set(:Job, backend) }
40:     end

[Source]

    # File lib/delayed/worker.rb, line 42
42:     def self.guess_backend
43:       self.backend ||= :active_record if defined?(ActiveRecord)
44:     end

[Source]

    # File lib/delayed/worker.rb, line 46
46:     def initialize(options={})
47:       @quiet = options.has_key?(:quiet) ? options[:quiet] : true
48:       self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
49:       self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
50:       self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
51:     end

Public Instance methods

[Source]

     # File lib/delayed/worker.rb, line 147
147:     def failed(job)
148:       job.hook(:failure)
149:       if job.respond_to?(:on_permanent_failure)
150:         warn "[DEPRECATION] The #on_permanent_failure hook has been renamed to #failure."
151:       end
152:       self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
153:     end

[Source]

     # File lib/delayed/worker.rb, line 161
161:     def max_attempts(job)
162:       job.max_attempts || self.class.max_attempts
163:     end

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

[Source]

    # File lib/delayed/worker.rb, line 57
57:     def name
58:       return @name unless @name.nil?
59:       "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
60:     end

Sets the name of the worker. Setting the name to nil will reset the default worker name

[Source]

    # File lib/delayed/worker.rb, line 64
64:     def name=(val)
65:       @name = val
66:     end

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

[Source]

     # File lib/delayed/worker.rb, line 135
135:     def reschedule(job, time = nil)
136:       if (job.attempts += 1) < max_attempts(job)
137:         time ||= job.reschedule_at
138:         job.run_at = time
139:         job.unlock
140:         job.save!
141:       else
142:         say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
143:         failed(job)
144:       end
145:     end

[Source]

     # File lib/delayed/worker.rb, line 118
118:     def run(job)
119:       runtime =  Benchmark.realtime do
120:         Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
121:         job.destroy
122:       end
123:       say "#{job.name} completed after %.4f" % runtime
124:       return true  # did work
125:     rescue DeserializationError => error
126:       job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}"
127:       failed(job)
128:     rescue Exception => error
129:       handle_failed_job(job, error)
130:       return false  # work failed
131:     end

[Source]

     # File lib/delayed/worker.rb, line 155
155:     def say(text, level = Logger::INFO)
156:       text = "[Worker(#{name})] #{text}"
157:       puts text unless @quiet
158:       logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
159:     end

[Source]

    # File lib/delayed/worker.rb, line 68
68:     def start
69:       say "Starting job worker"
70: 
71:       trap('TERM') { say 'Exiting...'; $exit = true }
72:       trap('INT')  { say 'Exiting...'; $exit = true }
73: 
74:       loop do
75:         result = nil
76: 
77:         realtime = Benchmark.realtime do
78:           result = work_off
79:         end
80: 
81:         count = result.sum
82: 
83:         break if $exit
84: 
85:         if count.zero?
86:           sleep(self.class.sleep_delay)
87:         else
88:           say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
89:         end
90: 
91:         break if $exit
92:       end
93: 
94:     ensure
95:       Delayed::Job.clear_locks!(name)
96:     end

Do num jobs and return stats on success/failure. Exit early if interrupted.

[Source]

     # File lib/delayed/worker.rb, line 100
100:     def work_off(num = 100)
101:       success, failure = 0, 0
102: 
103:       num.times do
104:         case reserve_and_run_one_job
105:         when true
106:             success += 1
107:         when false
108:             failure += 1
109:         else
110:           break  # leave if no work could be done
111:         end
112:         break if $exit # leave if we're exiting
113:       end
114: 
115:       return [success, failure]
116:     end

Protected Instance methods

[Source]

     # File lib/delayed/worker.rb, line 167
167:     def handle_failed_job(job, error)
168:       job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}"
169:       say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
170:       reschedule(job)
171:     end

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

[Source]

     # File lib/delayed/worker.rb, line 175
175:     def reserve_and_run_one_job
176:       job = Delayed::Job.reserve(self)
177:       run(job) if job
178:     end

[Validate]