diff --git a/config/config_test.go b/config/config_test.go index cd462d1f2b6..bed91e2bc8e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -120,32 +120,6 @@ func TestConfig3(t *testing.T) { test.That(t, cfg.Remotes[0].ReconnectInterval, test.ShouldEqual, 3*time.Second) } -func TestCreateCloudRequest(t *testing.T) { - cfg := config.Cloud{ - ID: "a", - Secret: "b", - Path: "c", - } - - version := "test-version" - gitRevision := "test-git-revision" - config.Version = version - config.GitRevision = gitRevision - - r, err := config.CreateCloudRequest(context.Background(), &cfg) - test.That(t, err, test.ShouldBeNil) - - test.That(t, r.Header.Get("Secret"), test.ShouldEqual, cfg.Secret) - test.That(t, r.URL.String(), test.ShouldEqual, "c?id=a") - - userInfo := map[string]interface{}{} - userInfoJSON := r.Header.Get("User-Info") - json.Unmarshal([]byte(userInfoJSON), &userInfo) - - test.That(t, userInfo["version"], test.ShouldEqual, version) - test.That(t, userInfo["gitRevision"], test.ShouldEqual, gitRevision) -} - func TestConfigEnsure(t *testing.T) { var emptyConfig config.Config test.That(t, emptyConfig.Ensure(false), test.ShouldBeNil) diff --git a/config/reader.go b/config/reader.go index 9c1eada6452..ca94bc1e551 100644 --- a/config/reader.go +++ b/config/reader.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "net/http" "net/url" "os" "path/filepath" @@ -206,48 +205,6 @@ func findServiceMapConverter(svcType resource.Subtype, model resource.Model) Att return nil } -const ( - cloudConfigSecretField = "Secret" - cloudConfigUserInfoField = "User-Info" - cloudConfigUserInfoHostField = "host" - cloudConfigUserInfoOSField = "os" - cloudConfigUserInfoLocalIPsField = "ips" - cloudConfigVersionField = "version" - cloudConfigGitRevisionField = "gitRevision" -) - -// CreateCloudRequest makes a request to fetch the robot config -// from a cloud endpoint. -func CreateCloudRequest(ctx context.Context, cloudCfg *Cloud) (*http.Request, error) { - url := fmt.Sprintf("%s?id=%s", cloudCfg.Path, cloudCfg.ID) - - r, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, errors.Wrapf(err, "error creating request for %s", url) - } - r.Header.Set(cloudConfigSecretField, cloudCfg.Secret) - - agentInfo, err := getAgentInfo() - if err != nil { - return nil, err - } - - userInfo := map[string]interface{}{} - userInfo[cloudConfigUserInfoHostField] = agentInfo.Host - userInfo[cloudConfigUserInfoOSField] = agentInfo.Os - userInfo[cloudConfigUserInfoLocalIPsField] = agentInfo.Ips - userInfo[cloudConfigVersionField] = agentInfo.Version - userInfo[cloudConfigGitRevisionField] = agentInfo.GitRevision - - userInfoBytes, err := json.Marshal(userInfo) - if err != nil { - return nil, err - } - r.Header.Set(cloudConfigUserInfoField, string(userInfoBytes)) - - return r, nil -} - func getAgentInfo() (*apppb.AgentInfo, error) { hostname, err := os.Hostname() if err != nil { @@ -268,20 +225,6 @@ func getAgentInfo() (*apppb.AgentInfo, error) { }, nil } -// createCloudCertificateRequest makes a request to fetch the robot's TLS -// certificate from a cloud endpoint. -func createCloudCertificateRequest(ctx context.Context, cloudCfg *Cloud) (*http.Request, error) { - url := fmt.Sprintf("%s?id=%s&cert=true", cloudCfg.Path, cloudCfg.ID) - - r, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, errors.Wrapf(err, "error creating request for %s", url) - } - r.Header.Set(cloudConfigSecretField, cloudCfg.Secret) - - return r, nil -} - var viamDotDir = filepath.Join(os.Getenv("HOME"), ".viam") func getCloudCacheFilePath(id string) string { @@ -329,49 +272,6 @@ func clearCache(id string) { }) } -// readCertificateDataFromCloud returns the certificate from the app. It returns it as properties of a new Cloud config. -// The argument `cloudConfigFromDisk` represents the Cloud config from disk and only the Path parameters are used to -// generate the url. This is different from the Cloud config returned from the HTTP or gRPC API which do not have it. -// -// TODO(RSDK-539): The TLS certificate data should not be part of the Cloud portion of the config. -func readCertificateDataFromCloud(ctx context.Context, signalingInsecure bool, cloudConfigFromDisk *Cloud) (*Cloud, error) { - certReq, err := createCloudCertificateRequest(ctx, cloudConfigFromDisk) - if err != nil { - return nil, err - } - - var client http.Client - defer client.CloseIdleConnections() - resp, err := client.Do(certReq) - if err != nil { - return nil, err - } - defer func() { - utils.UncheckedError(resp.Body.Close()) - }() - - dec := json.NewDecoder(resp.Body) - var certData Cloud - if err := dec.Decode(&certData); err != nil { - return nil, errors.Wrap(err, "error decoding certificate data from cloud; try again later") - } - - if !signalingInsecure { - if certData.TLSCertificate == "" { - return nil, errors.New("no TLS certificate yet from cloud; try again later") - } - if certData.TLSPrivateKey == "" { - return nil, errors.New("no TLS private key yet from cloud; try again later") - } - } - - // TODO(RSDK-539): we might want to use an internal type here. The gRPC api will not return a Cloud json struct. - return &Cloud{ - TLSCertificate: certData.TLSCertificate, - TLSPrivateKey: certData.TLSPrivateKey, - }, nil -} - func readCertificateDataFromCloudGRPC(ctx context.Context, signalingInsecure bool, cloudConfigFromDisk *Cloud, @@ -506,16 +406,9 @@ func readFromCloud( logger.Debug("reading tlsCertificate from the cloud") // Use the SignalingInsecure from the Cloud config returned from the app not the initial config. - var certData *Cloud - if originalCfg.Cloud.AppAddress == "" { - certData, err = readCertificateDataFromCloud(ctx, cfg.Cloud.SignalingInsecure, cloudCfg) - } else { - certData, err = readCertificateDataFromCloudGRPC(ctx, cfg.Cloud.SignalingInsecure, cloudCfg, logger) - } - + certData, err := readCertificateDataFromCloudGRPC(ctx, cfg.Cloud.SignalingInsecure, cloudCfg, logger) if err != nil { - var urlErr *url.Error - if !errors.Is(err, context.DeadlineExceeded) && (!errors.As(err, &urlErr) || urlErr.Temporary()) { + if !errors.Is(err, context.DeadlineExceeded) { return nil, err } if tlsCertificate == "" || tlsPrivateKey == "" { @@ -706,19 +599,11 @@ func processConfig(unprocessedConfig *Config, fromCloud bool) (*Config, error) { return cfg, nil } -// getFromCloudOrCache returns the config from either the legacy HTTP endpoint or gRPC endpoint depending if the original config -// has the AppAddress set. If failures during cloud lookup fallback to the local cache if the error indicates it should. +// getFromCloudOrCache returns the config from the gRPC endpoint. If failures during cloud lookup fallback to the +// local cache if the error indicates it should. func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCache bool, logger golog.Logger) (*Config, bool, error) { var cached bool - var cfg *Config - var errorShouldCheckCache bool - var err error - if cloudCfg.AppAddress == "" { - cfg, errorShouldCheckCache, err = getFromCloudHTTP(ctx, cloudCfg) - } else { - cfg, errorShouldCheckCache, err = getFromCloudGRPC(ctx, cloudCfg, logger) - } - + cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctx, cloudCfg, logger) if err != nil { if shouldReadFromCache && errorShouldCheckCache { logger.Warnw("failed to read config from cloud, checking cache", "error", err) @@ -742,53 +627,6 @@ func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCac return cfg, cached, nil } -// getFromCloud actually does the fetching of the robot config and parses to an unprocessed Config struct. -func getFromCloudHTTP(ctx context.Context, cloudCfg *Cloud) (*Config, bool, error) { - shouldCheckCacheOnFailure := false - cloudReq, err := CreateCloudRequest(ctx, cloudCfg) - if err != nil { - return nil, false, err - } - - unprocessedConfig := &Config{ - ConfigFilePath: "", - } - - var client http.Client - defer client.CloseIdleConnections() - resp, err := client.Do(cloudReq) - // Try to load from the cache - if err != nil { - var urlErr *url.Error - if !errors.Is(err, context.DeadlineExceeded) && (!errors.As(err, &urlErr) || urlErr.Temporary()) { - return nil, shouldCheckCacheOnFailure, err - } - shouldCheckCacheOnFailure = true - return nil, shouldCheckCacheOnFailure, err - } - - defer func() { - utils.UncheckedError(resp.Body.Close()) - }() - - rd, err := io.ReadAll(resp.Body) - if err != nil { - return nil, shouldCheckCacheOnFailure, err - } - - if resp.StatusCode != http.StatusOK { - if len(rd) != 0 { - return nil, shouldCheckCacheOnFailure, errors.Errorf("unexpected status %d: %s", resp.StatusCode, string(rd)) - } - return nil, shouldCheckCacheOnFailure, errors.Errorf("unexpected status %d", resp.StatusCode) - } - - if err := json.Unmarshal(rd, unprocessedConfig); err != nil { - return nil, shouldCheckCacheOnFailure, errors.Wrap(err, "cannot parse cloud config") - } - return unprocessedConfig, shouldCheckCacheOnFailure, nil -} - // getFromCloudGRPC actually does the fetching of the robot config from the gRPC endpoint. func getFromCloudGRPC(ctx context.Context, cloudCfg *Cloud, logger golog.Logger) (*Config, bool, error) { shouldCheckCacheOnFailure := true diff --git a/config/reader_test.go b/config/reader_test.go index d5512f4585b..24e1f9acc2c 100644 --- a/config/reader_test.go +++ b/config/reader_test.go @@ -27,6 +27,7 @@ func TestStoreToCache(t *testing.T) { LocalFQDN: "localFqdn", TLSCertificate: "cert", TLSPrivateKey: "key", + AppAddress: "https://app.viam.dev:443", } cfg.Cloud = cloud @@ -35,7 +36,7 @@ func TestStoreToCache(t *testing.T) { test.That(t, err, test.ShouldBeNil) // read config from cloud, confirm consistency - cloudCfg, err := readFromCloud(ctx, cfg, nil, true, true, logger) + cloudCfg, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) test.That(t, cloudCfg, test.ShouldResemble, cfg) @@ -44,7 +45,7 @@ func TestStoreToCache(t *testing.T) { cfg.Remotes = append(cfg.Remotes, newRemote) // read config from cloud again, confirm that the cached config differs from cfg - cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, true, logger) + cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) test.That(t, cloudCfg2, test.ShouldNotResemble, cfg) @@ -53,7 +54,7 @@ func TestStoreToCache(t *testing.T) { test.That(t, err, test.ShouldBeNil) // read updated cloud config, confirm that it now matches our updated cfg - cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, true, logger) + cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) test.That(t, cloudCfg3, test.ShouldResemble, cfg) } diff --git a/config/testutils/fake_cloud.go b/config/testutils/fake_cloud.go new file mode 100644 index 00000000000..1b78ce2d1c7 --- /dev/null +++ b/config/testutils/fake_cloud.go @@ -0,0 +1,204 @@ +// Package testutils helpers for testing the config retrievial. +package testutils + +import ( + "context" + "errors" + "net" + "net/http" + "sync" + + "github.com/edaniels/golog" + pb "go.viam.com/api/app/v1" + "go.viam.com/utils" + "go.viam.com/utils/rpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + rutils "go.viam.com/rdk/utils" +) + +// FakeCredentialPayLoad the hardcoded payload for all devices. +const FakeCredentialPayLoad = "some-secret" + +// FakeCloudServer fake implementation of the Viam Cloud RobotService. +type FakeCloudServer struct { + pb.UnimplementedRobotServiceServer + + rpcServer rpc.Server + listener net.Listener + exitWg sync.WaitGroup + + deviceConfigs map[string]*configAndCerts + + failOnConfigAndCerts bool + + mu sync.Mutex +} + +type configAndCerts struct { + cfg *pb.RobotConfig + certs *pb.CertificateResponse +} + +// NewFakeCloudServer creates and starts a new grpc server for the Viam Cloud. +func NewFakeCloudServer(ctx context.Context, logger golog.Logger) (*FakeCloudServer, error) { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 0}) + if err != nil { + return nil, err + } + + server := &FakeCloudServer{ + listener: listener, + deviceConfigs: map[string]*configAndCerts{}, + } + + server.rpcServer, err = rpc.NewServer(logger, + rpc.WithDisableMulticastDNS(), + rpc.WithAuthHandler(rutils.CredentialsTypeRobotSecret, rpc.MakeFuncAuthHandler( + server.robotSecretAuthenticate, + server.robotSecretVerifyEntity, + )), + rpc.WithWebRTCServerOptions(rpc.WebRTCServerOptions{Enable: false})) + if err != nil { + return nil, err + } + + err = server.rpcServer.RegisterServiceServer( + ctx, + &pb.RobotService_ServiceDesc, + server, + pb.RegisterRobotServiceHandlerFromEndpoint, + ) + if err != nil { + return nil, err + } + + server.exitWg.Add(1) + utils.PanicCapturingGo(func() { + defer server.exitWg.Done() + + err := server.rpcServer.Serve(server.listener) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Warnf("Error shutting down grpc server", "error", err) + } + }) + + return server, nil +} + +// FailOnConfigAndCerts if `failure` is true the server will return an Internal error on +// all certficate and config requests. +func (s *FakeCloudServer) FailOnConfigAndCerts(failure bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.failOnConfigAndCerts = failure +} + +// Addr returns the listeners address. +func (s *FakeCloudServer) Addr() net.Addr { + return s.listener.Addr() +} + +// Shutdown will stop the server. +func (s *FakeCloudServer) Shutdown() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.rpcServer.Stop() + if err != nil { + return err + } + + s.exitWg.Wait() + + return nil +} + +// Clear resets the fake servers state, does not restart the server. +func (s *FakeCloudServer) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + + s.deviceConfigs = map[string]*configAndCerts{} +} + +// StoreDeviceConfig store config and cert data for the device id. +func (s *FakeCloudServer) StoreDeviceConfig(id string, cfg *pb.RobotConfig, cert *pb.CertificateResponse) { + s.mu.Lock() + defer s.mu.Unlock() + + s.deviceConfigs[id] = &configAndCerts{cfg: cfg, certs: cert} +} + +// Config impl. +func (s *FakeCloudServer) Config(ctx context.Context, req *pb.ConfigRequest) (*pb.ConfigResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.failOnConfigAndCerts { + return nil, status.Error(codes.Internal, "oops failure") + } + + d, ok := s.deviceConfigs[req.Id] + if !ok { + return nil, status.Error(codes.NotFound, "config for device not found") + } + + return &pb.ConfigResponse{Config: d.cfg}, nil +} + +// Certificate impl. +func (s *FakeCloudServer) Certificate(ctx context.Context, req *pb.CertificateRequest) (*pb.CertificateResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.failOnConfigAndCerts { + return nil, status.Error(codes.Internal, "oops failure") + } + + d, ok := s.deviceConfigs[req.Id] + if !ok { + return nil, status.Error(codes.NotFound, "cert for device not found") + } + + return d.certs, nil +} + +// Log impl. +func (s *FakeCloudServer) Log(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Log not implemented") +} + +// NeedsRestart impl. +func (s *FakeCloudServer) NeedsRestart(ctx context.Context, req *pb.NeedsRestartRequest) (*pb.NeedsRestartResponse, error) { + return nil, status.Error(codes.Unimplemented, "method NeedsRestart not implemented") +} + +func (s *FakeCloudServer) robotSecretAuthenticate(ctx context.Context, entity, payload string) (map[string]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + _, ok := s.deviceConfigs[entity] + if !ok { + return nil, errors.New("failed to auth device not found in fake server") + } + + if payload != FakeCredentialPayLoad { + return nil, errors.New("failed to auth device payload does not match") + } + + return map[string]string{}, nil +} + +func (s *FakeCloudServer) robotSecretVerifyEntity(ctx context.Context, entity string) (interface{}, error) { + s.mu.Lock() + defer s.mu.Unlock() + + _, ok := s.deviceConfigs[entity] + if !ok { + return nil, errors.New("failed to verify entity in fake server") + } + + return map[string]string{}, nil +} diff --git a/config/watcher_test.go b/config/watcher_test.go index fff822794d4..8e3b59064d0 100644 --- a/config/watcher_test.go +++ b/config/watcher_test.go @@ -5,21 +5,20 @@ import ( "context" "encoding/json" "fmt" - "net/http" "os" - "sync" "testing" "time" "github.com/edaniels/golog" "go.mongodb.org/mongo-driver/bson/primitive" + pb "go.viam.com/api/app/v1" "go.viam.com/test" "go.viam.com/utils" "go.viam.com/utils/pexec" - "go.viam.com/utils/testutils" "go.viam.com/rdk/components/arm" "go.viam.com/rdk/config" + "go.viam.com/rdk/config/testutils" "go.viam.com/rdk/resource" ) @@ -184,83 +183,61 @@ func TestNewWatcherFile(t *testing.T) { func TestNewWatcherCloud(t *testing.T) { logger := golog.NewTestLogger(t) - listener := testutils.ReserveRandomListener(t) - httpServer := &http.Server{ - ReadTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } - certsToReturn := config.Cloud{ TLSCertificate: "hello", TLSPrivateKey: "world", } - cloudID := primitive.NewObjectID().Hex() + deviceID := primitive.NewObjectID().Hex() - var confToReturn config.Config - var confErr bool - var confMu sync.Mutex - var certsOnce bool - httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - panic(err) - } - if len(r.Form["id"]) == 0 || r.Form["id"][0] != cloudID { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("bad id")) - return - } - if r.Header.Get("secret") != "my_secret" { - w.WriteHeader(http.StatusForbidden) - w.Write([]byte("bad secret")) - return - } + fakeServer, err := testutils.NewFakeCloudServer(context.Background(), logger) + test.That(t, err, test.ShouldBeNil) + defer func() { + test.That(t, fakeServer.Shutdown(), test.ShouldBeNil) + }() - if len(r.Form["cert"]) != 0 && !certsOnce { - certsOnce = true - md, err := json.Marshal(&certsToReturn) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf("error marshaling certs: %s", err))) - return - } - w.Write(md) - return - } + storeConfigInServer := func(cfg config.Config) { + cloudConfProto, err := config.CloudConfigToProto(cfg.Cloud) + test.That(t, err, test.ShouldBeNil) - confMu.Lock() - if confErr { - confMu.Unlock() - w.WriteHeader(http.StatusInternalServerError) - return - } - confErr = true - - md, err := json.Marshal(&confToReturn) - confMu.Unlock() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf("error marshaling conf: %s", err))) - return + componentConfProto, err := config.ComponentConfigToProto(&cfg.Components[0]) + test.That(t, err, test.ShouldBeNil) + + proccessConfProto, err := config.ProcessConfigToProto(&cfg.Processes[0]) + test.That(t, err, test.ShouldBeNil) + + networkConfProto, err := config.NetworkConfigToProto(&cfg.Network) + test.That(t, err, test.ShouldBeNil) + + protoConfig := &pb.RobotConfig{ + Cloud: cloudConfProto, + Components: []*pb.ComponentConfig{componentConfProto}, + Processes: []*pb.ProcessConfig{proccessConfProto}, + Network: networkConfProto, } - w.Write(md) - }) - serveDone := make(chan struct{}) - go func() { - defer close(serveDone) - httpServer.Serve(listener) - }() - cloudConf := &config.Cloud{ - Path: fmt.Sprintf("http://%s", listener.Addr().String()), - ID: cloudID, - Secret: "my_secret", - FQDN: "woo", - LocalFQDN: "yee", - RefreshInterval: time.Second, + fakeServer.Clear() + fakeServer.StoreDeviceConfig(deviceID, protoConfig, &pb.CertificateResponse{ + TlsCertificate: certsToReturn.TLSCertificate, + TlsPrivateKey: certsToReturn.TLSPrivateKey, + }) } + + var confToReturn config.Config + newCloudConf := func() *config.Cloud { + return &config.Cloud{ + AppAddress: fmt.Sprintf("http://%s", fakeServer.Addr().String()), + ID: deviceID, + Secret: testutils.FakeCredentialPayLoad, + FQDN: "woo", + LocalFQDN: "yee", + RefreshInterval: time.Second, + LocationSecrets: []config.LocationSecret{{ID: "1", Secret: "secret"}}, + } + } + confToReturn = config.Config{ - Cloud: cloudConf, + Cloud: newCloudConf(), Components: []config.Component{ { Namespace: resource.ResourceNamespaceRDK, @@ -287,18 +264,20 @@ func TestNewWatcherCloud(t *testing.T) { }}, } + storeConfigInServer(confToReturn) + + watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: newCloudConf()}, logger) + test.That(t, err, test.ShouldBeNil) + confToExpect := confToReturn confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey - watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: cloudConf}, logger) - test.That(t, err, test.ShouldBeNil) - newConf := <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) confToReturn = config.Config{ - Cloud: cloudConf, + Cloud: newCloudConf(), Components: []config.Component{ { Namespace: resource.ResourceNamespaceRDK, @@ -324,17 +303,20 @@ func TestNewWatcherCloud(t *testing.T) { }, }}, } - confMu.Lock() - confErr = false + + // update the config with the newer config + storeConfigInServer(confToReturn) confToExpect = confToReturn confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey - confMu.Unlock() newConf = <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) + // fake server will start returning 5xx on requests. + // no new configs should be emitted to channel until the fake server starts returning again + fakeServer.FailOnConfigAndCerts(true) timer := time.NewTimer(5 * time.Second) defer timer.Stop() select { @@ -342,9 +324,13 @@ func TestNewWatcherCloud(t *testing.T) { test.That(t, c, test.ShouldBeNil) case <-timer.C: } + fakeServer.FailOnConfigAndCerts(false) + + newConf = <-watcher.Config() + test.That(t, newConf, test.ShouldResemble, &confToExpect) confToReturn = config.Config{ - Cloud: cloudConf, + Cloud: newCloudConf(), Components: []config.Component{ { Namespace: resource.ResourceNamespaceRDK, @@ -370,18 +356,15 @@ func TestNewWatcherCloud(t *testing.T) { }, }}, } - confMu.Lock() - confErr = false + + storeConfigInServer(confToReturn) confToExpect = confToReturn confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey - confMu.Unlock() newConf = <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) test.That(t, utils.TryClose(context.Background(), watcher), test.ShouldBeNil) - test.That(t, httpServer.Shutdown(context.Background()), test.ShouldBeNil) - <-serveDone } diff --git a/web/server/net_logger.go b/web/server/net_logger.go index e19c94eeba6..515ce7d7f73 100644 --- a/web/server/net_logger.go +++ b/web/server/net_logger.go @@ -2,11 +2,8 @@ package server import ( - "bytes" "context" - "encoding/json" "fmt" - "net/http" "os" "sync" "time" @@ -64,16 +61,9 @@ func newNetLogger(config *config.Cloud, loggerWithoutNet golog.Logger, logLevel return nil, err } - var logWriter remoteLogWriter - if config.AppAddress == "" { - logWriter = &remoteLogWriterHTTP{ - cfg: config, - } - } else { - logWriter = &remoteLogWriterGRPC{ - loggerWithoutNet: loggerWithoutNet, - cfg: config, - } + logWriter := &remoteLogWriterGRPC{ + loggerWithoutNet: loggerWithoutNet, + cfg: config, } cancelCtx, cancel := context.WithCancel(context.Background()) @@ -307,71 +297,6 @@ type remoteLogWriter interface { close() } -type remoteLogWriterHTTP struct { - cfg *config.Cloud - client http.Client -} - -func (w *remoteLogWriterHTTP) write(logs []*apppb.LogEntry) error { - for _, log := range logs { - err := w.writeToServer(log) - if err != nil { - return err - } - } - return nil -} - -func (w *remoteLogWriterHTTP) writeToServer(log *apppb.LogEntry) error { - level, err := zapcore.ParseLevel(log.Level) - if err != nil { - return errors.Wrap(err, "error creating log request") - } - - e := zapcore.Entry{ - Level: level, - LoggerName: log.LoggerName, - Message: log.Message, - Stack: log.Stack, - Time: log.Time.AsTime(), - } - - x := map[string]interface{}{ - "id": w.cfg.ID, - "host": log.Host, - "log": e, - "fields": log.Fields, - } - - js, err := json.Marshal(x) - if err != nil { - return err - } - - // we specifically don't use a parented cancellable context here so we can make sure we finish writing but - // we will only give it up to 5 seconds to do so in case we are trying to shutdown. - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - r, err := http.NewRequestWithContext(ctx, http.MethodPost, w.cfg.LogPath, bytes.NewReader(js)) - if err != nil { - return errors.Wrap(err, "error creating log request") - } - r.Header.Set("Secret", w.cfg.Secret) - - resp, err := w.client.Do(r) - if err != nil { - return err - } - defer func() { - utils.UncheckedError(resp.Body.Close()) - }() - return nil -} - -func (w *remoteLogWriterHTTP) close() { - w.client.CloseIdleConnections() -} - type remoteLogWriterGRPC struct { cfg *config.Cloud service apppb.RobotServiceClient diff --git a/web/server/net_logger_test.go b/web/server/net_logger_test.go index 682a40bd806..4289c0a5f54 100644 --- a/web/server/net_logger_test.go +++ b/web/server/net_logger_test.go @@ -21,29 +21,15 @@ func TestNewNetLogger(t *testing.T) { logger := golog.NewTestLogger(t) level := zap.NewAtomicLevelAt(zap.InfoLevel) - t.Run("with AppConfig should use GRPC", func(t *testing.T) { - nl, err := newNetLogger(&config.Cloud{ - AppAddress: "http://localhost:8080", - ID: "abc-123", - }, logger, level) - test.That(t, err, test.ShouldBeNil) - - _, ok := nl.remoteWriter.(*remoteLogWriterGRPC) - test.That(t, ok, test.ShouldBeTrue) - nl.cancel() - }) - - t.Run("with AppConfig should use HTTP", func(t *testing.T) { - nl, err := newNetLogger(&config.Cloud{ - LogPath: "http://localhost:8080/logs", - ID: "abc-123", - }, logger, level) - test.That(t, err, test.ShouldBeNil) + nl, err := newNetLogger(&config.Cloud{ + AppAddress: "http://localhost:8080", + ID: "abc-123", + }, logger, level) + test.That(t, err, test.ShouldBeNil) - _, ok := nl.remoteWriter.(*remoteLogWriterHTTP) - test.That(t, ok, test.ShouldBeTrue) - nl.cancel() - }) + _, ok := nl.remoteWriter.(*remoteLogWriterGRPC) + test.That(t, ok, test.ShouldBeTrue) + nl.cancel() } func TestNewNetBatchWrites(t *testing.T) { diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 6b36c10fc48..d855e3bb4c0 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -2,14 +2,10 @@ package server import ( - "bytes" "context" - "io" - "net/http" "time" "github.com/edaniels/golog" - "github.com/pkg/errors" apppb "go.viam.com/api/app/v1" "go.viam.com/utils" "go.viam.com/utils/rpc" @@ -27,45 +23,6 @@ type needsRestartChecker interface { close() } -type needsRestartCheckerHTTP struct { - cfg *config.Cloud - restartInterval time.Duration - logger golog.Logger - client http.Client -} - -func (c *needsRestartCheckerHTTP) close() { - c.client.CloseIdleConnections() -} - -func (c *needsRestartCheckerHTTP) needsRestart(ctx context.Context) (bool, time.Duration, error) { - req, err := config.CreateCloudRequest(ctx, c.cfg) - if err != nil { - return false, c.restartInterval, errors.Wrapf(err, "error creating cloud request") - } - req.URL.Path = "/api/json1/needs_restart" - resp, err := c.client.Do(req) - if err != nil { - return false, c.restartInterval, errors.Wrapf(err, "error querying cloud request") - } - - defer func() { - utils.UncheckedError(resp.Body.Close()) - }() - - if resp.StatusCode != http.StatusOK { - return false, c.restartInterval, errors.Wrapf(err, "bad status code") - } - - read, err := io.ReadAll(resp.Body) - if err != nil { - return false, c.restartInterval, errors.Wrapf(err, "failed to read body") - } - - mustRestart := bytes.Equal(read, []byte("true")) - return mustRestart, c.restartInterval, nil -} - type needsRestartCheckerGRPC struct { cfg *config.Cloud logger golog.Logger @@ -97,15 +54,6 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. } func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger golog.Logger) (needsRestartChecker, error) { - if cfg.AppAddress == "" { - return &needsRestartCheckerHTTP{ - cfg: cfg, - logger: logger, - restartInterval: defaultNeedsRestartCheckInterval, - client: http.Client{}, - }, nil - } - client, err := config.CreateNewGRPCClient(ctx, cfg, logger) if err != nil { return nil, err diff --git a/web/server/restart_checker_test.go b/web/server/restart_checker_test.go deleted file mode 100644 index fd7229ef816..00000000000 --- a/web/server/restart_checker_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package server - -import ( - "context" - "testing" - - "github.com/edaniels/golog" - "go.viam.com/test" - - "go.viam.com/rdk/config" -) - -func TestRestartChecker(t *testing.T) { - logger := golog.NewTestLogger(t) - ctx := context.Background() - - t.Run("without AppAddress should create HTTP", func(t *testing.T) { - c, err := newRestartChecker(ctx, &config.Cloud{}, logger) - test.That(t, err, test.ShouldBeNil) - defer c.close() - - _, ok := c.(*needsRestartCheckerHTTP) - test.That(t, ok, test.ShouldBeTrue) - }) -}