Skip to content
This repository has been archived by the owner on Dec 9, 2023. It is now read-only.

Commit

Permalink
Fix bug in ProcessWrapper (#36)
Browse files Browse the repository at this point in the history
The `done` getter listened to the stdio streams, which opened up
the possibility of multiple listeners on a single-subscription
stream.
  • Loading branch information
tvolkert authored Nov 8, 2018
1 parent c6f3d9a commit 3951d05
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

#### 3.0.8

* Fixed bug in `ProcessWrapper`

#### 3.0.7

* Renamed `Process` to `ProcessWrapper`
Expand Down
51 changes: 40 additions & 11 deletions lib/src/interface/process_wrapper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,47 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io' as io;

/// A wrapper around an [io.Process] class that adds some convenience methods.
class ProcessWrapper implements io.Process {
/// Constructs a [ProcessWrapper] object that delegates to the specified
/// underlying object.
const ProcessWrapper(this.delegate);
ProcessWrapper(this._delegate)
: _stdout = new StreamController<List<int>>(),
_stderr = new StreamController<List<int>>(),
_stdoutDone = new Completer<void>(),
_stderrDone = new Completer<void>() {
_monitorStdioStream(_delegate.stdout, _stdout, _stdoutDone);
_monitorStdioStream(_delegate.stderr, _stderr, _stderrDone);
}

final io.Process _delegate;
final StreamController<List<int>> _stdout;
final StreamController<List<int>> _stderr;
final Completer<void> _stdoutDone;
final Completer<void> _stderrDone;

final io.Process delegate;
/// Listens to the specified [stream], repeating events on it via
/// [controller], and completing [completer] once the stream is done.
void _monitorStdioStream(
Stream<List<int>> stream,
StreamController<List<int>> controller,
Completer<void> completer,
) {
stream.listen(
controller.add,
onError: controller.addError,
onDone: () {
controller.close;
completer.complete();
},
);
}

@override
Future<int> get exitCode => delegate.exitCode;
Future<int> get exitCode => _delegate.exitCode;

/// A [Future] that completes when the process has exited and its standard
/// output and error streams have closed.
Expand All @@ -26,9 +55,9 @@ class ProcessWrapper implements io.Process {
Future<int> get done async {
int result;
await Future.wait<void>(<Future<void>>[
delegate.stdout.length,
delegate.stderr.length,
delegate.exitCode.then((int value) {
_stdoutDone.future,
_stderrDone.future,
_delegate.exitCode.then((int value) {
result = value;
}),
]);
Expand All @@ -38,18 +67,18 @@ class ProcessWrapper implements io.Process {

@override
bool kill([io.ProcessSignal signal = io.ProcessSignal.sigterm]) {
return delegate.kill(signal);
return _delegate.kill(signal);
}

@override
int get pid => delegate.pid;
int get pid => _delegate.pid;

@override
Stream<List<int>> get stderr => delegate.stderr;
io.IOSink get stdin => _delegate.stdin;

@override
io.IOSink get stdin => delegate.stdin;
Stream<List<int>> get stdout => _stdout.stream;

@override
Stream<List<int>> get stdout => delegate.stdout;
Stream<List<int>> get stderr => _stderr.stream;
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: process
version: 3.0.7
version: 3.0.8
authors:
- Todd Volkert <tvolkert@google.com>
- Michael Goderbauer <goderbauer@google.com>
Expand Down
34 changes: 27 additions & 7 deletions test/src/interface/process_wrapper_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io' as io;

import 'package:process/process.dart';
import 'package:test/test.dart';

void main() {
group('done', () {
test('completes only when all done', () async {
TestProcess delegate = new TestProcess();
ProcessWrapper process = new ProcessWrapper(delegate);
bool done = false;
TestProcess delegate;
ProcessWrapper process;
bool done;

setUp(() {
delegate = TestProcess();
process = ProcessWrapper(delegate);
done = false;
// ignore: unawaited_futures
process.done.then((int result) {
done = true;
});
});

test('completes only when all done', () async {
expect(done, isFalse);
delegate.exitCodeCompleter.complete(0);
await Future<void>.value();
Expand All @@ -30,14 +38,26 @@ void main() {
expect(done, isTrue);
expect(await process.exitCode, 0);
});

test('works in conjunction with subscribers to stdio streams', () async {
process.stdout
.transform<String>(utf8.decoder)
.transform<String>(const LineSplitter())
.listen(print);
delegate.exitCodeCompleter.complete(0);
await delegate.stdoutController.close();
await delegate.stderrController.close();
await Future<void>.value();
expect(done, isTrue);
});
});
}

class TestProcess implements io.Process {
TestProcess([this.pid = 123])
: exitCodeCompleter = new Completer<int>(),
stdoutController = new StreamController<List<int>>(),
stderrController = new StreamController<List<int>>();
: exitCodeCompleter = Completer<int>(),
stdoutController = StreamController<List<int>>(),
stderrController = StreamController<List<int>>();

@override
final int pid;
Expand Down

0 comments on commit 3951d05

Please sign in to comment.