Skip to content

Commit

Permalink
Merge pull request #31 from akash-akya/dev
Browse files Browse the repository at this point in the history
 Handle abrupt VM termination
  • Loading branch information
akash-akya authored Jun 21, 2024
2 parents 7d83642 + 64cf74d commit f2e1665
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,4 @@ jobs:
- run: mix deps.get
- run: mix compile --warnings-as-errors
- run: mix test --trace
- run: mix test --trace --exclude os:unix
25 changes: 21 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)

func execute(workdir string, args []string) error {
done := make(chan struct{})

sigs := make(chan os.Signal, 1)
input := make(chan Packet, 1)
outputDemand := make(chan Packet)
inputDemand := make(chan Packet)
Expand All @@ -20,8 +23,17 @@ func execute(workdir string, args []string) error {
logger.Printf("Command path: %v\n", proc.Path)

output := startCommandPipeline(proc, input, inputDemand, outputDemand)

// Capture common signals.
// Setting notify for SIGPIPE is important to capture and without that
// we won't be able to handle abrupt beam vm terminations
// Also, SIGPIPE behaviour in golang is bit complex,
// see: https://pkg.go.dev/os/signal@go1.22.4#hdr-SIGPIPE
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)

// go handleSignals(input, outputDemand, done)
go dispatchStdin(input, outputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done)

// wait for pipline to exit
<-done
Expand All @@ -33,8 +45,8 @@ func execute(workdir string, args []string) error {
logger.Printf("Command exited with error: %v\n", e)
os.Exit(3)
}
// TODO: return Stderr and exit stauts to beam process
logger.Printf("Command exited: %#v\n", err)
// TODO: return Stderr and exit status to beam process
logger.Printf("Command exited\n")
return err
}

Expand All @@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st
stdinReader(dispatch, done)
}

func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) {
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) {
defer func() {
close(done)
}()

merged := func() (Packet, bool) {
select {
case sig := <-sigs:
logger.Printf("Received OS Signal: ", sig)
return Packet{}, false

case v, ok := <-inputDemand:
return v, ok

case v, ok := <-output:
return v, ok
}
Expand Down
5 changes: 4 additions & 1 deletion go_src/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func writePacket(tag uint8, data []byte) {
_, writeErr := os.Stdout.Write(buf[:payloadLen+4])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
// ignore broken pipe or closed pipe errors here.
// currently readCommandStdout closes output chan, making the
// flow break.
case *os.PathError:
logger.Printf("os.PathError: ", writeErr)
return
default:
fatal(writeErr)
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
%{
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"},
"credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
Expand Down
80 changes: 80 additions & 0 deletions test/ex_cmd_exit_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule ExCmdExitTest do
use ExUnit.Case, async: false

# currently running `elixir` command is not working in Windows
@tag os: :unix
test "if it kills external command on abnormal vm exit" do
ex_cmd_expr = ~S{ExCmd.stream!(["cat"]) |> Stream.run()}

port =
Port.open(
{:spawn, "elixir -S mix run -e '#{ex_cmd_expr}'"},
[:stderr_to_stdout, :use_stdio, :exit_status, :binary, :hide]
)

port_info = Port.info(port)
os_pid = port_info[:os_pid]

on_exit(fn ->
os_process_alive?(os_pid) && os_process_kill(os_pid)
end)

assert os_process_alive?(os_pid)

[_, cmd_pid] = capture_output!(port, ~r/os pid: ([0-9]+)/)

cmd_pid = String.to_integer(cmd_pid)
assert os_process_alive?(cmd_pid)

assert {:ok, _msg} = os_process_kill(os_pid)

# wait for the cleanup
:timer.sleep(5000)

refute os_process_alive?(os_pid)
refute os_process_alive?(cmd_pid)
end

defp os_process_alive?(pid) do
if windows?() do
case cmd(["tasklist", "/fi", "pid eq #{pid}"]) do
{"INFO: No tasks are running which match the specified criteria.\r\n", 0} -> false
{_, 0} -> true
end
else
match?({_, 0}, cmd(["ps", "-p", to_string(pid)]))
end
end

defp os_process_kill(pid) do
if windows?() do
cmd(["taskkill", "/pid", "#{pid}", "/f"])
else
cmd(["kill", "-SIGKILL", "#{pid}"])
end
|> case do
{msg, 0} -> {:ok, msg}
{msg, status} -> {:error, status, msg}
end
end

defp windows?, do: :os.type() == {:win32, :nt}

def cmd([cmd | args]), do: System.cmd(cmd, args, stderr_to_stdout: true)

defp capture_output!(port, regexp, acc \\ "") do
receive do
{^port, {:data, bin}} ->
output = acc <> bin

if match = Regex.run(regexp, output) do
match
else
capture_output!(port, regexp, output)
end
after
5000 ->
raise "timeout while waiting for the iex prompt, acc: #{acc}"
end
end
end

0 comments on commit f2e1665

Please sign in to comment.