Skip to content

Commit

Permalink
fix: don't build FUSE paths for Windows
Browse files Browse the repository at this point in the history
Fixes #1390.
  • Loading branch information
enocom committed Sep 9, 2022
1 parent 30edbe1 commit e92298d
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 179 deletions.
3 changes: 3 additions & 0 deletions internal/proxy/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package proxy

import (
Expand Down
8 changes: 8 additions & 0 deletions internal/proxy/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,11 @@ func TestFUSEClose(t *testing.T) {
t.Fatal("net.Dial() should fail")
}
}

func TestFUSEWithBadDir(t *testing.T) {
conf := &proxy.Config{FUSEDir: "/not/a/dir", FUSETempDir: randTmpDir(t)}
_, err := proxy.NewClient(context.Background(), &fakeDialer{}, testLogger, conf)
if err == nil {
t.Fatal("proxy client should fail with bad dir")
}
}
187 changes: 9 additions & 178 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ import (
"io"
"net"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"cloud.google.com/go/cloudsqlconn"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/cloudsql"
"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/gcloud"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"golang.org/x/oauth2"
)

Expand Down Expand Up @@ -274,51 +270,6 @@ func (c *portConfig) nextDBPort(version string) int {
}
}

type socketSymlink struct {
socket *socketMount
symlink *symlink
}

// Client proxies connections from a local client to the remote server side
// proxy for multiple Cloud SQL instances.
type Client struct {
// connCount tracks the number of all open connections from the Client to
// all Cloud SQL instances.
connCount uint64

// maxConns is the maximum number of allowed connections tracked by
// connCount. If not set, there is no limit.
maxConns uint64

dialer cloudsql.Dialer

// mnts is a list of all mounted sockets for this client
mnts []*socketMount

// waitOnClose is the maximum duration to wait for open connections to close
// when shutting down.
waitOnClose time.Duration

logger cloudsql.Logger

// fuseDir specifies the directory where a FUSE server is mounted. The value
// is empty if FUSE is not enabled. The directory holds symlinks to Unix
// domain sockets in the fuseTmpDir.
fuseDir string
fuseTempDir string
// fuseMu protects access to fuseSockets.
fuseMu sync.Mutex
// fuseSockets is a map of instance connection name to socketMount and
// symlink.
fuseSockets map[string]socketSymlink
fuseServerMu sync.Mutex
fuseServer *fuse.Server
fuseWg sync.WaitGroup

// Inode adds support for FUSE operations.
fs.Inode
}

// NewClient completes the initial setup required to get the proxy to a "steady"
// state.
func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf *Config) (*Client, error) {
Expand All @@ -343,13 +294,7 @@ func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf *
}

if conf.FUSEDir != "" {
if err := os.MkdirAll(conf.FUSETempDir, 0777); err != nil {
return nil, err
}
c.fuseDir = conf.FUSEDir
c.fuseTempDir = conf.FUSETempDir
c.fuseSockets = map[string]socketSymlink{}
return c, nil
return configureFUSE(c, conf)
}

for _, inst := range conf.Instances {
Expand Down Expand Up @@ -383,86 +328,6 @@ func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf *
return c, nil
}

// Readdir returns a list of all active Unix sockets in addition to the README.
func (c *Client) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
entries := []fuse.DirEntry{
{Name: "README", Mode: 0555 | fuse.S_IFREG},
}
var active []string
c.fuseMu.Lock()
for k := range c.fuseSockets {
active = append(active, k)
}
c.fuseMu.Unlock()

for _, a := range active {
entries = append(entries, fuse.DirEntry{
Name: a,
Mode: 0777 | syscall.S_IFSOCK,
})
}
return fs.NewListDirStream(entries), fs.OK
}

