Skip to content

Commit

Permalink
*: combine excessive Hash -> Unit requests into a single recursive call.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Witkowski committed Oct 1, 2015
1 parent 3178aed commit 091846e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 7 deletions.
28 changes: 22 additions & 6 deletions registry/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,25 @@ func (r *EtcdRegistry) Units() ([]job.Unit, error) {
return nil, err
}

// Fetch all units by hash recursively to avoid sending N requests to Etcd.
hashToUnit, err := r.getAllUnitsHashMap()
if err != nil {
log.Errorf("failed fetching all Units from etcd: %v", err)
return nil, err
}
unitHashLookupFunc := func(hash unit.Hash) *unit.UnitFile {
stringHash := hash.String()
unit, ok := hashToUnit[stringHash]
if !ok {
log.Errorf("did not find Unit %v in list of all units", stringHash)
return nil
}
return unit
}

uMap := make(map[string]*job.Unit)
for _, dir := range res.Node.Nodes {
u, err := r.dirToUnit(dir)
u, err := r.dirToUnit(dir, unitHashLookupFunc)
if err != nil {
log.Errorf("Failed to parse Unit from etcd: %v", err)
continue
Expand Down Expand Up @@ -143,12 +159,12 @@ func (r *EtcdRegistry) Unit(name string) (*job.Unit, error) {
return nil, err
}

return r.dirToUnit(res.Node)
return r.dirToUnit(res.Node, r.getUnitByHash)
}

// dirToUnit takes a Node containing a Job's constituent objects (in child
// nodes) and returns a *job.Unit, or any error encountered
func (r *EtcdRegistry) dirToUnit(dir *etcd.Node) (*job.Unit, error) {
func (r *EtcdRegistry) dirToUnit(dir *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
objKey := path.Join(dir.Key, "object")
var objNode *etcd.Node
for _, node := range dir.Nodes {
Expand All @@ -160,7 +176,7 @@ func (r *EtcdRegistry) dirToUnit(dir *etcd.Node) (*job.Unit, error) {
if objNode == nil {
return nil, nil
}
u, err := r.getUnitFromObjectNode(objNode)
u, err := r.getUnitFromObjectNode(objNode, unitHashLookupFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +270,7 @@ func dirToHeartbeat(dir *etcd.Node) (heartbeat string) {
// getUnitFromObject takes a *etcd.Node containing a Unit's jobModel, and
// instantiates and returns a representative *job.Unit, transitively fetching the
// associated UnitFile as necessary
func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node) (*job.Unit, error) {
func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
var err error
var jm jobModel
if err = unmarshal(node.Value, &jm); err != nil {
Expand All @@ -263,7 +279,7 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node) (*job.Unit, error)

var unit *unit.UnitFile

unit = r.getUnitByHash(jm.UnitHash)
unit = unitHashLookupFunc(jm.UnitHash)
if unit == nil {
log.Warningf("No Unit found in Registry for Job(%s)", jm.Name)
return nil, nil
Expand Down
43 changes: 42 additions & 1 deletion registry/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package registry

import (
"strings"

etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client"

"github.com/coreos/fleet/log"
Expand Down Expand Up @@ -61,8 +63,47 @@ func (r *EtcdRegistry) getUnitByHash(hash unit.Hash) *unit.UnitFile {
}
return nil
}
return r.unitFromEtcdNode(hash, resp.Node)
}

// getAllUnitsHashMap retrieves from the Registry all Units and returns a map of hash to UnitFile
func (r *EtcdRegistry) getAllUnitsHashMap() (map[string]*unit.UnitFile, error) {
key := r.prefixed(unitPrefix)
opts := &etcd.GetOptions{
Recursive: true,
Quorum: true,
}
hashToUnit := map[string]*unit.UnitFile{}
resp, err := r.kAPI.Get(r.ctx(), key, opts)
if err != nil {
return nil, err
}

for _, node := range resp.Node.Nodes {
parts := strings.Split(node.Key, "/")
if len(parts) == 0 {
log.Errorf("key '%v' doesn't have enough parts", node.Key)
continue
}
stringHash := parts[len(parts)-1]
hash, err := unit.HashFromHexString(stringHash)
if err != nil {
log.Errorf("failed to get Hash for key '%v' with stringHash '%v': %v", node.Key, stringHash, err)
continue
}
unit := r.unitFromEtcdNode(hash, node)
if unit == nil {
continue
}
hashToUnit[stringHash] = unit
}

return hashToUnit, nil
}

func (r *EtcdRegistry) unitFromEtcdNode(hash unit.Hash, etcdNode *etcd.Node) *unit.UnitFile {
var um unitModel
if err := unmarshal(resp.Node.Value, &um); err != nil {
if err := unmarshal(etcdNode.Value, &um); err != nil {
log.Errorf("error unmarshaling Unit(%s): %v", hash, err)
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package unit
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -169,6 +170,19 @@ func (h *Hash) Empty() bool {
return *h == Hash{}
}

func HashFromHexString(key string) (Hash, error) {
h := Hash{}
out, err := hex.DecodeString(key)
if err != nil {
return h, err
}
if len(out) != sha1.Size {
return h, fmt.Errorf("size of key %q (%d) differs from SHA1 size (%d)", out, len(out), sha1.Size)
}
copy(h[:], out[:sha1.Size])
return h, nil
}

// UnitState encodes the current state of a unit loaded into a fleet agent
type UnitState struct {
LoadState string
Expand Down
8 changes: 8 additions & 0 deletions unit/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func TestUnitHash(t *testing.T) {
if !eh.Empty() {
t.Fatalf("Empty hash check failed: %v", eh.Empty())
}

rehashed, err := HashFromHexString(expectHashString)
if err != nil {
t.Fatalf("HashFromHexString failed with: %v", err)
}
if rehashed != gotHash {
t.Fatalf("HashFromHexString not equal to original hash")
}
}

func TestRecognizedUnitTypes(t *testing.T) {
Expand Down

0 comments on commit 091846e

Please sign in to comment.