You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
4.5 KiB
131 lines
4.5 KiB
# frozen_string_literal: true |
|
|
|
class Scheduler::AccountsStatusesCleanupScheduler |
|
include Sidekiq::Worker |
|
include Redisable |
|
|
|
# This limit is mostly to be nice to the fediverse at large and not |
|
# generate too much traffic. |
|
# This also helps limiting the running time of the scheduler itself. |
|
MAX_BUDGET = 300 |
|
|
|
# This is an attempt to spread the load across remote servers, as |
|
# spreading deletions across diverse accounts is likely to spread |
|
# the deletion across diverse followers. It also helps each individual |
|
# user see some effect sooner. |
|
PER_ACCOUNT_BUDGET = 5 |
|
|
|
# This is an attempt to limit the workload generated by status removal |
|
# jobs to something the particular server can handle. |
|
PER_THREAD_BUDGET = 5 |
|
|
|
# These are latency limits on various queues above which a server is |
|
# considered to be under load, causing the auto-deletion to be entirely |
|
# skipped for that run. |
|
LOAD_LATENCY_THRESHOLDS = { |
|
default: 5, |
|
push: 10, |
|
# The `pull` queue has lower priority jobs, and it's unlikely that |
|
# pushing deletes would cause much issues with this queue if it didn't |
|
# cause issues with `default` and `push`. Yet, do not enqueue deletes |
|
# if the instance is lagging behind too much. |
|
pull: 5.minutes.to_i, |
|
}.freeze |
|
|
|
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i |
|
|
|
def perform |
|
return if under_load? |
|
|
|
budget = compute_budget |
|
|
|
# If the budget allows it, we want to consider all accounts with enabled |
|
# auto cleanup at least once. |
|
# |
|
# We start from `first_policy_id` (the last processed id in the previous |
|
# run) and process each policy until we loop to `first_policy_id`, |
|
# recording into `affected_policies` any policy that caused posts to be |
|
# deleted. |
|
# |
|
# After that, we set `full_iteration` to `false` and continue looping on |
|
# policies from `affected_policies`. |
|
first_policy_id = last_processed_id || 0 |
|
first_iteration = true |
|
full_iteration = true |
|
affected_policies = [] |
|
|
|
loop do |
|
num_processed_accounts = 0 |
|
|
|
scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) |
|
scope.find_each(order: :asc) do |policy| |
|
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) |
|
budget -= num_deleted |
|
|
|
unless num_deleted.zero? |
|
num_processed_accounts += 1 |
|
affected_policies << policy.id if full_iteration |
|
end |
|
|
|
full_iteration = false if !first_iteration && policy.id >= first_policy_id |
|
|
|
if budget.zero? |
|
save_last_processed_id(policy.id) |
|
break |
|
end |
|
end |
|
|
|
# The idea here is to loop through all policies at least once until the budget is exhausted |
|
# and start back after the last processed account otherwise |
|
break if budget.zero? || (num_processed_accounts.zero? && !full_iteration) |
|
|
|
full_iteration = false unless first_iteration |
|
first_iteration = false |
|
end |
|
end |
|
|
|
def compute_budget |
|
# Each post deletion is a `RemovalWorker` job (on `default` queue), each |
|
# potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue). |
|
threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum |
|
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min |
|
end |
|
|
|
def under_load? |
|
LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) } |
|
end |
|
|
|
private |
|
|
|
def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) |
|
scope = AccountStatusesCleanupPolicy.where(enabled: true) |
|
|
|
if full_iteration |
|
# If we are doing a full iteration, examine all policies we have not examined yet |
|
if first_iteration |
|
scope.where(id: first_policy_id...) |
|
else |
|
scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies)) |
|
end |
|
else |
|
# Otherwise, examine only policies that previously yielded posts to delete |
|
scope.where(id: affected_policies) |
|
end |
|
end |
|
|
|
def queue_under_load?(name, max_latency) |
|
Sidekiq::Queue.new(name).latency > max_latency |
|
end |
|
|
|
def last_processed_id |
|
redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i |
|
end |
|
|
|
def save_last_processed_id(id) |
|
if id.nil? |
|
redis.del('account_statuses_cleanup_scheduler:last_policy_id') |
|
else |
|
redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds) |
|
end |
|
end |
|
end
|
|
|