You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
115 lines
2.4 KiB
115 lines
2.4 KiB
# frozen_string_literal: true |
|
|
|
require_relative 'connection_pool/shared_connection_pool' |
|
|
|
class RequestPool |
|
def self.current |
|
@current ||= RequestPool.new |
|
end |
|
|
|
class Reaper |
|
attr_reader :pool, :frequency |
|
|
|
def initialize(pool, frequency) |
|
@pool = pool |
|
@frequency = frequency |
|
end |
|
|
|
def run |
|
return unless frequency&.positive? |
|
|
|
Thread.new(frequency, pool) do |t, p| |
|
loop do |
|
sleep t |
|
p.flush |
|
end |
|
end |
|
end |
|
end |
|
|
|
MAX_IDLE_TIME = 30 |
|
MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i |
|
REAPER_FREQUENCY = 30 |
|
WAIT_TIMEOUT = 5 |
|
|
|
class Connection |
|
attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh |
|
|
|
def initialize(site) |
|
@site = site |
|
@http_client = http_client |
|
@last_used_at = nil |
|
@created_at = current_time |
|
@dead = false |
|
@fresh = true |
|
end |
|
|
|
def use |
|
@last_used_at = current_time |
|
@in_use = true |
|
|
|
retries = 0 |
|
|
|
begin |
|
yield @http_client |
|
rescue HTTP::ConnectionError |
|
# It's possible the connection was closed, so let's |
|
# try re-opening it once |
|
|
|
close |
|
|
|
if @fresh || retries.positive? |
|
raise |
|
else |
|
@http_client = http_client |
|
retries += 1 |
|
retry |
|
end |
|
rescue |
|
# If this connection raises errors of any kind, it's |
|
# better if it gets reaped as soon as possible |
|
|
|
close |
|
@dead = true |
|
raise |
|
end |
|
ensure |
|
@fresh = false |
|
@in_use = false |
|
end |
|
|
|
def seconds_idle |
|
current_time - (@last_used_at || @created_at) |
|
end |
|
|
|
def close |
|
@http_client.close |
|
end |
|
|
|
private |
|
|
|
def http_client |
|
Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME) |
|
end |
|
|
|
def current_time |
|
Process.clock_gettime(Process::CLOCK_MONOTONIC) |
|
end |
|
end |
|
|
|
def initialize |
|
@pool = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) } |
|
@reaper = Reaper.new(self, REAPER_FREQUENCY) |
|
@reaper.run |
|
end |
|
|
|
def with(site, &block) |
|
@pool.with(site) do |connection| |
|
ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do |
|
connection.use(&block) |
|
end |
|
end |
|
end |
|
|
|
delegate :size, :flush, to: :@pool |
|
end
|
|
|