diff --git a/pkg/bootstrap/bootstrap.go b/pkg/bootstrap/bootstrap.go index d3c9f7ee248d..db4ac936df2a 100644 --- a/pkg/bootstrap/bootstrap.go +++ b/pkg/bootstrap/bootstrap.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/sirupsen/logrus" ) func Handler(bootstrap *config.ControlRuntimeBootstrap) http.Handler { @@ -35,7 +36,8 @@ func ReadFromDisk(w io.Writer, bootstrap *config.ControlRuntimeBootstrap) error } data, err := ioutil.ReadFile(path) if err != nil { - return errors.Wrapf(err, "failed to read %s", path) + logrus.Warnf("failed to read %s", path) + continue } info, err := os.Stat(path) diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index dc706c7261ac..8586ab0a5b6b 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -95,7 +95,7 @@ func run(app *cli.Context, cfg *cmds.Server) error { cluster := cluster.New(&serverConfig.ControlConfig) - if err := cluster.Bootstrap(ctx); err != nil { + if err := cluster.Bootstrap(ctx, true); err != nil { return err } diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index 76cc290cab70..f17a4df15b48 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -9,32 +9,103 @@ import ( "io" "io/ioutil" "os" + "path" "path/filepath" "strings" "time" "github.com/k3s-io/kine/pkg/client" + "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/k3s/pkg/bootstrap" "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/daemons/executor" + "github.com/rancher/k3s/pkg/etcd" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" + "go.etcd.io/etcd/server/v3/embed" ) // Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined. // It then checks to see if the cluster needs to load bootstrap data, and if so, loads data into the // ControlRuntimeBoostrap struct, either via HTTP or from the datastore. -func (c *Cluster) Bootstrap(ctx context.Context) error { +func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error { if err := c.assignManagedDriver(ctx); err != nil { return err } - shouldBootstrap, err := c.shouldBootstrapLoad(ctx) + shouldBootstrap, isInitialized, err := c.shouldBootstrapLoad(ctx) if err != nil { return err } c.shouldBootstrap = shouldBootstrap + if c.managedDB != nil { + if !snapshot { + // In the case of etcd, if the database has been initialized, it doesn't + // need to be bootstrapped however we still need to check the database + // and reconcile the bootstrap data. Below we're starting a temporary + // instance of etcd in the event that etcd certificates are unavailable, + // reading the data, and comparing that to the data on disk, all the while + // starting normal etcd. + isHTTP := c.config.JoinURL != "" && c.config.Token != "" + if isInitialized && !isHTTP { + tmpDataDir := filepath.Join(c.config.DataDir, "db", "tmp-etcd") + os.RemoveAll(tmpDataDir) + if err := os.Mkdir(tmpDataDir, 0700); err != nil { + return err + } + etcdDataDir := etcd.DBDir(c.config) + if err := createTmpDataDir(etcdDataDir, tmpDataDir); err != nil { + return err + } + defer func() { + if err := os.RemoveAll(tmpDataDir); err != nil { + logrus.Warn("failed to remove etcd temp dir", err) + } + }() + + args := executor.ETCDConfig{ + DataDir: tmpDataDir, + ForceNewCluster: true, + ListenClientURLs: "http://127.0.0.1:2399", + Logger: "zap", + HeartbeatInterval: 500, + ElectionTimeout: 5000, + LogOutputs: []string{"stderr"}, + } + configFile, err := args.ToConfigFile() + if err != nil { + return err + } + cfg, err := embed.ConfigFromFile(configFile) + if err != nil { + return err + } + + etcd, err := embed.StartEtcd(cfg) + if err != nil { + return err + } + defer etcd.Close() + + data, err := c.retrieveInitializedDBdata(ctx) + if err != nil { + return err + } + + ec := endpoint.ETCDConfig{ + Endpoints: []string{"http://127.0.0.1:2399"}, + LeaderElect: false, + } + + if err := c.ReconcileBootstrapData(ctx, bytes.NewReader(data.Bytes()), &c.config.Runtime.ControlRuntimeBootstrap, false, &ec); err != nil { + logrus.Fatal(err) + } + } + } + } + if c.shouldBootstrap { return c.bootstrap(ctx) } @@ -42,16 +113,80 @@ func (c *Cluster) Bootstrap(ctx context.Context) error { return nil } -// shouldBootstrapLoad returns true if we need to load ControlRuntimeBootstrap data again. -// This is controlled by a stamp file on disk that records successful bootstrap using a hash of the join token. -func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) { +// copyFile copies the contents of the src file +// to the given destination file. +func copyFile(src, dst string) error { + srcfd, err := os.Open(src) + if err != nil { + return err + } + defer srcfd.Close() + + dstfd, err := os.Create(dst) + if err != nil { + return err + } + defer dstfd.Close() + + if _, err = io.Copy(dstfd, srcfd); err != nil { + return err + } + + srcinfo, err := os.Stat(src) + if err != nil { + return err + } + + return os.Chmod(dst, srcinfo.Mode()) +} + +// createTmpDataDir creates a temporary directory and copies the +// contents of the original etcd data dir to be used +// by etcd when reading data. +func createTmpDataDir(src, dst string) error { + srcinfo, err := os.Stat(src) + if err != nil { + return err + } + + if err := os.MkdirAll(dst, srcinfo.Mode()); err != nil { + return err + } + + fds, err := ioutil.ReadDir(src) + if err != nil { + return err + } + + for _, fd := range fds { + srcfp := path.Join(src, fd.Name()) + dstfp := path.Join(dst, fd.Name()) + + if fd.IsDir() { + if err = createTmpDataDir(srcfp, dstfp); err != nil { + fmt.Println(err) + } + } else { + if err = copyFile(srcfp, dstfp); err != nil { + fmt.Println(err) + } + } + } + + return nil +} + +// shouldBootstrapLoad returns true if we need to load ControlRuntimeBootstrap data again and a second boolean +// indicating that the server has or has not been initialized, if etcd. This is controlled by a stamp file on +// disk that records successful bootstrap using a hash of the join token. +func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, bool, error) { // Non-nil managedDB indicates that the database is either initialized, initializing, or joining if c.managedDB != nil { c.runtime.HTTPBootstrap = true isInitialized, err := c.managedDB.IsInitialized(ctx, c.config) if err != nil { - return false, err + return false, false, err } if isInitialized { @@ -64,22 +199,22 @@ func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) { if c.config.JoinURL != "" && c.config.Token != "" { c.clientAccessInfo, _ = clientaccess.ParseAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server") } - return false, nil + return false, true, nil } else if c.config.JoinURL == "" { // Not initialized, not joining - must be initializing (cluster-init) logrus.Infof("Managed %s cluster initializing", c.managedDB.EndpointName()) - return false, nil + return false, false, nil } else { // Not initialized, but have a Join URL - fail if there's no token; if there is then validate it. if c.config.Token == "" { - return false, errors.New(version.ProgramUpper + "_TOKEN is required to join a cluster") + return false, false, errors.New(version.ProgramUpper + "_TOKEN is required to join a cluster") } // Fail if the token isn't syntactically valid, or if the CA hash on the remote server doesn't match // the hash in the token. The password isn't actually checked until later when actually bootstrapping. info, err := clientaccess.ParseAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server") if err != nil { - return false, err + return false, false, err } logrus.Infof("Managed %s cluster not yet initialized", c.managedDB.EndpointName()) @@ -97,7 +232,7 @@ func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) { // } // No errors and no bootstrap stamp, need to bootstrap. - return true, nil + return true, false, nil } // isDirEmpty checks to see if the given directory @@ -121,6 +256,7 @@ func isDirEmpty(name string) (bool, error) { // that contain the needed certificates exist. func (c *Cluster) certDirsExist() error { bootstrapDirs := []string{ + "cred", "tls", "tls/etcd", } @@ -180,61 +316,79 @@ const systemTimeSkew = int64(3) // ReconcileBootstrapData is called before any data is saved to the // datastore or locally. It checks to see if the contents of the // bootstrap data in the datastore is newer than on disk or different -// and dependingon where the difference is, the newer data is written -// to the older. -func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap) error { +// and depending on where the difference is, the newer data is written +// to disk or if the disk is newer, the process is stopped and a error +// is issued. +func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap, isHTTP bool, ec *endpoint.ETCDConfig) error { logrus.Info("Reconciling bootstrap data between datastore and disk") if err := c.certDirsExist(); err != nil { - logrus.Warn(err.Error()) return bootstrap.WriteToDiskFromStorage(buf, crb) } - token := c.config.Token - if token == "" { - tokenFromFile, err := readTokenFromFile(c.runtime.ServerToken, c.runtime.ServerCA, c.config.DataDir) + var dbRawData []byte + if c.managedDB != nil && !isHTTP { + token := c.config.Token + if token == "" { + tokenFromFile, err := readTokenFromFile(c.runtime.ServerToken, c.runtime.ServerCA, c.config.DataDir) + if err != nil { + return err + } + if tokenFromFile == "" { + // at this point this is a fresh start in a non-managed environment + c.saveBootstrap = true + return nil + } + token = tokenFromFile + } + + normalizedToken, err := normalizeToken(token) if err != nil { return err } - if tokenFromFile == "" { - // at this point this is a fresh start in a non-managed environment - c.saveBootstrap = true - return nil + + var value *client.Value + + var etcdConfig endpoint.ETCDConfig + if ec != nil { + etcdConfig = *ec + } else { + etcdConfig = c.etcdConfig } - token = tokenFromFile - } - normalizedToken, err := normalizeToken(token) - if err != nil { - return err - } - var value *client.Value + storageClient, err := client.New(etcdConfig) + if err != nil { + return err + } - storageClient, err := client.New(c.etcdConfig) - if err != nil { - return err - } + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + RETRY: + for { + value, err = c.getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token) + if err != nil { + if strings.Contains(err.Error(), "not supported for learner") { + for range ticker.C { + continue RETRY + } -RETRY: - for { - value, err = c.getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token) - if err != nil { - if strings.Contains(err.Error(), "not supported for learner") { - for range ticker.C { - continue RETRY } + return err + } + if value == nil { + return nil + } + dbRawData, err = decrypt(normalizedToken, value.Data) + if err != nil { + return err } - return err - } - if value == nil { - return nil + + break } - break + buf = bytes.NewReader(dbRawData) } paths, err := bootstrap.ObjToMap(crb) @@ -249,6 +403,7 @@ RETRY: // Therefore, we need to perform a migration to the newer bootstrap // format; bootstrap.BootstrapFile. buf.Seek(0, 0) + if err := migrateBootstrapData(ctx, buf, files); err != nil { return err } @@ -259,10 +414,9 @@ RETRY: db, disk, conflict bool } - var updateDatastore, updateDisk bool + var updateDisk bool results := make(map[string]update) - for pathKey, fileData := range files { path, ok := paths[pathKey] if !ok { @@ -286,8 +440,6 @@ RETRY: } if !bytes.Equal(fileData.Content, fData) { - logrus.Warnf("%s is out of sync with datastore", path) - info, err := os.Stat(path) if err != nil { return err @@ -363,26 +515,20 @@ RETRY: } for path, res := range results { - if res.db { - updateDatastore = true - logrus.Warn(path + " newer than datastore") - } else if res.disk { + switch { + case res.disk: updateDisk = true logrus.Warn("datastore newer than " + path) - } else if res.conflict { + case res.db: + logrus.Fatal(path + " newer than datastore and could cause cluster outage. Remove the file from disk and restart to be recreated from datastore.") + case res.conflict: logrus.Warnf("datastore / disk conflict: %s newer than in the datastore", path) } } - switch { - case updateDatastore: - logrus.Warn("updating bootstrap data in datastore from disk") - return c.save(ctx, true) - case updateDisk: + if updateDisk { logrus.Warn("updating bootstrap data on disk from datastore") return bootstrap.WriteToDiskFromStorage(buf, crb) - default: - // on disk certificates match timestamps in storage. noop. } return nil @@ -397,7 +543,16 @@ func (c *Cluster) httpBootstrap(ctx context.Context) error { return err } - return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap) + return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap, true, nil) +} + +func (c *Cluster) retrieveInitializedDBdata(ctx context.Context) (*bytes.Buffer, error) { + var buf bytes.Buffer + if err := bootstrap.ReadFromDisk(&buf, &c.runtime.ControlRuntimeBootstrap); err != nil { + return nil, err + } + + return &buf, nil } // bootstrap performs cluster bootstrapping, either via HTTP (for managed databases) or direct load from datastore. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c4ac80150048..ad1212146856 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -79,6 +79,10 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { return nil, err } + if err := c.startStorage(ctx); err != nil { + return nil, err + } + // if necessary, store bootstrap data to datastore if c.saveBootstrap { if err := c.save(ctx, false); err != nil { @@ -86,10 +90,6 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { } } - if err := c.startStorage(ctx); err != nil { - return nil, err - } - // at this point, if etcd is in use, it's bootstrapping is complete // so save the bootstrap data. We will need for etcd to be up. If // the save call returns an error, we panic since subsequent etcd diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go index bc86e5d3fda4..6cb617591fbf 100644 --- a/pkg/cluster/storage.go +++ b/pkg/cluster/storage.go @@ -132,7 +132,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error { return err } - return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap) + return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap, false, nil) } // getBootstrapKeyFromStorage will list all keys that has prefix /bootstrap and will check for key that is diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 2724d7f3d60a..1e4798c26d96 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -246,7 +246,7 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro cluster := cluster.New(config) - if err := cluster.Bootstrap(ctx); err != nil { + if err := cluster.Bootstrap(ctx, false); err != nil { return err } diff --git a/pkg/daemons/executor/etcd.go b/pkg/daemons/executor/etcd.go index 8bcdb735da55..9163ae5682fa 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/daemons/executor/etcd.go @@ -1,3 +1,4 @@ +//go:build !no_embedded_executor // +build !no_embedded_executor package executor @@ -27,6 +28,7 @@ func (e Embedded) ETCD(ctx context.Context, args ETCDConfig) error { if err != nil { return err } + etcd, err := embed.StartEtcd(cfg) if err != nil { return err diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 09aeb8bdc9ce..3100f5a90659 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -151,14 +151,14 @@ func (e *ETCD) Test(ctx context.Context) error { return errors.Errorf("this server is a not a member of the etcd cluster. Found %v, expect: %s=%s", memberNameUrls, e.name, e.address) } -// etcdDBDir returns the path to dataDir/db/etcd -func etcdDBDir(config *config.Control) string { +// DBDir returns the path to dataDir/db/etcd +func DBDir(config *config.Control) string { return filepath.Join(config.DataDir, "db", "etcd") } // walDir returns the path to etcdDBDir/member/wal func walDir(config *config.Control) string { - return filepath.Join(etcdDBDir(config), "member", "wal") + return filepath.Join(DBDir(config), "member", "wal") } func sqliteFile(config *config.Control) string { @@ -167,7 +167,7 @@ func sqliteFile(config *config.Control) string { // nameFile returns the path to etcdDBDir/name. func nameFile(config *config.Control) string { - return filepath.Join(etcdDBDir(config), "name") + return filepath.Join(DBDir(config), "name") } // ResetFile returns the path to etcdDBDir/reset-flag. @@ -188,7 +188,7 @@ func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool, } } -// Reset resets an etcd node +// Reset resets an etcd node to a single node cluster. func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error { // Wait for etcd to come up as a new single-node cluster, then exit go func() { @@ -287,7 +287,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e if existingCluster { //check etcd dir permission - etcdDir := etcdDBDir(e.config) + etcdDir := DBDir(e.config) info, err := os.Stat(etcdDir) if err != nil { return err @@ -416,10 +416,10 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt e.config.Datastore.BackendTLSConfig.CertFile = e.runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.runtime.ClientETCDKey - tombstoneFile := filepath.Join(etcdDBDir(e.config), "tombstone") + tombstoneFile := filepath.Join(DBDir(e.config), "tombstone") if _, err := os.Stat(tombstoneFile); err == nil { logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster") - if _, err := backupDirWithRetention(etcdDBDir(e.config), maxBackupRetention); err != nil { + if _, err := backupDirWithRetention(DBDir(e.config), maxBackupRetention); err != nil { return nil, err } } @@ -500,6 +500,7 @@ func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints .. if err != nil { return nil, err } + return clientv3.New(*cfg) } @@ -509,15 +510,14 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi if err != nil { return nil, err } - cfg := &clientv3.Config{ + return &clientv3.Config{ Endpoints: endpoints, TLS: tlsConfig, Context: ctx, DialTimeout: defaultDialTimeout, DialKeepAliveTime: defaultKeepAliveTime, DialKeepAliveTimeout: defaultKeepAliveTimeout, - } - return cfg, nil + }, nil } // toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable @@ -652,7 +652,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init ListenMetricsURLs: e.metricsURL(e.config.EtcdExposeMetrics), ListenPeerURLs: e.peerURL(), AdvertiseClientURLs: e.clientURL(), - DataDir: etcdDBDir(e.config), + DataDir: DBDir(e.config), ServerTrust: executor.ServerTrust{ CertFile: e.config.Runtime.ServerETCDCert, KeyFile: e.config.Runtime.ServerETCDKey, @@ -1313,7 +1313,7 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) { // completion. func (e *ETCD) Restore(ctx context.Context) error { // check the old etcd data dir - oldDataDir := etcdDBDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix())) + oldDataDir := DBDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix())) if e.config.ClusterResetRestorePath == "" { return errors.New("no etcd restore path was specified") } @@ -1322,14 +1322,14 @@ func (e *ETCD) Restore(ctx context.Context) error { return err } // move the data directory to a temp path - if err := os.Rename(etcdDBDir(e.config), oldDataDir); err != nil { + if err := os.Rename(DBDir(e.config), oldDataDir); err != nil { return err } logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir) return snapshot.NewV3(nil).Restore(snapshot.RestoreConfig{ SnapshotPath: e.config.ClusterResetRestorePath, Name: e.name, - OutputDataDir: etcdDBDir(e.config), + OutputDataDir: DBDir(e.config), OutputWALDir: walDir(e.config), PeerURLs: []string{e.peerURL()}, InitialCluster: e.name + "=" + e.peerURL(), @@ -1470,8 +1470,8 @@ func (e *ETCD) RemoveSelf(ctx context.Context) error { } // backup the data dir to avoid issues when re-enabling etcd - oldDataDir := etcdDBDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix())) + oldDataDir := DBDir(e.config) + "-old-" + strconv.Itoa(int(time.Now().Unix())) // move the data directory to a temp path - return os.Rename(etcdDBDir(e.config), oldDataDir) + return os.Rename(DBDir(e.config), oldDataDir) } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 4d15361ab2ff..e49feabd2e7f 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -170,17 +170,17 @@ func Test_UnitETCD_Register(t *testing.T) { if err := testutil.GenerateRuntime(cnf); err != nil { return err } - if err := os.MkdirAll(etcdDBDir(cnf), 0700); err != nil { + if err := os.MkdirAll(DBDir(cnf), 0700); err != nil { return err } - tombstoneFile := filepath.Join(etcdDBDir(cnf), "tombstone") + tombstoneFile := filepath.Join(DBDir(cnf), "tombstone") if _, err := os.Create(tombstoneFile); err != nil { return err } return nil }, teardown: func(cnf *config.Control) error { - tombstoneFile := filepath.Join(etcdDBDir(cnf), "tombstone") + tombstoneFile := filepath.Join(DBDir(cnf), "tombstone") os.Remove(tombstoneFile) testutil.CleanupDataDir(cnf) return nil