Iteration comes with a special wrapper enumerator that allows you to throttle iterations based on external signal (e.g. database health).
Consider this example:
class InactiveAccountDeleteJob < ActiveJob::Base
include JobIteration::Iteration
def build_enumerator(_params, cursor:)
enumerator_builder.active_record_on_batches(
Account.inactive,
cursor: cursor
)
end
def each_iteration(batch, _params)
Account.where(id: batch.map(&:id)).delete_all
end
endFor an app that keeps track of customer accounts, it's typical to purge old data that's no longer relevant for storage.
At the same time, if you've got a lot of DB writes to perform, this can cause extra load on the database and slow down other parts of your service.
You can change build_enumerator to wrap enumeration on DB rows into a throttle enumerator, which takes signal as a proc and enqueues the job for later in case the proc returned true.
def build_enumerator(_params, cursor:)
enumerator_builder.build_throttle_enumerator(
enumerator_builder.active_record_on_batches(
Account.inactive,
cursor: cursor
),
throttle_on: -> { DatabaseStatus.unhealthy? },
backoff: 30.seconds
)
endIf you want to apply throttling on all jobs, you can subclass your own EnumeratorBuilder and override the default
enumerator builder. The builder always wraps the returned enumerators from build_enumerator
class MyOwnBuilder < JobIteration::EnumeratorBuilder
class Wrapper < Enumerator
class << self
def wrap(_builder, enum)
ThrottleEnumerator.new(
enum,
nil,
throttle_on: -> { DatabaseStatus.unhealthy? },
backoff: 30.seconds
)
end
end
end
end
JobIteration.enumerator_builder = MyOwnBuilderNote that it's up to you to implement DatabaseStatus.unhealthy? that works for your database choice. At Shopify, a helper like DatabaseStatus checks the following MySQL metrics:
- Replication lag across all regions
- DB threads
- DB is available for writes (otherwise indicates a failover happening)
- Semian open circuits