diff --git a/pkg/networkservice/chains/nsmgr/unix_test.go b/pkg/networkservice/chains/nsmgr/unix_test.go index 008bfd01d..89675557b 100644 --- a/pkg/networkservice/chains/nsmgr/unix_test.go +++ b/pkg/networkservice/chains/nsmgr/unix_test.go @@ -14,13 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//+build !windows +// +build linux package nsmgr_test import ( "context" - "runtime" "testing" "time" @@ -84,9 +83,6 @@ func Test_Local_NoURLUsecase(t *testing.T) { } func Test_MultiForwarderSendfd(t *testing.T) { - if runtime.GOOS != "linux" { - t.Skip("sendfd works only on linux") - } t.Cleanup(func() { goleak.VerifyNone(t) }) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/pkg/networkservice/common/mechanisms/recvfd/common.go b/pkg/networkservice/common/mechanisms/recvfd/common.go index 09415e792..00c00a81d 100644 --- a/pkg/networkservice/common/mechanisms/recvfd/common.go +++ b/pkg/networkservice/common/mechanisms/recvfd/common.go @@ -86,7 +86,7 @@ func recvFDAndSwapInodeToFile(ctx context.Context, fileMap *perConnectionFileMap return err } -func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string, closeAllFiles bool) error { +func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string) error { // Get the inodeURL from parameters fileURLStr, ok := parameters[common.InodeURL] if !ok { @@ -112,11 +112,10 @@ func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string // Swap the fileURL for the inodeURL in parameters parameters[common.InodeURL] = inodeURL.String() - // If closeAllFiles == true, close any files we may have open for any other inodes // This is used to clean up files sent by MechanismPreferences that were *not* selected to be the // connection mechanism for inodeURLStr, file := range fileMap.filesByInodeURL { - if closeAllFiles || inodeURLStr != inodeURL.String() { + if inodeURLStr != inodeURL.String() { delete(fileMap.filesByInodeURL, inodeURLStr) _ = file.Close() } diff --git a/pkg/networkservice/common/mechanisms/recvfd/server.go b/pkg/networkservice/common/mechanisms/recvfd/server.go index ff578653f..805de5450 100644 --- a/pkg/networkservice/common/mechanisms/recvfd/server.go +++ b/pkg/networkservice/common/mechanisms/recvfd/server.go @@ -69,7 +69,7 @@ func (r *recvFDServer) Request(ctx context.Context, request *networkservice.Netw } // Swap back from File to Inode in the InodeURL in the Parameters - err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters(), false) + err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters()) if err != nil { return nil, err } @@ -78,18 +78,20 @@ func (r *recvFDServer) Request(ctx context.Context, request *networkservice.Netw } func (r *recvFDServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + // Clean up the fileMap no matter what happens + defer r.closeFiles(conn) + // Get the grpcfd.FDRecver recv, ok := grpcfd.FromContext(ctx) if !ok { return next.Server(ctx).Close(ctx, conn) } + // Get the fileMap fileMap, _ := r.fileMaps.LoadOrStore(conn.GetId(), &perConnectionFileMap{ filesByInodeURL: make(map[string]*os.File), inodeURLbyFilename: make(map[string]*url.URL), }) - // Clean up the fileMap no matter what happens - defer r.fileMaps.Delete(conn.GetId()) // Recv the FD and Swap the Inode for a file in InodeURL in Parameters err := recvFDAndSwapInodeToFile(ctx, fileMap, conn.GetMechanism().GetParameters(), recv) @@ -104,6 +106,20 @@ func (r *recvFDServer) Close(ctx context.Context, conn *networkservice.Connectio } // Swap back from File to Inode in the InodeURL in the Parameters - err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters(), true) + err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters()) return &empty.Empty{}, err } + +func (r *recvFDServer) closeFiles(conn *networkservice.Connection) { + defer r.fileMaps.Delete(conn.GetId()) + + fileMap, _ := r.fileMaps.LoadOrStore(conn.GetId(), &perConnectionFileMap{ + filesByInodeURL: make(map[string]*os.File), + inodeURLbyFilename: make(map[string]*url.URL), + }) + + for inodeURLStr, file := range fileMap.filesByInodeURL { + delete(fileMap.filesByInodeURL, inodeURLStr) + _ = file.Close() + } +} diff --git a/pkg/networkservice/common/mechanisms/recvfd/server_test.go b/pkg/networkservice/common/mechanisms/recvfd/server_test.go new file mode 100644 index 000000000..39469cd4c --- /dev/null +++ b/pkg/networkservice/common/mechanisms/recvfd/server_test.go @@ -0,0 +1,216 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package recvfd_test + +import ( + "context" + "fmt" + "net/url" + "os" + "path" + "runtime" + "testing" + "time" + + "github.com/edwarnicke/grpcfd" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/stretchr/testify/suite" + "go.uber.org/goleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext" + "github.com/networkservicemesh/sdk/pkg/tools/grpcfdutils" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" +) + +type checkRecvfdTestSuite struct { + suite.Suite + + tempDir string + onFileClosedContexts []context.Context + onFileClosedCallbacks map[string]func() + + testClient networkservice.NetworkServiceClient +} + +func (s *checkRecvfdTestSuite) SetupTest() { + t := s.T() + + ctx, cancel := context.WithCancel(context.Background()) + + t.Cleanup(func() { + cancel() + goleak.VerifyNone(t) + }) + + s.tempDir = t.TempDir() + + sock, err := os.Create(path.Join(s.tempDir, "test.sock")) + s.Require().NoError(err) + + serveURL := &url.URL{Scheme: "unix", Path: sock.Name()} + + testChain := chain.NewNetworkServiceServer( + checkcontext.NewServer(t, func(t *testing.T, c context.Context) { + injectErr := grpcfdutils.InjectOnFileReceivedCallback(c, func(fileName string, file *os.File) { + runtime.SetFinalizer(file, func(file *os.File) { + onFileClosedCallback, ok := s.onFileClosedCallbacks[fileName] + if ok { + onFileClosedCallback() + } + }) + }) + + s.Require().NoError(injectErr) + }), + recvfd.NewServer()) + + startServer(ctx, s, &testChain, serveURL) + s.testClient = createClient(ctx, serveURL) +} + +func TestRecvfd(t *testing.T) { + suite.Run(t, new(checkRecvfdTestSuite)) +} + +func startServer(ctx context.Context, s *checkRecvfdTestSuite, testServerChain *networkservice.NetworkServiceServer, serveURL *url.URL) { + grpcServer := grpc.NewServer(grpc.Creds(grpcfd.TransportCredentials(insecure.NewCredentials()))) + networkservice.RegisterNetworkServiceServer(grpcServer, *testServerChain) + + errCh := grpcutils.ListenAndServe(ctx, serveURL, grpcServer) + + s.Require().Len(errCh, 0) +} + +func createClient(ctx context.Context, u *url.URL) networkservice.NetworkServiceClient { + return client.NewClient( + ctx, + client.WithClientURL(sandbox.CloneURL(u)), + client.WithDialOptions(grpc.WithTransportCredentials( + grpcfd.TransportCredentials(insecure.NewCredentials())), + ), + client.WithDialTimeout(time.Second), + client.WithoutRefresh(), + client.WithAdditionalFunctionality(sendfd.NewClient())) +} + +func createFile(s *checkRecvfdTestSuite, fileName string) (inodeURLStr string, fileClosedContext context.Context, cancelFunc func()) { + f, err := os.Create(fileName) + s.Require().NoErrorf(err, "Failed to create and open a file: %v", err) + + err = f.Close() + s.Require().NoErrorf(err, "Failed to close file: %v", err) + + fileClosedContext, cancelFunc = context.WithCancel(context.Background()) + + inodeURL, err := grpcfd.FilenameToURL(fileName) + s.Require().NoError(err) + + return inodeURL.String(), fileClosedContext, cancelFunc +} + +func (s *checkRecvfdTestSuite) TestRecvfdClosesSingleFile() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + testFileName := path.Join(s.tempDir, "TestRecvfdClosesSingleFile.test") + + inodeURLStr, fileClosedContext, cancelFunc := createFile(s, testFileName) + + s.onFileClosedCallbacks = map[string]func(){ + inodeURLStr: cancelFunc, + } + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + { + Cls: cls.LOCAL, + Type: kernel.MECHANISM, + Parameters: map[string]string{ + common.InodeURL: "file:" + testFileName, + }, + }, + }, + } + + conn, err := s.testClient.Request(ctx, request) + s.Require().NoError(err) + + _, err = s.testClient.Close(ctx, conn) + s.Require().NoError(err) + + s.Require().Eventually(func() bool { + runtime.GC() + return fileClosedContext.Err() != nil + }, time.Second, time.Millisecond*100) +} + +func (s *checkRecvfdTestSuite) TestRecvfdClosesMultipleFiles() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + const numFiles = 3 + s.onFileClosedContexts = make([]context.Context, numFiles) + s.onFileClosedCallbacks = make(map[string]func(), numFiles) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: make([]*networkservice.Mechanism, numFiles), + } + + var filePath string + for i := 0; i < numFiles; i++ { + filePath = path.Join(s.tempDir, fmt.Sprintf("TestRecvfdClosesMultipleFiles.test%d", i)) + + inodeURLStr, fileClosedContext, cancelFunc := createFile(s, filePath) + s.onFileClosedCallbacks[inodeURLStr] = cancelFunc + s.onFileClosedContexts[i] = fileClosedContext + + request.MechanismPreferences = append(request.MechanismPreferences, + &networkservice.Mechanism{ + Cls: cls.LOCAL, + Type: kernel.MECHANISM, + Parameters: map[string]string{ + common.InodeURL: "file:" + filePath, + }, + }) + } + + conn, err := s.testClient.Request(ctx, request) + s.Require().NoError(err) + + _, err = s.testClient.Close(ctx, conn) + s.Require().NoError(err) + + for i := range s.onFileClosedContexts { + onClosedFileCtx := s.onFileClosedContexts[i] + s.Require().Eventually(func() bool { + runtime.GC() + return onClosedFileCtx.Err() != nil + }, time.Second, time.Millisecond*100) + } +} diff --git a/pkg/registry/common/recvfd/server.go b/pkg/registry/common/recvfd/server.go index 2932a1a76..84be895e1 100644 --- a/pkg/registry/common/recvfd/server.go +++ b/pkg/registry/common/recvfd/server.go @@ -83,7 +83,7 @@ func (r *recvfdNseServer) Register(ctx context.Context, endpoint *registry.Netwo r.fileMaps.Store(endpoint.GetName(), fileMap) } // Swap back from File to Inode in the InodeURL in the Parameters - err = swapFileToInode(fileMap, returnedEndpoint, false) + err = swapFileToInode(fileMap, returnedEndpoint) if err != nil { return nil, err } @@ -98,6 +98,10 @@ func (r *recvfdNseServer) Unregister(ctx context.Context, endpoint *registry.Net if endpoint.GetName() == "" { return nil, errors.New("invalid endpoint specified") } + + // Clean up the fileMap no matter what happens + defer r.closeFiles(endpoint) + // Get the grpcfd.FDRecver recv, ok := grpcfd.FromContext(ctx) if !ok { @@ -108,8 +112,6 @@ func (r *recvfdNseServer) Unregister(ctx context.Context, endpoint *registry.Net filesByInodeURL: make(map[string]*os.File), inodeURLbyFilename: make(map[string]*url.URL), }) - // Clean up the fileMap no matter what happens - defer r.fileMaps.Delete(endpoint.GetName()) // Recv the FD and Swap the Inode for a file in InodeURL in Parameters endpoint = endpoint.Clone() @@ -126,7 +128,7 @@ func (r *recvfdNseServer) Unregister(ctx context.Context, endpoint *registry.Net // Swap back from File to Inode in the InodeURL in the Parameters endpoint = endpoint.Clone() - err = swapFileToInode(fileMap, endpoint, true) + err = swapFileToInode(fileMap, endpoint) if err != nil { return nil, err } @@ -178,7 +180,7 @@ func recvFDAndSwapInodeToUnix(ctx context.Context, fileMap *perEndpointFileMap, return err } -func swapFileToInode(fileMap *perEndpointFileMap, endpoint *registry.NetworkServiceEndpoint, closeAllFiles bool) error { +func swapFileToInode(fileMap *perEndpointFileMap, endpoint *registry.NetworkServiceEndpoint) error { // Transform string to URL for correctness checking and ease of use unixURL, err := url.Parse(endpoint.GetUrl()) if err != nil { @@ -198,11 +200,10 @@ func swapFileToInode(fileMap *perEndpointFileMap, endpoint *registry.NetworkServ // Swap the fileURL for the inodeURL in parameters endpoint.Url = inodeURL.String() - // If closeAllFiles == true, close any files we may have open for any other inodes // This is used to clean up files sent by MechanismPreferences that were *not* selected to be the // connection mechanism for inodeURLStr, file := range fileMap.filesByInodeURL { - if closeAllFiles || inodeURLStr != inodeURL.String() { + if inodeURLStr != inodeURL.String() { delete(fileMap.filesByInodeURL, inodeURLStr) _ = file.Close() } @@ -210,3 +211,17 @@ func swapFileToInode(fileMap *perEndpointFileMap, endpoint *registry.NetworkServ }) return nil } + +func (r *recvfdNseServer) closeFiles(endpoint *registry.NetworkServiceEndpoint) { + defer r.fileMaps.Delete(endpoint.GetName()) + + fileMap, _ := r.fileMaps.LoadOrStore(endpoint.GetName(), &perEndpointFileMap{ + filesByInodeURL: make(map[string]*os.File), + inodeURLbyFilename: make(map[string]*url.URL), + }) + + for inodeURLStr, file := range fileMap.filesByInodeURL { + delete(fileMap.filesByInodeURL, inodeURLStr) + _ = file.Close() + } +} diff --git a/pkg/registry/common/recvfd/server_test.go b/pkg/registry/common/recvfd/server_test.go new file mode 100644 index 000000000..f399024a1 --- /dev/null +++ b/pkg/registry/common/recvfd/server_test.go @@ -0,0 +1,147 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package recvfd_test + +import ( + "context" + "net/url" + "os" + "path" + "runtime" + "testing" + "time" + + "github.com/edwarnicke/grpcfd" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + registryserver "github.com/networkservicemesh/sdk/pkg/registry" + "github.com/networkservicemesh/sdk/pkg/registry/common/connect" + "github.com/networkservicemesh/sdk/pkg/registry/common/memory" + registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd" + "github.com/networkservicemesh/sdk/pkg/registry/common/refresh" + "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd" + registryserialize "github.com/networkservicemesh/sdk/pkg/registry/common/serialize" + registrychain "github.com/networkservicemesh/sdk/pkg/registry/core/chain" + "github.com/networkservicemesh/sdk/pkg/registry/utils/checks/checkcontext" + "github.com/networkservicemesh/sdk/pkg/tools/grpcfdutils" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" + "github.com/networkservicemesh/sdk/pkg/tools/token" +) + +func getFileInfo(fileName string, t *testing.T) (inodeURLStr string, fileClosedContext context.Context, cancelFunc func()) { + fileClosedContext, cancelFunc = context.WithCancel(context.Background()) + + inodeURL, err := grpcfd.FilenameToURL(fileName) + require.NoError(t, err) + + return inodeURL.String(), fileClosedContext, cancelFunc +} + +func startServer(ctx context.Context, t *testing.T, testRegistry registryserver.Registry, serveURL *url.URL) { + var grpcServer = grpc.NewServer(grpc.Creds(grpcfd.TransportCredentials(insecure.NewCredentials()))) + + testRegistry.Register(grpcServer) + + var errCh = grpcutils.ListenAndServe(ctx, serveURL, grpcServer) + require.Len(t, errCh, 0) +} + +func TestNseRecvfdServerClosesFile(t *testing.T) { + var ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + var nsRegistry = registrychain.NewNetworkServiceRegistryServer( + registryserialize.NewNetworkServiceRegistryServer(), + memory.NewNetworkServiceRegistryServer(), + ) + + var onFileClosedCallbacks = make(map[string]func()) + + var nseRegistry = registrychain.NewNetworkServiceEndpointRegistryServer( + registryserialize.NewNetworkServiceEndpointRegistryServer(), + checkcontext.NewNSEServer(t, func(t *testing.T, c context.Context) { + err := grpcfdutils.InjectOnFileReceivedCallback(c, func(inodeURLStr string, file *os.File) { + runtime.SetFinalizer(file, func(file *os.File) { + onFileClosedCallback, ok := onFileClosedCallbacks[inodeURLStr] + if ok { + onFileClosedCallback() + } + }) + }) + + require.NoError(t, err) + }), + registryrecvfd.NewNetworkServiceEndpointRegistryServer(), + memory.NewNetworkServiceEndpointRegistryServer(), + ) + + var dir = t.TempDir() + var testFileName = path.Join(dir, t.Name()+".sock") + var regURL = &url.URL{Scheme: "unix", Path: testFileName} + + var dialOptions = []grpc.DialOption{ + grpc.WithTransportCredentials( + grpcfd.TransportCredentials(insecure.NewCredentials()), + ), + grpc.WithDefaultCallOptions( + grpc.PerRPCCredentials(token.NewPerRPCCredentials(sandbox.GenerateTestToken)), + ), + grpcfd.WithChainStreamInterceptor(), + grpcfd.WithChainUnaryInterceptor(), + sandbox.WithInsecureRPCCredentials(), + sandbox.WithInsecureStreamRPCCredentials(), + } + + var nseClient = registrychain.NewNetworkServiceEndpointRegistryClient( + registryserialize.NewNetworkServiceEndpointRegistryClient(), + refresh.NewNetworkServiceEndpointRegistryClient(ctx), + connect.NewNetworkServiceEndpointRegistryClient(ctx, regURL, + connect.WithNSEAdditionalFunctionality( + sendfd.NewNetworkServiceEndpointRegistryClient()), + connect.WithDialOptions(dialOptions...), + )) + + startServer(ctx, t, registryserver.NewServer(nsRegistry, nseRegistry), regURL) + + // setting onRecvFile after starting the server as we're re-creating a socket in grpcutils.ListenAndServe + // in registry case we're passing a socket + inodeURLStr, fileClosedContext, cancelFunc := getFileInfo(testFileName, t) + onFileClosedCallbacks[inodeURLStr] = cancelFunc + + var testEndpoint = ®istry.NetworkServiceEndpoint{ + Name: "test-endpoint", + NetworkServiceNames: []string{"test"}, + Url: regURL.String(), + } + + _, err := nseClient.Register(ctx, testEndpoint.Clone()) + require.NoError(t, err) + + _, err = nseClient.Unregister(ctx, testEndpoint.Clone()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + runtime.GC() + return fileClosedContext.Err() != nil + }, time.Second, time.Millisecond*100) +} diff --git a/pkg/tools/grpcfdutils/transceiver.go b/pkg/tools/grpcfdutils/transceiver.go new file mode 100644 index 000000000..f0be2f684 --- /dev/null +++ b/pkg/tools/grpcfdutils/transceiver.go @@ -0,0 +1,78 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +// Package grpcfdutils provides utilities for grpcfd library +package grpcfdutils + +import ( + "context" + "net" + "os" + + "github.com/edwarnicke/grpcfd" + "github.com/pkg/errors" + "google.golang.org/grpc/peer" +) + +// notifiableFDTransceiver - grpcfd.Transceiver wrapper checking that received FDs are closed +// onRecvFile - callback receiving inodeURL string and a file received by grpcfd +type notifiableFDTransceiver struct { + grpcfd.FDTransceiver + net.Addr + + onRecvFile func(string, *os.File) +} + +// RecvFileByURL - wrapper of grpcfd.FDRecver method invoking callback when a file is received by grpcfd +func (w *notifiableFDTransceiver) RecvFileByURL(urlStr string) (<-chan *os.File, error) { + recv, err := w.FDTransceiver.RecvFileByURL(urlStr) + if err != nil { + return nil, err + } + + var fileCh = make(chan *os.File) + go func() { + for f := range recv { + w.onRecvFile(urlStr, f) + fileCh <- f + } + }() + + return fileCh, nil +} + +// InjectOnFileReceivedCallback - injects callback into grpcfd.FDTransceiver that will be invoked on file received +func InjectOnFileReceivedCallback(ctx context.Context, callback func(string, *os.File)) error { + p, ok := peer.FromContext(ctx) + if !ok { + return errors.Errorf("No peer in context") + } + + transceiver, ok := p.Addr.(grpcfd.FDTransceiver) + if !ok { + return errors.Errorf("No grpcfd transceiver in context") + } + + p.Addr = ¬ifiableFDTransceiver{ + FDTransceiver: transceiver, + Addr: p.Addr, + onRecvFile: callback, + } + + return nil +}