Skip to content

Commit

Permalink
feat: add support for FUSE connections (#1373)
Browse files Browse the repository at this point in the history
This commit also ensures that closing the proxy.Client blocks until all
listeners are closed.
  • Loading branch information
enocom committed Sep 7, 2022
1 parent e8af4da commit f844488
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 71 deletions.
16 changes: 14 additions & 2 deletions internal/proxy/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,26 @@ import (
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
)

// symlink implements a symbolic link, returning the underlying path when
// Readlink is called.
type symlink struct {
fs.Inode
path string
}

// Readlink implements fs.NodeReadlinker and returns the symlink's path.
func (s *symlink) Readlink(ctx context.Context) ([]byte, syscall.Errno) {
return []byte(s.path), fs.OK
}

// readme represents a static read-only text file.
type readme struct {
fs.Inode
}

const readmeText = `
When programs attempt to open files in this directory, a remote connection to
the Cloud SQL instance of the same name will be established.
When applications attempt to open files in this directory, a remote connection
to the Cloud SQL instance of the same name will be established.
For example, when you run one of the followg commands, the proxy will initiate a
connection to the corresponding Cloud SQL instance, given you have the correct
Expand Down
266 changes: 228 additions & 38 deletions internal/proxy/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package proxy_test
import (
"context"
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"time"

"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/log"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/cloudsql"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/proxy"
"github.com/hanwen/go-fuse/v2/fs"
)

func randTmpDir(t interface {
Expand All @@ -39,48 +41,37 @@ func randTmpDir(t interface {
return name
}

// tryFunc executes the provided function up to maxCount times, sleeping 100ms
// between attempts.
func tryFunc(f func() error, maxCount int) error {
var errCount int
for {
err := f()
if err == nil {
return nil
}
errCount++
if errCount == maxCount {
return err
// newTestClient is a convenience function for testing that creates a
// proxy.Client and starts it. The returned cleanup function is also a
// convenience. Callers may choose to ignore it and manually close the client.
func newTestClient(t *testing.T, d cloudsql.Dialer, fuseDir, fuseTempDir string) (*proxy.Client, func()) {
conf := &proxy.Config{FUSEDir: fuseDir, FUSETempDir: fuseTempDir}
c, err := proxy.NewClient(context.Background(), d, testLogger, conf)
if err != nil {
t.Fatalf("want error = nil, got = %v", err)
}

ready := make(chan struct{})
go c.Serve(context.Background(), func() { close(ready) })
select {
case <-ready:
case <-time.Tick(5 * time.Second):
t.Fatal("failed to Serve")
}
return c, func() {
if cErr := c.Close(); cErr != nil {
t.Logf("failed to close client: %v", cErr)
}
time.Sleep(100 * time.Millisecond)
}
}

func TestREADME(t *testing.T) {
func TestFUSEREADME(t *testing.T) {
if testing.Short() {
t.Skip("skipping fuse tests in short mode.")
}
ctx := context.Background()

dir := randTmpDir(t)
conf := &proxy.Config{
FUSEDir: dir,
FUSETempDir: randTmpDir(t),
}
logger := log.NewStdLogger(os.Stdout, os.Stdout)
d := &fakeDialer{}
c, err := proxy.NewClient(ctx, d, logger, conf)
if err != nil {
t.Fatalf("want error = nil, got = %v", err)
}

ready := make(chan struct{})
go c.Serve(ctx, func() { close(ready) })
select {
case <-ready:
case <-time.After(time.Minute):
t.Fatal("proxy.Client failed to start serving")
}
_, cleanup := newTestClient(t, d, dir, randTmpDir(t))

fi, err := os.Stat(dir)
if err != nil {
Expand Down Expand Up @@ -110,13 +101,212 @@ func TestREADME(t *testing.T) {
t.Fatalf("expected README data, got no data (dir = %v)", dir)
}

if cErr := c.Close(); cErr != nil {
t.Fatalf("c.Close(): %v", cErr)
}
cleanup() // close the client

// verify that c.Close unmounts the FUSE server
// verify that the FUSE server is no longer mounted
_, err = ioutil.ReadFile(filepath.Join(dir, "README"))
if err == nil {
t.Fatal("expected ioutil.Readfile to fail, but it succeeded")
}
}

func tryDialUnix(t *testing.T, addr string) net.Conn {
var (
conn net.Conn
dialErr error
)
for i := 0; i < 10; i++ {
conn, dialErr = net.Dial("unix", addr)
if conn != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if dialErr != nil {
t.Fatalf("net.Dial(): %v", dialErr)
}
return conn
}

func TestFUSEDialInstance(t *testing.T) {
fuseDir := randTmpDir(t)
fuseTempDir := randTmpDir(t)
tcs := []struct {
desc string
wantInstance string
socketPath string
fuseTempDir string
}{
{
desc: "mysql connections create a Unix socket",
wantInstance: "proj:region:mysql",
socketPath: filepath.Join(fuseDir, "proj:region:mysql"),
fuseTempDir: fuseTempDir,
},
{
desc: "postgres connections create a directory with a special file",
wantInstance: "proj:region:pg",
socketPath: filepath.Join(fuseDir, "proj:region:pg", ".s.PGSQL.5432"),
fuseTempDir: fuseTempDir,
},
{
desc: "connecting creates intermediate temp directories",
wantInstance: "proj:region:mysql",
socketPath: filepath.Join(fuseDir, "proj:region:mysql"),
fuseTempDir: filepath.Join(fuseTempDir, "doesntexist"),
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, tc.fuseTempDir)
defer cleanup()

conn := tryDialUnix(t, tc.socketPath)
defer conn.Close()

var got []string
for i := 0; i < 10; i++ {
got = d.dialedInstances()
if len(got) == 1 {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(got) != 1 {
t.Fatalf("dialed instances len: want = 1, got = %v", got)
}
if want, inst := tc.wantInstance, got[0]; want != inst {
t.Fatalf("instance: want = %v, got = %v", want, inst)
}

})
}
}

func TestFUSEReadDir(t *testing.T) {
fuseDir := randTmpDir(t)
_, cleanup := newTestClient(t, &fakeDialer{}, fuseDir, randTmpDir(t))
defer cleanup()

// Initiate a connection so the FUSE server will list it in the dir entries.
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

entries, err := os.ReadDir(fuseDir)
if err != nil {
t.Fatalf("os.ReadDir(): %v", err)
}
// len should be README plus the proj:reg:mysql socket
if got, want := len(entries), 2; got != want {
t.Fatalf("want = %v, got = %v", want, got)
}
var names []string
for _, e := range entries {
names = append(names, e.Name())
}
if names[0] != "README" || names[1] != "proj:reg:mysql" {
t.Fatalf("want = %v, got = %v", []string{"README", "proj:reg:mysql"}, names)
}
}

func TestFUSEErrors(t *testing.T) {
ctx := context.Background()
d := &fakeDialer{}
c, _ := newTestClient(t, d, randTmpDir(t), randTmpDir(t))

// Simulate FUSE file access by invoking Lookup directly to control
// how the socket cache is populated.
_, err := c.Lookup(ctx, "proj:reg:mysql", nil)
if err != fs.OK {
t.Fatalf("proxy.Client.Lookup(): %v", err)
}

// Close the client to close all open sockets.
if err := c.Close(); err != nil {
t.Fatalf("c.Close(): %v", err)
}

// Simulate another FUSE file access to directly populated the socket cache.
_, err = c.Lookup(ctx, "proj:reg:mysql", nil)
if err != fs.OK {
t.Fatalf("proxy.Client.Lookup(): %v", err)
}

// Verify the dialer was called twice, to prove the previous cache entry was
// removed when the socket was closed.
var attempts int
wantAttempts := 2
for i := 0; i < 10; i++ {
attempts = d.engineVersionAttempts()
if attempts == wantAttempts {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("engine version attempts: want = %v, got = %v", wantAttempts, attempts)
}

func TestFUSEWithBadInstanceName(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

_, dialErr := net.Dial("unix", filepath.Join(fuseDir, "notvalid"))
if dialErr == nil {
t.Fatalf("net.Dial() should fail")
}

if got := d.engineVersionAttempts(); got > 0 {
t.Fatalf("engine version calls: want = 0, got = %v", got)
}
}

func TestFUSECheckConnections(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

// first establish a connection to "register" it with the proxy
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

if err := c.CheckConnections(context.Background()); err != nil {
t.Fatalf("c.CheckConnections(): %v", err)
}

// verify the dialer was invoked twice, once for connect, once for check
// connection
var attempts int
wantAttempts := 2
for i := 0; i < 10; i++ {
attempts = d.dialAttempts()
if attempts == wantAttempts {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("dial attempts: want = %v, got = %v", wantAttempts, attempts)
}

func TestFUSEClose(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, _ := newTestClient(t, d, fuseDir, randTmpDir(t))

// first establish a connection to "register" it with the proxy
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

// Close the proxy which should close all listeners
if err := c.Close(); err != nil {
t.Fatalf("c.Close(): %v", err)
}

_, err := net.Dial("unix", filepath.Join(fuseDir, "proj:reg:mysql"))
if err == nil {
t.Fatal("net.Dial() should fail")
}
}
Loading

0 comments on commit f844488

Please sign in to comment.