Skip to content
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
145980b
sum
kares Dec 28, 2021
5cc8bff
Test: make spec pass easy on CI
kares Dec 28, 2021
4f9e022
unpin rufus-scheduler ~> 3.0.9
kares Dec 28, 2021
a043181
bump + changelog
kares Dec 28, 2021
b6505a0
Temp: let's see if failure is due new rufus
kares Dec 29, 2021
bbfe024
Revert "Temp: let's see if failure is due new rufus"
kares Dec 29, 2021
b3d862e
Refactor: no @interval_thread or stud usage around
kares Dec 29, 2021
3efb809
Test: apply a lag due CI :red: against 8.1-SNAPSHOT
kares Dec 29, 2021
6c104fb
Test: move Timecop setup/return to before/after hooks
kares Dec 29, 2021
0607035
proper scope for allowed_lag override
kares Dec 29, 2021
b3ae84c
more lag
kares Dec 29, 2021
f88e40c
Refactor: to use graceful scheduler shutdown
kares Dec 29, 2021
dda1a80
10
kares Dec 29, 2021
7cfc654
Refactor: join on thread instead of queue (in 3.8)
kares Dec 30, 2021
6b70eb6
Temp: long lag
kares Jan 6, 2022
6c6cd17
try a few times
kares Jan 6, 2022
cc08287
Revert testing timeout increase (due newer base img)
kares Jan 17, 2022
9287c55
Merge branch 'main' into rufus
kares Feb 14, 2022
53388ac
Test: make sure test does not leak scheduler
kares Feb 14, 2022
e52b01c
Test: dry-out + do not leak scheduler
kares Feb 14, 2022
ae0d99c
allow more lag on scheduler to shut-down
kares Feb 14, 2022
d6074f7
set -Xms128m -Xmx1g -XX:MaxMetaspaceSize=256m
kares Feb 14, 2022
2dd8a3f
account for CI lag within sleep
kares Feb 14, 2022
901044e
let's bound test to ~1G heap+meta space
kares Feb 14, 2022
10ef111
Temp: lag up to 20
kares Feb 14, 2022
1ac1f5e
CI: test against 3.0.9 scheduler version, for now
kares Feb 14, 2022
8ba255e
LOG_LEVEL=info
kares Feb 14, 2022
4ded64e
-Djava.security.egd=/dev/./urandon
kares Feb 14, 2022
0141c35
Test: re-invent the plugin interrupt spec
kares Feb 14, 2022
9ed546d
dump ruby threads
kares Feb 14, 2022
f397826
puts plugin_thread.alive?
kares Feb 14, 2022
793801a
DRAFT: close http client completely!
kares Feb 14, 2022
f48bc6e
print threads in either case
kares Feb 14, 2022
4eebf47
Test: release every plugin we start!
kares Feb 14, 2022
a8dafb4
TEST: back to @scheduler.join for now
kares Feb 14, 2022
85d258b
Test: use less (pool thread) resources in tests
kares Feb 14, 2022
8ae1419
more tries
kares Feb 14, 2022
3c79dcc
sleep for a bit
kares Feb 14, 2022
58d6198
adjust scheduler specs for predictability
kares Feb 15, 2022
8b494fc
puts raw thread dump
kares Feb 15, 2022
d6f0e72
dangling around trying to JIT a block turn it OFF
kares Feb 15, 2022
ceae70b
print more thread dump details
kares Feb 15, 2022
8726a33
LOG_LEVEL=debug
kares Feb 15, 2022
d370769
back to join-ing on thread - makes sense!
kares Feb 15, 2022
f86e45c
avoid shutdown(:wait) as it's over engineered
kares Feb 15, 2022
1f1b77e
Revert "LOG_LEVEL=debug"
kares Feb 15, 2022
5f13700
Revert "print more thread dump details"
kares Feb 15, 2022
438d017
Revert "puts raw thread dump"
kares Feb 15, 2022
f630b2b
Revert "sleep for a bit"
kares Feb 15, 2022
18f09c9
Revert "print threads in either case"
kares Feb 15, 2022
e1af18b
Revert "puts plugin_thread.alive?"
kares Feb 15, 2022
9b1af8b
Revert "dump ruby threads"
kares Feb 15, 2022
d108f71
Revert "Test: re-invent the plugin interrupt spec"
kares Feb 15, 2022
d1e9320
undo docker.env overrides
kares Feb 15, 2022
6450158
Revert "DRAFT: close http client completely!"
kares Feb 15, 2022
e51ec6c
Test: spec close-ing as well
kares Feb 15, 2022
6d9df13
restore previous close/stop impl
kares Feb 15, 2022
6cc115f
do not override close - stop is called before
kares Feb 15, 2022
4252bcb
Update CHANGELOG.md
kares Feb 16, 2022
a38b589
Refactor: restore @scheduler.stop if @scheduler
kares Feb 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
import:
- logstash-plugins/.ci:travis/[email protected]
- logstash-plugins/.ci:travis/[email protected]

