Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.4.0
- Refactor: start using scheduler mixin [#134](https://github.com/logstash-plugins/logstash-input-http_poller/pull/134)

## 5.3.1
- Fix: make sure plugin is closing the http client [#130](https://github.com/logstash-plugins/logstash-input-http_poller/pull/130)

Expand Down
26 changes: 11 additions & 15 deletions lib/logstash/inputs/http_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
require "rufus/scheduler"
require "logstash/plugin_mixins/ecs_compatibility_support"
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/scheduler'

class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
Expand All @@ -18,6 +18,8 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::Scheduler

config_name "http_poller"

default :codec, "json"
Expand Down Expand Up @@ -45,7 +47,6 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
config :metadata_target, :validate => :string, :default => '@metadata'

public
Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)

Expand All @@ -55,15 +56,15 @@ def register

# @overload
def stop
shutdown_scheduler_and_close_client(:wait)
close_client
end

# @overload
def close
shutdown_scheduler_and_close_client
close_client
end

def shutdown_scheduler_and_close_client(opt = nil)
def close_client
@logger.debug("closing http client", client: client)
begin
client.close # since Manticore 0.9.0 this shuts-down/closes all resources
Expand All @@ -72,12 +73,8 @@ def shutdown_scheduler_and_close_client(opt = nil)
details[:backtrace] = e.backtrace if @logger.debug?
@logger.warn "failed closing http client", details
end
if @scheduler
@logger.debug("shutting down scheduler", scheduler: @scheduler)
@scheduler.shutdown(opt) # on newer Rufus (3.8) this joins on the scheduler thread
end
end
private :shutdown_scheduler_and_close_client
private :close_client

private
def setup_requests!
Expand Down Expand Up @@ -185,12 +182,11 @@ def setup_schedule(queue)
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
schedule_type = @schedule.keys.first
schedule_value = @schedule[schedule_type]
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
raise LogStash::ConfigurationError, msg_invalid_schedule unless %w(cron every at in).include?(schedule_type)

@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
@scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join
opts = schedule_type == "every" ? { first_in: 0.01 } : {}
scheduler.public_send(schedule_type, schedule_value, opts) { run_once(queue) }
scheduler.join
end

def run_once(queue)
Expand Down
4 changes: 2 additions & 2 deletions logstash-input-http_poller.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-http_poller'
s.version = '5.3.1'
s.version = '5.4.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Decodes the output of an HTTP API into events"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,7 +21,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency "logstash-mixin-http_client", ">= 7.2.0"
s.add_runtime_dependency 'rufus-scheduler', ">= 3.0.9"
s.add_runtime_dependency 'logstash-mixin-scheduler', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3'
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0', '>= 1.0.1'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
Expand Down
9 changes: 0 additions & 9 deletions spec/inputs/http_poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@
require 'rspec/matchers/built_in/raise_error.rb'
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'

begin
# TODO: CI work-around - will most likely be moved to the scheduler mixin
require 'et-orbi.rb' # a dependency of rufus-scheduler since 3.4
::EtOrbi::EoTime.now # might take a long time to initialize - loading time zone
# data (from tz-info) and thus gets un-predictable on CI, since the scheduler worker
# thread might be stuck starting while we attempt to shutdown in a given time frame
rescue LoadError
end

describe LogStash::Inputs::HTTP_Poller do
let(:metadata_target) { "_http_poller_metadata" }
let(:queue) { Queue.new }
Expand Down