// Lookup implements the fs.NodeLookuper interface and returns an index node
// (inode) for a symlink that points to a Unix domain socket. The Unix domain
// socket is connected to the requested Cloud SQL instance. Lookup returns a
// symlink (instead of the socket itself) so that multiple callers all use the
// same Unix socket.
func (c *Client) Lookup(ctx context.Context, instance string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno) {
if instance == "README" {
return c.NewInode(ctx, &readme{}, fs.StableAttr{}), fs.OK
}

if _, err := parseConnName(instance); err != nil {
return nil, syscall.ENOENT
}

c.fuseMu.Lock()
defer c.fuseMu.Unlock()
if l, ok := c.fuseSockets[instance]; ok {
return l.symlink.EmbeddedInode(), fs.OK
}

version, err := c.dialer.EngineVersion(ctx, instance)
if err != nil {
c.logger.Errorf("could not resolve version for %q: %v", instance, err)
return nil, syscall.ENOENT
}

s, err := newSocketMount(
ctx, &Config{UnixSocket: c.fuseTempDir},
nil, InstanceConnConfig{Name: instance}, version,
)
if err != nil {
c.logger.Errorf("could not create socket for %q: %v", instance, err)
return nil, syscall.ENOENT
}

c.fuseWg.Add(1)
go func() {
defer c.fuseWg.Done()
sErr := c.serveSocketMount(ctx, s)
if sErr != nil {
c.fuseMu.Lock()
delete(c.fuseSockets, instance)
c.fuseMu.Unlock()
}
}()

// Return a symlink that points to the actual Unix socket within the
// temporary directory. For Postgres, return a symlink that points to the
// directory which holds the ".s.PGSQL.5432" Unix socket.
sl := &symlink{path: filepath.Join(c.fuseTempDir, instance)}
c.fuseSockets[instance] = socketSymlink{
socket: s,
symlink: sl,
}
return c.NewInode(ctx, sl, fs.StableAttr{
Mode: 0777 | fuse.S_IFLNK},
), fs.OK
}

// CheckConnections dials each registered instance and reports any errors that
// may have occurred.
func (c *Client) CheckConnections(ctx context.Context) error {
Expand All @@ -472,12 +337,7 @@ func (c *Client) CheckConnections(ctx context.Context) error {
mnts = c.mnts
)
if c.fuseDir != "" {
mnts = []*socketMount{}
c.fuseMu.Lock()
for _, m := range c.fuseSockets {
mnts = append(mnts, m.socket)
}
c.fuseMu.Unlock()
mnts = c.fuseMounts()
}
for _, mnt := range mnts {
wg.Add(1)
Expand Down Expand Up @@ -524,18 +384,7 @@ func (c *Client) Serve(ctx context.Context, notify func()) error {
defer cancel()

if c.fuseDir != "" {
srv, err := fs.Mount(c.fuseDir, c, &fs.Options{
MountOptions: fuse.MountOptions{AllowOther: true},
})
if err != nil {
return fmt.Errorf("FUSE mount failed: %q: %v", c.fuseDir, err)
}
c.fuseServerMu.Lock()
c.fuseServer = srv
c.fuseServerMu.Unlock()
notify()
<-ctx.Done()
return ctx.Err()
return c.serveFuse(ctx, notify)
}

exitCh := make(chan error)
Expand Down Expand Up @@ -580,21 +429,11 @@ func (m MultiErr) Error() string {
func (c *Client) Close() error {
mnts := c.mnts

c.fuseServerMu.Lock()
hasFuseServer := c.fuseServer != nil
c.fuseServerMu.Unlock()

var mErr MultiErr
if hasFuseServer {
if err := c.fuseServer.Unmount(); err != nil {
mErr = append(mErr, err)
}
mnts = []*socketMount{}
c.fuseMu.Lock()
for _, m := range c.fuseSockets {
mnts = append(mnts, m.socket)
}
c.fuseMu.Unlock()

if c.fuseDir != "" {
mErr = c.unmountFUSEMounts(mErr)
mnts = c.fuseMounts()
}

// First, close all open socket listeners to prevent additional connections.
Expand All @@ -604,8 +443,8 @@ func (c *Client) Close() error {
mErr = append(mErr, err)
}
}
if hasFuseServer {
c.fuseWg.Wait()
if c.fuseDir != "" {
c.waitForFUSEMounts()
}
// Next, close the dialer to prevent any additional refreshes.
cErr := c.dialer.Close()
Expand Down Expand Up @@ -686,14 +525,6 @@ func (c *Client) serveSocketMount(ctx context.Context, s *socketMount) error {
}
}

// socketMount is a tcp/unix socket that listens for a Cloud SQL instance.
type socketMount struct {
fs.Inode
inst string
listener net.Listener
dialOpts []cloudsqlconn.DialOption
}

func newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst InstanceConnConfig, version string) (*socketMount, error) {
var (
// network is one of "tcp" or "unix"
Expand Down
Loading

0 comments on commit e92298d

Please sign in to comment.