env:
jobs: # test with old scheduler version (3.0 was locked in LS 7.x)
- ELASTIC_STACK_VERSION=7.x RUFUS_SCHEDULER_VERSION=3.0.9 LOG_LEVEL=info
- ELASTIC_STACK_VERSION=6.x RUFUS_SCHEDULER_VERSION=3.0.9
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.1
- Deps: unpin rufus-scheduler dependency [#130](https://github.com/logstash-plugins/logstash-input-http_poller/pull/130)

## 5.2.0
- Feat: support ssl_verification_mode option [#131](https://github.com/logstash-plugins/logstash-input-http_poller/pull/131)

Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ if Dir.exist?(logstash_path) && use_logstash_source
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
end

gem 'rufus-scheduler', ENV['RUFUS_SCHEDULER_VERSION'] if ENV['RUFUS_SCHEDULER_VERSION']
12 changes: 7 additions & 5 deletions lib/logstash/inputs/http_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ def register
setup_requests!
end

# @overload
def stop
Stud.stop!(@interval_thread) if @interval_thread
@scheduler.stop if @scheduler
if @scheduler && @scheduler.thread && @scheduler.thread.status
@scheduler.shutdown # on newer Rufus (3.8) this joins on the scheduler thread
end
# TODO implement client.close as we as releasing it's pooled resources!
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a doubt on this, if in the .travis.yml and Gemfile we pin to Rufus 3.0.9 because LS 7.x pins that version, is this code 7.x proof?
Do this change behaves the same way also with Rufus 3.0.9 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the missing client.close is yet another completely separate issue (basically a plugin leak on reload) - stumbled upon it while trying to investigate the CI failure - please ignore the TODO for now if it's confusing.

I've a doubt on this, if in the .travis.yml and Gemfile we pin to Rufus 3.0.9 because LS 7.x pins that version, is this code 7.x proof?

it's proof that tests run (on CI) against latest rufus-scheduler as well as 3.0.9. LS 7.x for now used 3.0.9 but in theory (when the plugin unpinning from elastic/logstash#12931 is complete) with the right scenario of multpile bin/logstash-plugin update ... the lock could be released and rufus could be updated.

previous version had @scheduler.shutdown if @scheduler, in theory this could still fail if the thread did not start or the thread died - in those cases we can not trigger a "safe" scheduler.shutdown. but this isn't smt we're seeing and I can revert the code to be the same as before if you have doubts (that change isn't that important atm).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this, thanks for clarification.

end

private
Expand Down Expand Up @@ -163,16 +166,15 @@ def setup_schedule(queue)
#schedule hash must contain exactly one of the allowed keys
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
"exactly one of the following keys - cron, at, every or in"
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
schedule_type = @schedule.keys.first
schedule_value = @schedule[schedule_type]
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)

@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
@scheduler.join
@scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in rufus 3.0.9 @scheduler.join is ~ @scheduler.thread.join.
however in latest rufus 3.8 the @scheduler.join does a blocking Queue#pop so this makes it behave the same across versions ...

end

def run_once(queue)
Expand Down
5 changes: 2 additions & 3 deletions logstash-input-http_poller.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-http_poller'
s.version = '5.2.0'
s.version = '5.2.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Decodes the output of an HTTP API into events"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,8 +21,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency "logstash-mixin-http_client", ">= 7.1.0"
s.add_runtime_dependency 'stud', "~> 0.0.22"
Copy link
Contributor Author

@kares kares Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stud dependency was not used - there was one line which seemed as a reminder of some relic.

s.add_runtime_dependency 'rufus-scheduler', "~>3.0.9"
s.add_runtime_dependency 'rufus-scheduler', ">= 3.0.9"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3'
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0', '>= 1.0.1'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
Expand Down
129 changes: 64 additions & 65 deletions spec/inputs/http_poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@
"schedule" => default_schedule,
"urls" => default_urls,
"codec" => "json",
"metadata_target" => metadata_target
"metadata_target" => metadata_target,
"pool_max" => 3, "pool_max_per_route" => 1, 'keepalive' => false
}
}
let(:klass) { LogStash::Inputs::HTTP_Poller }
let(:opts) { default_opts }

subject(:plugin) { described_class.new(opts) }

describe "instances" do
subject { klass.new(default_opts) }
subject { described_class.new(default_opts) }

before do
subject.register
end
before { subject.register }
after { subject.stop }

describe "#run" do
it "should setup a scheduler" do
Expand Down Expand Up @@ -189,22 +190,21 @@
"metadata_target" => metadata_target
}
}
it "should run at the schedule" do
instance = klass.new(opts)
instance.register

