Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle abrupt terminations #30

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)

func execute(workdir string, args []string) error {
done := make(chan struct{})

sigs := make(chan os.Signal, 1)
input := make(chan Packet, 1)
outputDemand := make(chan Packet)
inputDemand := make(chan Packet)
Expand All @@ -20,8 +23,17 @@ func execute(workdir string, args []string) error {
logger.Printf("Command path: %v\n", proc.Path)

output := startCommandPipeline(proc, input, inputDemand, outputDemand)

// Capture common signals.
// Setting notify for SIGPIPE is important to capture and without that
// we won't be able to handle abrupt beam vm terminations
// Also, SIGPIPE behaviour in golang is bit complex,
// see: https://pkg.go.dev/os/signal@go1.22.4#hdr-SIGPIPE
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)

// go handleSignals(input, outputDemand, done)
go dispatchStdin(input, outputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done)

// wait for pipline to exit
<-done
Expand All @@ -33,8 +45,8 @@ func execute(workdir string, args []string) error {
logger.Printf("Command exited with error: %v\n", e)
os.Exit(3)
}
// TODO: return Stderr and exit stauts to beam process
logger.Printf("Command exited: %#v\n", err)
// TODO: return Stderr and exit status to beam process
logger.Printf("Command exited\n")
return err
}

Expand All @@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st
stdinReader(dispatch, done)
}

func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) {
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) {
defer func() {
close(done)
}()

merged := func() (Packet, bool) {
select {
case sig := <-sigs:
logger.Printf("Received OS Signal: ", sig)
return Packet{}, false

case v, ok := <-inputDemand:
return v, ok

case v, ok := <-output:
return v, ok
}
Expand Down
5 changes: 4 additions & 1 deletion go_src/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func writePacket(tag uint8, data []byte) {
_, writeErr := os.Stdout.Write(buf[:payloadLen+4])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
// ignore broken pipe or closed pipe errors here.
// currently readCommandStdout closes output chan, making the
// flow break.
case *os.PathError:
logger.Printf("os.PathError: ", writeErr)
return
default:
fatal(writeErr)
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
%{
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"},
"credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
Expand Down
80 changes: 80 additions & 0 deletions test/ex_cmd_exit_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule ExCmdExitTest do
use ExUnit.Case, async: false

test "if it kills external command on abnormal vm exit" do

Check failure on line 4 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Windows OTP 25 / Elixir 1.14

test if it kills external command on abnormal vm exit (ExCmdExitTest)

Check failure on line 4 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Windows OTP 25 / Elixir 1.14

test if it kills external command on abnormal vm exit (ExCmdExitTest)
ex_cmd_expr = ~S{ExCmd.stream!(["cat"]) |> Stream.run()}

port =
Port.open(
{:spawn, "elixir -S mix run -e '#{ex_cmd_expr}'"},
[:stderr_to_stdout, :use_stdio, :exit_status, :binary, :hide]
)

port_info = Port.info(port)
os_pid = port_info[:os_pid]

on_exit(fn ->
os_process_alive?(os_pid) && os_process_kill(os_pid)
end)

assert os_process_alive?(os_pid)

[_, cmd_pid] = capture_output!(port, ~r/os pid: ([0-9]+)/)

cmd_pid = String.to_integer(cmd_pid)
assert os_process_alive?(cmd_pid)

assert {:ok, _msg} = os_process_kill(os_pid)

# wait for the cleanup
:timer.sleep(5000)

refute os_process_alive?(os_pid)
refute os_process_alive?(cmd_pid)
end

defp os_process_alive?(pid) do
if windows?() do
IO.inspect(["tasklist", "/fi", "pid eq #{pid}"])

Check warning on line 38 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Lint OTP 26.x / Elixir 1.16.x

There should be no calls to `IO.inspect/1`.

Check warning on line 38 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Lint OTP 26.x / Elixir 1.16.x

There should be no calls to `IO.inspect/1`.

case cmd(["tasklist", "/fi", "pid eq #{pid}"]) do
{"INFO: No tasks are running which match the specified criteria.\r\n", 0} -> false
{_, 0} -> true
end
else
match?({_, 0}, cmd(["ps", "-p", to_string(pid)]))
end
end

defp os_process_kill(pid) do
if windows?() do
cmd(["taskkill", "/pid", "#{pid}", "/f"])
else
cmd(["kill", "-SIGKILL", "#{pid}"])
end
|> case do
{msg, 0} -> {:ok, msg}
{msg, status} -> {:error, status, msg}
end
end

defp windows?, do: :os.type() == {:win32, :nt}

def cmd([cmd | args]), do: System.cmd(cmd, args, stderr_to_stdout: true)

defp capture_output!(port, regexp, acc \\ "") do
receive do
{^port, {:data, bin}} ->
output = acc <> bin

if match = Regex.run(regexp, output) do
match
else
capture_output!(port, regexp, output)
end
after
5000 ->
raise "timeout while waiting for the iex prompt, acc: #{acc}"
end
end
end
Loading