Skip to content

Commit

Permalink
bugfix: Properly exit a process.
Browse files Browse the repository at this point in the history
This requires that onExit() is not called immediately upon receiving a
SIGCHLD. There could still be data in the pipez. So, instead just set a
flag and invoke the pipe watchers.

Sometimes one will not receive an EOF from pipes because the process was
killed by a SIGTERM, or something. If SIGCHLD has been recved but we are
getting EAGAIN, the pipez need to be closed too.
  • Loading branch information
ry committed Jun 24, 2009
1 parent 0e67b34 commit 7363ccd
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 32 deletions.
106 changes: 77 additions & 29 deletions src/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ using namespace node;
#define ON_EXIT_SYMBOL String::NewSymbol("onExit")
#define PID_SYMBOL String::NewSymbol("pid")

/* defines for the parent side */
#define STDOUT_CLOSED (stdout_pipe_[0] < 0)
#define STDERR_CLOSED (stderr_pipe_[0] < 0)
#define STDIN_CLOSED (stdin_pipe_[1] < 0)


Persistent<FunctionTemplate> Process::constructor_template;

void
Expand Down Expand Up @@ -119,11 +125,7 @@ Process::Write (const Arguments& args)

} else return ThrowException(String::New("Bad argument"));

if (process->Write(buf) != 0) {
return ThrowException(String::New("Pipe already closed"));
}

return Undefined();
return process->Write(buf) == 0 ? True() : False();
}

Handle<Value>
Expand All @@ -149,12 +151,7 @@ Process::Close (const Arguments& args)
HandleScope scope;
Process *process = NODE_UNWRAP(Process, args.Holder());
assert(process);

if (process->Close() != 0) {
return ThrowException(String::New("Pipe already closed."));
}

return Undefined();
return process->Close() == 0 ? True() : False();
}

Process::Process (Handle<Object> handle)
Expand All @@ -169,7 +166,7 @@ Process::Process (Handle<Object> handle)
ev_init(&stdin_watcher_, Process::OnWritable);
stdin_watcher_.data = this;

ev_init(&child_watcher_, Process::OnExit);
ev_init(&child_watcher_, Process::OnCHLD);
child_watcher_.data = this;

stdout_pipe_[0] = -1;
Expand All @@ -180,6 +177,8 @@ Process::Process (Handle<Object> handle)
stdin_pipe_[1] = -1;

got_close_ = false;
got_chld_ = false;
exit_code_ = 0;

pid_ = 0;

Expand Down Expand Up @@ -342,7 +341,18 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents)
r = read(fd, buf, buf_size);

if (r < 0) {
if (errno != EAGAIN) perror("IPC pipe read error");
if (errno != EAGAIN) {
perror("IPC pipe read error");
} else {
if (process->got_chld_) {
close(fd);
if (is_stdout) {
process->stdout_pipe_[0] = -1;
} else {
process->stderr_pipe_[0] = -1;
}
}
}
break;
}

Expand All @@ -364,9 +374,16 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents)

if (r == 0) {
ev_io_stop(EV_DEFAULT_UC_ watcher);
close(fd);
if (is_stdout) {
process->stdout_pipe_[0] = -1;
} else {
process->stderr_pipe_[0] = -1;
}
break;
}
}
process->MaybeShutdown();
}

void
Expand All @@ -387,7 +404,13 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
, to_write->len - to_write->written
);
if (sent < 0) {
if (errno == EAGAIN) break;
if (errno == EAGAIN) {
if (process->got_chld_) {
close(process->stdin_pipe_[1]);
process->stdin_pipe_[1] = -1;
}
break;
}
perror("IPC pipe write error");
break;
}
Expand All @@ -410,7 +433,7 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
}