before do
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
Timecop.scale(60)
queue = Queue.new
runner = Thread.new do
instance.run(queue)
end
sleep 3
instance.stop
runner.kill
runner.join
expect(queue.size).to eq(2)
end

after do
Timecop.return
end

it "should run at the schedule" do
run_plugin_and_yield_queue(plugin, sleep: 3) do |queue|
try(5) { expect(queue.size).to be >= 2 }
end
end
end

context "given 'at' expression" do
Expand All @@ -216,22 +216,21 @@
"metadata_target" => metadata_target
}
}
it "should run at the schedule" do
instance = klass.new(opts)
instance.register

before do
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
Timecop.scale(60 * 5)
queue = Queue.new
runner = Thread.new do
instance.run(queue)
end
sleep 2
instance.stop
runner.kill
runner.join
expect(queue.size).to eq(1)
Timecop.scale (60 * 5) / 2
end

after do
Timecop.return
end

it "should run at the schedule" do
run_plugin_and_yield_queue(plugin, sleep: 2) do |queue|
try(5) { expect(queue.size).to eq(1) }
end
end
end

context "given 'every' expression" do
Expand All @@ -244,20 +243,12 @@
}
}
it "should run at the schedule" do
instance = klass.new(opts)
instance.register
queue = Queue.new
runner = Thread.new do
instance.run(queue)
run_plugin_and_yield_queue(plugin, sleep: 5) do |queue|
#T 0123456
#events x x x x
#expects 3 events at T=5
try(5) { expect(queue.size).to be_between(2, 3) }
end
#T 0123456
#events x x x x
#expects 3 events at T=5
sleep 5
instance.stop
runner.kill
runner.join
expect(queue.size).to be_between(2, 3)
end
end

Expand All @@ -271,21 +262,28 @@
}
}
it "should run at the schedule" do
instance = klass.new(opts)
instance.register
queue = Queue.new
runner = Thread.new do
instance.run(queue)
run_plugin_and_yield_queue(plugin, sleep: 2.5) do |queue|
try(5) { expect(queue.size).to eq(1) }
end
sleep 3
instance.stop
runner.kill
runner.join
expect(queue.size).to eq(1)
end
end
end

def run_plugin_and_yield_queue(plugin, sleep: nil)
plugin.register
queue = Queue.new
begin
runner = Thread.new do
plugin.run(queue)
end
sleep(sleep) if sleep
yield(queue)
ensure
plugin.stop
runner.join if runner
end
end

describe "events", :ecs_compatibility_support, :aggregate_failures do
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
before do
Expand Down Expand Up @@ -339,6 +337,8 @@
event # materialize the subject
end

after { poller.stop }

it "should enqueue a message" do
expect(event).to be_a(LogStash::Event)
end
Expand Down Expand Up @@ -399,9 +399,6 @@
let(:payload) { {"a" => 2, "hello" => ["a", "b", "c"]} }
let(:response_body) { LogStash::Json.dump(payload) }
let(:opts) { default_opts }
let(:instance) {
klass.new(opts)
}
let(:name) { default_name }
let(:url) { default_url }
let(:code) { 202 }
Expand All @@ -411,30 +408,30 @@
}

before do
instance.register
plugin.register
u = url.is_a?(Hash) ? url["url"] : url # handle both complex specs and simple string URLs
instance.client.stub(u,
plugin.client.stub(u,
:body => response_body,
:code => code
)
allow(instance).to receive(:decorate)
instance.send(:run_once, queue)
allow(plugin).to receive(:decorate)
plugin.send(:run_once, queue)
end

it "should have a matching message" do
expect(event.to_hash).to include(payload)
end

it "should decorate the event" do
expect(instance).to have_received(:decorate).once
expect(plugin).to have_received(:decorate).once
end

include_examples("matching metadata")

context "with an empty body" do
let(:response_body) { "" }
it "should return an empty event" do
instance.send(:run_once, queue)
plugin.send(:run_once, queue)
headers_field = ecs_select[disabled: "[#{metadata_target}][response_headers]",
v1: "[#{metadata_target}][input][http_poller][response][headers]"]
expect(event.get("#{headers_field}[content-length]")).to eql("0")
Expand All @@ -449,7 +446,7 @@
}

it "should not have any metadata on the event" do
instance.send(:run_once, queue)
plugin.send(:run_once, queue)
expect(event.get(metadata_target)).to be_nil
end
end
Expand Down Expand Up @@ -555,6 +552,8 @@

describe "stopping" do
let(:config) { default_opts }
it_behaves_like "an interruptible input plugin"
it_behaves_like "an interruptible input plugin" do
let(:allowed_lag) { 10 } # CI: wait till scheduler shuts down
end
end
end