Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.4.1
- Fix retry indefinitely in termination process. This feature requires Logstash 8.1 [#129](https://github.com/logstash-plugins/logstash-output-http/pull/129)

## 5.4.0
- Introduce retryable unknown exceptions for "connection reset by peer" and "timeout" [#127](https://github.com/logstash-plugins/logstash-output-http/pull/127)

Expand Down
9 changes: 9 additions & 0 deletions lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
/Read Timed out/i
]

class PluginInternalQueueLeftoverError < StandardError; end

# This output lets you send events to a
# generic HTTP(S) endpoint
Expand Down Expand Up @@ -179,6 +180,9 @@ def send_events(events)

event, attempt = popped

raise PluginInternalQueueLeftoverError.new("Received pipeline shutdown request but http output has unfinished events. " \
"If persistent queue is enabled, events will be retried.") if pipeline_shutdown_requested? && attempt > 2

action, event, attempt = send_event(event, attempt)
begin
action = :failure if action == :retry && !@retry_failed
Expand Down Expand Up @@ -223,6 +227,11 @@ def send_events(events)
raise e
end

def pipeline_shutdown_requested?
return super if defined?(super) # since LS 8.1.0
nil
end

def sleep_for_attempt(attempt)
sleep_for = attempt**2
sleep_for = sleep_for <= 60 ? sleep_for : 60
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-http.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-http'
s.version = '5.4.0'
s.version = '5.4.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Sends events to a generic HTTP or HTTPS endpoint"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
17 changes: 17 additions & 0 deletions spec/outputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,23 @@ def start_app_and_wait(app, opts = {})
let(:base_config) { { "http_compression" => true } }
end
end

describe "retryable error in termination" do
let(:url) { "http://localhost:#{port-1}/invalid" }
let(:events) { [event] }
let(:config) { {"url" => url, "http_method" => "get", "pool_max" => 1} }

subject { LogStash::Outputs::Http.new(config) }

before do
subject.register
allow(subject).to receive(:pipeline_shutdown_requested?).and_return(true)
end

it "raise exception to exit indefinitely retry" do
expect { subject.multi_receive(events) }.to raise_error(LogStash::Outputs::Http::PluginInternalQueueLeftoverError)
end
end
end

describe LogStash::Outputs::Http do # different block as we're starting web server with TLS
Expand Down