From e04a6bac1797ed638f6e398b1cdc3af423c29df5 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Mon, 2 Oct 2023 14:23:36 -0600 Subject: [PATCH 1/6] Backport #1092: Change logs to use current_exceptions_to_string() instead of `exception=` pattern (#1115) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Change logs to use `current_exceptions_to_string()` instead of `exception=` pattern (#1092) * Change logs to use `current_exceptions_to_string()` instead of `exception=` * cleanup unused variable `catch e` * Update Exceptions.jl for older julia * Rename err => msg; fix missing import * Make "Don't retry on internal exceptions" less flakey --------- Co-authored-by: Nick Robinson Co-authored-by: Tomáš Drvoštěp * Bump release to v1.9.16 for #1092 --------- Co-authored-by: Nick Robinson Co-authored-by: Tomáš Drvoštěp --- Project.toml | 2 +- src/Connections.jl | 4 ++-- src/Exceptions.jl | 9 +++++++-- src/Servers.jl | 19 ++++++++++++------- src/Streams.jl | 2 +- src/WebSockets.jl | 4 +++- src/clientlayers/ConnectionRequest.jl | 12 ++++++------ src/clientlayers/StreamRequest.jl | 6 +++--- src/clientlayers/TimeoutRequest.jl | 5 +++-- test/server.jl | 10 +++++----- 10 files changed, 43 insertions(+), 30 deletions(-) diff --git a/Project.toml b/Project.toml index dd27a6863..6116138bc 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.9.15" +version = "1.9.16" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/src/Connections.jl b/src/Connections.jl index 182bace24..60d6eb9b7 100644 --- a/src/Connections.jl +++ b/src/Connections.jl @@ -478,9 +478,9 @@ function keepalive!(tcp) Base.iolock_begin() try Base.check_open(tcp) - err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), + msg = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), tcp.handle, 1, 1) - Base.uv_error("failed to set keepalive on tcp socket", err) + Base.uv_error("failed to set keepalive on tcp socket", msg) finally Base.iolock_end() end diff --git a/src/Exceptions.jl b/src/Exceptions.jl index 914aff317..ba7a30df0 100644 --- a/src/Exceptions.jl +++ b/src/Exceptions.jl @@ -100,8 +100,13 @@ function current_exceptions_to_string() buf = IOBuffer() println(buf) println(buf, "\n===========================\nHTTP Error message:\n") - Base.display_error(buf, Base.catch_stack()) + exc = @static if VERSION >= v"1.8.0-" + Base.current_exceptions() + else + Base.catch_stack() + end + Base.display_error(buf, exc) return String(take!(buf)) end -end # module Exceptions \ No newline at end of file +end # module Exceptions diff --git a/src/Servers.jl b/src/Servers.jl index 6afb16590..4a8f37194 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -192,8 +192,9 @@ shutdown(::Nothing) = nothing function shutdown(fn::Function) try fn() - catch e - @error "shutdown function $fn failed" exception=(e, catch_backtrace()) + catch + msg = current_exceptions_to_string() + @error "shutdown function $fn failed. $msg" end end @@ -392,7 +393,8 @@ function listenloop(f, listener, conns, tcpisvalid, if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" else - @errorv 2 "Server on $(listener.hostname):$(listener.hostport) errored" exception=(e, catch_backtrace()) + msg = current_exceptions_to_string() + @errorv 2 "Server on $(listener.hostname):$(listener.hostport) errored. $msg" # quick little sleep in case there's a temporary # local error accepting and this might help avoid quickly re-erroring sleep(0.05 + rand() * 0.05) @@ -431,7 +433,8 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) if e isa ParseError write(c, Response(e.code == :HEADER_SIZE_EXCEEDS_LIMIT ? 431 : 400, string(e.code))) end - @debugv 1 "handle_connection startread error" exception=(e, catch_backtrace()) + msg = current_exceptions_to_string() + @debugv 1 "handle_connection startread error. $msg" break end @@ -458,7 +461,8 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) # The remote can close the stream whenever it wants to, but there's nothing # anyone can do about it on this side. No reason to log an error in that case. level = e isa Base.IOError && !isopen(c) ? Logging.Debug : Logging.Error - @logmsgv 1 level "handle_connection handler error" exception=(e, stacktrace(catch_backtrace())) + msg = current_exceptions_to_string() + @logmsgv 1 level "handle_connection handler error. $msg" if isopen(http) && !iswritable(http) request.response.status = 500 @@ -473,9 +477,10 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) end end end - catch e + catch # we should be catching everything inside the while loop, but just in case - @errorv 1 "error while handling connection" exception=(e, catch_backtrace()) + msg = current_exceptions_to_string() + @errorv 1 "error while handling connection. $msg" finally if readtimeout > 0 wait_for_timeout[] = false diff --git a/src/Streams.jl b/src/Streams.jl index 116bf2000..220d11c4e 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -326,7 +326,7 @@ function IOExtras.readuntil(http::Stream, f::Function)::ByteView bytes = IOExtras.readuntil(http.stream, f) update_ntoread(http, length(bytes)) return bytes - catch e + catch # if we error, it means we didn't find what we were looking for return UInt8[] end diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 56b6933a9..ab9e532da 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -3,6 +3,7 @@ module WebSockets using Base64, LoggingExtras, UUIDs, Sockets, Random using MbedTLS: digest, MD_SHA1, SSLContext using ..IOExtras, ..Streams, ..Connections, ..Messages, ..Conditions, ..Servers +using ..Exceptions: current_exceptions_to_string import ..open import ..HTTP # for doc references @@ -439,7 +440,8 @@ function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=f f(ws) catch e if !isok(e) - suppress_close_error || @error "$(ws.id): Unexpected websocket server error" exception=(e, catch_backtrace()) + msg = current_exceptions_to_string() + suppress_close_error || @error "$(ws.id): Unexpected websocket server error. $msg" end if !isclosed(ws) if e isa WebSocketError && e.message isa CloseFrameBody diff --git a/src/clientlayers/ConnectionRequest.jl b/src/clientlayers/ConnectionRequest.jl index f26d7cb02..564a8f088 100644 --- a/src/clientlayers/ConnectionRequest.jl +++ b/src/clientlayers/ConnectionRequest.jl @@ -80,8 +80,8 @@ function connectionlayer(handler) io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...) catch e if logerrors - err = current_exceptions_to_string() - @error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag + msg = current_exceptions_to_string() + @error msg type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag end req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1 throw(ConnectError(string(url), e)) @@ -127,12 +127,12 @@ function connectionlayer(handler) root_err = ExceptionUnwrapping.unwrap_exception_to_root(e) # don't log if it's an HTTPError since we should have already logged it if logerrors && root_err isa StatusError - err = current_exceptions_to_string() - @error err type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag + msg = current_exceptions_to_string() + @error msg type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag end if logerrors && !ExceptionUnwrapping.has_wrapped_exception(e, HTTPError) - err = current_exceptions_to_string(e) - @error err type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag + msg = current_exceptions_to_string() + @error msg type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag end @debugv 1 "❗️ ConnectionLayer $root_err. Closing: $io" if @isdefined(stream) && stream.nwritten == -1 diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 3ba642702..4442574b6 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -68,12 +68,12 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi end end end - catch e + catch if timedout === nothing || !timedout[] req.context[:io_errors] = get(req.context, :io_errors, 0) + 1 if logerrors - err = current_exceptions_to_string() - @error err type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag + msg = current_exceptions_to_string() + @error msg type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag end end rethrow() diff --git a/src/clientlayers/TimeoutRequest.jl b/src/clientlayers/TimeoutRequest.jl index 94e58b587..13486c081 100644 --- a/src/clientlayers/TimeoutRequest.jl +++ b/src/clientlayers/TimeoutRequest.jl @@ -2,6 +2,7 @@ module TimeoutRequest using ..Connections, ..Streams, ..Exceptions, ..Messages using LoggingExtras, ConcurrentUtilities +using ..Exceptions: current_exceptions_to_string export timeoutlayer @@ -25,8 +26,8 @@ function timeoutlayer(handler) req = stream.message.request req.context[:timeout_errors] = get(req.context, :timeout_errors, 0) + 1 if logerrors - err = current_exceptions_to_string() - @error err type=Symbol("HTTP.TimeoutError") method=req.method url=req.url context=req.context timeout=readtimeout logtag=logtag + msg = current_exceptions_to_string() + @error msg type=Symbol("HTTP.TimeoutError") method=req.method url=req.url context=req.context timeout=readtimeout logtag=logtag end e = Exceptions.TimeoutError(readtimeout) end diff --git a/test/server.jl b/test/server.jl index 7b35396f9..af1a98741 100644 --- a/test/server.jl +++ b/test/server.jl @@ -191,17 +191,17 @@ const echostreamhandler = HTTP.streamhandler(echohandler) HTTP.startwrite(http) write(http, "response body\n") end - + port = HTTP.port(server) - + sock = connect(host, port) close(sock) - + r = HTTP.get("https://$(host):$(port)/"; readtimeout=30, require_ssl_verification = false) @test r.status == 200 close(server) - end + end end # @testset @testset "on_shutdown" begin @@ -222,7 +222,7 @@ end # @testset # First shutdown function errors, second adds 1 shutdown_throw() = throw(ErrorException("Broken")) server = HTTP.listen!(x -> nothing; listenany=true, on_shutdown=[shutdown_throw, shutdown_add]) - @test_logs (:error, r"shutdown function .* failed") close(server) + @test_logs (:error, r"shutdown function .* failed.*ERROR: Broken.*"s) close(server) @test TEST_COUNT[] == 4 end # @testset From e68131ed65096a0c427223cf4d746746e64cf9f6 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Fri, 16 Feb 2024 10:19:02 +0000 Subject: [PATCH 2/6] Backport logging improvement from pull/1150 to v1.9 release (#1151) * Don't collect exception stack if it won't be logged * Bump version --- Project.toml | 2 +- src/Servers.jl | 30 ++++++++++++++++++--------- src/WebSockets.jl | 6 ++++-- src/clientlayers/ConnectionRequest.jl | 9 +++----- src/clientlayers/StreamRequest.jl | 3 +-- src/clientlayers/TimeoutRequest.jl | 3 +-- 6 files changed, 30 insertions(+), 23 deletions(-) diff --git a/Project.toml b/Project.toml index 6116138bc..f3afb3d81 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.9.16" +version = "1.9.17" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/src/Servers.jl b/src/Servers.jl index 4a8f37194..fbc9daa36 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -193,8 +193,10 @@ function shutdown(fn::Function) try fn() catch - msg = current_exceptions_to_string() - @error "shutdown function $fn failed. $msg" + @error begin + msg = current_exceptions_to_string() + "shutdown function $fn failed. $msg" + end end end @@ -393,8 +395,10 @@ function listenloop(f, listener, conns, tcpisvalid, if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" else - msg = current_exceptions_to_string() - @errorv 2 "Server on $(listener.hostname):$(listener.hostport) errored. $msg" + @errorv 2 begin + msg = current_exceptions_to_string() + "Server on $(listener.hostname):$(listener.hostport) errored. $msg" + end # quick little sleep in case there's a temporary # local error accepting and this might help avoid quickly re-erroring sleep(0.05 + rand() * 0.05) @@ -433,8 +437,10 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) if e isa ParseError write(c, Response(e.code == :HEADER_SIZE_EXCEEDS_LIMIT ? 431 : 400, string(e.code))) end - msg = current_exceptions_to_string() - @debugv 1 "handle_connection startread error. $msg" + @debugv 1 begin + msg = current_exceptions_to_string() + "handle_connection startread error. $msg" + end break end @@ -461,8 +467,10 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) # The remote can close the stream whenever it wants to, but there's nothing # anyone can do about it on this side. No reason to log an error in that case. level = e isa Base.IOError && !isopen(c) ? Logging.Debug : Logging.Error - msg = current_exceptions_to_string() - @logmsgv 1 level "handle_connection handler error. $msg" + @logmsgv 1 level begin + msg = current_exceptions_to_string() + "handle_connection handler error. $msg" + end if isopen(http) && !iswritable(http) request.response.status = 500 @@ -479,8 +487,10 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) end catch # we should be catching everything inside the while loop, but just in case - msg = current_exceptions_to_string() - @errorv 1 "error while handling connection. $msg" + @errorv 1 begin + msg = current_exceptions_to_string() + "error while handling connection. $msg" + end finally if readtimeout > 0 wait_for_timeout[] = false diff --git a/src/WebSockets.jl b/src/WebSockets.jl index ab9e532da..a937c296a 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -440,8 +440,10 @@ function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=f f(ws) catch e if !isok(e) - msg = current_exceptions_to_string() - suppress_close_error || @error "$(ws.id): Unexpected websocket server error. $msg" + suppress_close_error || @error begin + msg = current_exceptions_to_string() + "$(ws.id): Unexpected websocket server error. $msg" + end end if !isclosed(ws) if e isa WebSocketError && e.message isa CloseFrameBody diff --git a/src/clientlayers/ConnectionRequest.jl b/src/clientlayers/ConnectionRequest.jl index 564a8f088..fe5e90550 100644 --- a/src/clientlayers/ConnectionRequest.jl +++ b/src/clientlayers/ConnectionRequest.jl @@ -80,8 +80,7 @@ function connectionlayer(handler) io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...) catch e if logerrors - msg = current_exceptions_to_string() - @error msg type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag + @error current_exceptions_to_string() type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag end req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1 throw(ConnectError(string(url), e)) @@ -127,12 +126,10 @@ function connectionlayer(handler) root_err = ExceptionUnwrapping.unwrap_exception_to_root(e) # don't log if it's an HTTPError since we should have already logged it if logerrors && root_err isa StatusError - msg = current_exceptions_to_string() - @error msg type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag + @error current_exceptions_to_string() type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag end if logerrors && !ExceptionUnwrapping.has_wrapped_exception(e, HTTPError) - msg = current_exceptions_to_string() - @error msg type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag + @error current_exceptions_to_string() type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag end @debugv 1 "❗️ ConnectionLayer $root_err. Closing: $io" if @isdefined(stream) && stream.nwritten == -1 diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 4442574b6..27b33d7e9 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -72,8 +72,7 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi if timedout === nothing || !timedout[] req.context[:io_errors] = get(req.context, :io_errors, 0) + 1 if logerrors - msg = current_exceptions_to_string() - @error msg type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag + @error current_exceptions_to_string() type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag end end rethrow() diff --git a/src/clientlayers/TimeoutRequest.jl b/src/clientlayers/TimeoutRequest.jl index 13486c081..5bac3249e 100644 --- a/src/clientlayers/TimeoutRequest.jl +++ b/src/clientlayers/TimeoutRequest.jl @@ -26,8 +26,7 @@ function timeoutlayer(handler) req = stream.message.request req.context[:timeout_errors] = get(req.context, :timeout_errors, 0) + 1 if logerrors - msg = current_exceptions_to_string() - @error msg type=Symbol("HTTP.TimeoutError") method=req.method url=req.url context=req.context timeout=readtimeout logtag=logtag + @error current_exceptions_to_string() type=Symbol("HTTP.TimeoutError") method=req.method url=req.url context=req.context timeout=readtimeout logtag=logtag end e = Exceptions.TimeoutError(readtimeout) end From 6a3b0a92696f15eb6b5eaadd73c2b4821bf09361 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Sun, 18 Feb 2024 11:59:20 +0000 Subject: [PATCH 3/6] Run CI all PRs to any branch --- .github/workflows/ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f0d0198c..1524f1303 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,6 @@ name: CI on: pull_request: - branches: - - master push: branches: - master From f20777d5233971cbc5c4dd399851544c6b06e99a Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Sun, 18 Feb 2024 21:31:43 +0000 Subject: [PATCH 4/6] Make CI green again (#1128) (#1152) * Fix tests for new version of `ConcurrentUtilities` `ConcurrentUtilities` changed an (internal) fieldname of the `Pool` type from `max` to `limit`. This field is used in HTTP.jl tests so this patch works around the issue. * Remove invalidation CI check This has been failing for quite some time with some internal error. Also in the working case it is a bit unclear what the actionable thing is when it errors (xref https://github.com/JuliaWeb/HTTP.jl/pull/920#issuecomment-1254284159). Co-authored-by: Fredrik Ekre --- .github/workflows/Invalidations.yml | 40 ----------------------------- test/client.jl | 13 ++++++---- 2 files changed, 8 insertions(+), 45 deletions(-) delete mode 100644 .github/workflows/Invalidations.yml diff --git a/.github/workflows/Invalidations.yml b/.github/workflows/Invalidations.yml deleted file mode 100644 index 4d0004e83..000000000 --- a/.github/workflows/Invalidations.yml +++ /dev/null @@ -1,40 +0,0 @@ -name: Invalidations - -on: - pull_request: - -concurrency: - # Skip intermediate builds: always. - # Cancel intermediate builds: always. - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - evaluate: - # Only run on PRs to the default branch. - # In the PR trigger above branches can be specified only explicitly whereas this check should work for master, main, or any other default branch - if: github.base_ref == github.event.repository.default_branch - runs-on: ubuntu-latest - steps: - - uses: julia-actions/setup-julia@v1 - with: - version: '1' - - uses: actions/checkout@v3 - - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-invalidations@v1 - id: invs_pr - - - uses: actions/checkout@v3 - with: - ref: ${{ github.event.repository.default_branch }} - - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-invalidations@v1 - id: invs_default - - - name: Report invalidation counts - run: | - echo "Invalidations on default branch: ${{ steps.invs_default.outputs.total }} (${{ steps.invs_default.outputs.deps }} via deps)" >> $GITHUB_STEP_SUMMARY - echo "This branch: ${{ steps.invs_pr.outputs.total }} (${{ steps.invs_pr.outputs.deps }} via deps)" >> $GITHUB_STEP_SUMMARY - - name: Check if the PR does increase number of invalidations - if: steps.invs_pr.outputs.total > steps.invs_default.outputs.total - run: exit 1 diff --git a/test/client.jl b/test/client.jl index f886db705..f9891acfa 100644 --- a/test/client.jl +++ b/test/client.jl @@ -13,12 +13,15 @@ using URIs using InteractiveUtils: @which using ConcurrentUtilities +# ConcurrentUtilities changed a fieldname from max to limit in 2.3.0 +const max_or_limit = :max in fieldnames(ConcurrentUtilities.Pool) ? (:max) : (:limit) + # test we can adjust default_connection_limit for x in (10, 12) HTTP.set_default_connection_limit!(x) - @test HTTP.Connections.TCP_POOL[].max == x - @test HTTP.Connections.MBEDTLS_POOL[].max == x - @test HTTP.Connections.OPENSSL_POOL[].max == x + @test getfield(HTTP.Connections.TCP_POOL[], max_or_limit) == x + @test getfield(HTTP.Connections.MBEDTLS_POOL[], max_or_limit) == x + @test getfield(HTTP.Connections.OPENSSL_POOL[], max_or_limit) == x end @testset "@client macro" begin @@ -325,11 +328,11 @@ end end @testset "connect_timeout does not include the time needed to acquire a connection from the pool" begin - connection_limit = HTTP.Connections.TCP_POOL[].max + connection_limit = getfield(HTTP.Connections.TCP_POOL[], max_or_limit) try dummy_conn = HTTP.Connection(Sockets.TCPSocket()) HTTP.set_default_connection_limit!(1) - @assert HTTP.Connections.TCP_POOL[].max == 1 + @assert getfield(HTTP.Connections.TCP_POOL[], max_or_limit) == 1 # drain the pool acquire(()->dummy_conn, HTTP.Connections.TCP_POOL[], HTTP.Connections.connectionkey(dummy_conn)) # Put it back in 10 seconds From 3a79e9af36d00e4ac537a202da563b3eeb5b293e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Drvo=C5=A1t=C4=9Bp?= Date: Tue, 26 Mar 2024 14:17:51 +0100 Subject: [PATCH 5/6] Introduce connections_lock to protect Server.connections (backport 1.9) (#1162) * Introduce `connections_lock` for mutating `Server.connections` * Add comment * Version bump --- Project.toml | 2 +- src/Servers.jl | 37 +++++++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Project.toml b/Project.toml index f3afb3d81..8de064795 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.9.17" +version = "1.9.18" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/src/Servers.jl b/src/Servers.jl index fbc9daa36..b3f004797 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -118,6 +118,9 @@ struct Server{L <: Listener} connections::Set{Connection} # server listenandserve loop task task::Task + # Protects the connections Set which is mutated in the listenloop + # while potentially being accessed by the close method at the same time + connections_lock::ReentrantLock end port(s::Server) = Int(s.listener.addr.port) @@ -127,8 +130,10 @@ Base.wait(s::Server) = wait(s.task) function forceclose(s::Server) shutdown(s.on_shutdown) close(s.listener) - for c in s.connections - close(c) + Base.@lock s.connections_lock begin + for c in s.connections + close(c) + end end return wait(s.task) end @@ -166,14 +171,19 @@ function Base.close(s::Server) shutdown(s.on_shutdown) close(s.listener) # first pass to mark or request connections to close - for c in s.connections - requestclose!(c) + Base.@lock s.connections_lock begin + for c in s.connections + requestclose!(c) + end end # second pass to wait for connections to close # we wait for connections to empty because as # connections close themselves, they are removed # from our connections Set - while !isempty(s.connections) + while true + Base.@lock s.connections_lock begin + isempty(s.connections) && break + end sleep(0.5 + rand() * 0.1) end return wait(s.task) @@ -346,25 +356,28 @@ function listen!(f, listener::Listener; access_log::Union{Function,Nothing}=nothing, verbose=false, kw...) conns = Set{Connection}() + conns_lock = ReentrantLock() ready_to_accept = Threads.Event() if verbose > 0 tsk = @_spawn_interactive LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do - listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) + listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose) end else - tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) + tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose) end # wait until the listenloop enters the loop wait(ready_to_accept) - return Server(listener, on_shutdown, conns, tsk) + return Server(listener, on_shutdown, conns, tsk, conns_lock) end """" Main server loop. Accepts new tcp connections and spawns async tasks to handle them." """ -function listenloop(f, listener, conns, tcpisvalid, - max_connections, readtimeout, access_log, ready_to_accept, verbose) +function listenloop( + f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, + conns_lock, verbose +) sem = Base.Semaphore(max_connections) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) @@ -382,13 +395,13 @@ function listenloop(f, listener, conns, tcpisvalid, end conn = Connection(io) conn.state = IDLE - push!(conns, conn) + Base.@lock conns_lock push!(conns, conn) conn.host, conn.port = listener.hostname, listener.hostport @async try handle_connection(f, conn, listener, readtimeout, access_log) finally # handle_connection is in charge of closing the underlying io - delete!(conns, conn) + Base.@lock conns_lock delete!(conns, conn) Base.release(sem) end catch e From 0cfe5ceba76fb18a73e44fafc64ab30997cc0001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Drvo=C5=A1t=C4=9Bp?= Date: Fri, 13 Dec 2024 12:49:43 +0100 Subject: [PATCH 6/6] Do not migrate tasks to a different thread pool (backport of #1159) (#1206) --- Project.toml | 4 ++-- src/clientlayers/StreamRequest.jl | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Project.toml b/Project.toml index 8de064795..7079cbe83 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.9.18" +version = "1.9.19" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -22,7 +22,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] CodecZlib = "0.7" -ConcurrentUtilities = "2.2" +ConcurrentUtilities = "2.4" ExceptionUnwrapping = "0.1" LoggingExtras = "0.4.9,1" MbedTLS = "0.6.8, 0.7, 1" diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 27b33d7e9..64168a59d 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -3,6 +3,7 @@ module StreamRequest using ..IOExtras, ..Messages, ..Streams, ..Connections, ..Strings, ..RedirectRequest, ..Exceptions using LoggingExtras, CodecZlib, URIs using SimpleBufferStream: BufferStream +using ConcurrentUtilities: @samethreadpool_spawn export streamlayer @@ -36,7 +37,7 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi # use a lock here for request.context changes (this is currently the only places # where multiple threads may modify/change context at the same time) lock = ReentrantLock() - Threads.@spawn try + @samethreadpool_spawn try writebody(stream, req, lock) finally Base.@lock lock begin @@ -46,7 +47,7 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi closewrite(stream) end read_start = time() - Threads.@spawn try + @samethreadpool_spawn try @debugv 2 "client startread" startread(stream) if !isaborted(stream)