Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
Add audit logging to RC farm.
Browse files Browse the repository at this point in the history
An audit log record will be created whenever nodes are scheduled or
unscheduled by an RC. This will allow constructing a history of which
nodes were managed by an RC or pod cluster over time.
  • Loading branch information
mpuncel committed Jun 22, 2017
1 parent 72109aa commit 3705d4d
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 37 deletions.
4 changes: 4 additions & 0 deletions bin/p2-rctl-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/square/p2/pkg/roll"
"github.com/square/p2/pkg/scheduler"
"github.com/square/p2/pkg/store/consul"
"github.com/square/p2/pkg/store/consul/auditlogstore"
"github.com/square/p2/pkg/store/consul/consulutil"
"github.com/square/p2/pkg/store/consul/flags"
"github.com/square/p2/pkg/store/consul/rcstore"
Expand Down Expand Up @@ -98,9 +99,12 @@ func main() {
}
}

auditLogStore := auditlogstore.NewConsulStore(client.KV())

// Run the farms!
go rc.NewFarm(
consulStore,
auditLogStore,
rcStore,
rcStore,
rcStore,
Expand Down
95 changes: 95 additions & 0 deletions pkg/rc/auditing_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package rc

import (
"context"

"github.com/square/p2/pkg/audit"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/store/consul/transaction"
"github.com/square/p2/pkg/types"
"github.com/square/p2/pkg/util"
)

type auditingTransaction struct {
ctx context.Context
nodes map[types.NodeName]struct{}
auditLogStore AuditLogStore
podID types.PodID
az pc_fields.AvailabilityZone
cn pc_fields.ClusterName
}

type scheduledNodesKey struct{}

func (rc *replicationController) newAuditingTransaction(
ctx context.Context,
startingNodes []types.NodeName,
) (*auditingTransaction, func()) {
annotatedContext := context.WithValue(ctx, scheduledNodesKey{}, startingNodes)
ctx, cancelFunc := transaction.New(annotatedContext)

startingNodeMap := make(map[types.NodeName]struct{})
for _, node := range startingNodes {
startingNodeMap[node] = struct{}{}
}

rc.mu.Lock()
manifest := rc.Manifest
podLabels := rc.PodLabels
rc.mu.Unlock()
return &auditingTransaction{
ctx: ctx,
nodes: startingNodeMap,
auditLogStore: rc.auditLogStore,
podID: manifest.ID(),
az: pc_fields.AvailabilityZone(podLabels[types.AvailabilityZoneLabel]),
cn: pc_fields.ClusterName(podLabels[types.ClusterNameLabel]),
}, cancelFunc
}

func (a *auditingTransaction) Context() context.Context {
return a.ctx
}

func (a *auditingTransaction) Nodes() []types.NodeName {
var nodes []types.NodeName
for node, _ := range a.nodes {
nodes = append(nodes, node)
}
return nodes
}

func (a *auditingTransaction) AddNode(node types.NodeName) {
a.nodes[node] = struct{}{}
}

func (a *auditingTransaction) RemoveNode(node types.NodeName) {
delete(a.nodes, node)
}

// Commit adds one final operation to the underlying consul transaction to
// create an audit log record with the set of nodes that already have been
// scheduled and nodes that will be scheduled as a part of this transaction by
// the RC. Then it commits the transaction
func (a *auditingTransaction) Commit(cancelFunc func(), txner transaction.Txner) error {
details, err := audit.NewRCRetargetingEventDetails(
a.podID,
a.az,
a.cn,
a.Nodes(),
)
if err != nil {
return err
}

err = a.auditLogStore.Create(
a.ctx,
audit.RCRetargetingEvent,
details,
)
if err != nil {
return util.Errorf("could not add rc retargeting audit log to context: %s", err)
}

return transaction.Commit(a.ctx, cancelFunc, txner)
}
98 changes: 98 additions & 0 deletions pkg/rc/auditing_transaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rc

import (
"context"
"encoding/json"
"testing"

"github.com/square/p2/pkg/audit"
pc_fields "github.com/square/p2/pkg/pc/fields"
rc_fields "github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/store/consul/auditlogstore"
"github.com/square/p2/pkg/store/consul/consulutil"
"github.com/square/p2/pkg/types"

klabels "k8s.io/kubernetes/pkg/labels"
)

func TestAuditingTransaction(t *testing.T) {
fixture := consulutil.NewFixture(t)
defer fixture.Stop()

auditLogStore := auditlogstore.NewConsulStore(fixture.Client.KV())
rc := &replicationController{
RC: rc_fields.RC{
Manifest: testManifest(),
PodLabels: klabels.Set{
pc_fields.AvailabilityZoneLabel: "some_az",
pc_fields.ClusterNameLabel: "some_cn",
},
},
auditLogStore: auditLogStore,
}

ctx, cancel := rc.newAuditingTransaction(context.Background(), []types.NodeName{"node1", "node2"})
defer cancel()

ctx.AddNode("node3")
ctx.RemoveNode("node2")

err := ctx.Commit(cancel, fixture.Client.KV())
if err != nil {
t.Fatalf("could not commit audit log record: %s", err)
}

records, err := auditLogStore.List()
if err != nil {
t.Fatal(err)
}
if len(records) != 1 {
t.Fatalf("expected a single audit log record but there were %d", len(records))
}

for _, record := range records {
if record.EventType != audit.RCRetargetingEvent {
t.Errorf("expected audit log type to be %q but was %q", audit.RCRetargetingEvent, record.EventType)
}

var details audit.RCRetargetingDetails
err = json.Unmarshal([]byte(*record.EventDetails), &details)
if err != nil {
t.Fatal(err)
}

if details.PodID != testManifest().ID() {
t.Errorf("expected audit log details to have pod ID %q but was %q", testManifest().ID(), details.PodID)
}
if details.AvailabilityZone != "some_az" {
t.Errorf("expected audit log details to have availability zone %q but was %q", "some_az", details.AvailabilityZone)
}
if details.ClusterName != "some_cn" {
t.Errorf("expected audit log details to have cluster name %q but was %q", "some_cn", details.ClusterName)
}

if len(details.Nodes) != 2 {
t.Fatalf("expected 2 nodes but there were %d", len(details.Nodes))
}
found1 := false
found3 := false
for _, node := range details.Nodes {
switch node {
case "node1":
found1 = true
case "node3":
found3 = true
default:
t.Errorf("found node %s but that wasn't expected", node)
}
}

if !found1 {
t.Error("expected to find node1 in the list")
}

if !found3 {
t.Error("expected to find node3 in the list")
}
}
}
28 changes: 21 additions & 7 deletions pkg/rc/farm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rc

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -10,6 +11,7 @@ import (
klabels "k8s.io/kubernetes/pkg/labels"

"github.com/square/p2/pkg/alerting"
"github.com/square/p2/pkg/audit"
"github.com/square/p2/pkg/labels"
"github.com/square/p2/pkg/logging"
p2metrics "github.com/square/p2/pkg/metrics"
Expand Down Expand Up @@ -48,6 +50,14 @@ type ReplicationControllerLocker interface {
LockForOwnership(rcID fields.ID, session consul.Session) (consulutil.Unlocker, error)
}

type AuditLogStore interface {
Create(
ctx context.Context,
eventType audit.EventType,
eventDetails json.RawMessage,
) error
}

// The Farm is responsible for spawning and reaping replication controllers
// as they are added to and deleted from Consul. Multiple farms can exist
// simultaneously, but each one must hold a different Consul session. This
Expand All @@ -60,13 +70,14 @@ type ReplicationControllerLocker interface {
// farms to cooperatively schedule work.
type Farm struct {
// constructor arguments for rcs created by this farm
store consulStore
rcStore ReplicationControllerStore
rcLocker ReplicationControllerLocker
rcWatcher ReplicationControllerWatcher
scheduler scheduler.Scheduler
labeler Labeler
txner transaction.Txner
store consulStore
auditLogStore AuditLogStore
rcStore ReplicationControllerStore
rcLocker ReplicationControllerLocker
rcWatcher ReplicationControllerWatcher
scheduler scheduler.Scheduler
labeler Labeler
txner transaction.Txner

// session stream for the rcs locked by this farm
sessions <-chan string
Expand Down Expand Up @@ -94,6 +105,7 @@ type childRC struct {

func NewFarm(
store consulStore,
auditLogStore AuditLogStore,
rcs ReplicationControllerStore,
rcLocker ReplicationControllerLocker,
rcWatcher ReplicationControllerWatcher,
Expand All @@ -112,6 +124,7 @@ func NewFarm(

return &Farm{
store: store,
auditLogStore: auditLogStore,
rcStore: rcs,
rcLocker: rcLocker,
rcWatcher: rcWatcher,
Expand Down Expand Up @@ -259,6 +272,7 @@ START_LOOP:
newChild := New(
rc,
rcf.store,
rcf.auditLogStore,
rcf.txner,
rcf.rcWatcher,
rcf.scheduler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/rc/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type RC struct {
// Defines the set of nodes on which the manifest can be scheduled
NodeSelector labels.Selector

// A set of labels that will be added to every pod scheduled by this controller
// A set of labels that will be added to every pod scheduled by this controller.
PodLabels labels.Set

// The desired number of instances of the manifest that should be
Expand Down
Loading

0 comments on commit 3705d4d

Please sign in to comment.