Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
require availability zone and cluster name pod labels on RCs
Browse files Browse the repository at this point in the history
RCs have a "pod_labels" field which represents a set of labels that
should be applied to every pod that is scheduled by that RC. Previously
there has been no requirement that that set of labels includes the
availability_zone and cluster_name to identify the pod cluster that the
RC and its pods are members of. pod_id can be inferred from the pod
manifest and is already automatically set.

This commit enforces that the set of labels have availability zone and
cluster name in anticipation of having RCs create audit log records
every time the set of nodes they target changes. This is because audit
log records are linked to pod clusters, and as a result RCs must be as
well.
  • Loading branch information
mpuncel committed Jun 22, 2017
1 parent 4555c22 commit 72109aa
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 31 deletions.
35 changes: 27 additions & 8 deletions bin/p2-rctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/logging"
"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/rc"
"github.com/square/p2/pkg/rc/fields"
rc_fields "github.com/square/p2/pkg/rc/fields"
Expand Down Expand Up @@ -54,11 +55,13 @@ var (
logLevel = kingpin.Flag("log", "Logging level to display.").String()
logJSON = kingpin.Flag("log-json", "Log messages will be JSON formatted").Bool()

cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller")
createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String()
createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String()
createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap()
createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap()
cmdCreate = kingpin.Command(cmdCreateText, "Create a new replication controller")
createManifest = cmdCreate.Flag("manifest", "manifest file to use for this replication controller").Short('m').Required().String()
createNodeSel = cmdCreate.Flag("node-selector", "node selector that this replication controller should target").Short('n').Required().String()
createPodLabels = cmdCreate.Flag("pod-label", "a pod label, in LABEL=VALUE form, to add to this replication controller. Can be specified multiple times.").Short('p').StringMap()
createRCLabels = cmdCreate.Flag("rc-label", "an RC label, in LABEL=VALUE form, to be applied to this replication controller. Can be specified multiple times.").Short('r').StringMap()
createAvailabilityZone = cmdCreate.Flag("availability-zone", "availability zone that RC should belong to").Short('a').Required().String()
createClusterName = cmdCreate.Flag("cluster-name", "availability zone that RC should belong to").Short('c').Required().String()

cmdDelete = kingpin.Command(cmdDeleteText, "Delete a replication controller")
deleteID = cmdDelete.Arg("id", "replication controller uuid to delete").Required().String()
Expand Down Expand Up @@ -153,7 +156,14 @@ func main() {

switch cmd {
case cmdCreateText:
rctl.Create(*createManifest, *createNodeSel, *createPodLabels, *createRCLabels)
rctl.Create(
*createManifest,
*createNodeSel,
pc_fields.AvailabilityZone(*createAvailabilityZone),
pc_fields.ClusterName(*createClusterName),
*createPodLabels,
*createRCLabels,
)
case cmdDeleteText:
rctl.Delete(*deleteID, *deleteForce)
case cmdReplicasText:
Expand Down Expand Up @@ -197,6 +207,8 @@ type ReplicationControllerStore interface {
Create(
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (fields.RC, error)
Expand Down Expand Up @@ -231,7 +243,14 @@ type rctlParams struct {
logger logging.Logger
}

func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[string]string, rcLabels map[string]string) {
func (r rctlParams) Create(
manifestPath string,
nodeSelector string,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels map[string]string,
rcLabels map[string]string,
) {
manifest, err := manifest.FromPath(manifestPath)
if err != nil {
r.logger.WithErrorAndFields(err, logrus.Fields{
Expand All @@ -246,7 +265,7 @@ func (r rctlParams) Create(manifestPath, nodeSelector string, podLabels map[stri
}).Fatalln("Could not parse node selector")
}

newRC, err := r.rcs.Create(manifest, nodeSel, klabels.Set(podLabels), rcLabels)
newRC, err := r.rcs.Create(manifest, nodeSel, availabilityZone, clusterName, klabels.Set(podLabels), rcLabels)
if err != nil {
r.logger.WithError(err).Fatalln("Could not create replication controller in Consul")
}
Expand Down
4 changes: 2 additions & 2 deletions integration/single-node-slug-deploy/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,13 @@ func verifyTransferReplicas(errCh chan<- error, tempdir string, logger logging.L
builder.SetID("some_pod")
man := builder.GetManifest()

fromRC, err := rcStore.Create(man, klabels.Everything(), nil, nil)
fromRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil)
if err != nil {
errCh <- util.Errorf("could not create RC for replica transfer test: %s", err)
return
}

toRC, err := rcStore.Create(man, klabels.Everything(), nil, nil)
toRC, err := rcStore.Create(man, klabels.Everything(), "some_az", "some_cn", nil, nil)
if err != nil {
errCh <- util.Errorf("could not create second RC for replica transfer test: %s", err)
return
Expand Down
7 changes: 3 additions & 4 deletions pkg/pc/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"reflect"

"github.com/square/p2/pkg/store/consul/rcstore"
"github.com/square/p2/pkg/types"

"k8s.io/kubernetes/pkg/labels"
Expand All @@ -18,9 +17,9 @@ type Annotations map[string]interface{}

// label keys used by pod selector
const (
AvailabilityZoneLabel = "availability_zone"
ClusterNameLabel = "cluster_name"
PodIDLabel = rcstore.PodIDLabel // TODO: put this in a different place now that multiple packages use it
AvailabilityZoneLabel = types.AvailabilityZoneLabel
ClusterNameLabel = types.ClusterNameLabel
PodIDLabel = types.PodIDLabel // TODO: put this in a different place now that multiple packages use it
)

func (id ID) String() string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rc/farm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestRCsWithCountsWillBeFine(t *testing.T) {
rcStore: fakeStore,
}

rc, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil)
rc, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil)
if err != nil {
t.Fatalf("could not put an RC in the fake store: %s", err)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestRCsWithZeroCountsWillTriggerIncident(t *testing.T) {
}

// replica count is implicitly zero
_, err := fakeStore.Create(testManifest(), klabels.Everything(), map[string]string{}, nil)
_, err := fakeStore.Create(testManifest(), klabels.Everything(), "some_az", "some_cn", map[string]string{}, nil)
if err != nil {
t.Fatalf("could not put an RC in the fake store: %s", err)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/rc/replication_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/logging"
"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/pods"
"github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/scheduler"
Expand All @@ -27,7 +28,14 @@ import (
type testRCStore interface {
ReplicationControllerStore
ReplicationControllerWatcher
Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error)
Create(
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (fields.RC, error)
SetDesiredReplicas(id fields.ID, n int) error
}

Expand Down Expand Up @@ -76,7 +84,7 @@ func setup(t *testing.T) (
nodeSelector := klabels.Everything().Add("nodeQuality", klabels.EqualsOperator, []string{"good"})
podLabels := map[string]string{"podTest": "successful"}

rcData, err := rcStore.Create(podManifest, nodeSelector, podLabels, nil)
rcData, err := rcStore.Create(podManifest, nodeSelector, "some_az", "some_cn", podLabels, nil)
Assert(t).IsNil(err, "expected no error creating request")

alerter = alertingtest.NewRecorder()
Expand Down
4 changes: 2 additions & 2 deletions pkg/roll/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestCleanupOldRCHappy(t *testing.T) {
builder := manifest.NewBuilder()
builder.SetID("whatever")

rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil)
rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestCleanupOldRCTooManyReplicas(t *testing.T) {
builder := manifest.NewBuilder()
builder.SetID("whatever")

rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), nil, nil)
rc, err := rcStore.Create(builder.GetManifest(), klabels.Everything(), "some_az", "some_cn", nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/roll/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/logging"
"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/rc"
rc_fields "github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/roll/fields"
Expand Down Expand Up @@ -376,7 +377,14 @@ func assignManifestsToNodes(
}

type testRCCreator interface {
Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (rc_fields.RC, error)
Create(
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (rc_fields.RC, error)
SetDesiredReplicas(id rc_fields.ID, n int) error
}

Expand All @@ -387,7 +395,7 @@ func createRC(
desired int,
nodes map[types.NodeName]bool,
) (rc_fields.RC, error) {
created, err := rcs.Create(manifest, nil, nil, nil)
created, err := rcs.Create(manifest, nil, "some_az", "some_cn", nil, nil)
if err != nil {
return rc_fields.RC{}, fmt.Errorf("Error creating RC: %s", err)
}
Expand Down
22 changes: 20 additions & 2 deletions pkg/store/consul/rcstore/consul_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (

"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/store/consul"
"github.com/square/p2/pkg/store/consul/consulutil"
"github.com/square/p2/pkg/store/consul/transaction"
"github.com/square/p2/pkg/types"
"github.com/square/p2/pkg/util"
)

const (
// This label is applied to an RC, to identify the ID of its pod manifest.
PodIDLabel = "pod_id"
PodIDLabel = types.PodIDLabel
// This is called "update" for backwards compatibility reasons, it
// should probably be named "mutate"
mutationSuffix = "update"
Expand Down Expand Up @@ -106,7 +108,21 @@ func NewConsul(client consulutil.ConsulClient, labeler RCLabeler, retries int) *
// The node selector is used to determine what nodes the replication controller may schedule on.
// The pod label set is applied to every pod the replication controller schedules.
// The additionalLabels label set is applied to the RCs own labels
func (s *ConsulStore) Create(manifest manifest.Manifest, nodeSelector klabels.Selector, podLabels klabels.Set, additionalLabels klabels.Set) (fields.RC, error) {
func (s *ConsulStore) Create(
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (fields.RC, error) {

if podLabels == nil {
podLabels = make(klabels.Set)
}
podLabels[types.ClusterNameLabel] = clusterName.String()
podLabels[types.AvailabilityZoneLabel] = availabilityZone.String()

rc, err := s.innerCreate(manifest, nodeSelector, podLabels)

// TODO: measure whether retries are is important in practice
Expand Down Expand Up @@ -136,6 +152,8 @@ func (s *ConsulStore) CreateTxn(
ctx context.Context,
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (fields.RC, error) {
Expand Down
20 changes: 18 additions & 2 deletions pkg/store/consul/rcstore/fake_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/kubernetes/pkg/labels"

"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/store/consul"
"github.com/square/p2/pkg/store/consul/consulutil"
Expand Down Expand Up @@ -37,7 +38,14 @@ func NewFake() *fakeStore {
}
}

func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) {
func (s *fakeStore) Create(
manifest manifest.Manifest,
nodeSelector labels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels labels.Set,
additionalLabels labels.Set,
) (fields.RC, error) {
// A real replication controller will use a UUID.
// We'll just use a monotonically increasing counter for expedience.
s.creates += 1
Expand Down Expand Up @@ -66,7 +74,15 @@ func (s *fakeStore) Create(manifest manifest.Manifest, nodeSelector labels.Selec
// If a test needs to use transactions, it should be using a real consul e.g.
// via consulutil.NewFixture(). We won't be implementing transactions ourselves
// in these fake storage structs
func (s *fakeStore) CreateTxn(ctx context.Context, manifest manifest.Manifest, nodeSelector labels.Selector, podLabels labels.Set, additionalLabels labels.Set) (fields.RC, error) {
func (s *fakeStore) CreateTxn(
ctx context.Context,
manifest manifest.Manifest,
nodeSelector labels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels labels.Set,
additionalLabels labels.Set,
) (fields.RC, error) {
panic("transactions not implemented in fake rc store")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/consul/rcstore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCreateTxn(t *testing.T) {

ctx, cancelFunc := transaction.New(context.Background())
defer cancelFunc()
rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), nil, rcLabelsToSet)
rc, err := store.CreateTxn(ctx, testManifest(), klabels.Everything(), "some_az", "some_cn", nil, rcLabelsToSet)
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/store/consul/rollstore/consul_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/logging"
"github.com/square/p2/pkg/manifest"
pc_fields "github.com/square/p2/pkg/pc/fields"
rc_fields "github.com/square/p2/pkg/rc/fields"
roll_fields "github.com/square/p2/pkg/roll/fields"
"github.com/square/p2/pkg/store/consul"
Expand Down Expand Up @@ -65,6 +66,8 @@ type ReplicationControllerStore interface {
ctx context.Context,
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (rc_fields.RC, error)
Expand All @@ -75,6 +78,8 @@ type ReplicationControllerStore interface {
Create(
manifest manifest.Manifest,
nodeSelector klabels.Selector,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
podLabels klabels.Set,
additionalLabels klabels.Set,
) (rc_fields.RC, error)
Expand Down Expand Up @@ -307,6 +312,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID(
minimumReplicas int,
leaveOld bool,
rollDelay time.Duration,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
newRCManifest manifest.Manifest,
newRCNodeSelector klabels.Selector,
newRCPodLabels klabels.Set,
Expand Down Expand Up @@ -334,7 +341,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneExistingRCWithID(
return roll_fields.Update{}, err
}

rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels)
rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels)
if err != nil {
return roll_fields.Update{}, err
}
Expand Down Expand Up @@ -381,6 +388,8 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector(
minimumReplicas int,
leaveOld bool,
rollDelay time.Duration,
availabilityZone pc_fields.AvailabilityZone,
clusterName pc_fields.ClusterName,
newRCManifest manifest.Manifest,
newRCNodeSelector klabels.Selector,
newRCPodLabels klabels.Set,
Expand Down Expand Up @@ -418,7 +427,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector(

// Create the old RC using the same info as the new RC, it'll be
// removed when the update completes anyway
rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels)
rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels)
if err != nil {
return roll_fields.Update{}, err
}
Expand All @@ -440,7 +449,7 @@ func (s ConsulStore) CreateRollingUpdateFromOneMaybeExistingWithLabelSelector(

// Create the new RC
var newRCID rc_fields.ID
rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, newRCPodLabels, newRCLabels)
rc, err := s.rcstore.CreateTxn(ctx, newRCManifest, newRCNodeSelector, availabilityZone, clusterName, newRCPodLabels, newRCLabels)
if err != nil {
return roll_fields.Update{}, err
}
Expand Down
Loading

0 comments on commit 72109aa

Please sign in to comment.