diff --git a/bin/p2-rctl-server/main.go b/bin/p2-rctl-server/main.go index d05d96691..74938505d 100644 --- a/bin/p2-rctl-server/main.go +++ b/bin/p2-rctl-server/main.go @@ -16,12 +16,12 @@ import ( "github.com/square/p2/pkg/alerting" "github.com/square/p2/pkg/health/checker" - "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/rc" "github.com/square/p2/pkg/roll" "github.com/square/p2/pkg/scheduler" "github.com/square/p2/pkg/store/consul" + "github.com/square/p2/pkg/store/consul/auditlogstore" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/flags" "github.com/square/p2/pkg/store/consul/rcstore" @@ -54,7 +54,7 @@ func SessionName() string { func main() { // Parse custom flags + standard Consul routing options kingpin.Version(version.VERSION) - _, opts, _ := flags.ParseWithConsulOptions() + _, opts, labeler := flags.ParseWithConsulOptions() // Set up the logger logger := logging.NewLogger(logrus.Fields{}) @@ -72,9 +72,6 @@ func main() { httpClient := cleanhttp.DefaultClient() client := consul.NewConsulClient(opts) consulStore := consul.NewConsulStore(client) - // flags.ParseWithConsulOptions() because that interface doesn't - // support transactions which is now required by the RC store - labeler := labels.NewConsulApplicator(client, 0) rcStore := rcstore.NewConsul(client, labeler, RetryCount) rollStore := rollstore.NewConsul(client, labeler, nil) @@ -102,9 +99,12 @@ func main() { } } + auditLogStore := auditlogstore.NewConsulStore(client.KV()) + // Run the farms! go rc.NewFarm( consulStore, + auditLogStore, rcStore, rcStore, rcStore, diff --git a/bin/p2-rctl/main.go b/bin/p2-rctl/main.go index cc62c7479..d478baa8d 100644 --- a/bin/p2-rctl/main.go +++ b/bin/p2-rctl/main.go @@ -22,6 +22,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc" "github.com/square/p2/pkg/rc/fields" rc_fields "github.com/square/p2/pkg/rc/fields" @@ -54,11 +55,13 @@ var ( logLevel = kingpin.Flag("log", "Logging level to display.").String() logJSON = kingpin.Flag("log-json", "Log messages will be JSON formatted").Bool() - cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller") - createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String() - createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String() - createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap() - createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap() + cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller") + createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String() + createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String() + createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap() + createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap() + createAvailabilityZone = cmdCreate.Flag("availability-zone", "availability zone that RC should belong to").Short('a').Required().String() + createClusterName = cmdCreate.Flag("cluster-name", "availability zone that RC should belong to").Short('c').Required().String() cmdDelete = kingpin.Command(cmdDeleteText, "Delete a replication controller") deleteID = cmdDelete.Arg("id", "replication controller uuid to delete").Required().String() @@ -153,7 +156,14 @@ func main() { switch cmd { case cmdCreateText: - rctl.Create(*createManifest, *createNodeSel, *createPodLabels, *createRCLabels) + rctl.Create( + *createManifest, + *createNodeSel, + pc_fields.AvailabilityZone(*createAvailabilityZone), + pc_fields.ClusterName(*createClusterName), + *createPodLabels, + *createRCLabels, + ) case cmdDeleteText: rctl.Delete(*deleteID, *deleteForce) case cmdReplicasText: @@ -197,6 +207,8 @@ type ReplicationControllerStore interface { Create( manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (fields.RC, error) @@ -231,7 +243,14 @@ type rctlParams struct { logger logging.Logger } -func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[string]string, rcLabels map[string]string) { +func (r rctlParams) Create( + manifestPath string, + nodeSelector string, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels map[string]string, + rcLabels map[string]string, +) { manifest, err := manifest.FromPath(manifestPath) if err != nil { r.logger.WithErrorAndFields(err, logrus.Fields{ @@ -246,7 +265,7 @@ func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[stri }).Fatalln("Could not parse node selector") } - newRC, err := r.rcs.Create(manifest, nodeSel, klabels.Set(podLabels), rcLabels) + newRC, err := r.rcs.Create(manifest, nodeSel, availabilityZone, clusterName, klabels.Set(podLabels), rcLabels) if err != nil { r.logger.WithError(err).Fatalln("Could not create replication controller in Consul") } diff --git a/integration/single-node-slug-deploy/check.go b/integration/single-node-slug-deploy/check.go index 5db2794f9..b2ab6af31 100644 --- a/integration/single-node-slug-deploy/check.go +++ b/integration/single-node-slug-deploy/check.go @@ -428,13 +428,13 @@ func verifyTransferReplicas(errCh chan<- error, tempdir string, logger logging.L builder.SetID("some_pod") man := builder.GetManifest() - fromRC, err := rcStore.Create(man, klabels.Everything(), nil, nil) + fromRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { errCh <- util.Errorf("could not create RC for replica transfer test: %s", err) return } - toRC, err := rcStore.Create(man, klabels.Everything(), nil, nil) + toRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { errCh <- util.Errorf("could not create second RC for replica transfer test: %s", err) return @@ -854,7 +854,19 @@ func createHelloReplicationController(dir string) (fields.ID, error) { return "", err } - cmd := exec.Command("p2-rctl", "--log-json", "create", "--manifest", signedManifestPath, "--node-selector", "test=yes") + cmd := exec.Command( + "p2-rctl", + "--log-json", + "create", + "--manifest", + signedManifestPath, + "--node-selector", + "test=yes", + "--availability-zone", + "some_az", + "--cluster-name", + "some_cn", + ) out := bytes.Buffer{} cmd.Stdout = &out cmd.Stderr = &out diff --git a/pkg/audit/rc.go b/pkg/audit/rc.go new file mode 100644 index 000000000..39591d2ee --- /dev/null +++ b/pkg/audit/rc.go @@ -0,0 +1,44 @@ +package audit + +import ( + "encoding/json" + + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/types" + "github.com/square/p2/pkg/util" +) + +const ( + // RcRetargetingEvent represents events in which an RC changes the set of + // nodes that it is targeting. This can be used to log the set of nodes that + // an RC or pod cluster manages over time + RCRetargetingEvent EventType = "REPLICATION_CONTROLLER_RETARGET" +) + +type RCRetargetingDetails struct { + PodID types.PodID `json:"pod_id"` + AvailabilityZone pc_fields.AvailabilityZone `json:"availability_zone"` + ClusterName pc_fields.ClusterName `json:"cluster_name"` + Nodes []types.NodeName `json:"nodes"` +} + +func NewRCRetargetingEventDetails( + podID types.PodID, + az pc_fields.AvailabilityZone, + name pc_fields.ClusterName, + nodes []types.NodeName, +) (json.RawMessage, error) { + details := RCRetargetingDetails{ + PodID: podID, + AvailabilityZone: az, + ClusterName: name, + Nodes: nodes, + } + + bytes, err := json.Marshal(details) + if err != nil { + return nil, util.Errorf("could not marshal rc retargeting details as json: %s", err) + } + + return json.RawMessage(bytes), nil +} diff --git a/pkg/audit/rc_test.go b/pkg/audit/rc_test.go new file mode 100644 index 000000000..b5bb0aa79 --- /dev/null +++ b/pkg/audit/rc_test.go @@ -0,0 +1,44 @@ +package audit + +import ( + "encoding/json" + "reflect" + "testing" + + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/types" +) + +func TestRCRetargetingEventDetails(t *testing.T) { + podID := types.PodID("some_pod_id") + clusterName := pc_fields.ClusterName("some_cluster_name") + az := pc_fields.AvailabilityZone("some_availability_zone") + nodes := []types.NodeName{"node1", "node2"} + + detailsJSON, err := NewRCRetargetingEventDetails(podID, az, clusterName, nodes) + if err != nil { + t.Fatal(err) + } + + var details RCRetargetingDetails + err = json.Unmarshal(detailsJSON, &details) + if err != nil { + t.Fatal(err) + } + + if details.PodID != podID { + t.Errorf("expected pod id to be %s but was %s", podID, details.PodID) + } + + if details.AvailabilityZone != az { + t.Errorf("expected availability zone to be %s but was %s", az, details.AvailabilityZone) + } + + if details.ClusterName != clusterName { + t.Errorf("expected cluster name to be %s but was %s", clusterName, details.ClusterName) + } + + if !reflect.DeepEqual(details.Nodes, nodes) { + t.Errorf("expected node list to be %s but was %s", nodes, details.Nodes) + } +} diff --git a/pkg/labels/consul_applicator.go b/pkg/labels/consul_applicator.go index dcca85744..067c4d57e 100644 --- a/pkg/labels/consul_applicator.go +++ b/pkg/labels/consul_applicator.go @@ -365,6 +365,10 @@ func (c *consulApplicator) RemoveAllLabels(labelType Type, id string) error { // the passed transaction rather than synchronously making the requisite consul // call func (c *consulApplicator) RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error { + return removeAllLabelsTxn(ctx, labelType, id) +} + +func removeAllLabelsTxn(ctx context.Context, labelType Type, id string) error { path, err := objectPath(labelType, id) if err != nil { return err diff --git a/pkg/labels/fake_applicator.go b/pkg/labels/fake_applicator.go index bbde77cad..671bb3501 100644 --- a/pkg/labels/fake_applicator.go +++ b/pkg/labels/fake_applicator.go @@ -83,6 +83,10 @@ func (app *fakeApplicator) RemoveLabel(labelType Type, id, name string) error { return nil } +func (app *fakeApplicator) RemoveLabelsTxn(ctx context.Context, labelType Type, id string, keysToRemove []string) error { + panic("not implemented") +} + func (app *fakeApplicator) ListLabels(labelType Type) ([]Labeled, error) { res := []Labeled{} for id, set := range app.data[labelType] { diff --git a/pkg/labels/http_applicator.go b/pkg/labels/http_applicator.go index e72368392..9c6a46025 100644 --- a/pkg/labels/http_applicator.go +++ b/pkg/labels/http_applicator.go @@ -206,6 +206,10 @@ func (h *httpApplicator) RemoveAllLabels(labelType Type, id string) error { return nil } +func (h *httpApplicator) RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error { + return removeAllLabelsTxn(ctx, labelType, id) +} + // Finds all labels assigned to all entities under a type // // GET /api/labels/:type diff --git a/pkg/labels/label_http_server.go b/pkg/labels/label_http_server.go index 968048abc..577647bd1 100644 --- a/pkg/labels/label_http_server.go +++ b/pkg/labels/label_http_server.go @@ -1,6 +1,7 @@ package labels import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -21,8 +22,11 @@ import ( type ApplicatorWithoutWatches interface { SetLabel(labelType Type, id, name, value string) error SetLabels(labelType Type, id string, labels map[string]string) error + SetLabelsTxn(ctx context.Context, labelType Type, id string, labels map[string]string) error RemoveLabel(labelType Type, id, name string) error RemoveAllLabels(labelType Type, id string) error + RemoveLabelsTxn(ctx context.Context, labelType Type, id string, keysToRemove []string) error + RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error ListLabels(labelType Type) ([]Labeled, error) GetLabels(labelType Type, id string) (Labeled, error) GetMatches(selector klabels.Selector, labelType Type) ([]Labeled, error) diff --git a/pkg/pc/fields/fields.go b/pkg/pc/fields/fields.go index a77972a2a..d537cd79b 100644 --- a/pkg/pc/fields/fields.go +++ b/pkg/pc/fields/fields.go @@ -4,7 +4,6 @@ import ( "encoding/json" "reflect" - "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/types" "k8s.io/kubernetes/pkg/labels" @@ -18,9 +17,9 @@ type Annotations map[string]interface{} // label keys used by pod selector const ( - AvailabilityZoneLabel = "availability_zone" - ClusterNameLabel = "cluster_name" - PodIDLabel = rcstore.PodIDLabel // TODO: put this in a different place now that multiple packages use it + AvailabilityZoneLabel = types.AvailabilityZoneLabel + ClusterNameLabel = types.ClusterNameLabel + PodIDLabel = types.PodIDLabel ) func (id ID) String() string { diff --git a/pkg/rc/auditing_transaction.go b/pkg/rc/auditing_transaction.go new file mode 100644 index 000000000..9197f9bfd --- /dev/null +++ b/pkg/rc/auditing_transaction.go @@ -0,0 +1,95 @@ +package rc + +import ( + "context" + + "github.com/square/p2/pkg/audit" + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" + "github.com/square/p2/pkg/util" +) + +type auditingTransaction struct { + ctx context.Context + nodes map[types.NodeName]struct{} + auditLogStore AuditLogStore + podID types.PodID + az pc_fields.AvailabilityZone + cn pc_fields.ClusterName +} + +type scheduledNodesKey struct{} + +func (rc *replicationController) newAuditingTransaction( + ctx context.Context, + startingNodes []types.NodeName, +) (*auditingTransaction, func()) { + annotatedContext := context.WithValue(ctx, scheduledNodesKey{}, startingNodes) + ctx, cancelFunc := transaction.New(annotatedContext) + + startingNodeMap := make(map[types.NodeName]struct{}) + for _, node := range startingNodes { + startingNodeMap[node] = struct{}{} + } + + rc.mu.Lock() + manifest := rc.Manifest + podLabels := rc.PodLabels + rc.mu.Unlock() + return &auditingTransaction{ + ctx: ctx, + nodes: startingNodeMap, + auditLogStore: rc.auditLogStore, + podID: manifest.ID(), + az: pc_fields.AvailabilityZone(podLabels[types.AvailabilityZoneLabel]), + cn: pc_fields.ClusterName(podLabels[types.ClusterNameLabel]), + }, cancelFunc +} + +func (a *auditingTransaction) Context() context.Context { + return a.ctx +} + +func (a *auditingTransaction) Nodes() []types.NodeName { + var nodes []types.NodeName + for node, _ := range a.nodes { + nodes = append(nodes, node) + } + return nodes +} + +func (a *auditingTransaction) AddNode(node types.NodeName) { + a.nodes[node] = struct{}{} +} + +func (a *auditingTransaction) RemoveNode(node types.NodeName) { + delete(a.nodes, node) +} + +// Commit adds one final operation to the underlying consul transaction to +// create an audit log record with the set of nodes that already have been +// scheduled and nodes that will be scheduled as a part of this transaction by +// the RC. Then it commits the transaction +func (a *auditingTransaction) Commit(cancelFunc func(), txner transaction.Txner) error { + details, err := audit.NewRCRetargetingEventDetails( + a.podID, + a.az, + a.cn, + a.Nodes(), + ) + if err != nil { + return err + } + + err = a.auditLogStore.Create( + a.ctx, + audit.RCRetargetingEvent, + details, + ) + if err != nil { + return util.Errorf("could not add rc retargeting audit log to context: %s", err) + } + + return transaction.Commit(a.ctx, cancelFunc, txner) +} diff --git a/pkg/rc/auditing_transaction_test.go b/pkg/rc/auditing_transaction_test.go new file mode 100644 index 000000000..203cc6bfe --- /dev/null +++ b/pkg/rc/auditing_transaction_test.go @@ -0,0 +1,98 @@ +package rc + +import ( + "context" + "encoding/json" + "testing" + + "github.com/square/p2/pkg/audit" + pc_fields "github.com/square/p2/pkg/pc/fields" + rc_fields "github.com/square/p2/pkg/rc/fields" + "github.com/square/p2/pkg/store/consul/auditlogstore" + "github.com/square/p2/pkg/store/consul/consulutil" + "github.com/square/p2/pkg/types" + + klabels "k8s.io/kubernetes/pkg/labels" +) + +func TestAuditingTransaction(t *testing.T) { + fixture := consulutil.NewFixture(t) + defer fixture.Stop() + + auditLogStore := auditlogstore.NewConsulStore(fixture.Client.KV()) + rc := &replicationController{ + RC: rc_fields.RC{ + Manifest: testManifest(), + PodLabels: klabels.Set{ + pc_fields.AvailabilityZoneLabel: "some_az", + pc_fields.ClusterNameLabel: "some_cn", + }, + }, + auditLogStore: auditLogStore, + } + + ctx, cancel := rc.newAuditingTransaction(context.Background(), []types.NodeName{"node1", "node2"}) + defer cancel() + + ctx.AddNode("node3") + ctx.RemoveNode("node2") + + err := ctx.Commit(cancel, fixture.Client.KV()) + if err != nil { + t.Fatalf("could not commit audit log record: %s", err) + } + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 1 { + t.Fatalf("expected a single audit log record but there were %d", len(records)) + } + + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + + if details.PodID != testManifest().ID() { + t.Errorf("expected audit log details to have pod ID %q but was %q", testManifest().ID(), details.PodID) + } + if details.AvailabilityZone != "some_az" { + t.Errorf("expected audit log details to have availability zone %q but was %q", "some_az", details.AvailabilityZone) + } + if details.ClusterName != "some_cn" { + t.Errorf("expected audit log details to have cluster name %q but was %q", "some_cn", details.ClusterName) + } + + if len(details.Nodes) != 2 { + t.Fatalf("expected 2 nodes but there were %d", len(details.Nodes)) + } + found1 := false + found3 := false + for _, node := range details.Nodes { + switch node { + case "node1": + found1 = true + case "node3": + found3 = true + default: + t.Errorf("found node %s but that wasn't expected", node) + } + } + + if !found1 { + t.Error("expected to find node1 in the list") + } + + if !found3 { + t.Error("expected to find node3 in the list") + } + } +} diff --git a/pkg/rc/farm.go b/pkg/rc/farm.go index 4af760691..b876f52e4 100644 --- a/pkg/rc/farm.go +++ b/pkg/rc/farm.go @@ -2,6 +2,7 @@ package rc import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -10,6 +11,7 @@ import ( klabels "k8s.io/kubernetes/pkg/labels" "github.com/square/p2/pkg/alerting" + "github.com/square/p2/pkg/audit" "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" p2metrics "github.com/square/p2/pkg/metrics" @@ -48,6 +50,14 @@ type ReplicationControllerLocker interface { LockForOwnership(rcID fields.ID, session consul.Session) (consulutil.Unlocker, error) } +type AuditLogStore interface { + Create( + ctx context.Context, + eventType audit.EventType, + eventDetails json.RawMessage, + ) error +} + // The Farm is responsible for spawning and reaping replication controllers // as they are added to and deleted from Consul. Multiple farms can exist // simultaneously, but each one must hold a different Consul session. This @@ -60,13 +70,14 @@ type ReplicationControllerLocker interface { // farms to cooperatively schedule work. type Farm struct { // constructor arguments for rcs created by this farm - store consulStore - rcStore ReplicationControllerStore - rcLocker ReplicationControllerLocker - rcWatcher ReplicationControllerWatcher - scheduler scheduler.Scheduler - labeler Labeler - txner transaction.Txner + store consulStore + auditLogStore AuditLogStore + rcStore ReplicationControllerStore + rcLocker ReplicationControllerLocker + rcWatcher ReplicationControllerWatcher + scheduler scheduler.Scheduler + labeler Labeler + txner transaction.Txner // session stream for the rcs locked by this farm sessions <-chan string @@ -94,6 +105,7 @@ type childRC struct { func NewFarm( store consulStore, + auditLogStore AuditLogStore, rcs ReplicationControllerStore, rcLocker ReplicationControllerLocker, rcWatcher ReplicationControllerWatcher, @@ -112,6 +124,7 @@ func NewFarm( return &Farm{ store: store, + auditLogStore: auditLogStore, rcStore: rcs, rcLocker: rcLocker, rcWatcher: rcWatcher, @@ -259,6 +272,7 @@ START_LOOP: newChild := New( rc, rcf.store, + rcf.auditLogStore, rcf.txner, rcf.rcWatcher, rcf.scheduler, diff --git a/pkg/rc/farm_test.go b/pkg/rc/farm_test.go index bf2f23fdf..19b0330c9 100644 --- a/pkg/rc/farm_test.go +++ b/pkg/rc/farm_test.go @@ -46,7 +46,7 @@ func TestRCsWithCountsWillBeFine(t *testing.T) { rcStore: fakeStore, } - rc, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil) + rc, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil) if err != nil { t.Fatalf("could not put an RC in the fake store: %s", err) } @@ -74,7 +74,7 @@ func TestRCsWithZeroCountsWillTriggerIncident(t *testing.T) { } // replica count is implicitly zero - _, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil) + _, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil) if err != nil { t.Fatalf("could not put an RC in the fake store: %s", err) } diff --git a/pkg/rc/fields/fields.go b/pkg/rc/fields/fields.go index e56611257..ea5a4aaee 100644 --- a/pkg/rc/fields/fields.go +++ b/pkg/rc/fields/fields.go @@ -40,7 +40,7 @@ type RC struct { // Defines the set of nodes on which the manifest can be scheduled NodeSelector labels.Selector - // A set of labels that will be added to every pod scheduled by this controller + // A set of labels that will be added to every pod scheduled by this controller. PodLabels labels.Set // The desired number of instances of the manifest that should be diff --git a/pkg/rc/replication_controller.go b/pkg/rc/replication_controller.go index 8297bbd7e..5952a7fae 100644 --- a/pkg/rc/replication_controller.go +++ b/pkg/rc/replication_controller.go @@ -80,6 +80,7 @@ type replicationController struct { logger logging.Logger consulStore consulStore + auditLogStore AuditLogStore txner transaction.Txner rcWatcher ReplicationControllerWatcher scheduler scheduler.Scheduler @@ -94,6 +95,7 @@ type ReplicationControllerWatcher interface { func New( fields fields.RC, consulStore consulStore, + auditLogStore AuditLogStore, txner transaction.Txner, rcWatcher ReplicationControllerWatcher, scheduler scheduler.Scheduler, @@ -110,6 +112,7 @@ func New( logger: logger, consulStore: consulStore, + auditLogStore: auditLogStore, txner: txner, rcWatcher: rcWatcher, scheduler: scheduler, @@ -226,19 +229,20 @@ func (rc *replicationController) addPods(current types.PodLocations) error { rc.logger.NoFields().Infof("Need to schedule %d nodes out of %s", toSchedule, possible) - ctx, cancelFunc := transaction.New(context.Background()) + txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toSchedule; i++ { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { - err = transaction.Commit(ctx, cancelFunc, rc.txner) + err = txn.Commit(cancelFunc, rc.txner) if err != nil { cancelFunc() return err } - ctx, cancelFunc = transaction.New(context.Background()) + txn, cancelFunc = rc.newAuditingTransaction(context.Background(), txn.Nodes()) } if len(possibleSorted) < i+1 { errMsg := fmt.Sprintf( @@ -251,7 +255,7 @@ func (rc *replicationController) addPods(current types.PodLocations) error { } // commit any queued operations - txnErr := transaction.Commit(ctx, cancelFunc, rc.txner) + txnErr := txn.Commit(cancelFunc, rc.txner) if txnErr != nil { return txnErr } @@ -260,14 +264,14 @@ func (rc *replicationController) addPods(current types.PodLocations) error { } scheduleOn := possibleSorted[i] - err := rc.schedule(ctx, scheduleOn) + err := rc.schedule(txn, scheduleOn) if err != nil { cancelFunc() return err } } - return transaction.Commit(ctx, cancelFunc, rc.txner) + return txn.Commit(cancelFunc, rc.txner) } // Generates an alerting.AlertInfo struct. Includes information relevant to @@ -312,19 +316,20 @@ func (rc *replicationController) removePods(current types.PodLocations) error { toUnschedule := len(current) - rc.ReplicasDesired rc.logger.NoFields().Infof("Need to unschedule %d nodes out of %s", toUnschedule, current) - ctx, cancelFunc := transaction.New(context.Background()) + txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toUnschedule; i++ { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { - err = transaction.Commit(ctx, cancelFunc, rc.txner) + err = txn.Commit(cancelFunc, rc.txner) if err != nil { cancelFunc() return err } - ctx, cancelFunc = transaction.New(context.Background()) + txn, cancelFunc = rc.newAuditingTransaction(context.Background(), txn.Nodes()) } unscheduleFrom, ok := preferred.PopAny() @@ -335,7 +340,7 @@ func (rc *replicationController) removePods(current types.PodLocations) error { // This should be mathematically impossible unless replicasDesired was negative // commit any queued operations - txnErr := transaction.Commit(ctx, cancelFunc, rc.txner) + txnErr := txn.Commit(cancelFunc, rc.txner) if txnErr != nil { return txnErr } @@ -347,13 +352,15 @@ func (rc *replicationController) removePods(current types.PodLocations) error { ) } } - err := rc.unschedule(ctx, unscheduleFrom) + + err := rc.unschedule(txn, unscheduleFrom) if err != nil { cancelFunc() return err } } - return transaction.Commit(ctx, cancelFunc, rc.txner) + + return txn.Commit(cancelFunc, rc.txner) } func (rc *replicationController) ensureConsistency(current types.PodLocations) error { @@ -367,9 +374,10 @@ func (rc *replicationController) ensureConsistency(current types.PodLocations) e ctx, cancelFunc := transaction.New(context.Background()) for i, pod := range current { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { err = transaction.Commit(ctx, cancelFunc, rc.txner) if err != nil { @@ -396,7 +404,8 @@ func (rc *replicationController) ensureConsistency(current types.PodLocations) e } rc.logger.WithField("node", pod.Node).WithField("intentManifestSHA", intentSHA).Info("Found inconsistency in scheduled manifest") - if err := rc.schedule(ctx, pod.Node); err != nil { + + if err := rc.scheduleNoAudit(ctx, pod.Node); err != nil { cancelFunc() return err } @@ -462,7 +471,17 @@ func (rc *replicationController) computePodLabels() map[string]string { return ret } -func (rc *replicationController) schedule(ctx context.Context, node types.NodeName) error { +func (rc *replicationController) schedule(txn *auditingTransaction, node types.NodeName) error { + err := rc.scheduleNoAudit(txn.Context(), node) + if err != nil { + return err + } + + txn.AddNode(node) + return nil +} + +func (rc *replicationController) scheduleNoAudit(ctx context.Context, node types.NodeName) error { rc.logger.NoFields().Infof("Scheduling on %s", node) rc.mu.Lock() manifest := rc.Manifest @@ -477,12 +496,12 @@ func (rc *replicationController) schedule(ctx context.Context, node types.NodeNa return rc.consulStore.SetPodTxn(ctx, consul.INTENT_TREE, node, manifest) } -func (rc *replicationController) unschedule(ctx context.Context, node types.NodeName) error { +func (rc *replicationController) unschedule(txn *auditingTransaction, node types.NodeName) error { rc.logger.NoFields().Infof("Unscheduling from %s", node) rc.mu.Lock() manifest := rc.Manifest rc.mu.Unlock() - err := rc.consulStore.DeletePodTxn(ctx, consul.INTENT_TREE, node, manifest.ID()) + err := rc.consulStore.DeletePodTxn(txn.Context(), consul.INTENT_TREE, node, manifest.ID()) if err != nil { return err } @@ -494,5 +513,12 @@ func (rc *replicationController) unschedule(ctx context.Context, node types.Node } labelKey := labels.MakePodLabelKey(node, manifest.ID()) - return rc.podApplicator.RemoveLabelsTxn(ctx, labels.POD, labelKey, keysToRemove) + + err = rc.podApplicator.RemoveLabelsTxn(txn.Context(), labels.POD, labelKey, keysToRemove) + if err != nil { + return err + } + + txn.RemoveNode(node) + return nil } diff --git a/pkg/rc/replication_controller_test.go b/pkg/rc/replication_controller_test.go index 9a455ca1b..041d4dd1d 100644 --- a/pkg/rc/replication_controller_test.go +++ b/pkg/rc/replication_controller_test.go @@ -4,19 +4,23 @@ package rc import ( "context" + "encoding/json" "fmt" "sync" "testing" "time" "github.com/square/p2/pkg/alerting/alertingtest" + "github.com/square/p2/pkg/audit" "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/pods" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/scheduler" "github.com/square/p2/pkg/store/consul" + "github.com/square/p2/pkg/store/consul/auditlogstore" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/types" @@ -28,7 +32,14 @@ import ( type testRCStore interface { ReplicationControllerStore ReplicationControllerWatcher - Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error) + Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, + ) (fields.RC, error) SetDesiredReplicas(id fields.ID, n int) error } @@ -46,12 +57,18 @@ type testApplicator interface { SetLabelsTxn(ctx context.Context, labelType labels.Type, id string, values map[string]string) error } +type testAuditLogStore interface { + AuditLogStore + List() (map[audit.ID]audit.AuditLog, error) +} + func setup(t *testing.T) ( rcStore testRCStore, consulStore testConsulStore, applicator testApplicator, rc *replicationController, alerter *alertingtest.AlertRecorder, + auditLogStore testAuditLogStore, closeFn func(), ) { fixture := consulutil.NewFixture(t) @@ -77,14 +94,16 @@ func setup(t *testing.T) ( nodeSelector := klabels.Everything().Add("nodeQuality", klabels.EqualsOperator, []string{"good"}) podLabels := map[string]string{"podTest": "successful"} - rcData, err := rcStore.Create(podManifest, nodeSelector, podLabels, nil) + rcData, err := rcStore.Create(podManifest, nodeSelector, "some_az", "some_cn", podLabels, nil) Assert(t).IsNil(err, "expected no error creating request") alerter = alertingtest.NewRecorder() + auditLogStore = auditlogstore.NewConsulStore(fixture.Client.KV()) rc = New( rcData, consulStore, + auditLogStore, fixture.Client.KV(), rcStore, scheduler.NewApplicatorScheduler(applicator), @@ -125,7 +144,7 @@ func waitForNodes(t *testing.T, rc ReplicationController, desired int) int { } func TestDoNothing(t *testing.T) { - _, consulStore, applicator, rc, alerter, closeFn := setup(t) + _, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := rc.meetDesires() @@ -144,7 +163,7 @@ func TestDoNothing(t *testing.T) { } func TestCantSchedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() quit := make(chan struct{}) @@ -173,7 +192,7 @@ func TestCantSchedule(t *testing.T) { } func TestSchedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, auditLogStore, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -209,10 +228,35 @@ func TestSchedule(t *testing.T) { } Assert(t).AreEqual(len(alerter.Alerts), 0, "Expected no alerts to fire") + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 1 { + t.Fatalf("expected a single audit log record but there were %d", len(records)) + } + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + if len(details.Nodes) != 1 { + t.Error("expected one node") + } else { + if details.Nodes[0] != "node2" { + t.Errorf("expected node list to only have %v but had %v", "node2", details.Nodes[0]) + } + } + } } func TestSchedulePartial(t *testing.T) { - _, consulStore, applicator, rc, alerter, closeFn := setup(t) + _, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -258,7 +302,7 @@ func TestSchedulePartial(t *testing.T) { } func TestUnschedulePartial(t *testing.T) { - _, consulStore, applicator, rc, _, closeFn := setup(t) + _, consulStore, applicator, rc, _, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node2", "nodeQuality", "good") @@ -296,7 +340,7 @@ func TestUnschedulePartial(t *testing.T) { } func TestScheduleTwice(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -339,7 +383,7 @@ func TestScheduleTwice(t *testing.T) { } func TestUnschedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, auditLogStore, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -375,10 +419,49 @@ func TestUnschedule(t *testing.T) { } Assert(t).AreEqual(len(manifests), 0, "expected manifest to have been unscheduled") Assert(t).AreEqual(len(alerter.Alerts), 0, "expected no alerts to fire") + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 2 { + t.Fatalf("expected 2 audit log records but there were %d", len(records)) + } + + foundNone := false + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + + if len(details.Nodes) == 0 { + if foundNone { + t.Fatal("both audit records had no nodes in them") + } + foundNone = true + } else { + if len(details.Nodes) != 1 { + t.Error("expected one node") + } else { + if details.Nodes[0] != "node2" { + t.Errorf("expected node list to only have %v but had %v", "node2", details.Nodes[0]) + } + } + } + } + + if !foundNone { + t.Fatal("should have found an audit record with no nodes but didn't") + } } func TestPreferUnscheduleIneligible(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() for i := 0; i < 1000; i++ { nodeName := fmt.Sprintf("node%d", i) @@ -420,7 +503,7 @@ func TestPreferUnscheduleIneligible(t *testing.T) { } func TestConsistencyNoChange(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -452,7 +535,7 @@ func TestConsistencyNoChange(t *testing.T) { } func TestConsistencyModify(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -482,7 +565,7 @@ func TestConsistencyModify(t *testing.T) { } func TestConsistencyDelete(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -510,7 +593,7 @@ func TestConsistencyDelete(t *testing.T) { } func TestReservedLabels(t *testing.T) { - _, _, applicator, rc, _, closeFn := setup(t) + _, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -531,7 +614,7 @@ func TestReservedLabels(t *testing.T) { } func TestScheduleMoreThan5(t *testing.T) { - rcStore, _, applicator, rc, _, closeFn := setup(t) + rcStore, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() for i := 0; i < 7; i++ { @@ -562,7 +645,7 @@ func TestScheduleMoreThan5(t *testing.T) { } func TestUnscheduleMoreThan5(t *testing.T) { - rcStore, _, applicator, rc, _, closeFn := setup(t) + rcStore, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() for i := 0; i < 7; i++ { diff --git a/pkg/roll/integration_test.go b/pkg/roll/integration_test.go index 6e5c273e5..c8217e5c9 100644 --- a/pkg/roll/integration_test.go +++ b/pkg/roll/integration_test.go @@ -78,7 +78,7 @@ func TestCleanupOldRCHappy(t *testing.T) { builder := manifest.NewBuilder() builder.SetID("whatever") - rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil) + rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { t.Fatal(err) } @@ -131,7 +131,7 @@ func TestCleanupOldRCTooManyReplicas(t *testing.T) { builder := manifest.NewBuilder() builder.SetID("whatever") - rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil) + rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/roll/update_test.go b/pkg/roll/update_test.go index ab31dd91f..4f01c5d3a 100644 --- a/pkg/roll/update_test.go +++ b/pkg/roll/update_test.go @@ -12,6 +12,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc" rc_fields "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/roll/fields" @@ -376,7 +377,14 @@ func assignManifestsToNodes( } type testRCCreator interface { - Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (rc_fields.RC, error) + Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, + ) (rc_fields.RC, error) SetDesiredReplicas(id rc_fields.ID, n int) error } @@ -387,7 +395,7 @@ func createRC( desired int, nodes map[types.NodeName]bool, ) (rc_fields.RC, error) { - created, err := rcs.Create(manifest, nil, nil, nil) + created, err := rcs.Create(manifest, nil, "some_az", "some_cn", nil, nil) if err != nil { return rc_fields.RC{}, fmt.Errorf("Error creating RC: %s", err) } diff --git a/pkg/store/consul/rcstore/consul_store.go b/pkg/store/consul/rcstore/consul_store.go index cead689cc..ff98acfd2 100644 --- a/pkg/store/consul/rcstore/consul_store.go +++ b/pkg/store/consul/rcstore/consul_store.go @@ -16,16 +16,18 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/store/consul" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" "github.com/square/p2/pkg/util" ) const ( // This label is applied to an RC, to identify the ID of its pod manifest. - PodIDLabel = "pod_id" + PodIDLabel = types.PodIDLabel // This is called "update" for backwards compatibility reasons, it // should probably be named "mutate" mutationSuffix = "update" @@ -106,7 +108,21 @@ func NewConsul(client consulutil.ConsulClient, labeler RCLabeler, retries int) * // The node selector is used to determine what nodes the replication controller may schedule on. // The pod label set is applied to every pod the replication controller schedules. // The additionalLabels label set is applied to the RCs own labels -func (s *ConsulStore) Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error) { +func (s *ConsulStore) Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, +) (fields.RC, error) { + + if podLabels == nil { + podLabels = make(klabels.Set) + } + podLabels[types.ClusterNameLabel] = clusterName.String() + podLabels[types.AvailabilityZoneLabel] = availabilityZone.String() + rc, err := s.innerCreate(manifest, nodeSelector, podLabels) // TODO: measure whether retries are is important in practice @@ -136,6 +152,8 @@ func (s *ConsulStore) CreateTxn( ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (fields.RC, error) { diff --git a/pkg/store/consul/rcstore/fake_store.go b/pkg/store/consul/rcstore/fake_store.go index f86ef017c..cb43987d7 100644 --- a/pkg/store/consul/rcstore/fake_store.go +++ b/pkg/store/consul/rcstore/fake_store.go @@ -10,6 +10,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/store/consul" "github.com/square/p2/pkg/store/consul/consulutil" @@ -37,7 +38,14 @@ func NewFake() *fakeStore { } } -func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) { +func (s *fakeStore) Create( + manifest manifest.Manifest, + nodeSelector labels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels labels.Set, + additionalLabels labels.Set, +) (fields.RC, error) { // A real replication controller will use a UUID. // We'll just use a monotonically increasing counter for expedience. s.creates += 1 @@ -66,7 +74,15 @@ func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selec // If a test needs to use transactions, it should be using a real consul e.g. // via consulutil.NewFixture(). We won't be implementing transactions ourselves // in these fake storage structs -func (s *fakeStore) CreateTxn(ctx context.Context, manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) { +func (s *fakeStore) CreateTxn( + ctx context.Context, + manifest manifest.Manifest, + nodeSelector labels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels labels.Set, + additionalLabels labels.Set, +) (fields.RC, error) { panic("transactions not implemented in fake rc store") } diff --git a/pkg/store/consul/rcstore/integration_test.go b/pkg/store/consul/rcstore/integration_test.go index da4270275..6c6f7d266 100644 --- a/pkg/store/consul/rcstore/integration_test.go +++ b/pkg/store/consul/rcstore/integration_test.go @@ -27,7 +27,7 @@ func TestCreateTxn(t *testing.T) { ctx, cancelFunc := transaction.New(context.Background()) defer cancelFunc() - rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), nil, rcLabelsToSet) + rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), "some_az", "some_cn", nil, rcLabelsToSet) if err != nil { t.Fatal(err) } diff --git a/pkg/store/consul/rollstore/consul_store.go b/pkg/store/consul/rollstore/consul_store.go index 8148ea9f8..14094ab4a 100644 --- a/pkg/store/consul/rollstore/consul_store.go +++ b/pkg/store/consul/rollstore/consul_store.go @@ -16,6 +16,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" rc_fields "github.com/square/p2/pkg/rc/fields" roll_fields "github.com/square/p2/pkg/roll/fields" "github.com/square/p2/pkg/store/consul" @@ -65,6 +66,8 @@ type ReplicationControllerStore interface { ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (rc_fields.RC, error) @@ -75,6 +78,8 @@ type ReplicationControllerStore interface { Create( manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (rc_fields.RC, error) @@ -307,6 +312,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID( minimumReplicas int, leaveOld bool, rollDelay time.Duration, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, newRCManifest manifest.Manifest, newRCNodeSelector klabels.Selector, newRCPodLabels klabels.Set, @@ -334,7 +341,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID( return roll_fields.Update{}, err } - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } @@ -381,6 +388,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( minimumReplicas int, leaveOld bool, rollDelay time.Duration, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, newRCManifest manifest.Manifest, newRCNodeSelector klabels.Selector, newRCPodLabels klabels.Set, @@ -418,7 +427,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( // Create the old RC using the same info as the new RC, it'll be // removed when the update completes anyway - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } @@ -440,7 +449,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( // Create the new RC var newRCID rc_fields.ID - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } diff --git a/pkg/store/consul/rollstore/consul_store_test.go b/pkg/store/consul/rollstore/consul_store_test.go index 294b9b45d..fd0c487d5 100644 --- a/pkg/store/consul/rollstore/consul_store_test.go +++ b/pkg/store/consul/rollstore/consul_store_test.go @@ -273,6 +273,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDFailsIfCantAcquireLock(t *tes 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -310,6 +312,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat firstRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -325,6 +329,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat secondRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -348,6 +354,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -371,6 +379,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenExisti oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -407,6 +417,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenExisti 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -438,6 +450,8 @@ func TestLeaveOldInvalidIfNoOldRC(t *testing.T) { 0, true, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, diff --git a/pkg/store/consul/rollstore/integration_test.go b/pkg/store/consul/rollstore/integration_test.go index 8fe40d944..e75a3dfff 100644 --- a/pkg/store/consul/rollstore/integration_test.go +++ b/pkg/store/consul/rollstore/integration_test.go @@ -15,6 +15,7 @@ import ( "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" "github.com/hashicorp/consul/api" klabels "k8s.io/kubernetes/pkg/labels" @@ -101,6 +102,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithID(t *testing.T) { 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -159,7 +162,7 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) rollstore, rcStore := newRollStoreWithRealConsul(t, fixture, nil) // create the old RC - oldRC, err := rollstore.rcstore.Create(testManifest(), nil, nil, nil) + oldRC, err := rollstore.rcstore.Create(testManifest(), nil, "some_az", "some_cn", podLabels(), nil) if err != nil { t.Fatalf("Failed to create old rc: %s", err) } @@ -172,6 +175,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -199,6 +204,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -261,6 +268,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenDoesntExist 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -326,6 +335,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenExists(t *t oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -349,6 +360,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenExists(t *t 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -389,6 +402,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -413,6 +428,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -445,6 +462,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -498,3 +517,10 @@ func newRollStoreWithRealConsul(t *testing.T, fixture consulutil.Fixture, entrie labeler: applicator, }, rcStore } + +func podLabels() klabels.Set { + return klabels.Set{ + types.AvailabilityZoneLabel: "some_az", + types.ClusterNameLabel: "some_cn", + } +} diff --git a/pkg/types/constants.go b/pkg/types/constants.go new file mode 100644 index 000000000..b2a0c934c --- /dev/null +++ b/pkg/types/constants.go @@ -0,0 +1,7 @@ +package types + +const ( + AvailabilityZoneLabel = "availability_zone" + ClusterNameLabel = "cluster_name" + PodIDLabel = "pod_id" +)