diff --git a/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs b/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs index 68f81984d96241..e38204ffc031e3 100644 --- a/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs +++ b/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs @@ -246,6 +246,8 @@ public async Task BrowserHttpHandler_Streaming() int readOffset = 0; req.Content = new StreamContent(new DelegateStream( + canReadFunc: () => true, + readFunc: (buffer, offset, count) => throw new FormatException(), readAsyncFunc: async (buffer, offset, count, cancellationToken) => { await Task.Delay(1); @@ -295,8 +297,12 @@ public async Task BrowserHttpHandler_StreamingRequest() req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); int size = 1500 * 1024 * 1024; + int multipartOverhead = 125 + 4 /* "test" */; int remaining = size; - req.Content = new StreamContent(new DelegateStream( + var content = new MultipartFormDataContent(); + content.Add(new StreamContent(new DelegateStream( + canReadFunc: () => true, + readFunc: (buffer, offset, count) => throw new FormatException(), readAsyncFunc: (buffer, offset, count, cancellationToken) => { if (remaining > 0) @@ -307,7 +313,8 @@ public async Task BrowserHttpHandler_StreamingRequest() return Task.FromResult(send); } return Task.FromResult(0); - })); + })), "test"); + req.Content = content; req.Content.Headers.Add("Content-MD5-Skip", "browser"); @@ -315,7 +322,7 @@ public async Task BrowserHttpHandler_StreamingRequest() using (HttpResponseMessage response = await client.SendAsync(req)) { Assert.Equal(HttpStatusCode.OK, response.StatusCode); - Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length"))); + Assert.Equal((size + multipartOverhead).ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length"))); // Streaming requests can't set Content-Length Assert.False(response.Headers.Contains("X-HttpRequest-Headers-ContentLength")); } @@ -335,22 +342,101 @@ public async Task BrowserHttpHandler_StreamingRequest_ThrowFromContentCopy_Reque req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); Exception error = new FormatException(); - var content = new StreamContent(new DelegateStream( + req.Content = new StreamContent(new DelegateStream( canSeekFunc: () => true, lengthFunc: () => 12345678, positionGetFunc: () => 0, canReadFunc: () => true, - readFunc: (buffer, offset, count) => throw error, + readFunc: (buffer, offset, count) => throw new FormatException(), readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith(_ => throw error))); - req.Content = content; - using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server)) { Assert.Same(error, await Assert.ThrowsAsync(() => client.SendAsync(req))); } } + public static TheoryData CancelRequestReadFunctions + => new TheoryData>> + { + { false, () => Task.FromResult(0) }, + { true, () => Task.FromResult(0) }, + { false, () => Task.FromResult(1) }, + { true, () => Task.FromResult(1) }, + { false, () => throw new FormatException() }, + { true, () => throw new FormatException() }, + }; + + [OuterLoop] + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + [MemberData(nameof(CancelRequestReadFunctions))] + public async Task BrowserHttpHandler_StreamingRequest_CancelRequest(bool cancelAsync, Func> readFunc) + { + var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); + + var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteEchoServer); + + req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); + + using var cts = new CancellationTokenSource(); + var token = cts.Token; + int readNotCancelledCount = 0, readCancelledCount = 0; + req.Content = new StreamContent(new DelegateStream( + canReadFunc: () => true, + readFunc: (buffer, offset, count) => throw new FormatException(), + readAsyncFunc: async (buffer, offset, count, cancellationToken) => + { + if (cancelAsync) await Task.Delay(1); + Assert.Equal(token.IsCancellationRequested, cancellationToken.IsCancellationRequested); + if (!token.IsCancellationRequested) + { + readNotCancelledCount++; + cts.Cancel(); + } + else + { + readCancelledCount++; + } + return await readFunc(); + })); + + using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server)) + { + TaskCanceledException ex = await Assert.ThrowsAsync(() => client.SendAsync(req, token)); + Assert.Equal(token, ex.CancellationToken); + Assert.Equal(1, readNotCancelledCount); + Assert.Equal(0, readCancelledCount); + } + } + + [OuterLoop] + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + public async Task BrowserHttpHandler_StreamingRequest_Http1Fails() + { + var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); + + var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.RemoteHttp11Server.BaseUri); + + req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); + + int readCount = 0; + req.Content = new StreamContent(new DelegateStream( + canReadFunc: () => true, + readFunc: (buffer, offset, count) => throw new FormatException(), + readAsyncFunc: (buffer, offset, count, cancellationToken) => + { + readCount++; + return Task.FromResult(1); + })); + + using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp11Server)) + { + HttpRequestException ex = await Assert.ThrowsAsync(() => client.SendAsync(req)); + Assert.Equal("TypeError: Failed to fetch", ex.Message); + Assert.Equal(1, readCount); + } + } + [OuterLoop] [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] public async Task BrowserHttpHandler_StreamingResponse() diff --git a/src/libraries/System.Net.Http/src/Resources/Strings.resx b/src/libraries/System.Net.Http/src/Resources/Strings.resx index ea9007d9ca6cca..ca69a570c984fc 100644 --- a/src/libraries/System.Net.Http/src/Resources/Strings.resx +++ b/src/libraries/System.Net.Http/src/Resources/Strings.resx @@ -534,6 +534,9 @@ Synchronous reads are not supported, use ReadAsync instead. + + Synchronous writes are not supported, use WriteAsync instead. + Failed to authenticate with the SOCKS server. diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs index 96f482fb58be74..d884f1a460e2c3 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs @@ -133,25 +133,26 @@ private static async Task CallFetch(HttpRequestMessage reques List headerNames = new List(headerCount); List headerValues = new List(headerCount); JSObject abortController = BrowserHttpInterop.CreateAbortController(); - CancellationTokenRegistration? abortRegistration = cancellationToken.Register(() => + CancellationTokenRegistration abortRegistration = cancellationToken.Register(static s => { + JSObject _abortController = (JSObject)s!; #if FEATURE_WASM_THREADS - if (!abortController.IsDisposed) + if (!_abortController.IsDisposed) { - abortController.SynchronizationContext.Send(static (JSObject _abortController) => + _abortController.SynchronizationContext.Send(static (JSObject __abortController) => { - BrowserHttpInterop.AbortRequest(_abortController); - _abortController.Dispose(); - }, abortController); + BrowserHttpInterop.AbortRequest(__abortController); + __abortController.Dispose(); + }, _abortController); } #else - if (!abortController.IsDisposed) + if (!_abortController.IsDisposed) { - BrowserHttpInterop.AbortRequest(abortController); - abortController.Dispose(); + BrowserHttpInterop.AbortRequest(_abortController); + _abortController.Dispose(); } #endif - }); + }, abortController); try { if (request.RequestUri == null) @@ -208,7 +209,7 @@ private static async Task CallFetch(HttpRequestMessage reques } } - Task? promise; + JSObject? fetchResponse; cancellationToken.ThrowIfCancellationRequested(); if (request.Content != null) { @@ -220,51 +221,71 @@ private static async Task CallFetch(HttpRequestMessage reques if (streamingEnabled) { - Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true); - cancellationToken.ThrowIfCancellationRequested(); - - ReadableStreamPullState pullState = new ReadableStreamPullState(stream, cancellationToken); - - promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, ReadableStreamPull, pullState); + using (JSObject transformStream = BrowserHttpInterop.CreateTransformStream()) + { + Task fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, transformStream); + Task fetchTask = BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).AsTask(); // initialize fetch cancellation + + using (WasmHttpWriteStream stream = new WasmHttpWriteStream(transformStream)) + { + try + { + await request.Content.CopyToAsync(stream, cancellationToken).ConfigureAwait(true); + Task closePromise = BrowserHttpInterop.TransformStreamClose(transformStream); + await BrowserHttpInterop.CancelationHelper(closePromise, cancellationToken).ConfigureAwait(true); + } + catch (Exception) + { + BrowserHttpInterop.TransformStreamAbort(transformStream); + if (!abortController.IsDisposed) + { + BrowserHttpInterop.AbortRequest(abortController); + } + try + { + using (fetchResponse = await fetchTask.ConfigureAwait(true)) // observe exception + { + BrowserHttpInterop.AbortResponse(fetchResponse); + } + } + catch { /* ignore */ } + cancellationToken.ThrowIfCancellationRequested(); + throw; + } + } + + fetchResponse = await fetchTask.ConfigureAwait(true); + } } else { byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true); cancellationToken.ThrowIfCancellationRequested(); - promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer); + Task fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer); + fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true); } } else { - promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController); + Task fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController); + fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true); } - cancellationToken.ThrowIfCancellationRequested(); - JSObject fetchResponse = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, abortController, null).ConfigureAwait(true); - return new WasmFetchResponse(fetchResponse, abortController, abortRegistration.Value); - } - catch (JSException jse) - { - throw new HttpRequestException(jse.Message, jse); + return new WasmFetchResponse(fetchResponse, abortController, abortRegistration); } - catch (Exception) + catch (Exception ex) { - // this would also trigger abort - abortRegistration?.Dispose(); - abortController?.Dispose(); + abortRegistration.Dispose(); + abortController.Dispose(); + if (ex is JSException jse) + { + throw new HttpRequestException(jse.Message, jse); + } throw; } } - private static void ReadableStreamPull(object state) - { - ReadableStreamPullState pullState = (ReadableStreamPullState)state; -#pragma warning disable CS4014 // intentionally not awaited - pullState.PullAsync(); -#pragma warning restore CS4014 - } - private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse) { #if FEATURE_WASM_THREADS @@ -329,41 +350,90 @@ static async Task Impl(HttpRequestMessage request, Cancella } } - internal sealed class ReadableStreamPullState + internal sealed class WasmHttpWriteStream : Stream { - private readonly Stream _stream; - private readonly CancellationToken _cancellationToken; - private readonly byte[] _buffer; + private readonly JSObject _transformStream; - public ReadableStreamPullState(Stream stream, CancellationToken cancellationToken) + public WasmHttpWriteStream(JSObject transformStream) { - ArgumentNullException.ThrowIfNull(stream); + ArgumentNullException.ThrowIfNull(transformStream); - _stream = stream; - _cancellationToken = cancellationToken; - _buffer = new byte[65536]; + _transformStream = transformStream; } - public async Task PullAsync() + private Task WriteAsyncCore(ReadOnlyMemory buffer, CancellationToken cancellationToken) { - try - { - int length = await _stream.ReadAsync(_buffer, _cancellationToken).ConfigureAwait(true); - ReadableStreamControllerEnqueueUnsafe(this, _buffer, length); - } - catch (Exception ex) + cancellationToken.ThrowIfCancellationRequested(); +#if FEATURE_WASM_THREADS + return _transformStream.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken)); +#else + return Impl(this, buffer, cancellationToken); +#endif + static async Task Impl(WasmHttpWriteStream self, ReadOnlyMemory buffer, CancellationToken cancellationToken) { - BrowserHttpInterop.ReadableStreamControllerError(this, ex); + using (Buffers.MemoryHandle handle = buffer.Pin()) + { + Task writePromise = TransformStreamWriteUnsafe(self._transformStream, buffer, handle); + await BrowserHttpInterop.CancelationHelper(writePromise, cancellationToken).ConfigureAwait(true); + } } + + static unsafe Task TransformStreamWriteUnsafe(JSObject transformStream, ReadOnlyMemory buffer, Buffers.MemoryHandle handle) + => BrowserHttpInterop.TransformStreamWrite(transformStream, (nint)handle.Pointer, buffer.Length); } - private static unsafe void ReadableStreamControllerEnqueueUnsafe(object pullState, byte[] buffer, int length) + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) { - fixed (byte* ptr = buffer) - { - BrowserHttpInterop.ReadableStreamControllerEnqueue(pullState, (nint)ptr, length); - } + return new ValueTask(WriteAsyncCore(buffer, cancellationToken)); } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValidateBufferArguments(buffer, offset, count); + return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + + protected override void Dispose(bool disposing) + { + _transformStream.Dispose(); + } + + public override void Flush() + { + } + + #region PlatformNotSupported + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + public override long Length => throw new NotSupportedException(); + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(SR.net_http_synchronous_writes_not_supported); + } + #endregion } internal sealed class WasmFetchResponse : IDisposable @@ -468,7 +538,7 @@ private async ValueTask GetResponseData(CancellationToken cancellationTo #if FEATURE_WASM_THREADS } //lock #endif - _length = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, null, _fetchResponse.FetchResponse).ConfigureAwait(true); + _length = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, _fetchResponse.FetchResponse).ConfigureAwait(true); #if FEATURE_WASM_THREADS lock (_fetchResponse.ThisLock) { @@ -572,7 +642,7 @@ static async Task Impl(WasmHttpReadStream self, Memory buffer, Cancel #if FEATURE_WASM_THREADS } //lock #endif - int response = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, null, self._fetchResponse.FetchResponse).ConfigureAwait(true); + int response = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, self._fetchResponse.FetchResponse).ConfigureAwait(true); return response; } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs index 0e2eb3226dada1..b942bccd4c760a 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs @@ -28,16 +28,22 @@ public static partial void AbortRequest( public static partial void AbortResponse( JSObject fetchResponse); - [JSImport("INTERNAL.http_wasm_readable_stream_controller_enqueue")] - public static partial void ReadableStreamControllerEnqueue( - [JSMarshalAs] object pullState, + [JSImport("INTERNAL.http_wasm_create_transform_stream")] + public static partial JSObject CreateTransformStream(); + + [JSImport("INTERNAL.http_wasm_transform_stream_write")] + public static partial Task TransformStreamWrite( + JSObject transformStream, IntPtr bufferPtr, int bufferLength); - [JSImport("INTERNAL.http_wasm_readable_stream_controller_error")] - public static partial void ReadableStreamControllerError( - [JSMarshalAs] object pullState, - Exception error); + [JSImport("INTERNAL.http_wasm_transform_stream_close")] + public static partial Task TransformStreamClose( + JSObject transformStream); + + [JSImport("INTERNAL.http_wasm_transform_stream_abort")] + public static partial void TransformStreamAbort( + JSObject transformStream); [JSImport("INTERNAL.http_wasm_get_response_header_names")] private static partial string[] _GetResponseHeaderNames( @@ -79,8 +85,7 @@ public static partial Task Fetch( string[] optionNames, [JSMarshalAs>] object?[] optionValues, JSObject abortControler, - [JSMarshalAs>] Action pull, - [JSMarshalAs] object pullState); + JSObject transformStream); [JSImport("INTERNAL.http_wasm_fetch_bytes")] private static partial Task FetchBytes( @@ -117,30 +122,27 @@ public static partial int GetResponseBytes( [JSMarshalAs] Span buffer); - public static async ValueTask CancelationHelper(Task promise, CancellationToken cancellationToken, JSObject? abortController, JSObject? fetchResponse) + public static async ValueTask CancelationHelper(Task promise, CancellationToken cancellationToken, JSObject? fetchResponse = null) { if (promise.IsCompletedSuccessfully) { - return promise.Result; + return; } try { - using (var operationRegistration = cancellationToken.Register(() => + using (var operationRegistration = cancellationToken.Register(static s => { - CancelablePromise.CancelPromise(promise, static (JSObject? _fetchResponse, JSObject? _abortController) => + (Task _promise, JSObject? _fetchResponse) = ((Task, JSObject?))s!; + CancelablePromise.CancelPromise(_promise, static (JSObject? __fetchResponse) => { - if (_abortController != null) + if (__fetchResponse != null) { - AbortRequest(_abortController); + AbortResponse(__fetchResponse); } - if (_fetchResponse != null) - { - AbortResponse(_fetchResponse); - } - }, fetchResponse, abortController); - })) + }, _fetchResponse); + }, (promise, fetchResponse))) { - return await promise.ConfigureAwait(true); + await promise.ConfigureAwait(true); } } catch (OperationCanceledException oce) when (cancellationToken.IsCancellationRequested) @@ -160,6 +162,16 @@ public static async ValueTask CancelationHelper(Task promise, Cancellat throw new HttpRequestException(jse.Message, jse); } } + + public static async ValueTask CancelationHelper(Task promise, CancellationToken cancellationToken, JSObject? fetchResponse = null) + { + if (promise.IsCompletedSuccessfully) + { + return promise.Result; + } + await CancelationHelper((Task)promise, cancellationToken, fetchResponse).ConfigureAwait(true); + return await promise.ConfigureAwait(true); + } } } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpRequestMessageTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpRequestMessageTest.cs index 6946c202beffaa..0dd1a485894d6b 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpRequestMessageTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpRequestMessageTest.cs @@ -274,7 +274,7 @@ public async Task HttpRequest_StringContent_WithoutMediaType() await LoopbackServer.CreateServerAsync(async (server, uri) => { var request = new HttpRequestMessage(HttpMethod.Post, uri); - request.Content = new StringContent("Hello World", null, ((MediaTypeHeaderValue)null)!); + request.Content = new StringContent("", null, ((MediaTypeHeaderValue)null)!); Task requestTask = client.SendAsync(request); await server.AcceptConnectionAsync(async connection => diff --git a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs index 121fa8595c65d5..ed139d83fd74fd 100644 --- a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs +++ b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs @@ -580,10 +580,10 @@ private async Task CancelationHelper(Task jsTask, CancellationToken cancellation } try { - using (var receiveRegistration = cancellationToken.Register(() => + using (var receiveRegistration = cancellationToken.Register(static s => { - CancelablePromise.CancelPromise(jsTask); - })) + CancelablePromise.CancelPromise((Task)s!); + }, jsTask)) { await jsTask.ConfigureAwait(true); return; diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/CancelablePromise.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/CancelablePromise.cs index d1bbf48f1c4de7..7a8a0aa1f2d9f6 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/CancelablePromise.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/CancelablePromise.cs @@ -32,7 +32,7 @@ public static void CancelPromise(Task promise) #endif } - public static void CancelPromise(Task promise, Action callback, T1 state1, T2 state2) + public static void CancelPromise(Task promise, Action callback, T state) { // this check makes sure that promiseGCHandle is still valid handle if (promise.IsCompleted) @@ -48,7 +48,7 @@ public static void CancelPromise(Task promise, Action callback, { #endif _CancelPromise(holder.GCHandle); - callback.Invoke(state1, state2); + callback.Invoke(state); #if FEATURE_WASM_THREADS }, holder); #endif diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs index 248740b9ce89a2..4e5b68c30ff717 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs @@ -159,10 +159,10 @@ public static async Task CancelationHelper(Task jsTask, Canc { return jsTask.Result; } - using (var receiveRegistration = cancellationToken.Register(() => + using (var receiveRegistration = cancellationToken.Register(static s => { - CancelablePromise.CancelPromise(jsTask); - })) + CancelablePromise.CancelPromise((Task)s!); + }, jsTask)) { return await jsTask.ConfigureAwait(true); } diff --git a/src/mono/wasm/runtime/exports-internal.ts b/src/mono/wasm/runtime/exports-internal.ts index 43011bfaa41275..2ff2dea5f9896a 100644 --- a/src/mono/wasm/runtime/exports-internal.ts +++ b/src/mono/wasm/runtime/exports-internal.ts @@ -4,7 +4,7 @@ import { mono_wasm_cancel_promise } from "./cancelable-promise"; import cwraps, { profiler_c_functions } from "./cwraps"; import { mono_wasm_send_dbg_command_with_parms, mono_wasm_send_dbg_command, mono_wasm_get_dbg_command_info, mono_wasm_get_details, mono_wasm_release_object, mono_wasm_call_function_on, mono_wasm_debugger_resume, mono_wasm_detach_debugger, mono_wasm_raise_debug_event, mono_wasm_change_debugger_log_level, mono_wasm_debugger_attached } from "./debug"; -import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_readable_stream_controller_enqueue, http_wasm_readable_stream_controller_error, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http"; +import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_create_transform_stream, http_wasm_transform_stream_write, http_wasm_transform_stream_close, http_wasm_transform_stream_abort, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http"; import { exportedRuntimeAPI, Module, runtimeHelpers } from "./globals"; import { get_property, set_property, has_property, get_typeof_property, get_global_this, dynamic_import } from "./invoke-js"; import { mono_wasm_stringify_as_error_with_stack } from "./logging"; @@ -69,8 +69,10 @@ export function export_internal(): any { http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, - http_wasm_readable_stream_controller_enqueue, - http_wasm_readable_stream_controller_error, + http_wasm_create_transform_stream, + http_wasm_transform_stream_write, + http_wasm_transform_stream_close, + http_wasm_transform_stream_abort, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, diff --git a/src/mono/wasm/runtime/http.ts b/src/mono/wasm/runtime/http.ts index fc76c02d8823bd..cdb60bd862ab08 100644 --- a/src/mono/wasm/runtime/http.ts +++ b/src/mono/wasm/runtime/http.ts @@ -2,10 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. import { wrap_as_cancelable_promise } from "./cancelable-promise"; -import { ENVIRONMENT_IS_NODE, Module, createPromiseController, loaderHelpers, mono_assert } from "./globals"; -import { ManagedObject, MemoryViewType, Span } from "./marshal"; +import { ENVIRONMENT_IS_NODE, Module, loaderHelpers, mono_assert } from "./globals"; +import { MemoryViewType, Span } from "./marshal"; import type { VoidPtr } from "./types/emscripten"; -import { ControllablePromise, PromiseController } from "./types/internal"; +import { ControllablePromise } from "./types/internal"; function verifyEnvironment() { @@ -25,7 +25,7 @@ export function http_wasm_supports_streaming_request(): boolean { // So, if that header is set, then we know the browser doesn't support streams in request objects, and we can exit early. // Safari does support streams in request objects, but doesn't allow them to be used with fetch, so the duplex option is tested, which Safari doesn't currently support. // See https://developer.chrome.com/articles/fetch-streaming-requests/ - if (typeof Request !== "undefined" && "body" in Request.prototype && typeof ReadableStream === "function") { + if (typeof Request !== "undefined" && "body" in Request.prototype && typeof ReadableStream === "function" && typeof TransformStream === "function") { let duplexAccessed = false; const hasContentType = new Request("", { body: new ReadableStream(), @@ -65,83 +65,45 @@ export function http_wasm_abort_response(res: ResponseExtension): void { } } -export function http_wasm_readable_stream_controller_enqueue(pull_state: PullStateExtension, bufferPtr: VoidPtr, bufferLength: number): void { - const controller = pull_state.__controller; - const pull_promise_control = pull_state.__pull_promise_control; - mono_assert(controller, "expected controller"); - mono_assert(pull_promise_control, "expected pull_promise_control"); - try { - if (bufferLength === 0) { - controller.close(); - pull_state.dispose(); - pull_state.__pull_promise_control = null; - pull_state.__controller = null; - } else { - // the bufferPtr is pinned by the caller - const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte); - // because https://github.com/WebAssembly/design/issues/1162 we need to copy the buffer - // also it doesn't make much sense to use byob - const copy = view.slice() as Uint8Array; - controller.enqueue(copy); - } - pull_promise_control.resolve(); - } - catch (err) { - pull_state.dispose(); - pull_promise_control.reject(err); - } - finally { - pull_state.__pull_promise_control = null; - } +export function http_wasm_create_transform_stream(): TransformStreamExtension { + const transform_stream = new TransformStream() as TransformStreamExtension; + transform_stream.__writer = transform_stream.writable.getWriter(); + return transform_stream; } -export function http_wasm_readable_stream_controller_error(pull_state: PullStateExtension, error: Error): void { - const controller = pull_state.__controller; - mono_assert(controller, "expected controller"); - pull_state.__pull_promise_control?.reject(error); - pull_state.__fetch_promise_control?.reject(error); - pull_state.dispose(); - controller.error(error); - pull_state.__pull_promise_control = null; +export function http_wasm_transform_stream_write(ts: TransformStreamExtension, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise { + mono_assert(bufferLength > 0, "expected bufferLength > 0"); + // the bufferPtr is pinned by the caller + const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte); + const copy = view.slice() as Uint8Array; + return wrap_as_cancelable_promise(async () => { + mono_assert(ts.__fetch_promise, "expected fetch promise"); + // race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250 + await Promise.race([ts.__writer.ready, ts.__fetch_promise]); + await Promise.race([ts.__writer.write(copy), ts.__fetch_promise]); + }); } -export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, - pull_delegate: (pull_state: PullStateExtension) => void, - pull_state: PullStateExtension): Promise { - function pull(controller: ReadableByteStreamController): Promise { - const { promise, promise_control } = createPromiseController(); - try { - mono_assert(!pull_state.__pull_promise_control, "expected pull_promise_control to be null"); - pull_state.__controller = controller; - pull_state.__pull_promise_control = promise_control; - pull_delegate(pull_state); - return promise; - } - catch (error) { - pull_state.dispose(); - pull_state.__controller = null; - pull_state.__pull_promise_control = null; - pull_state.__fetch_promise_control?.reject(error); - return Promise.reject(error); - } - } - - function cancel(error: any) { - pull_state.__fetch_promise_control?.reject(error); - } - - const body = new ReadableStream({ - type: "bytes", - pull, - cancel +export function http_wasm_transform_stream_close(ts: TransformStreamExtension): ControllablePromise { + return wrap_as_cancelable_promise(async () => { + mono_assert(ts.__fetch_promise, "expected fetch promise"); + // race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250 + await Promise.race([ts.__writer.ready, ts.__fetch_promise]); + await Promise.race([ts.__writer.close(), ts.__fetch_promise]); }); +} + +export function http_wasm_transform_stream_abort(ts: TransformStreamExtension): void { + ts.__writer.abort(); +} - const cancelable_promise = http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, body); - pull_state.__fetch_promise_control = loaderHelpers.getPromiseController(cancelable_promise); - return cancelable_promise; +export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: TransformStreamExtension): ControllablePromise { + const fetch_promise = http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, body.readable); + body.__fetch_promise = fetch_promise; + return fetch_promise; } -export function http_wasm_fetch_bytes(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, bodyPtr: VoidPtr, bodyLength: number): Promise { +export function http_wasm_fetch_bytes(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, bodyPtr: VoidPtr, bodyLength: number): ControllablePromise { // the bodyPtr is pinned by the caller const view = new Span(bodyPtr, bodyLength, MemoryViewType.Byte); const copy = view.slice() as Uint8Array; @@ -201,7 +163,7 @@ export function http_wasm_get_response_header_values(res: ResponseExtension): st return res.__headerValues; } -export function http_wasm_get_response_length(res: ResponseExtension): Promise { +export function http_wasm_get_response_length(res: ResponseExtension): ControllablePromise { return wrap_as_cancelable_promise(async () => { const buffer = await res.arrayBuffer(); res.__buffer = buffer; @@ -222,7 +184,7 @@ export function http_wasm_get_response_bytes(res: ResponseExtension, view: Span) return bytes_read; } -export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise { +export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise { // the bufferPtr is pinned by the caller const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte); return wrap_as_cancelable_promise(async () => { @@ -252,10 +214,9 @@ export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bu }); } -interface PullStateExtension extends ManagedObject { - __pull_promise_control: PromiseController | null - __fetch_promise_control: PromiseController | null - __controller: ReadableByteStreamController | null +interface TransformStreamExtension extends TransformStream { + __writer: WritableStreamDefaultWriter + __fetch_promise?: Promise } interface ResponseExtension extends Response {