diff --git a/src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs b/src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs index 8474a2dc21cdc1..ed40b0f1049a4b 100644 --- a/src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs +++ b/src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs @@ -8,6 +8,7 @@ using System.Net.WebSockets; using System.Text; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -24,7 +25,8 @@ internal class DevToolsProxy private ClientWebSocket browser; private WebSocket ide; private int next_cmd_id; - private List pending_ops = new List(); + private readonly ChannelWriter _channelWriter; + private readonly ChannelReader _channelReader; private List queues = new List(); protected readonly ILogger logger; @@ -32,6 +34,10 @@ internal class DevToolsProxy public DevToolsProxy(ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); + + var channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _channelWriter = channel.Writer; + _channelReader = channel.Reader; } protected virtual Task AcceptEvent(SessionId sessionId, string method, JObject args, CancellationToken token) @@ -82,7 +88,7 @@ private DevToolsQueue GetQueueForTask(Task task) return queues.FirstOrDefault(q => q.CurrentSend == task); } - private void Send(WebSocket to, JObject o, CancellationToken token) + private async Task Send(WebSocket to, JObject o, CancellationToken token) { string sender = browser == to ? "Send-browser" : "Send-ide"; @@ -95,7 +101,7 @@ private void Send(WebSocket to, JObject o, CancellationToken token) Task task = queue.Send(bytes, token); if (task != null) - pending_ops.Add(task); + await _channelWriter.WriteAsync(task, token); } private async Task OnEvent(SessionId sessionId, string method, JObject args, CancellationToken token) @@ -105,7 +111,7 @@ private async Task OnEvent(SessionId sessionId, string method, JObject args, Can if (!await AcceptEvent(sessionId, method, args, token)) { //logger.LogDebug ("proxy browser: {0}::{1}",method, args); - SendEventInternal(sessionId, method, args, token); + await SendEventInternal(sessionId, method, args, token); } } catch (Exception e) @@ -121,7 +127,7 @@ private async Task OnCommand(MessageId id, string method, JObject args, Cancella if (!await AcceptCommand(id, method, args, token)) { Result res = await SendCommandInternal(id, method, args, token); - SendResponseInternal(id, res, token); + await SendResponseInternal(id, res, token); } } catch (Exception e) @@ -142,7 +148,7 @@ private void OnResponse(MessageId id, Result result) logger.LogError("Cannot respond to command: {id} with result: {result} - command is not pending", id, result); } - private void ProcessBrowserMessage(string msg, CancellationToken token) + private Task ProcessBrowserMessage(string msg, CancellationToken token) { var res = JObject.Parse(msg); @@ -151,23 +157,30 @@ private void ProcessBrowserMessage(string msg, CancellationToken token) Log("protocol", $"browser: {msg}"); if (res["id"] == null) - pending_ops.Add(OnEvent(res.ToObject(), res["method"].Value(), res["params"] as JObject, token)); + { + return OnEvent(res.ToObject(), res["method"].Value(), res["params"] as JObject, token); + } else + { OnResponse(res.ToObject(), Result.FromJson(res)); + return null; + } } - private void ProcessIdeMessage(string msg, CancellationToken token) + private Task ProcessIdeMessage(string msg, CancellationToken token) { Log("protocol", $"ide: {msg}"); if (!string.IsNullOrEmpty(msg)) { var res = JObject.Parse(msg); var id = res.ToObject(); - pending_ops.Add(OnCommand( + return OnCommand( id, res["method"].Value(), - res["params"] as JObject, token)); + res["params"] as JObject, token); } + + return null; } internal async Task SendCommand(SessionId id, string method, JObject args, CancellationToken token) @@ -176,7 +189,7 @@ internal async Task SendCommand(SessionId id, string method, JObject arg return await SendCommandInternal(id, method, args, token); } - private Task SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token) + private async Task SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token) { int id = Interlocked.Increment(ref next_cmd_id); @@ -194,17 +207,17 @@ private Task SendCommandInternal(SessionId sessionId, string method, JOb //Log ("verbose", $"add cmd id {sessionId}-{id}"); pending_cmds[msgId] = tcs; - Send(this.browser, o, token); - return tcs.Task; + await Send(browser, o, token); + return await tcs.Task; } - public void SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token) + public Task SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token) { //Log ("verbose", $"sending event {method}: {args}"); - SendEventInternal(sessionId, method, args, token); + return SendEventInternal(sessionId, method, args, token); } - private void SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token) + private Task SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token) { var o = JObject.FromObject(new { @@ -214,7 +227,7 @@ private void SendEventInternal(SessionId sessionId, string method, JObject args, if (sessionId.sessionId != null) o["sessionId"] = sessionId.sessionId; - Send(this.ide, o, token); + return Send(ide, o, token); } internal void SendResponse(MessageId id, Result result, CancellationToken token) @@ -222,13 +235,13 @@ internal void SendResponse(MessageId id, Result result, CancellationToken token) SendResponseInternal(id, result, token); } - private void SendResponseInternal(MessageId id, Result result, CancellationToken token) + private Task SendResponseInternal(MessageId id, Result result, CancellationToken token) { JObject o = result.ToJObject(id); if (result.IsErr) logger.LogError($"sending error response for id: {id} -> {result}"); - Send(this.ide, o, token); + return Send(this.ide, o, token); } // , HttpContext context) @@ -248,10 +261,14 @@ public async Task Run(Uri browserUri, WebSocket ideSocket) Log("verbose", $"DevToolsProxy: Client connected on {browserUri}"); var x = new CancellationTokenSource(); + List pending_ops = new(); + pending_ops.Add(ReadOne(browser, x.Token)); pending_ops.Add(ReadOne(ide, x.Token)); pending_ops.Add(side_exception.Task); pending_ops.Add(client_initiated_close.Task); + Task readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask(); + pending_ops.Add(readerTask); try { @@ -268,6 +285,16 @@ public async Task Run(Uri browserUri, WebSocket ideSocket) break; } + if (readerTask.IsCompleted) + { + while (_channelReader.TryRead(out Task newTask)) + { + pending_ops.Add(newTask); + } + + pending_ops[4] = _channelReader.WaitToReadAsync(x.Token).AsTask(); + } + //logger.LogTrace ("pump {0} {1}", task, pending_ops.IndexOf (task)); if (task == pending_ops[0]) { @@ -275,7 +302,9 @@ public async Task Run(Uri browserUri, WebSocket ideSocket) if (msg != null) { pending_ops[0] = ReadOne(browser, x.Token); //queue next read - ProcessBrowserMessage(msg, x.Token); + Task newTask = ProcessBrowserMessage(msg, x.Token); + if (newTask != null) + pending_ops.Add(newTask); } } else if (task == pending_ops[1]) @@ -284,7 +313,9 @@ public async Task Run(Uri browserUri, WebSocket ideSocket) if (msg != null) { pending_ops[1] = ReadOne(ide, x.Token); //queue next read - ProcessIdeMessage(msg, x.Token); + Task newTask = ProcessIdeMessage(msg, x.Token); + if (newTask != null) + pending_ops.Add(newTask); } } else if (task == pending_ops[2]) @@ -304,10 +335,13 @@ public async Task Run(Uri browserUri, WebSocket ideSocket) } } } + + _channelWriter.Complete(); } catch (Exception e) { Log("error", $"DevToolsProxy::Run: Exception {e}"); + _channelWriter.Complete(e); //throw; } finally diff --git a/src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs b/src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs index 95a86902d1e02e..d7ea025700262d 100644 --- a/src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs +++ b/src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs @@ -111,7 +111,7 @@ protected override async Task AcceptEvent(SessionId sessionId, string meth case "Runtime.executionContextCreated": { - SendEvent(sessionId, method, args, token); + await SendEvent(sessionId, method, args, token); JToken ctx = args?["context"]; var aux_data = ctx?["auxData"] as JObject; int id = ctx["id"].Value(); @@ -831,7 +831,7 @@ private async Task SendCallStack(SessionId sessionId, ExecutionContext con await SendCommand(sessionId, "Debugger.resume", new JObject(), token); return true; } - SendEvent(sessionId, "Debugger.paused", o, token); + await SendEvent(sessionId, "Debugger.paused", o, token); return true; } @@ -940,7 +940,7 @@ internal async Task LoadSymbolsOnDemand(AssemblyInfo asm, int method foreach (SourceFile source in asm.Sources) { var scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData)); - SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token); + await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token); } return asm.GetMethodByToken(method_token); } @@ -1171,7 +1171,7 @@ private async Task OnSourceFileAdded(SessionId sessionId, SourceFile source, Exe { JObject scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData)); Log("debug", $"sending {source.Url} {context.Id} {sessionId.sessionId}"); - SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token); + await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token); foreach (var req in context.BreakpointRequests.Values) { @@ -1253,7 +1253,7 @@ private async Task RuntimeReady(SessionId sessionId, CancellationTok DebugStore store = await LoadStore(sessionId, token); context.ready.SetResult(store); - SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token); + await SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token); SdbHelper.ResetStore(store); return store; } @@ -1340,7 +1340,7 @@ private async Task SetBreakpoint(SessionId sessionId, DebugStore store, Breakpoi }; if (sendResolvedEvent) - SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token); + await SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token); } req.Locations.AddRange(breakpoints);