Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev-server: Properly deal with IPv6 #611

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions temporalcli/commands.server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
// Setup UI
if !t.Headless {
opts.UIIP, opts.UIPort = t.Ip, t.UiPort
opts.UIIP, opts.UIPort = t.UiIp, t.UiPort
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
if opts.UIIP == "" {
opts.UIIP = t.Ip
}
Expand All @@ -67,7 +67,7 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
return fmt.Errorf("can't use default UI port %d (%d + 1000): %w", opts.UIPort, t.Port, err)
}
} else {
if err := devserver.CheckPortFree(opts.UIIP, t.Port); err != nil {
if err := devserver.CheckPortFree(opts.UIIP, opts.UIPort); err != nil {
return fmt.Errorf("can't set UI port %d: %w", opts.UIPort, err)
}
}
Expand Down Expand Up @@ -125,9 +125,9 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
// Grab a free port for metrics ahead-of-time so we know what port is selected
if opts.MetricsPort == 0 {
opts.MetricsPort = devserver.MustGetFreePort(t.Ip)
opts.MetricsPort = devserver.MustGetFreePort(opts.FrontendIP)
} else {
if err := devserver.CheckPortFree(t.Ip, opts.MetricsPort); err != nil {
if err := devserver.CheckPortFree(opts.FrontendIP, opts.MetricsPort); err != nil {
return fmt.Errorf("can't set metrics port %d: %w", opts.MetricsPort, err)
}
}
Expand All @@ -144,20 +144,23 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
return err
}

friendlyIP := t.Ip
if friendlyIP == "127.0.0.1" {
friendlyIP = "localhost"
}
cctx.Printer.Printlnf("%-16s %v:%v", "Temporal server:", friendlyIP, t.Port)
cctx.Printer.Printlnf("%-16s %v:%v", "Temporal server:", toFriendlyIp(opts.FrontendIP), opts.FrontendPort)
if !t.Headless {
cctx.Printer.Printlnf("%-16s http://%v:%v", "Web UI:", friendlyIP, opts.UIPort)
cctx.Printer.Printlnf("%-16s http://%v:%v", "Web UI:", toFriendlyIp(opts.UIIP), opts.UIPort)
}
cctx.Printer.Printlnf("%-16s http://%v:%v/metrics", "Metrics:", friendlyIP, opts.MetricsPort)
cctx.Printer.Printlnf("%-16s http://%v:%v/metrics", "Metrics:", toFriendlyIp(opts.FrontendIP), opts.MetricsPort)
<-cctx.Done()
cctx.Printer.Println("Stopping server...")
return nil
}

func toFriendlyIp(host string) string {
if host == "127.0.0.1" || host == "::1" {
return "localhost"
}
return devserver.MaybeEscapeIPv6(host)
}

func persistentClusterID() string {
// If there is not a database file in use, we want a cluster ID to be the same
// for every re-run, so we set it as an environment config in a special env
Expand Down
72 changes: 53 additions & 19 deletions temporalcli/commands.server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package temporalcli_test

import (
"context"
"net"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -21,15 +22,53 @@ import (
// * Server reuse existing database file

func TestServer_StartDev_Simple(t *testing.T) {
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
startDevServerAndRunSimpleTest(
t,
// TODO(cretz): Remove --headless when
// https://github.com/temporalio/ui/issues/1773 fixed
[]string{"server", "start-dev", "-p", port, "--headless"},
"127.0.0.1:"+port,
)
}

func TestServer_StartDev_IPv4Unspecified(t *testing.T) {
port := strconv.Itoa(devserver.MustGetFreePort("0.0.0.0"))
startDevServerAndRunSimpleTest(
t,
[]string{"server", "start-dev", "--ip", "0.0.0.0", "-p", port, "--headless"},
"0.0.0.0:"+port,
)
}

func TestServer_StartDev_IPv6Unspecified(t *testing.T) {
_, err := net.InterfaceByName("::1")
if err != nil {
t.Skip("Machine has no IPv6 support")
return
}

port := strconv.Itoa(devserver.MustGetFreePort("::"))
startDevServerAndRunSimpleTest(
t,
[]string{
"server", "start-dev",
"--ip", "::", "--ui-ip", "::1",
"-p", port,
"--ui-port", strconv.Itoa(devserver.MustGetFreePort("::")),
"--http-port", strconv.Itoa(devserver.MustGetFreePort("::")),
"--metrics-port", strconv.Itoa(devserver.MustGetFreePort("::"))},
"[::]:"+port,
)
}

func startDevServerAndRunSimpleTest(t *testing.T, args []string, dialAddress string) {
h := NewCommandHarness(t)
defer h.Close()

// Start in background, then wait for client to be able to connect
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
resCh := make(chan *CommandResult, 1)
// TODO(cretz): Remove --headless when
// https://github.com/temporalio/ui/issues/1773 fixed
go func() { resCh <- h.Execute("server", "start-dev", "-p", port, "--headless") }()
go func() { resCh <- h.Execute(args...) }()

// Try to connect for a bit while checking for error
var cl client.Client
Expand All @@ -41,7 +80,7 @@ func TestServer_StartDev_Simple(t *testing.T) {
default:
}
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port})
cl, err = client.Dial(client.Options{HostPort: dialAddress})
assert.NoError(t, err)
}, 3*time.Second, 200*time.Millisecond)
defer cl.Close()
Expand Down Expand Up @@ -95,24 +134,19 @@ func TestServer_StartDev_ConcurrentStarts(t *testing.T) {
// Send an interrupt by cancelling context
h.CancelContext()

// FIXME: We should technically wait for server cleanup, but this is
// slowing down the test considerably, presumably due to the issue fixed
// in https://github.com/temporalio/temporal/pull/5459. Uncomment the
// following code when the server dependency is updated to 1.24.0.
//
// select {
// case <-time.After(20 * time.Second):
// h.Fail("didn't cleanup after 20 seconds")
// case res := <-resCh:
// h.NoError(res.Err)
// }
select {
case <-time.After(20 * time.Second):
h.Fail("didn't cleanup after 20 seconds")
case res := <-resCh:
h.NoError(res.Err)
}
}