void
Process::OnExit (EV_P_ ev_child *watcher, int revents)
Process::OnCHLD (EV_P_ ev_child *watcher, int revents)
{
ev_child_stop(EV_A_ watcher);
Process *process = static_cast<Process*>(watcher->data);
Expand All @@ -419,34 +442,39 @@ Process::OnExit (EV_P_ ev_child *watcher, int revents)
assert(process->pid_ == watcher->rpid);
assert(&process->child_watcher_ == watcher);

// Call onExit ( watcher->rstatus )
HandleScope scope;
Handle<Value> callback_v = process->handle_->Get(ON_EXIT_SYMBOL);
process->got_chld_ = true;
process->exit_code_ = watcher->rstatus;

if (callback_v->IsFunction()) {
Handle<Function> callback = Handle<Function>::Cast(callback_v);
TryCatch try_catch;
Handle<Value> argv[1] = { Integer::New(watcher->rstatus) };
callback->Call(process->handle_, 1, argv);
if (try_catch.HasCaught()) FatalException(try_catch);
if (process->stdout_pipe_[0] >= 0) {
ev_feed_event(&process->stdout_watcher_, EV_READ);
}

if (process->stderr_pipe_[0] >= 0) {
ev_feed_event(&process->stderr_watcher_, EV_READ);
}
process->Shutdown();

if (process->stdin_pipe_[1] >= 0) {
ev_io_start(EV_DEFAULT_UC_ &process->stdin_watcher_);
ev_feed_event(&process->stdin_watcher_, EV_WRITE);
}

process->MaybeShutdown();
}

int
Process::Write (oi_buf *buf)
{
if (stdin_pipe_[1] < 0 || got_close_) return -1;
if (STDIN_CLOSED || got_close_ || got_chld_) return -1;
oi_queue_insert_head(&out_stream_, &buf->queue);
buf->written = 0;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0;
}

int
Process::Close ()
Process::Close (void)
{
if (stdin_pipe_[1] < 0 || got_close_) return -1;
if (STDIN_CLOSED || got_close_ || got_chld_) return -1;
got_close_ = true;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0;
Expand All @@ -455,6 +483,26 @@ Process::Close ()
int
Process::Kill (int sig)
{
if (pid_ == 0) return -1;
if (got_chld_ || pid_ == 0) return -1;
return kill(pid_, sig);
}

int
Process::MaybeShutdown (void)
{
if (STDOUT_CLOSED && STDERR_CLOSED && got_chld_) {
// Call onExit
HandleScope scope;
Handle<Value> callback_v = handle_->Get(ON_EXIT_SYMBOL);

if (callback_v->IsFunction()) {
Handle<Function> callback = Handle<Function>::Cast(callback_v);
TryCatch try_catch;
Handle<Value> argv[1] = { Integer::New(exit_code_) };
callback->Call(handle_, 1, argv);
if (try_catch.HasCaught()) FatalException(try_catch);
}

Shutdown();
}
}
10 changes: 7 additions & 3 deletions src/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ class Process : ObjectWrap {
Process(v8::Handle<v8::Object> handle);
~Process();

void Shutdown ();
int Spawn (const char *command);
int Write (oi_buf *buf);
int Close ();
int Close (void);
int Kill (int sig);

private:
static void OnOutput (EV_P_ ev_io *watcher, int revents);
static void OnError (EV_P_ ev_io *watcher, int revents);
static void OnWritable (EV_P_ ev_io *watcher, int revents);
static void OnExit (EV_P_ ev_child *watcher, int revents);
static void OnCHLD (EV_P_ ev_child *watcher, int revents);

int MaybeShutdown (void);
void Shutdown (void);

ev_io stdout_watcher_;
ev_io stderr_watcher_;
Expand All @@ -48,6 +50,8 @@ class Process : ObjectWrap {
pid_t pid_;

bool got_close_;
bool got_chld_;
int exit_code_;

oi_queue out_stream_;
};
Expand Down
29 changes: 29 additions & 0 deletions test/mjsunit/test-process-buffering.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
include("mjsunit.js");

var pwd_called = false;

function pwd (callback) {
var output = "";
var process = new node.Process("pwd");
process.onOutput = function (s) {
if (s) output += s;
};
process.onExit = function(c) {
assertEquals(0, c);
callback(output);
pwd_called = true;
};
}


function onLoad () {
pwd(function (result) {
print(result);
assertTrue(result.length > 1);
assertEquals("\n", result[result.length-1]);
});
}

function onExit () {
assertTrue(pwd_called);
}

0 comments on commit 7363ccd

Please sign in to comment.