Skip to content

Commit

Permalink
Changes for Successful for Anti-Entropy Replication (#15)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Deziel <42919891+pdeziel@users.noreply.github.com>
  • Loading branch information
bbengfort and pdeziel authored Jan 10, 2022
1 parent 7ab6fd1 commit ce31ee3
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 207 deletions.
35 changes: 35 additions & 0 deletions honu.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,41 @@ func (db *DB) Get(key []byte, options ...opts.SetOptions) (value []byte, err err

}

// Update an object directly in the database without modifying its version information.
// Update is to Put as Object is to Get - use Update when manually modifying the data
// store, for example during replication, but not for normal DB operations.
func (db *DB) Update(obj *pb.Object, options ...opts.SetOptions) (err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return err
}
defer tx.Finish()

// Collect the options
var cfg *opts.Options
if cfg, err = opts.New(options...); err != nil {
return err
}

// Check the namespace and that it matches the object
if cfg.Namespace == opts.NamespaceDefault {
cfg.Namespace = obj.Namespace
} else if cfg.Namespace != obj.Namespace {
return errors.New("options namespace does not match object namespace")
}

// Put the version directly to disk
var data []byte
if data, err = proto.Marshal(obj); err != nil {
return err
}

if err = tx.Put(obj.Key, data, cfg); err != nil {
return err
}
return nil
}

// Put a new value to the specified key and update the version.
func (db *DB) Put(key, value []byte, options ...opts.SetOptions) (_ *pb.Object, err error) {
var tx engine.Transaction
Expand Down
23 changes: 23 additions & 0 deletions honu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,29 @@ func TestLevelDBInteractions(t *testing.T) {
require.Equal(t, uint64(3), obj.Version.Version)
require.False(t, obj.Tombstone())

// Attempt to directly update the object in the database
obj.Data = []byte("directly updated")
obj.Owner = "me"
obj.Version.Parent = nil
obj.Version.Version = 42
obj.Version.Pid = 93
obj.Version.Region = "here"
obj.Version.Tombstone = false
require.NoError(t, db.Update(obj))

obj, err = db.Object(key, options.WithNamespace(namespace))
require.NoError(t, err)
require.Equal(t, uint64(42), obj.Version.Version)
require.Equal(t, uint64(93), obj.Version.Pid)
require.Equal(t, "me", obj.Owner)
require.Equal(t, "here", obj.Version.Region)

// Update with same namespace option should not error.
require.NoError(t, db.Update(obj, options.WithNamespace(namespace)))

// Update with wrong namespace should error
require.Error(t, db.Update(obj, options.WithNamespace("this is not the right thing")))

// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
// on :: and grabbing the first string, so it only
Expand Down
7 changes: 7 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package object

var VersionZero = Version{}

// Tombstone returns true if the version of the object is a Tombstone (for a deleted object)
func (o *Object) Tombstone() bool {
if o.Version == nil {
Expand All @@ -17,6 +19,11 @@ func (v *Version) IsZero() bool {
// IsLater returns true if the specified version is later than the other version. It
// returns false if the other version is later or equal to the specified version.
func (v *Version) IsLater(other *Version) bool {
// If other is nil, then we assume it represents the zero-valued version.
if other == nil {
other = &VersionZero
}

// Version is monotonically increasing, if it's greater than the other, then this
// version is later than the other.
if v.Version > other.Version {
Expand Down
5 changes: 5 additions & 0 deletions object/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func TestTombstone(t *testing.T) {

func TestVersionIsLater(t *testing.T) {
v1 := &Version{Pid: 8, Version: 42}
require.True(t, v1.IsLater(nil))
require.True(t, v1.IsLater(&VersionZero))
require.False(t, VersionZero.IsLater(v1))
require.False(t, VersionZero.IsLater(nil))
require.False(t, VersionZero.IsLater(&VersionZero))
require.True(t, v1.IsLater(&Version{Pid: 8, Version: 40}))
require.True(t, v1.IsLater(&Version{Pid: 9, Version: 42}))
require.False(t, v1.IsLater(&Version{Pid: 7, Version: 42}))
Expand Down
65 changes: 30 additions & 35 deletions proto/replica/v1/replica.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,37 @@ import "object/v1/object.proto";
//
// The Replication Service requires mTLS authentication in order to conduct exchanges.
service Replication {
// During gossip, the initiating replica sends a randomly selected remote peer the
// version vectors of all objects it currently stores. The remote peer should
// respond with updates that correspond to more recent versions of the objects. The
// remote peer can than also make a reciprocal request for updates by sending the
// set of versions requested that were more recent on the initiating replica, and
// use a partial flag to indicate that it is requesting specific versions. This
// mechanism implements bilateral anti-entropy: a push and pull gossip.
rpc Gossip(VersionVectors) returns (Updates) {}
// Gossip implements biltateral anti-entropy: during a Gossip session the initiating
// replica pushes updates to the remote peer and pulls requested changes. Using
// bidirectional streaming, the initiating peer sends data-less sync messages with
// the versions of objects it stores locally. The remote replica then responds with
// data if its local version is later or sends a sync message back requesting the
// data from the initating replica if its local version is earlier (no exchange)
// occurs if both replicas have the same version. At the end of a gossip session,
// both replicas should have synchronized and have identical underlying data stores.
rpc Gossip(stream Sync) returns (stream Sync) {}
}

// VersionVectors initiate a Gossip RPC by passing along the current state of the
// namespace of objects on the initiating replica. Objects should not have data
// populated in this message, only metadata. If the partial flag is set, that indicates
// that the replica only wants to consider the objects described rather than the entire
// namespace (e.g. objects that may have been created outside the set of objects
// described). The partial flag is typically used as a mechanism to fetch specific
// objects that are known to be later from the remote replica. The namespaces array
// specifies which namespaces should be considered in gossip allowing for partial
// replication by namespace if necessary. If the namespaces array is empty, then all
// namespaces are considered during Gossip.
message VersionVectors {
// Version vectors of objects without data.
repeated honu.object.v1.Object objects = 1;
// Sync messages allow replicas to exchange information in a bidirectional stream.
// Because Sync messages are sent in the stream, there are different synchronization
// types. A "check" synchronization sends only the version vector without data,
// requesting the remote peer to check if they have a later version, and if so, return
// it. A "repair" synchronization sends object data back if the sender's version is
// later. A "complete" synchronization indicates that the sender is fully synchronized,
// and an "error" synchronization message says that there was some failure repairing
// the specified version vector.
message Sync {
// Status indicates what state the synchronization is in between peers, allowing
// the replicas to coordinate between multiple sync messages in a grpc stream.
enum Status {
UNKNOWN = 0; // Should not be used as a status
CHECK = 1; // Sync contains version information only
REPAIR = 2; // Sync contains object data information
COMPLETE = 3; // Sync contains no object information, ready to end Gossip
ERROR = 4; // Sync contains object-specific error information
}

// A request to consider only the objects specified in the version vector and not
// the entire namespace (e.g. ignore objects that are not specified).
bool partial = 2;

// Limit the comparison only to the specified namespace(s). If not set or empty,
// then all namespaces are considered.
repeated string namespaces = 3;
}

// Updates returns data to repair entropy (e.g. to repair the divergence caused by
// updates). Updates should only contain the objects that have later versions and the
// objects should have their data fields populated.
message Updates {
repeated honu.object.v1.Object objects = 1;
Status status = 1; // The status/type of sync messages
honu.object.v1.Object object = 2; // The object being synchronized
string error = 3; // Error information if the object failed to sync
}
Loading

0 comments on commit ce31ee3

Please sign in to comment.