diff --git a/CHANGELOG.md b/CHANGELOG.md index 0af09f4..917a936 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ +#### 3.0.8 + +* Fixed bug in `ProcessWrapper` + #### 3.0.7 * Renamed `Process` to `ProcessWrapper` diff --git a/lib/src/interface/process_wrapper.dart b/lib/src/interface/process_wrapper.dart index ff109ba..2162637 100644 --- a/lib/src/interface/process_wrapper.dart +++ b/lib/src/interface/process_wrapper.dart @@ -2,18 +2,47 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io' as io; /// A wrapper around an [io.Process] class that adds some convenience methods. class ProcessWrapper implements io.Process { /// Constructs a [ProcessWrapper] object that delegates to the specified /// underlying object. - const ProcessWrapper(this.delegate); + ProcessWrapper(this._delegate) + : _stdout = new StreamController>(), + _stderr = new StreamController>(), + _stdoutDone = new Completer(), + _stderrDone = new Completer() { + _monitorStdioStream(_delegate.stdout, _stdout, _stdoutDone); + _monitorStdioStream(_delegate.stderr, _stderr, _stderrDone); + } + + final io.Process _delegate; + final StreamController> _stdout; + final StreamController> _stderr; + final Completer _stdoutDone; + final Completer _stderrDone; - final io.Process delegate; + /// Listens to the specified [stream], repeating events on it via + /// [controller], and completing [completer] once the stream is done. + void _monitorStdioStream( + Stream> stream, + StreamController> controller, + Completer completer, + ) { + stream.listen( + controller.add, + onError: controller.addError, + onDone: () { + controller.close; + completer.complete(); + }, + ); + } @override - Future get exitCode => delegate.exitCode; + Future get exitCode => _delegate.exitCode; /// A [Future] that completes when the process has exited and its standard /// output and error streams have closed. @@ -26,9 +55,9 @@ class ProcessWrapper implements io.Process { Future get done async { int result; await Future.wait(>[ - delegate.stdout.length, - delegate.stderr.length, - delegate.exitCode.then((int value) { + _stdoutDone.future, + _stderrDone.future, + _delegate.exitCode.then((int value) { result = value; }), ]); @@ -38,18 +67,18 @@ class ProcessWrapper implements io.Process { @override bool kill([io.ProcessSignal signal = io.ProcessSignal.sigterm]) { - return delegate.kill(signal); + return _delegate.kill(signal); } @override - int get pid => delegate.pid; + int get pid => _delegate.pid; @override - Stream> get stderr => delegate.stderr; + io.IOSink get stdin => _delegate.stdin; @override - io.IOSink get stdin => delegate.stdin; + Stream> get stdout => _stdout.stream; @override - Stream> get stdout => delegate.stdout; + Stream> get stderr => _stderr.stream; } diff --git a/pubspec.yaml b/pubspec.yaml index ba8a66f..45b360d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: process -version: 3.0.7 +version: 3.0.8 authors: - Todd Volkert - Michael Goderbauer diff --git a/test/src/interface/process_wrapper_test.dart b/test/src/interface/process_wrapper_test.dart index b3f24ad..8799e66 100644 --- a/test/src/interface/process_wrapper_test.dart +++ b/test/src/interface/process_wrapper_test.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; +import 'dart:convert'; import 'dart:io' as io; import 'package:process/process.dart'; @@ -10,14 +11,21 @@ import 'package:test/test.dart'; void main() { group('done', () { - test('completes only when all done', () async { - TestProcess delegate = new TestProcess(); - ProcessWrapper process = new ProcessWrapper(delegate); - bool done = false; + TestProcess delegate; + ProcessWrapper process; + bool done; + + setUp(() { + delegate = TestProcess(); + process = ProcessWrapper(delegate); + done = false; // ignore: unawaited_futures process.done.then((int result) { done = true; }); + }); + + test('completes only when all done', () async { expect(done, isFalse); delegate.exitCodeCompleter.complete(0); await Future.value(); @@ -30,14 +38,26 @@ void main() { expect(done, isTrue); expect(await process.exitCode, 0); }); + + test('works in conjunction with subscribers to stdio streams', () async { + process.stdout + .transform(utf8.decoder) + .transform(const LineSplitter()) + .listen(print); + delegate.exitCodeCompleter.complete(0); + await delegate.stdoutController.close(); + await delegate.stderrController.close(); + await Future.value(); + expect(done, isTrue); + }); }); } class TestProcess implements io.Process { TestProcess([this.pid = 123]) - : exitCodeCompleter = new Completer(), - stdoutController = new StreamController>(), - stderrController = new StreamController>(); + : exitCodeCompleter = Completer(), + stdoutController = StreamController>(), + stderrController = StreamController>(); @override final int pid;