From c3b4933f4da1312ff467e4146e17b01f78503248 Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Thu, 22 Jun 2017 10:18:18 -0700 Subject: [PATCH 1/5] Make http applicator implement RemoveAllLabelsTxn This allows expanding the labels.ApplicatorWithoutWatches interface which makes it easier to use in various binaries that switch on using HTTP applicators vs direct consul access. --- bin/p2-rctl-server/main.go | 6 +----- pkg/labels/consul_applicator.go | 4 ++++ pkg/labels/fake_applicator.go | 4 ++++ pkg/labels/http_applicator.go | 4 ++++ pkg/labels/label_http_server.go | 4 ++++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/bin/p2-rctl-server/main.go b/bin/p2-rctl-server/main.go index d05d96691..91ad08786 100644 --- a/bin/p2-rctl-server/main.go +++ b/bin/p2-rctl-server/main.go @@ -16,7 +16,6 @@ import ( "github.com/square/p2/pkg/alerting" "github.com/square/p2/pkg/health/checker" - "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/rc" "github.com/square/p2/pkg/roll" @@ -54,7 +53,7 @@ func SessionName() string { func main() { // Parse custom flags + standard Consul routing options kingpin.Version(version.VERSION) - _, opts, _ := flags.ParseWithConsulOptions() + _, opts, labeler := flags.ParseWithConsulOptions() // Set up the logger logger := logging.NewLogger(logrus.Fields{}) @@ -72,9 +71,6 @@ func main() { httpClient := cleanhttp.DefaultClient() client := consul.NewConsulClient(opts) consulStore := consul.NewConsulStore(client) - // flags.ParseWithConsulOptions() because that interface doesn't - // support transactions which is now required by the RC store - labeler := labels.NewConsulApplicator(client, 0) rcStore := rcstore.NewConsul(client, labeler, RetryCount) rollStore := rollstore.NewConsul(client, labeler, nil) diff --git a/pkg/labels/consul_applicator.go b/pkg/labels/consul_applicator.go index dcca85744..067c4d57e 100644 --- a/pkg/labels/consul_applicator.go +++ b/pkg/labels/consul_applicator.go @@ -365,6 +365,10 @@ func (c *consulApplicator) RemoveAllLabels(labelType Type, id string) error { // the passed transaction rather than synchronously making the requisite consul // call func (c *consulApplicator) RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error { + return removeAllLabelsTxn(ctx, labelType, id) +} + +func removeAllLabelsTxn(ctx context.Context, labelType Type, id string) error { path, err := objectPath(labelType, id) if err != nil { return err diff --git a/pkg/labels/fake_applicator.go b/pkg/labels/fake_applicator.go index bbde77cad..671bb3501 100644 --- a/pkg/labels/fake_applicator.go +++ b/pkg/labels/fake_applicator.go @@ -83,6 +83,10 @@ func (app *fakeApplicator) RemoveLabel(labelType Type, id, name string) error { return nil } +func (app *fakeApplicator) RemoveLabelsTxn(ctx context.Context, labelType Type, id string, keysToRemove []string) error { + panic("not implemented") +} + func (app *fakeApplicator) ListLabels(labelType Type) ([]Labeled, error) { res := []Labeled{} for id, set := range app.data[labelType] { diff --git a/pkg/labels/http_applicator.go b/pkg/labels/http_applicator.go index e72368392..9c6a46025 100644 --- a/pkg/labels/http_applicator.go +++ b/pkg/labels/http_applicator.go @@ -206,6 +206,10 @@ func (h *httpApplicator) RemoveAllLabels(labelType Type, id string) error { return nil } +func (h *httpApplicator) RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error { + return removeAllLabelsTxn(ctx, labelType, id) +} + // Finds all labels assigned to all entities under a type // // GET /api/labels/:type diff --git a/pkg/labels/label_http_server.go b/pkg/labels/label_http_server.go index 968048abc..577647bd1 100644 --- a/pkg/labels/label_http_server.go +++ b/pkg/labels/label_http_server.go @@ -1,6 +1,7 @@ package labels import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -21,8 +22,11 @@ import ( type ApplicatorWithoutWatches interface { SetLabel(labelType Type, id, name, value string) error SetLabels(labelType Type, id string, labels map[string]string) error + SetLabelsTxn(ctx context.Context, labelType Type, id string, labels map[string]string) error RemoveLabel(labelType Type, id, name string) error RemoveAllLabels(labelType Type, id string) error + RemoveLabelsTxn(ctx context.Context, labelType Type, id string, keysToRemove []string) error + RemoveAllLabelsTxn(ctx context.Context, labelType Type, id string) error ListLabels(labelType Type) ([]Labeled, error) GetLabels(labelType Type, id string) (Labeled, error) GetMatches(selector klabels.Selector, labelType Type) ([]Labeled, error) From a96cc4ba39debf3bc7abe8e0c4c4ee75562f700c Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Thu, 8 Jun 2017 14:43:41 -0700 Subject: [PATCH 2/5] Add details schema for "rc retargeting events". An rc retargeting event represents an RC changing the set of nodes it targets, e.g. due to a replica count change, selector change, or a change to the labels of a node. This commit creates the json schema for the details for an event as well as a constructor for easily creating the json.RawMessage for use in an audit log record. --- pkg/audit/rc.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ pkg/audit/rc_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 pkg/audit/rc.go create mode 100644 pkg/audit/rc_test.go diff --git a/pkg/audit/rc.go b/pkg/audit/rc.go new file mode 100644 index 000000000..39591d2ee --- /dev/null +++ b/pkg/audit/rc.go @@ -0,0 +1,44 @@ +package audit + +import ( + "encoding/json" + + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/types" + "github.com/square/p2/pkg/util" +) + +const ( + // RcRetargetingEvent represents events in which an RC changes the set of + // nodes that it is targeting. This can be used to log the set of nodes that + // an RC or pod cluster manages over time + RCRetargetingEvent EventType = "REPLICATION_CONTROLLER_RETARGET" +) + +type RCRetargetingDetails struct { + PodID types.PodID `json:"pod_id"` + AvailabilityZone pc_fields.AvailabilityZone `json:"availability_zone"` + ClusterName pc_fields.ClusterName `json:"cluster_name"` + Nodes []types.NodeName `json:"nodes"` +} + +func NewRCRetargetingEventDetails( + podID types.PodID, + az pc_fields.AvailabilityZone, + name pc_fields.ClusterName, + nodes []types.NodeName, +) (json.RawMessage, error) { + details := RCRetargetingDetails{ + PodID: podID, + AvailabilityZone: az, + ClusterName: name, + Nodes: nodes, + } + + bytes, err := json.Marshal(details) + if err != nil { + return nil, util.Errorf("could not marshal rc retargeting details as json: %s", err) + } + + return json.RawMessage(bytes), nil +} diff --git a/pkg/audit/rc_test.go b/pkg/audit/rc_test.go new file mode 100644 index 000000000..b5bb0aa79 --- /dev/null +++ b/pkg/audit/rc_test.go @@ -0,0 +1,44 @@ +package audit + +import ( + "encoding/json" + "reflect" + "testing" + + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/types" +) + +func TestRCRetargetingEventDetails(t *testing.T) { + podID := types.PodID("some_pod_id") + clusterName := pc_fields.ClusterName("some_cluster_name") + az := pc_fields.AvailabilityZone("some_availability_zone") + nodes := []types.NodeName{"node1", "node2"} + + detailsJSON, err := NewRCRetargetingEventDetails(podID, az, clusterName, nodes) + if err != nil { + t.Fatal(err) + } + + var details RCRetargetingDetails + err = json.Unmarshal(detailsJSON, &details) + if err != nil { + t.Fatal(err) + } + + if details.PodID != podID { + t.Errorf("expected pod id to be %s but was %s", podID, details.PodID) + } + + if details.AvailabilityZone != az { + t.Errorf("expected availability zone to be %s but was %s", az, details.AvailabilityZone) + } + + if details.ClusterName != clusterName { + t.Errorf("expected cluster name to be %s but was %s", clusterName, details.ClusterName) + } + + if !reflect.DeepEqual(details.Nodes, nodes) { + t.Errorf("expected node list to be %s but was %s", nodes, details.Nodes) + } +} From 8882678c010099500ab80c3fd70e03e05b107f82 Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Mon, 12 Jun 2017 12:11:48 -0700 Subject: [PATCH 3/5] require availability zone and cluster name pod labels on RCs RCs have a "pod_labels" field which represents a set of labels that should be applied to every pod that is scheduled by that RC. Previously there has been no requirement that that set of labels includes the availability_zone and cluster_name to identify the pod cluster that the RC and its pods are members of. pod_id can be inferred from the pod manifest and is already automatically set. This commit enforces that the set of labels have availability zone and cluster name in anticipation of having RCs create audit log records every time the set of nodes they target changes. This is because audit log records are linked to pod clusters, and as a result RCs must be as well. --- bin/p2-rctl/main.go | 35 ++++++++++++++----- integration/single-node-slug-deploy/check.go | 18 ++++++++-- pkg/pc/fields/fields.go | 7 ++-- pkg/rc/farm_test.go | 4 +-- pkg/rc/replication_controller_test.go | 12 +++++-- pkg/roll/integration_test.go | 4 +-- pkg/roll/update_test.go | 12 +++++-- pkg/store/consul/rcstore/consul_store.go | 22 ++++++++++-- pkg/store/consul/rcstore/fake_store.go | 20 +++++++++-- pkg/store/consul/rcstore/integration_test.go | 2 +- pkg/store/consul/rollstore/consul_store.go | 15 ++++++-- .../consul/rollstore/consul_store_test.go | 14 ++++++++ .../consul/rollstore/integration_test.go | 28 ++++++++++++++- pkg/types/constants.go | 7 ++++ 14 files changed, 168 insertions(+), 32 deletions(-) create mode 100644 pkg/types/constants.go diff --git a/bin/p2-rctl/main.go b/bin/p2-rctl/main.go index cc62c7479..d478baa8d 100644 --- a/bin/p2-rctl/main.go +++ b/bin/p2-rctl/main.go @@ -22,6 +22,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc" "github.com/square/p2/pkg/rc/fields" rc_fields "github.com/square/p2/pkg/rc/fields" @@ -54,11 +55,13 @@ var ( logLevel = kingpin.Flag("log", "Logging level to display.").String() logJSON = kingpin.Flag("log-json", "Log messages will be JSON formatted").Bool() - cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller") - createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String() - createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String() - createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap() - createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap() + cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller") + createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String() + createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String() + createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap() + createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap() + createAvailabilityZone = cmdCreate.Flag("availability-zone", "availability zone that RC should belong to").Short('a').Required().String() + createClusterName = cmdCreate.Flag("cluster-name", "availability zone that RC should belong to").Short('c').Required().String() cmdDelete = kingpin.Command(cmdDeleteText, "Delete a replication controller") deleteID = cmdDelete.Arg("id", "replication controller uuid to delete").Required().String() @@ -153,7 +156,14 @@ func main() { switch cmd { case cmdCreateText: - rctl.Create(*createManifest, *createNodeSel, *createPodLabels, *createRCLabels) + rctl.Create( + *createManifest, + *createNodeSel, + pc_fields.AvailabilityZone(*createAvailabilityZone), + pc_fields.ClusterName(*createClusterName), + *createPodLabels, + *createRCLabels, + ) case cmdDeleteText: rctl.Delete(*deleteID, *deleteForce) case cmdReplicasText: @@ -197,6 +207,8 @@ type ReplicationControllerStore interface { Create( manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (fields.RC, error) @@ -231,7 +243,14 @@ type rctlParams struct { logger logging.Logger } -func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[string]string, rcLabels map[string]string) { +func (r rctlParams) Create( + manifestPath string, + nodeSelector string, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels map[string]string, + rcLabels map[string]string, +) { manifest, err := manifest.FromPath(manifestPath) if err != nil { r.logger.WithErrorAndFields(err, logrus.Fields{ @@ -246,7 +265,7 @@ func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[stri }).Fatalln("Could not parse node selector") } - newRC, err := r.rcs.Create(manifest, nodeSel, klabels.Set(podLabels), rcLabels) + newRC, err := r.rcs.Create(manifest, nodeSel, availabilityZone, clusterName, klabels.Set(podLabels), rcLabels) if err != nil { r.logger.WithError(err).Fatalln("Could not create replication controller in Consul") } diff --git a/integration/single-node-slug-deploy/check.go b/integration/single-node-slug-deploy/check.go index 5db2794f9..b2ab6af31 100644 --- a/integration/single-node-slug-deploy/check.go +++ b/integration/single-node-slug-deploy/check.go @@ -428,13 +428,13 @@ func verifyTransferReplicas(errCh chan<- error, tempdir string, logger logging.L builder.SetID("some_pod") man := builder.GetManifest() - fromRC, err := rcStore.Create(man, klabels.Everything(), nil, nil) + fromRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { errCh <- util.Errorf("could not create RC for replica transfer test: %s", err) return } - toRC, err := rcStore.Create(man, klabels.Everything(), nil, nil) + toRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { errCh <- util.Errorf("could not create second RC for replica transfer test: %s", err) return @@ -854,7 +854,19 @@ func createHelloReplicationController(dir string) (fields.ID, error) { return "", err } - cmd := exec.Command("p2-rctl", "--log-json", "create", "--manifest", signedManifestPath, "--node-selector", "test=yes") + cmd := exec.Command( + "p2-rctl", + "--log-json", + "create", + "--manifest", + signedManifestPath, + "--node-selector", + "test=yes", + "--availability-zone", + "some_az", + "--cluster-name", + "some_cn", + ) out := bytes.Buffer{} cmd.Stdout = &out cmd.Stderr = &out diff --git a/pkg/pc/fields/fields.go b/pkg/pc/fields/fields.go index a77972a2a..d537cd79b 100644 --- a/pkg/pc/fields/fields.go +++ b/pkg/pc/fields/fields.go @@ -4,7 +4,6 @@ import ( "encoding/json" "reflect" - "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/types" "k8s.io/kubernetes/pkg/labels" @@ -18,9 +17,9 @@ type Annotations map[string]interface{} // label keys used by pod selector const ( - AvailabilityZoneLabel = "availability_zone" - ClusterNameLabel = "cluster_name" - PodIDLabel = rcstore.PodIDLabel // TODO: put this in a different place now that multiple packages use it + AvailabilityZoneLabel = types.AvailabilityZoneLabel + ClusterNameLabel = types.ClusterNameLabel + PodIDLabel = types.PodIDLabel ) func (id ID) String() string { diff --git a/pkg/rc/farm_test.go b/pkg/rc/farm_test.go index bf2f23fdf..19b0330c9 100644 --- a/pkg/rc/farm_test.go +++ b/pkg/rc/farm_test.go @@ -46,7 +46,7 @@ func TestRCsWithCountsWillBeFine(t *testing.T) { rcStore: fakeStore, } - rc, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil) + rc, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil) if err != nil { t.Fatalf("could not put an RC in the fake store: %s", err) } @@ -74,7 +74,7 @@ func TestRCsWithZeroCountsWillTriggerIncident(t *testing.T) { } // replica count is implicitly zero - _, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil) + _, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil) if err != nil { t.Fatalf("could not put an RC in the fake store: %s", err) } diff --git a/pkg/rc/replication_controller_test.go b/pkg/rc/replication_controller_test.go index 9a455ca1b..8d6d195d6 100644 --- a/pkg/rc/replication_controller_test.go +++ b/pkg/rc/replication_controller_test.go @@ -13,6 +13,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/pods" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/scheduler" @@ -28,7 +29,14 @@ import ( type testRCStore interface { ReplicationControllerStore ReplicationControllerWatcher - Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error) + Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, + ) (fields.RC, error) SetDesiredReplicas(id fields.ID, n int) error } @@ -77,7 +85,7 @@ func setup(t *testing.T) ( nodeSelector := klabels.Everything().Add("nodeQuality", klabels.EqualsOperator, []string{"good"}) podLabels := map[string]string{"podTest": "successful"} - rcData, err := rcStore.Create(podManifest, nodeSelector, podLabels, nil) + rcData, err := rcStore.Create(podManifest, nodeSelector, "some_az", "some_cn", podLabels, nil) Assert(t).IsNil(err, "expected no error creating request") alerter = alertingtest.NewRecorder() diff --git a/pkg/roll/integration_test.go b/pkg/roll/integration_test.go index 6e5c273e5..c8217e5c9 100644 --- a/pkg/roll/integration_test.go +++ b/pkg/roll/integration_test.go @@ -78,7 +78,7 @@ func TestCleanupOldRCHappy(t *testing.T) { builder := manifest.NewBuilder() builder.SetID("whatever") - rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil) + rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { t.Fatal(err) } @@ -131,7 +131,7 @@ func TestCleanupOldRCTooManyReplicas(t *testing.T) { builder := manifest.NewBuilder() builder.SetID("whatever") - rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil) + rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/roll/update_test.go b/pkg/roll/update_test.go index ab31dd91f..4f01c5d3a 100644 --- a/pkg/roll/update_test.go +++ b/pkg/roll/update_test.go @@ -12,6 +12,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc" rc_fields "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/roll/fields" @@ -376,7 +377,14 @@ func assignManifestsToNodes( } type testRCCreator interface { - Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (rc_fields.RC, error) + Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, + ) (rc_fields.RC, error) SetDesiredReplicas(id rc_fields.ID, n int) error } @@ -387,7 +395,7 @@ func createRC( desired int, nodes map[types.NodeName]bool, ) (rc_fields.RC, error) { - created, err := rcs.Create(manifest, nil, nil, nil) + created, err := rcs.Create(manifest, nil, "some_az", "some_cn", nil, nil) if err != nil { return rc_fields.RC{}, fmt.Errorf("Error creating RC: %s", err) } diff --git a/pkg/store/consul/rcstore/consul_store.go b/pkg/store/consul/rcstore/consul_store.go index cead689cc..ff98acfd2 100644 --- a/pkg/store/consul/rcstore/consul_store.go +++ b/pkg/store/consul/rcstore/consul_store.go @@ -16,16 +16,18 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/store/consul" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" "github.com/square/p2/pkg/util" ) const ( // This label is applied to an RC, to identify the ID of its pod manifest. - PodIDLabel = "pod_id" + PodIDLabel = types.PodIDLabel // This is called "update" for backwards compatibility reasons, it // should probably be named "mutate" mutationSuffix = "update" @@ -106,7 +108,21 @@ func NewConsul(client consulutil.ConsulClient, labeler RCLabeler, retries int) * // The node selector is used to determine what nodes the replication controller may schedule on. // The pod label set is applied to every pod the replication controller schedules. // The additionalLabels label set is applied to the RCs own labels -func (s *ConsulStore) Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error) { +func (s *ConsulStore) Create( + manifest manifest.Manifest, + nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels klabels.Set, + additionalLabels klabels.Set, +) (fields.RC, error) { + + if podLabels == nil { + podLabels = make(klabels.Set) + } + podLabels[types.ClusterNameLabel] = clusterName.String() + podLabels[types.AvailabilityZoneLabel] = availabilityZone.String() + rc, err := s.innerCreate(manifest, nodeSelector, podLabels) // TODO: measure whether retries are is important in practice @@ -136,6 +152,8 @@ func (s *ConsulStore) CreateTxn( ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (fields.RC, error) { diff --git a/pkg/store/consul/rcstore/fake_store.go b/pkg/store/consul/rcstore/fake_store.go index f86ef017c..cb43987d7 100644 --- a/pkg/store/consul/rcstore/fake_store.go +++ b/pkg/store/consul/rcstore/fake_store.go @@ -10,6 +10,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/store/consul" "github.com/square/p2/pkg/store/consul/consulutil" @@ -37,7 +38,14 @@ func NewFake() *fakeStore { } } -func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) { +func (s *fakeStore) Create( + manifest manifest.Manifest, + nodeSelector labels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels labels.Set, + additionalLabels labels.Set, +) (fields.RC, error) { // A real replication controller will use a UUID. // We'll just use a monotonically increasing counter for expedience. s.creates += 1 @@ -66,7 +74,15 @@ func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selec // If a test needs to use transactions, it should be using a real consul e.g. // via consulutil.NewFixture(). We won't be implementing transactions ourselves // in these fake storage structs -func (s *fakeStore) CreateTxn(ctx context.Context, manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) { +func (s *fakeStore) CreateTxn( + ctx context.Context, + manifest manifest.Manifest, + nodeSelector labels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, + podLabels labels.Set, + additionalLabels labels.Set, +) (fields.RC, error) { panic("transactions not implemented in fake rc store") } diff --git a/pkg/store/consul/rcstore/integration_test.go b/pkg/store/consul/rcstore/integration_test.go index da4270275..6c6f7d266 100644 --- a/pkg/store/consul/rcstore/integration_test.go +++ b/pkg/store/consul/rcstore/integration_test.go @@ -27,7 +27,7 @@ func TestCreateTxn(t *testing.T) { ctx, cancelFunc := transaction.New(context.Background()) defer cancelFunc() - rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), nil, rcLabelsToSet) + rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), "some_az", "some_cn", nil, rcLabelsToSet) if err != nil { t.Fatal(err) } diff --git a/pkg/store/consul/rollstore/consul_store.go b/pkg/store/consul/rollstore/consul_store.go index 8148ea9f8..14094ab4a 100644 --- a/pkg/store/consul/rollstore/consul_store.go +++ b/pkg/store/consul/rollstore/consul_store.go @@ -16,6 +16,7 @@ import ( "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" + pc_fields "github.com/square/p2/pkg/pc/fields" rc_fields "github.com/square/p2/pkg/rc/fields" roll_fields "github.com/square/p2/pkg/roll/fields" "github.com/square/p2/pkg/store/consul" @@ -65,6 +66,8 @@ type ReplicationControllerStore interface { ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (rc_fields.RC, error) @@ -75,6 +78,8 @@ type ReplicationControllerStore interface { Create( manifest manifest.Manifest, nodeSelector klabels.Selector, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, ) (rc_fields.RC, error) @@ -307,6 +312,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID( minimumReplicas int, leaveOld bool, rollDelay time.Duration, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, newRCManifest manifest.Manifest, newRCNodeSelector klabels.Selector, newRCPodLabels klabels.Set, @@ -334,7 +341,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID( return roll_fields.Update{}, err } - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } @@ -381,6 +388,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( minimumReplicas int, leaveOld bool, rollDelay time.Duration, + availabilityZone pc_fields.AvailabilityZone, + clusterName pc_fields.ClusterName, newRCManifest manifest.Manifest, newRCNodeSelector klabels.Selector, newRCPodLabels klabels.Set, @@ -418,7 +427,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( // Create the old RC using the same info as the new RC, it'll be // removed when the update completes anyway - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } @@ -440,7 +449,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector( // Create the new RC var newRCID rc_fields.ID - rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels) + rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels) if err != nil { return roll_fields.Update{}, err } diff --git a/pkg/store/consul/rollstore/consul_store_test.go b/pkg/store/consul/rollstore/consul_store_test.go index 294b9b45d..fd0c487d5 100644 --- a/pkg/store/consul/rollstore/consul_store_test.go +++ b/pkg/store/consul/rollstore/consul_store_test.go @@ -273,6 +273,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDFailsIfCantAcquireLock(t *tes 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -310,6 +312,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat firstRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -325,6 +329,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat secondRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -348,6 +354,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenTwoMat 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -371,6 +379,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenExisti oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -407,6 +417,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenExisti 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -438,6 +450,8 @@ func TestLeaveOldInvalidIfNoOldRC(t *testing.T) { 0, true, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, diff --git a/pkg/store/consul/rollstore/integration_test.go b/pkg/store/consul/rollstore/integration_test.go index 8fe40d944..e75a3dfff 100644 --- a/pkg/store/consul/rollstore/integration_test.go +++ b/pkg/store/consul/rollstore/integration_test.go @@ -15,6 +15,7 @@ import ( "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" "github.com/hashicorp/consul/api" klabels "k8s.io/kubernetes/pkg/labels" @@ -101,6 +102,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithID(t *testing.T) { 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -159,7 +162,7 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) rollstore, rcStore := newRollStoreWithRealConsul(t, fixture, nil) // create the old RC - oldRC, err := rollstore.rcstore.Create(testManifest(), nil, nil, nil) + oldRC, err := rollstore.rcstore.Create(testManifest(), nil, "some_az", "some_cn", podLabels(), nil) if err != nil { t.Fatalf("Failed to create old rc: %s", err) } @@ -172,6 +175,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -199,6 +204,8 @@ func TestCreateRollingUpdateFromOneExistingRCWithIDMutualExclusion(t *testing.T) 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -261,6 +268,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenDoesntExist 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -326,6 +335,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenExists(t *t oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -349,6 +360,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorWhenExists(t *t 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -389,6 +402,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli oldRC, err := rollstore.rcstore.Create( testManifest(), nil, + "some_az", + "some_cn", nil, nil, ) @@ -413,6 +428,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -445,6 +462,8 @@ func TestCreateRollingUpdateFromOneMaybeExistingWithLabelSelectorFailsWhenConfli 0, false, 0, + "some_az", + "some_cn", testManifest(), testNodeSelector(), nil, @@ -498,3 +517,10 @@ func newRollStoreWithRealConsul(t *testing.T, fixture consulutil.Fixture, entrie labeler: applicator, }, rcStore } + +func podLabels() klabels.Set { + return klabels.Set{ + types.AvailabilityZoneLabel: "some_az", + types.ClusterNameLabel: "some_cn", + } +} diff --git a/pkg/types/constants.go b/pkg/types/constants.go new file mode 100644 index 000000000..b2a0c934c --- /dev/null +++ b/pkg/types/constants.go @@ -0,0 +1,7 @@ +package types + +const ( + AvailabilityZoneLabel = "availability_zone" + ClusterNameLabel = "cluster_name" + PodIDLabel = "pod_id" +) From a9a18f13507f9e93eb980300f871ce92047b42fd Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Mon, 12 Jun 2017 14:28:58 -0700 Subject: [PATCH 4/5] Add audit logging to RC farm. An audit log record will be created whenever nodes are scheduled or unscheduled by an RC. This will allow constructing a history of which nodes were managed by an RC or pod cluster over time. --- bin/p2-rctl-server/main.go | 4 + pkg/rc/auditing_transaction.go | 95 ++++++++++++++++++++++++ pkg/rc/auditing_transaction_test.go | 98 ++++++++++++++++++++++++ pkg/rc/farm.go | 28 +++++-- pkg/rc/fields/fields.go | 2 +- pkg/rc/replication_controller.go | 57 +++++++++----- pkg/rc/replication_controller_test.go | 103 ++++++++++++++++++++++---- 7 files changed, 348 insertions(+), 39 deletions(-) create mode 100644 pkg/rc/auditing_transaction.go create mode 100644 pkg/rc/auditing_transaction_test.go diff --git a/bin/p2-rctl-server/main.go b/bin/p2-rctl-server/main.go index 91ad08786..74938505d 100644 --- a/bin/p2-rctl-server/main.go +++ b/bin/p2-rctl-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/square/p2/pkg/roll" "github.com/square/p2/pkg/scheduler" "github.com/square/p2/pkg/store/consul" + "github.com/square/p2/pkg/store/consul/auditlogstore" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/flags" "github.com/square/p2/pkg/store/consul/rcstore" @@ -98,9 +99,12 @@ func main() { } } + auditLogStore := auditlogstore.NewConsulStore(client.KV()) + // Run the farms! go rc.NewFarm( consulStore, + auditLogStore, rcStore, rcStore, rcStore, diff --git a/pkg/rc/auditing_transaction.go b/pkg/rc/auditing_transaction.go new file mode 100644 index 000000000..9197f9bfd --- /dev/null +++ b/pkg/rc/auditing_transaction.go @@ -0,0 +1,95 @@ +package rc + +import ( + "context" + + "github.com/square/p2/pkg/audit" + pc_fields "github.com/square/p2/pkg/pc/fields" + "github.com/square/p2/pkg/store/consul/transaction" + "github.com/square/p2/pkg/types" + "github.com/square/p2/pkg/util" +) + +type auditingTransaction struct { + ctx context.Context + nodes map[types.NodeName]struct{} + auditLogStore AuditLogStore + podID types.PodID + az pc_fields.AvailabilityZone + cn pc_fields.ClusterName +} + +type scheduledNodesKey struct{} + +func (rc *replicationController) newAuditingTransaction( + ctx context.Context, + startingNodes []types.NodeName, +) (*auditingTransaction, func()) { + annotatedContext := context.WithValue(ctx, scheduledNodesKey{}, startingNodes) + ctx, cancelFunc := transaction.New(annotatedContext) + + startingNodeMap := make(map[types.NodeName]struct{}) + for _, node := range startingNodes { + startingNodeMap[node] = struct{}{} + } + + rc.mu.Lock() + manifest := rc.Manifest + podLabels := rc.PodLabels + rc.mu.Unlock() + return &auditingTransaction{ + ctx: ctx, + nodes: startingNodeMap, + auditLogStore: rc.auditLogStore, + podID: manifest.ID(), + az: pc_fields.AvailabilityZone(podLabels[types.AvailabilityZoneLabel]), + cn: pc_fields.ClusterName(podLabels[types.ClusterNameLabel]), + }, cancelFunc +} + +func (a *auditingTransaction) Context() context.Context { + return a.ctx +} + +func (a *auditingTransaction) Nodes() []types.NodeName { + var nodes []types.NodeName + for node, _ := range a.nodes { + nodes = append(nodes, node) + } + return nodes +} + +func (a *auditingTransaction) AddNode(node types.NodeName) { + a.nodes[node] = struct{}{} +} + +func (a *auditingTransaction) RemoveNode(node types.NodeName) { + delete(a.nodes, node) +} + +// Commit adds one final operation to the underlying consul transaction to +// create an audit log record with the set of nodes that already have been +// scheduled and nodes that will be scheduled as a part of this transaction by +// the RC. Then it commits the transaction +func (a *auditingTransaction) Commit(cancelFunc func(), txner transaction.Txner) error { + details, err := audit.NewRCRetargetingEventDetails( + a.podID, + a.az, + a.cn, + a.Nodes(), + ) + if err != nil { + return err + } + + err = a.auditLogStore.Create( + a.ctx, + audit.RCRetargetingEvent, + details, + ) + if err != nil { + return util.Errorf("could not add rc retargeting audit log to context: %s", err) + } + + return transaction.Commit(a.ctx, cancelFunc, txner) +} diff --git a/pkg/rc/auditing_transaction_test.go b/pkg/rc/auditing_transaction_test.go new file mode 100644 index 000000000..203cc6bfe --- /dev/null +++ b/pkg/rc/auditing_transaction_test.go @@ -0,0 +1,98 @@ +package rc + +import ( + "context" + "encoding/json" + "testing" + + "github.com/square/p2/pkg/audit" + pc_fields "github.com/square/p2/pkg/pc/fields" + rc_fields "github.com/square/p2/pkg/rc/fields" + "github.com/square/p2/pkg/store/consul/auditlogstore" + "github.com/square/p2/pkg/store/consul/consulutil" + "github.com/square/p2/pkg/types" + + klabels "k8s.io/kubernetes/pkg/labels" +) + +func TestAuditingTransaction(t *testing.T) { + fixture := consulutil.NewFixture(t) + defer fixture.Stop() + + auditLogStore := auditlogstore.NewConsulStore(fixture.Client.KV()) + rc := &replicationController{ + RC: rc_fields.RC{ + Manifest: testManifest(), + PodLabels: klabels.Set{ + pc_fields.AvailabilityZoneLabel: "some_az", + pc_fields.ClusterNameLabel: "some_cn", + }, + }, + auditLogStore: auditLogStore, + } + + ctx, cancel := rc.newAuditingTransaction(context.Background(), []types.NodeName{"node1", "node2"}) + defer cancel() + + ctx.AddNode("node3") + ctx.RemoveNode("node2") + + err := ctx.Commit(cancel, fixture.Client.KV()) + if err != nil { + t.Fatalf("could not commit audit log record: %s", err) + } + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 1 { + t.Fatalf("expected a single audit log record but there were %d", len(records)) + } + + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + + if details.PodID != testManifest().ID() { + t.Errorf("expected audit log details to have pod ID %q but was %q", testManifest().ID(), details.PodID) + } + if details.AvailabilityZone != "some_az" { + t.Errorf("expected audit log details to have availability zone %q but was %q", "some_az", details.AvailabilityZone) + } + if details.ClusterName != "some_cn" { + t.Errorf("expected audit log details to have cluster name %q but was %q", "some_cn", details.ClusterName) + } + + if len(details.Nodes) != 2 { + t.Fatalf("expected 2 nodes but there were %d", len(details.Nodes)) + } + found1 := false + found3 := false + for _, node := range details.Nodes { + switch node { + case "node1": + found1 = true + case "node3": + found3 = true + default: + t.Errorf("found node %s but that wasn't expected", node) + } + } + + if !found1 { + t.Error("expected to find node1 in the list") + } + + if !found3 { + t.Error("expected to find node3 in the list") + } + } +} diff --git a/pkg/rc/farm.go b/pkg/rc/farm.go index 4af760691..b876f52e4 100644 --- a/pkg/rc/farm.go +++ b/pkg/rc/farm.go @@ -2,6 +2,7 @@ package rc import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -10,6 +11,7 @@ import ( klabels "k8s.io/kubernetes/pkg/labels" "github.com/square/p2/pkg/alerting" + "github.com/square/p2/pkg/audit" "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" p2metrics "github.com/square/p2/pkg/metrics" @@ -48,6 +50,14 @@ type ReplicationControllerLocker interface { LockForOwnership(rcID fields.ID, session consul.Session) (consulutil.Unlocker, error) } +type AuditLogStore interface { + Create( + ctx context.Context, + eventType audit.EventType, + eventDetails json.RawMessage, + ) error +} + // The Farm is responsible for spawning and reaping replication controllers // as they are added to and deleted from Consul. Multiple farms can exist // simultaneously, but each one must hold a different Consul session. This @@ -60,13 +70,14 @@ type ReplicationControllerLocker interface { // farms to cooperatively schedule work. type Farm struct { // constructor arguments for rcs created by this farm - store consulStore - rcStore ReplicationControllerStore - rcLocker ReplicationControllerLocker - rcWatcher ReplicationControllerWatcher - scheduler scheduler.Scheduler - labeler Labeler - txner transaction.Txner + store consulStore + auditLogStore AuditLogStore + rcStore ReplicationControllerStore + rcLocker ReplicationControllerLocker + rcWatcher ReplicationControllerWatcher + scheduler scheduler.Scheduler + labeler Labeler + txner transaction.Txner // session stream for the rcs locked by this farm sessions <-chan string @@ -94,6 +105,7 @@ type childRC struct { func NewFarm( store consulStore, + auditLogStore AuditLogStore, rcs ReplicationControllerStore, rcLocker ReplicationControllerLocker, rcWatcher ReplicationControllerWatcher, @@ -112,6 +124,7 @@ func NewFarm( return &Farm{ store: store, + auditLogStore: auditLogStore, rcStore: rcs, rcLocker: rcLocker, rcWatcher: rcWatcher, @@ -259,6 +272,7 @@ START_LOOP: newChild := New( rc, rcf.store, + rcf.auditLogStore, rcf.txner, rcf.rcWatcher, rcf.scheduler, diff --git a/pkg/rc/fields/fields.go b/pkg/rc/fields/fields.go index e56611257..ea5a4aaee 100644 --- a/pkg/rc/fields/fields.go +++ b/pkg/rc/fields/fields.go @@ -40,7 +40,7 @@ type RC struct { // Defines the set of nodes on which the manifest can be scheduled NodeSelector labels.Selector - // A set of labels that will be added to every pod scheduled by this controller + // A set of labels that will be added to every pod scheduled by this controller. PodLabels labels.Set // The desired number of instances of the manifest that should be diff --git a/pkg/rc/replication_controller.go b/pkg/rc/replication_controller.go index 8297bbd7e..fd7799c61 100644 --- a/pkg/rc/replication_controller.go +++ b/pkg/rc/replication_controller.go @@ -80,6 +80,7 @@ type replicationController struct { logger logging.Logger consulStore consulStore + auditLogStore AuditLogStore txner transaction.Txner rcWatcher ReplicationControllerWatcher scheduler scheduler.Scheduler @@ -94,6 +95,7 @@ type ReplicationControllerWatcher interface { func New( fields fields.RC, consulStore consulStore, + auditLogStore AuditLogStore, txner transaction.Txner, rcWatcher ReplicationControllerWatcher, scheduler scheduler.Scheduler, @@ -110,6 +112,7 @@ func New( logger: logger, consulStore: consulStore, + auditLogStore: auditLogStore, txner: txner, rcWatcher: rcWatcher, scheduler: scheduler, @@ -226,19 +229,19 @@ func (rc *replicationController) addPods(current types.PodLocations) error { rc.logger.NoFields().Infof("Need to schedule %d nodes out of %s", toSchedule, possible) - ctx, cancelFunc := transaction.New(context.Background()) + txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toSchedule; i++ { // create a new context for every 5 nodes. This is done to make sure // we're safely under the 64 operation limit imposed by consul on // transactions if i%5 == 0 && i > 0 { - err = transaction.Commit(ctx, cancelFunc, rc.txner) + err = txn.Commit(cancelFunc, rc.txner) if err != nil { cancelFunc() return err } - ctx, cancelFunc = transaction.New(context.Background()) + txn, cancelFunc = rc.newAuditingTransaction(context.Background(), txn.Nodes()) } if len(possibleSorted) < i+1 { errMsg := fmt.Sprintf( @@ -251,7 +254,7 @@ func (rc *replicationController) addPods(current types.PodLocations) error { } // commit any queued operations - txnErr := transaction.Commit(ctx, cancelFunc, rc.txner) + txnErr := txn.Commit(cancelFunc, rc.txner) if txnErr != nil { return txnErr } @@ -260,14 +263,14 @@ func (rc *replicationController) addPods(current types.PodLocations) error { } scheduleOn := possibleSorted[i] - err := rc.schedule(ctx, scheduleOn) + err := rc.schedule(txn, scheduleOn) if err != nil { cancelFunc() return err } } - return transaction.Commit(ctx, cancelFunc, rc.txner) + return txn.Commit(cancelFunc, rc.txner) } // Generates an alerting.AlertInfo struct. Includes information relevant to @@ -312,19 +315,19 @@ func (rc *replicationController) removePods(current types.PodLocations) error { toUnschedule := len(current) - rc.ReplicasDesired rc.logger.NoFields().Infof("Need to unschedule %d nodes out of %s", toUnschedule, current) - ctx, cancelFunc := transaction.New(context.Background()) + txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toUnschedule; i++ { // create a new context for every 5 nodes. This is done to make sure // we're safely under the 64 operation limit imposed by consul on // transactions if i%5 == 0 && i > 0 { - err = transaction.Commit(ctx, cancelFunc, rc.txner) + err = txn.Commit(cancelFunc, rc.txner) if err != nil { cancelFunc() return err } - ctx, cancelFunc = transaction.New(context.Background()) + txn, cancelFunc = rc.newAuditingTransaction(context.Background(), txn.Nodes()) } unscheduleFrom, ok := preferred.PopAny() @@ -335,7 +338,7 @@ func (rc *replicationController) removePods(current types.PodLocations) error { // This should be mathematically impossible unless replicasDesired was negative // commit any queued operations - txnErr := transaction.Commit(ctx, cancelFunc, rc.txner) + txnErr := txn.Commit(cancelFunc, rc.txner) if txnErr != nil { return txnErr } @@ -347,13 +350,15 @@ func (rc *replicationController) removePods(current types.PodLocations) error { ) } } - err := rc.unschedule(ctx, unscheduleFrom) + + err := rc.unschedule(txn, unscheduleFrom) if err != nil { cancelFunc() return err } } - return transaction.Commit(ctx, cancelFunc, rc.txner) + + return txn.Commit(cancelFunc, rc.txner) } func (rc *replicationController) ensureConsistency(current types.PodLocations) error { @@ -396,7 +401,8 @@ func (rc *replicationController) ensureConsistency(current types.PodLocations) e } rc.logger.WithField("node", pod.Node).WithField("intentManifestSHA", intentSHA).Info("Found inconsistency in scheduled manifest") - if err := rc.schedule(ctx, pod.Node); err != nil { + + if err := rc.scheduleNoAudit(ctx, pod.Node); err != nil { cancelFunc() return err } @@ -462,7 +468,17 @@ func (rc *replicationController) computePodLabels() map[string]string { return ret } -func (rc *replicationController) schedule(ctx context.Context, node types.NodeName) error { +func (rc *replicationController) schedule(txn *auditingTransaction, node types.NodeName) error { + err := rc.scheduleNoAudit(txn.Context(), node) + if err != nil { + return err + } + + txn.AddNode(node) + return nil +} + +func (rc *replicationController) scheduleNoAudit(ctx context.Context, node types.NodeName) error { rc.logger.NoFields().Infof("Scheduling on %s", node) rc.mu.Lock() manifest := rc.Manifest @@ -477,12 +493,12 @@ func (rc *replicationController) schedule(ctx context.Context, node types.NodeNa return rc.consulStore.SetPodTxn(ctx, consul.INTENT_TREE, node, manifest) } -func (rc *replicationController) unschedule(ctx context.Context, node types.NodeName) error { +func (rc *replicationController) unschedule(txn *auditingTransaction, node types.NodeName) error { rc.logger.NoFields().Infof("Unscheduling from %s", node) rc.mu.Lock() manifest := rc.Manifest rc.mu.Unlock() - err := rc.consulStore.DeletePodTxn(ctx, consul.INTENT_TREE, node, manifest.ID()) + err := rc.consulStore.DeletePodTxn(txn.Context(), consul.INTENT_TREE, node, manifest.ID()) if err != nil { return err } @@ -494,5 +510,12 @@ func (rc *replicationController) unschedule(ctx context.Context, node types.Node } labelKey := labels.MakePodLabelKey(node, manifest.ID()) - return rc.podApplicator.RemoveLabelsTxn(ctx, labels.POD, labelKey, keysToRemove) + + err = rc.podApplicator.RemoveLabelsTxn(txn.Context(), labels.POD, labelKey, keysToRemove) + if err != nil { + return err + } + + txn.RemoveNode(node) + return nil } diff --git a/pkg/rc/replication_controller_test.go b/pkg/rc/replication_controller_test.go index 8d6d195d6..041d4dd1d 100644 --- a/pkg/rc/replication_controller_test.go +++ b/pkg/rc/replication_controller_test.go @@ -4,12 +4,14 @@ package rc import ( "context" + "encoding/json" "fmt" "sync" "testing" "time" "github.com/square/p2/pkg/alerting/alertingtest" + "github.com/square/p2/pkg/audit" "github.com/square/p2/pkg/labels" "github.com/square/p2/pkg/logging" "github.com/square/p2/pkg/manifest" @@ -18,6 +20,7 @@ import ( "github.com/square/p2/pkg/rc/fields" "github.com/square/p2/pkg/scheduler" "github.com/square/p2/pkg/store/consul" + "github.com/square/p2/pkg/store/consul/auditlogstore" "github.com/square/p2/pkg/store/consul/consulutil" "github.com/square/p2/pkg/store/consul/rcstore" "github.com/square/p2/pkg/types" @@ -54,12 +57,18 @@ type testApplicator interface { SetLabelsTxn(ctx context.Context, labelType labels.Type, id string, values map[string]string) error } +type testAuditLogStore interface { + AuditLogStore + List() (map[audit.ID]audit.AuditLog, error) +} + func setup(t *testing.T) ( rcStore testRCStore, consulStore testConsulStore, applicator testApplicator, rc *replicationController, alerter *alertingtest.AlertRecorder, + auditLogStore testAuditLogStore, closeFn func(), ) { fixture := consulutil.NewFixture(t) @@ -89,10 +98,12 @@ func setup(t *testing.T) ( Assert(t).IsNil(err, "expected no error creating request") alerter = alertingtest.NewRecorder() + auditLogStore = auditlogstore.NewConsulStore(fixture.Client.KV()) rc = New( rcData, consulStore, + auditLogStore, fixture.Client.KV(), rcStore, scheduler.NewApplicatorScheduler(applicator), @@ -133,7 +144,7 @@ func waitForNodes(t *testing.T, rc ReplicationController, desired int) int { } func TestDoNothing(t *testing.T) { - _, consulStore, applicator, rc, alerter, closeFn := setup(t) + _, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := rc.meetDesires() @@ -152,7 +163,7 @@ func TestDoNothing(t *testing.T) { } func TestCantSchedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() quit := make(chan struct{}) @@ -181,7 +192,7 @@ func TestCantSchedule(t *testing.T) { } func TestSchedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, auditLogStore, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -217,10 +228,35 @@ func TestSchedule(t *testing.T) { } Assert(t).AreEqual(len(alerter.Alerts), 0, "Expected no alerts to fire") + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 1 { + t.Fatalf("expected a single audit log record but there were %d", len(records)) + } + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + if len(details.Nodes) != 1 { + t.Error("expected one node") + } else { + if details.Nodes[0] != "node2" { + t.Errorf("expected node list to only have %v but had %v", "node2", details.Nodes[0]) + } + } + } } func TestSchedulePartial(t *testing.T) { - _, consulStore, applicator, rc, alerter, closeFn := setup(t) + _, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -266,7 +302,7 @@ func TestSchedulePartial(t *testing.T) { } func TestUnschedulePartial(t *testing.T) { - _, consulStore, applicator, rc, _, closeFn := setup(t) + _, consulStore, applicator, rc, _, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node2", "nodeQuality", "good") @@ -304,7 +340,7 @@ func TestUnschedulePartial(t *testing.T) { } func TestScheduleTwice(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -347,7 +383,7 @@ func TestScheduleTwice(t *testing.T) { } func TestUnschedule(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, auditLogStore, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "bad") @@ -383,10 +419,49 @@ func TestUnschedule(t *testing.T) { } Assert(t).AreEqual(len(manifests), 0, "expected manifest to have been unscheduled") Assert(t).AreEqual(len(alerter.Alerts), 0, "expected no alerts to fire") + + records, err := auditLogStore.List() + if err != nil { + t.Fatal(err) + } + if len(records) != 2 { + t.Fatalf("expected 2 audit log records but there were %d", len(records)) + } + + foundNone := false + for _, record := range records { + if record.EventType != audit.RCRetargetingEvent { + t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType) + } + var details audit.RCRetargetingDetails + err = json.Unmarshal([]byte(*record.EventDetails), &details) + if err != nil { + t.Fatal(err) + } + + if len(details.Nodes) == 0 { + if foundNone { + t.Fatal("both audit records had no nodes in them") + } + foundNone = true + } else { + if len(details.Nodes) != 1 { + t.Error("expected one node") + } else { + if details.Nodes[0] != "node2" { + t.Errorf("expected node list to only have %v but had %v", "node2", details.Nodes[0]) + } + } + } + } + + if !foundNone { + t.Fatal("should have found an audit record with no nodes but didn't") + } } func TestPreferUnscheduleIneligible(t *testing.T) { - rcStore, consulStore, applicator, rc, alerter, closeFn := setup(t) + rcStore, consulStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() for i := 0; i < 1000; i++ { nodeName := fmt.Sprintf("node%d", i) @@ -428,7 +503,7 @@ func TestPreferUnscheduleIneligible(t *testing.T) { } func TestConsistencyNoChange(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -460,7 +535,7 @@ func TestConsistencyNoChange(t *testing.T) { } func TestConsistencyModify(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -490,7 +565,7 @@ func TestConsistencyModify(t *testing.T) { } func TestConsistencyDelete(t *testing.T) { - _, kvStore, applicator, rc, alerter, closeFn := setup(t) + _, kvStore, applicator, rc, alerter, _, closeFn := setup(t) defer closeFn() rcSHA, _ := rc.Manifest.SHA() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -518,7 +593,7 @@ func TestConsistencyDelete(t *testing.T) { } func TestReservedLabels(t *testing.T) { - _, _, applicator, rc, _, closeFn := setup(t) + _, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() err := applicator.SetLabel(labels.NODE, "node1", "nodeQuality", "good") @@ -539,7 +614,7 @@ func TestReservedLabels(t *testing.T) { } func TestScheduleMoreThan5(t *testing.T) { - rcStore, _, applicator, rc, _, closeFn := setup(t) + rcStore, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() for i := 0; i < 7; i++ { @@ -570,7 +645,7 @@ func TestScheduleMoreThan5(t *testing.T) { } func TestUnscheduleMoreThan5(t *testing.T) { - rcStore, _, applicator, rc, _, closeFn := setup(t) + rcStore, _, applicator, rc, _, _, closeFn := setup(t) defer closeFn() for i := 0; i < 7; i++ { From 087ea9ebb05ac394750ca0e528947ed1491a2d73 Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Mon, 10 Jul 2017 11:14:29 -0700 Subject: [PATCH 5/5] add links to consul issue about raising transaction size limit --- pkg/rc/replication_controller.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/rc/replication_controller.go b/pkg/rc/replication_controller.go index fd7799c61..5952a7fae 100644 --- a/pkg/rc/replication_controller.go +++ b/pkg/rc/replication_controller.go @@ -231,9 +231,10 @@ func (rc *replicationController) addPods(current types.PodLocations) error { txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toSchedule; i++ { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { err = txn.Commit(cancelFunc, rc.txner) if err != nil { @@ -317,9 +318,10 @@ func (rc *replicationController) removePods(current types.PodLocations) error { txn, cancelFunc := rc.newAuditingTransaction(context.Background(), currentNodes) for i := 0; i < toUnschedule; i++ { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { err = txn.Commit(cancelFunc, rc.txner) if err != nil { @@ -372,9 +374,10 @@ func (rc *replicationController) ensureConsistency(current types.PodLocations) e ctx, cancelFunc := transaction.New(context.Background()) for i, pod := range current { - // create a new context for every 5 nodes. This is done to make sure - // we're safely under the 64 operation limit imposed by consul on - // transactions + // create a new context for every 5 nodes. This is done to make + // sure we're safely under the 64 operation limit imposed by + // consul on transactions. This shouldn't be necessary after + // https://github.com/hashicorp/consul/issues/2921 is resolved if i%5 == 0 && i > 0 { err = transaction.Commit(ctx, cancelFunc, rc.txner) if err != nil {