// Start 80 dev server instances, with 8 concurrent executions
// Start 40 dev server instances, with 8 concurrent executions
instanceCounter := atomic.Int32{}
instanceCounter.Store(80)
instanceCounter.Store(40)
wg := &sync.WaitGroup{}
for i := 0; i < 8; i++ {
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
for instanceCounter.Add(-1) >= 0 {
Expand Down
20 changes: 18 additions & 2 deletions temporalcli/devserver/freeport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and
// should not be set (setting it may have unpredictable consequences).
func GetFreePort(host string) (int, error) {
host = MaybeEscapeIPv6(host)
l, err := net.Listen("tcp", host+":0")
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
Expand All @@ -44,7 +45,14 @@ func GetFreePort(host string) (int, error) {
// ports, making an extra connection just to reserve a port might actually
// be harmful (by hastening ephemeral port exhaustion).
if runtime.GOOS != "darwin" && runtime.GOOS != "windows" {
r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr))
// DialTCP(..., l.Addr()) might fail if machine has IPv6 support, but
// isn't fully configured (e.g. doesn't have a loopback interface bound
// to ::1). For safety, rebuild address form the original host instead.
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is that we are not choosing a specific IP version based on the input string when binding, but we are when dialing. I am concerned about an ipv6-only machine (assuming they exist) if we blindly say --ip 0.0.0.0 works for all IP versions. But it's not a real issue, we can just tell people to pass in --ip :: or something.

if err != nil {
return 0, fmt.Errorf("error resolving address: %v", err)
}
r, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
Expand Down Expand Up @@ -86,10 +94,18 @@ func MustGetFreePort(host string) int {
// Asserts that the given TCP port is available to listen on, for the given
// (local) host; return an error if it is not.
func CheckPortFree(host string, port int) error {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", MaybeEscapeIPv6(host), port))
if err != nil {
return err
}
l.Close()
return nil
}

// Escapes an IPv6 address with square brackets, if it is an IPv6 address.
func MaybeEscapeIPv6(host string) string {
if ip := net.ParseIP(host); ip != nil && ip.To4() == nil {
return "[" + host + "]"
}
return host
}
37 changes: 34 additions & 3 deletions temporalcli/devserver/freeport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
func TestFreePort_NoDouble(t *testing.T) {
host := "127.0.0.1"
portSet := make(map[int]bool)

for i := 0; i < 2000; i++ {
p, err := devserver.GetFreePort(host)
if err != nil {
Expand All @@ -30,7 +29,6 @@ func TestFreePort_NoDouble(t *testing.T) {

func TestFreePort_CanBindImmediatelySameProcess(t *testing.T) {
host := "127.0.0.1"

for i := 0; i < 500; i++ {
p, err := devserver.GetFreePort(host)
if err != nil {
Expand All @@ -45,15 +43,48 @@ func TestFreePort_CanBindImmediatelySameProcess(t *testing.T) {
}
}

func TestFreePort_IPv4Unspecified(t *testing.T) {
host := "0.0.0.0"
p, err := devserver.GetFreePort(host)
if err != nil {
t.Fatalf("Error: %s", err)
return
}
err = tryListenAndDialOn(host, p)
if err != nil {
t.Fatalf("Error: %s", err)
return
}
}

func TestFreePort_IPv6Unspecified(t *testing.T) {
host := "::"
p, err := devserver.GetFreePort(host)
if err != nil {
t.Fatalf("Error: %s", err)
return
}
err = tryListenAndDialOn(host, p)
if err != nil {
t.Fatalf("Error: %s", err)
return
}
}

// This function is used as part of unit tests, to ensure that the port
func tryListenAndDialOn(host string, port int) error {
host = devserver.MaybeEscapeIPv6(host)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
defer l.Close()

r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr))
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
panic(err)
}
r, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
panic(err)
}
Expand Down
18 changes: 9 additions & 9 deletions temporalcli/devserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func (s *Server) Stop() {

func (s *StartOptions) buildUIServer() *uiserver.Server {
return uiserver.NewServer(uiserveroptions.WithConfigProvider(&uiconfig.Config{
Host: s.UIIP,
Host: MaybeEscapeIPv6(s.UIIP),
Port: s.UIPort,
TemporalGRPCAddress: fmt.Sprintf("%v:%v", s.FrontendIP, s.FrontendPort),
TemporalGRPCAddress: fmt.Sprintf("%v:%v", MaybeEscapeIPv6(s.FrontendIP), s.FrontendPort),
EnableUI: true,
UIAssetPath: s.UIAssetPath,
Codec: uiconfig.Codec{Endpoint: s.UICodecEndpoint},
Expand Down Expand Up @@ -224,11 +224,11 @@ func (s *StartOptions) buildServerConfig() (*config.Config, error) {
var conf config.Config
// Global config
conf.Global.Membership.MaxJoinDuration = 30 * time.Second
conf.Global.Membership.BroadcastAddress = "127.0.0.1"
conf.Global.Membership.BroadcastAddress = s.FrontendIP
if conf.Global.Metrics == nil && s.MetricsPort > 0 {
conf.Global.Metrics = &metrics.Config{
Prometheus: &metrics.PrometheusConfig{
ListenAddress: fmt.Sprintf("%v:%v", s.FrontendIP, s.MetricsPort),
ListenAddress: fmt.Sprintf("%v:%v", MaybeEscapeIPv6(s.FrontendIP), s.MetricsPort),
HandlerPath: "/metrics",
},
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *StartOptions) buildServerConfig() (*config.Config, error) {
s.CurrentClusterName: {
Enabled: true,
InitialFailoverVersion: int64(s.InitialFailoverVersion),
RPCAddress: fmt.Sprintf("127.0.0.1:%v", s.FrontendPort),
RPCAddress: fmt.Sprintf("%v:%v", MaybeEscapeIPv6(s.FrontendIP), s.FrontendPort),
ClusterID: s.ClusterID,
},
},
Expand All @@ -273,7 +273,7 @@ func (s *StartOptions) buildServerConfig() (*config.Config, error) {
conf.Archival.Visibility.State = "disabled"
conf.NamespaceDefaults.Archival.History.State = "disabled"
conf.NamespaceDefaults.Archival.Visibility.State = "disabled"
conf.PublicClient.HostPort = fmt.Sprintf("127.0.0.1:%v", s.FrontendPort)
conf.PublicClient.HostPort = fmt.Sprintf("%v:%v", MaybeEscapeIPv6(s.FrontendIP), s.FrontendPort)
return &conf, nil
}

Expand Down Expand Up @@ -329,9 +329,9 @@ func (s *StartOptions) buildServiceConfig(frontend bool) config.Service {
conf.RPC.HTTPPort = s.FrontendHTTPPort
}
} else {
conf.RPC.GRPCPort = MustGetFreePort("127.0.0.1")
conf.RPC.BindOnLocalHost = true
conf.RPC.GRPCPort = MustGetFreePort(s.FrontendIP)
conf.RPC.BindOnIP = s.FrontendIP
}
conf.RPC.MembershipPort = MustGetFreePort("127.0.0.1")
conf.RPC.MembershipPort = MustGetFreePort(s.FrontendIP)
return conf
}
Loading