Object
MIN_WORK_THREADS = 3
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 315 def self.h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end
# File lib/rufus/scheduler.rb, line 75 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" lock || return start end
# File lib/rufus/scheduler/util.rb, line 34 def self.parse(o) opts = { :no_error => true } parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || raise(ArgumentError.new("couldn't parse \"#{o}\"")) end
# File lib/rufus/scheduler/util.rb, line 51 def self.parse_at(o, opts={}) return o if o.is_a?(Time) tz = nil s = o.to_s.gsub(TZ_REGEX) { |m| t = TZInfo::Timezone.get(m) rescue nil tz ||= t t ? '' : m } begin DateTime.parse(o) rescue raise ArgumentError, "no time information in #{o.inspect}" end if RUBY_VERSION < '1.9.0' t = Time.parse(s) t = tz.local_to_utc(t) if tz t rescue StandardError => se return nil if opts[:no_error] raise se end
# File lib/rufus/scheduler/util.rb, line 81 def self.parse_cron(o, opts) CronLine.new(o) rescue ArgumentError => ae return nil if opts[:no_error] raise ae end
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year ‘nada’ -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 148 def self.parse_duration(string, opts={}) string = string.to_s return 0.0 if string == '' m = string.match(/^(-?)([\d\.#{DURATION_LETTERS}]+)$/) return nil if m.nil? && opts[:no_error] raise ArgumentError.new("cannot parse '#{string}'") if m.nil? mod = m[1] == '-' ? -1.0 : 1.0 val = 0.0 s = m[2] while s.length > 0 m = nil if m = s.match(/^(\d+|\d+\.\d*|\d*\.\d+)([#{DURATION_LETTERS}])(.*)$/) val += m[1].to_f * DURATIONS[m[2]] elsif s.match(/^\d+$/) val += s.to_i elsif s.match(/^\d*\.\d*$/) val += s.to_f elsif opts[:no_error] return nil else raise ArgumentError.new( "cannot parse '#{string}' (especially '#{s}')" ) end break unless m && m[3] s = m[3] end mod * val end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler/util.rb, line 44 def self.parse_in(o, opts={}) o.is_a?(String) ? parse_duration(o, opts) : o end
# File lib/rufus/scheduler/util.rb, line 91 def self.parse_to_time(o) t = o t = parse(t) if t.is_a?(String) t = Time.now + t if t.is_a?(Numeric) raise ArgumentError.new( "cannot turn #{o.inspect} to a point in time, doesn't make sense" ) unless t.is_a?(Time) t end
Alias for Rufus::Scheduler.singleton
# File lib/rufus/scheduler.rb, line 110 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler instance
# File lib/rufus/scheduler.rb, line 103 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let’s assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 118 def self.start_new fail "this is rufus-scheduler 3.0, use .new instead of .start_new" end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without ‘marker’
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 227 def self.to_duration(seconds, options={}) h = to_duration_hash(seconds, options) return (options[:drop_seconds] ? '0m' : '0s') if h.empty? s = DU_KEYS.inject('') { |r, key| count = h[key] count = nil if count == 0 r << "#{count}#{key}" if count r } ms = h[:ms] s << ms.to_s if ms s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :ms => "51" } Rufus.to_duration_hash 7.051 # => { :s => 7, :ms => "51" } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1, :ms => "120" }
This method is used by to_duration behind the scenes.
Options are :
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 273 def self.to_duration_hash(seconds, options={}) h = {} if seconds.is_a?(Float) h[:ms] = (seconds % 1 * 1000).to_i seconds = seconds.to_i end if options[:drop_seconds] h.delete(:ms) seconds = (seconds - seconds % 60) end durations = options[:months] ? DURATIONS2M : DURATIONS2 durations.each do |key, duration| count = seconds / duration seconds = seconds % duration h[key.to_sym] = count if count > 0 end h end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler.rb, line 187 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 292 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
# File lib/rufus/scheduler.rb, line 227 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 312 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 207 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 302 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 197 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 297 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 217 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 307 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 317 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 274 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 154 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end
# File lib/rufus/scheduler.rb, line 373 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, j)| t } else h end end
# File lib/rufus/scheduler.rb, line 396 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} lockfile: #{@lockfile.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 248 def repeat(arg, callable=nil, opts={}, &block) # TODO: eventually, spare one parse call case Scheduler.parse(arg) when CronLine then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 178 def resume @paused = false end
# File lib/rufus/scheduler.rb, line 368 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 237 def schedule(arg, callable=nil, opts={}, &block) # TODO: eventually, spare one parse call case Scheduler.parse(arg) when CronLine then schedule_cron(arg, callable, opts, &block) when Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 192 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 232 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 212 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 202 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 222 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 327 def scheduled?(job_or_job_id) job, job_id = fetch(job_or_job_id) !! (job && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 123 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 336 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 391 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
# File lib/rufus/scheduler.rb, line 258 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 144 def uptime @started_at ? Time.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 149 def uptime_s self.class.to_duration(uptime) end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
:all (default), returns all the threads that are work threads or are currently running a job
:active, returns all threads that are currenly running a job
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 354 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
# File lib/rufus/scheduler.rb, line 603 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at == nil callable, opts = nil, callable if callable.is_a?(Hash) return_job_instance ||= opts[:job] job_class = case job_type when :once tt = Rufus::Scheduler.parse(t) tt.is_a?(Time) ? AtJob : InJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) raise ArgumentError.new( "job frequency (#{job.frequency}) is higher than " + "scheduler frequency (#{@frequency})" ) if job.respond_to?(:frequency) && job.frequency < @frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 457 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 528 def join_all_work_threads work_threads.size.times { @work_queue << :sayonara } work_threads.each { |t| t.join } @work_queue.clear end
# File lib/rufus/scheduler.rb, line 537 def kill_all_work_threads work_threads.each { |t| t.kill } end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => ‘path/to/lock/file’ scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (to first to write the lockfile and lock it). It uses “man 2 flock” so it probably won’t work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, providing overriding implementation for this lock and the unlock complement is easy.
# File lib/rufus/scheduler.rb, line 484 def lock @lockfile = nil return true unless f = @opts[:lockfile] raise ArgumentError.new( ":lockfile argument must be a string, not a #{f.class}" ) unless f.is_a?(String) FileUtils.mkdir_p(File.dirname(f)) f = File.new(f, File::RDWR | File::CREAT) locked = f.flock(File::LOCK_NB | File::LOCK_EX) return false unless locked now = Time.now f.print("pid: #{$$}, ") f.print("scheduler.object_id: #{self.object_id}, ") f.print("time: #{now}, ") f.print("timestamp: #{now.to_f}") f.flush @lockfile = f true end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 547 def start @started_at = Time.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused timeout_jobs sleep(@frequency) end end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 521 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 584 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] next unless job && to # thread might just have become inactive (job -> nil) ts = t[:rufus_scheduler_time] to = to.is_a?(Time) ? to : ts + to next if to > Time.now t.raise(Rufus::Scheduler::TimeoutError) end end
# File lib/rufus/scheduler.rb, line 574 def trigger_jobs now = Time.now @jobs.each(now) do |job| job.trigger(now) end end
Sister method to lock, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 516 def unlock @lockfile.flock(File::LOCK_UN) if @lockfile end
Generated with the Darkfish Rdoc Generator 2.