Skip to content

Commit 2c84e63

Browse files
authored
[ Cocoon ] Wait for task results to be received by the task runner before shutting down the task process (#156002)
Prior to this fix, `_TaskRunner.run` would immediately cleanup the keep-alive port once the task completed, which would result in the isolate shutting down as soon as the task result was returned from `ext.cocoonRunTask` callback in the form of a `ServiceExtensionResponse`. Since the service extension response is processed by the service isolate, it was possible for the VM to start shutting down before the service isolate could send the task result data back to the task runner. This change introduces a new service extension, `ext.cocoonTaskResultReceived`, that the task runner invokes after it receives the task result from `ext.cocoonRunTask`, notifying the task process that it can close the keep-alive port and shutdown. Fixes flutter/flutter#155475
1 parent d877d28 commit 2c84e63

File tree

3 files changed

+79
-36
lines changed

3 files changed

+79
-36
lines changed

dev/devicelab/lib/framework/framework.dart

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ Future<TaskResult> task(TaskFunction task, { ProcessManager? processManager }) a
6666

6767
class _TaskRunner {
6868
_TaskRunner(this.task, this.processManager) {
69+
final String successResponse = json.encode(
70+
const <String, String>{
71+
'result': 'success',
72+
},
73+
);
74+
6975
registerExtension('ext.cocoonRunTask',
7076
(String method, Map<String, String> parameters) async {
7177
final Duration? taskTimeout = parameters.containsKey('timeoutInMinutes')
@@ -82,11 +88,25 @@ class _TaskRunner {
8288
localEngine: localEngine,
8389
localEngineHost: localEngineHost,
8490
);
91+
const Duration taskResultReceivedTimeout = Duration(seconds: 30);
92+
_taskResultReceivedTimeout = Timer(
93+
taskResultReceivedTimeout,
94+
() {
95+
logger.severe('Task runner did not acknowledge task results in $taskResultReceivedTimeout.');
96+
_closeKeepAlivePort();
97+
exitCode = 1;
98+
}
99+
);
85100
return ServiceExtensionResponse.result(json.encode(result.toJson()));
86101
});
87102
registerExtension('ext.cocoonRunnerReady',
88103
(String method, Map<String, String> parameters) async {
89-
return ServiceExtensionResponse.result('"ready"');
104+
return ServiceExtensionResponse.result(successResponse);
105+
});
106+
registerExtension('ext.cocoonTaskResultReceived',
107+
(String method, Map<String, String> parameters) async {
108+
_closeKeepAlivePort();
109+
return ServiceExtensionResponse.result(successResponse);
90110
});
91111
}
92112

@@ -104,6 +124,7 @@ class _TaskRunner {
104124
// TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797
105125
RawReceivePort? _keepAlivePort;
106126
Timer? _startTaskTimeout;
127+
Timer? _taskResultReceivedTimeout;
107128
bool _taskStarted = false;
108129

109130
final Completer<TaskResult> _completer = Completer<TaskResult>();
@@ -210,7 +231,6 @@ class _TaskRunner {
210231
} finally {
211232
await checkForRebootRequired();
212233
await forceQuitRunningProcesses();
213-
_closeKeepAlivePort();
214234
}
215235
}
216236

@@ -269,6 +289,7 @@ class _TaskRunner {
269289
/// Disables the keepalive port, allowing the VM to exit.
270290
void _closeKeepAlivePort() {
271291
_startTaskTimeout?.cancel();
292+
_taskResultReceivedTimeout?.cancel();
272293
_keepAlivePort?.close();
273294
}
274295

dev/devicelab/lib/framework/runner.dart

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'dart:io';
88

99
import 'package:meta/meta.dart';
1010
import 'package:vm_service/vm_service.dart';
11+
import 'package:vm_service/vm_service_io.dart';
1112

1213
import 'cocoon.dart';
1314
import 'devices.dart';
@@ -237,11 +238,19 @@ Future<TaskResult> runTask(
237238
print('[$taskName] Connected to VM server.');
238239
isolateParams = isolateParams == null ? <String, String>{} : Map<String, String>.of(isolateParams);
239240
isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString();
240-
final Map<String, dynamic> taskResultJson = (await result.vmService.callServiceExtension(
241+
final VmService service = result.vmService;
242+
final String isolateId = result.isolate.id!;
243+
final Map<String, dynamic> taskResultJson = (await service.callServiceExtension(
241244
'ext.cocoonRunTask',
242245
args: isolateParams,
243-
isolateId: result.isolate.id,
246+
isolateId: isolateId,
244247
)).json!;
248+
// Notify the task process that the task result has been received and it
249+
// can proceed to shutdown.
250+
await _acknowledgeTaskResultReceived(
251+
service: service,
252+
isolateId: isolateId,
253+
);
245254
final TaskResult taskResult = TaskResult.fromJson(taskResultJson);
246255
final int exitCode = await runner.exitCode;
247256
print('[$taskName] Process terminated with exit code $exitCode.');
@@ -270,9 +279,6 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
270279

271280
while (true) {
272281
try {
273-
// Make sure VM server is up by successfully opening and closing a socket.
274-
await (await WebSocket.connect(url)).close();
275-
276282
// Look up the isolate.
277283
final VmService client = await vmServiceConnectUri(url);
278284
VM vm = await client.getVM();
@@ -281,8 +287,9 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
281287
vm = await client.getVM();
282288
}
283289
final IsolateRef isolate = vm.isolates!.first;
290+
// Sanity check to ensure we're talking with the main isolate.
284291
final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id);
285-
if (response.json!['response'] != 'ready') {
292+
if (response.json!['result'] != 'success') {
286293
throw 'not ready yet';
287294
}
288295
return ConnectionResult(client, isolate);
@@ -301,37 +308,23 @@ Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
301308
}
302309
}
303310

311+
Future<void> _acknowledgeTaskResultReceived({
312+
required VmService service,
313+
required String isolateId,
314+
}) async {
315+
try {
316+
await service.callServiceExtension(
317+
'ext.cocoonTaskResultReceived',
318+
isolateId: isolateId,
319+
);
320+
} on RPCError {
321+
// The target VM may shutdown before the response is received.
322+
}
323+
}
324+
304325
class ConnectionResult {
305326
ConnectionResult(this.vmService, this.isolate);
306327

307328
final VmService vmService;
308329
final IsolateRef isolate;
309330
}
310-
311-
/// The cocoon client sends an invalid VM service response, we need to intercept it.
312-
Future<VmService> vmServiceConnectUri(String wsUri, {Log? log}) async {
313-
final WebSocket socket = await WebSocket.connect(wsUri);
314-
final StreamController<dynamic> controller = StreamController<dynamic>();
315-
final Completer<dynamic> streamClosedCompleter = Completer<dynamic>();
316-
socket.listen(
317-
(dynamic data) {
318-
final Map<String, dynamic> rawData = json.decode(data as String) as Map<String, dynamic> ;
319-
if (rawData['result'] == 'ready') {
320-
rawData['result'] = <String, dynamic>{'response': 'ready'};
321-
controller.add(json.encode(rawData));
322-
} else {
323-
controller.add(data);
324-
}
325-
},
326-
onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace),
327-
onDone: () => streamClosedCompleter.complete(),
328-
);
329-
330-
return VmService(
331-
controller.stream,
332-
(String message) => socket.add(message),
333-
log: log,
334-
disposeHandler: () => socket.close(),
335-
streamClosed: streamClosedCompleter.future,
336-
);
337-
}

dev/devicelab/test/runner_test.dart

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// found in the LICENSE file.
44

55
import 'package:flutter_devicelab/framework/runner.dart';
6+
import 'package:vm_service/vm_service.dart';
67

78
import 'common.dart';
89

@@ -40,5 +41,33 @@ void main() {
4041
expect(printLog[0], 'Consistently failed across all 3 executions.');
4142
expect(printLog[1], 'flaky: false');
4243
});
44+
45+
test('Ensures task results are received before task process shuts down.', () async {
46+
// Regression test for https://github.com/flutter/flutter/issues/155475
47+
//
48+
// Runs multiple concurrent instances of a short-lived task in an effort to
49+
// trigger the race between the VM service processing the response from
50+
// ext.cocoonRunTask and the VM shutting down, which will throw a RPCError
51+
// with a "Service connection disposed" message.
52+
//
53+
// Obviously this isn't foolproof, but this test becoming flaky or failing
54+
// consistently should signal that we're encountering a shutdown race
55+
// somewhere.
56+
const int runs = 30;
57+
try {
58+
await Future.wait(
59+
<Future<void>>[
60+
for (int i = 0; i < runs; ++i)
61+
runTasks(
62+
<String>['smoke_test_success'],
63+
isolateParams: isolateParams,
64+
),
65+
],
66+
eagerError: true,
67+
);
68+
} on RPCError catch (e) {
69+
fail('Unexpected RPCError: $e');
70+
}
71+
}, timeout: const Timeout.factor(2));
4372
});
4473
}

0 commit comments

Comments
 (0)