Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for FUSE connections #1373

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions internal/proxy/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,26 @@ import (
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
)

// symlink implements a symbolic link, returning the underlying path when
// Readlink is called.
type symlink struct {
fs.Inode
path string
}

// Readlink implements fs.NodeReadlinker and returns the symlink's path.
func (s *symlink) Readlink(ctx context.Context) ([]byte, syscall.Errno) {
return []byte(s.path), fs.OK
}

// readme represents a static read-only text file.
type readme struct {
fs.Inode
}

const readmeText = `
When programs attempt to open files in this directory, a remote connection to
the Cloud SQL instance of the same name will be established.
When applications attempt to open files in this directory, a remote connection
to the Cloud SQL instance of the same name will be established.

For example, when you run one of the followg commands, the proxy will initiate a
connection to the corresponding Cloud SQL instance, given you have the correct
Expand Down
266 changes: 228 additions & 38 deletions internal/proxy/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package proxy_test
import (
"context"
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"time"

"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/log"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/cloudsql"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/proxy"
"github.com/hanwen/go-fuse/v2/fs"
)

func randTmpDir(t interface {
Expand All @@ -39,48 +41,37 @@ func randTmpDir(t interface {
return name
}

// tryFunc executes the provided function up to maxCount times, sleeping 100ms
// between attempts.
func tryFunc(f func() error, maxCount int) error {
var errCount int
for {
err := f()
if err == nil {
return nil
}
errCount++
if errCount == maxCount {
return err
// newTestClient is a convenience function for testing that creates a
// proxy.Client and starts it. The returned cleanup function is also a
// convenience. Callers may choose to ignore it and manually close the client.
func newTestClient(t *testing.T, d cloudsql.Dialer, fuseDir, fuseTempDir string) (*proxy.Client, func()) {
conf := &proxy.Config{FUSEDir: fuseDir, FUSETempDir: fuseTempDir}
c, err := proxy.NewClient(context.Background(), d, testLogger, conf)
if err != nil {
t.Fatalf("want error = nil, got = %v", err)
}

ready := make(chan struct{})
go c.Serve(context.Background(), func() { close(ready) })
select {
case <-ready:
case <-time.Tick(5 * time.Second):
t.Fatal("failed to Serve")
}
return c, func() {
if cErr := c.Close(); cErr != nil {
t.Logf("failed to close client: %v", cErr)
}
time.Sleep(100 * time.Millisecond)
}
}

func TestREADME(t *testing.T) {
func TestFUSEREADME(t *testing.T) {
if testing.Short() {
t.Skip("skipping fuse tests in short mode.")
}
ctx := context.Background()

dir := randTmpDir(t)
conf := &proxy.Config{
FUSEDir: dir,
FUSETempDir: randTmpDir(t),
}
logger := log.NewStdLogger(os.Stdout, os.Stdout)
d := &fakeDialer{}
c, err := proxy.NewClient(ctx, d, logger, conf)
if err != nil {
t.Fatalf("want error = nil, got = %v", err)
}

ready := make(chan struct{})
go c.Serve(ctx, func() { close(ready) })
select {
case <-ready:
case <-time.After(time.Minute):
t.Fatal("proxy.Client failed to start serving")
}
_, cleanup := newTestClient(t, d, dir, randTmpDir(t))

fi, err := os.Stat(dir)
if err != nil {
Expand Down Expand Up @@ -110,13 +101,212 @@ func TestREADME(t *testing.T) {
t.Fatalf("expected README data, got no data (dir = %v)", dir)
}

if cErr := c.Close(); cErr != nil {
t.Fatalf("c.Close(): %v", cErr)
}
cleanup() // close the client

// verify that c.Close unmounts the FUSE server
// verify that the FUSE server is no longer mounted
_, err = ioutil.ReadFile(filepath.Join(dir, "README"))
if err == nil {
t.Fatal("expected ioutil.Readfile to fail, but it succeeded")
}
}

func tryDialUnix(t *testing.T, addr string) net.Conn {
var (
conn net.Conn
dialErr error
)
for i := 0; i < 10; i++ {
conn, dialErr = net.Dial("unix", addr)
if conn != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if dialErr != nil {
t.Fatalf("net.Dial(): %v", dialErr)
}
return conn
}

func TestFUSEDialInstance(t *testing.T) {
fuseDir := randTmpDir(t)
fuseTempDir := randTmpDir(t)
tcs := []struct {
desc string
wantInstance string
socketPath string
fuseTempDir string
}{
{
desc: "mysql connections create a Unix socket",
wantInstance: "proj:region:mysql",
socketPath: filepath.Join(fuseDir, "proj:region:mysql"),
fuseTempDir: fuseTempDir,
},
{
desc: "postgres connections create a directory with a special file",
wantInstance: "proj:region:pg",
socketPath: filepath.Join(fuseDir, "proj:region:pg", ".s.PGSQL.5432"),
fuseTempDir: fuseTempDir,
},
{
desc: "connecting creates intermediate temp directories",
kurtisvg marked this conversation as resolved.
Show resolved Hide resolved
wantInstance: "proj:region:mysql",
socketPath: filepath.Join(fuseDir, "proj:region:mysql"),
fuseTempDir: filepath.Join(fuseTempDir, "doesntexist"),
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, tc.fuseTempDir)
defer cleanup()

conn := tryDialUnix(t, tc.socketPath)
defer conn.Close()

var got []string
for i := 0; i < 10; i++ {
got = d.dialedInstances()
if len(got) == 1 {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(got) != 1 {
t.Fatalf("dialed instances len: want = 1, got = %v", got)
}
if want, inst := tc.wantInstance, got[0]; want != inst {
t.Fatalf("instance: want = %v, got = %v", want, inst)
}

})
}
}

func TestFUSEReadDir(t *testing.T) {
fuseDir := randTmpDir(t)
_, cleanup := newTestClient(t, &fakeDialer{}, fuseDir, randTmpDir(t))
defer cleanup()

// Initiate a connection so the FUSE server will list it in the dir entries.
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

entries, err := os.ReadDir(fuseDir)
if err != nil {
t.Fatalf("os.ReadDir(): %v", err)
}
// len should be README plus the proj:reg:mysql socket
if got, want := len(entries), 2; got != want {
t.Fatalf("want = %v, got = %v", want, got)
}
var names []string
for _, e := range entries {
names = append(names, e.Name())
}
if names[0] != "README" || names[1] != "proj:reg:mysql" {
t.Fatalf("want = %v, got = %v", []string{"README", "proj:reg:mysql"}, names)
}
}

func TestFUSEErrors(t *testing.T) {
ctx := context.Background()
d := &fakeDialer{}
c, _ := newTestClient(t, d, randTmpDir(t), randTmpDir(t))

// Simulate FUSE file access by invoking Lookup directly to control
// how the socket cache is populated.
_, err := c.Lookup(ctx, "proj:reg:mysql", nil)
if err != fs.OK {
t.Fatalf("proxy.Client.Lookup(): %v", err)
}

// Close the client to close all open sockets.
if err := c.Close(); err != nil {
t.Fatalf("c.Close(): %v", err)
}

// Simulate another FUSE file access to directly populated the socket cache.
_, err = c.Lookup(ctx, "proj:reg:mysql", nil)
if err != fs.OK {
t.Fatalf("proxy.Client.Lookup(): %v", err)
}

// Verify the dialer was called twice, to prove the previous cache entry was
// removed when the socket was closed.
var attempts int
wantAttempts := 2
for i := 0; i < 10; i++ {
attempts = d.engineVersionAttempts()
if attempts == wantAttempts {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("engine version attempts: want = %v, got = %v", wantAttempts, attempts)
}

func TestFUSEWithBadInstanceName(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

_, dialErr := net.Dial("unix", filepath.Join(fuseDir, "notvalid"))
if dialErr == nil {
t.Fatalf("net.Dial() should fail")
}

if got := d.engineVersionAttempts(); got > 0 {
t.Fatalf("engine version calls: want = 0, got = %v", got)
}
}

func TestFUSECheckConnections(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

// first establish a connection to "register" it with the proxy
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

if err := c.CheckConnections(context.Background()); err != nil {
t.Fatalf("c.CheckConnections(): %v", err)
}

// verify the dialer was invoked twice, once for connect, once for check
// connection
var attempts int
wantAttempts := 2
for i := 0; i < 10; i++ {
attempts = d.dialAttempts()
if attempts == wantAttempts {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("dial attempts: want = %v, got = %v", wantAttempts, attempts)
}

func TestFUSEClose(t *testing.T) {
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, _ := newTestClient(t, d, fuseDir, randTmpDir(t))

// first establish a connection to "register" it with the proxy
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

// Close the proxy which should close all listeners
if err := c.Close(); err != nil {
t.Fatalf("c.Close(): %v", err)
}

_, err := net.Dial("unix", filepath.Join(fuseDir, "proj:reg:mysql"))
if err == nil {
t.Fatal("net.Dial() should fail")
}
}
Loading