Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for Successful for Anti-Entropy Replication #15

Merged
merged 5 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions honu.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,41 @@ func (db *DB) Get(key []byte, options ...opts.SetOptions) (value []byte, err err

}

// Update an object directly in the database without modifying its version information.
// Update is to Put as Object is to Get - use Update when manually modifying the data
// store, for example during replication, but not for normal DB operations.
func (db *DB) Update(obj *pb.Object, options ...opts.SetOptions) (err error) {
var tx engine.Transaction
if tx, err = db.engine.Begin(false); err != nil {
return err
}
defer tx.Finish()

// Collect the options
var cfg *opts.Options
if cfg, err = opts.New(options...); err != nil {
return err
}

// Check the namespace and that it matches the object
if cfg.Namespace == opts.NamespaceDefault {
cfg.Namespace = obj.Namespace
} else if cfg.Namespace != obj.Namespace {
return errors.New("options namespace does not match object namespace")
}

// Put the version directly to disk
var data []byte
if data, err = proto.Marshal(obj); err != nil {
return err
}

if err = tx.Put(obj.Key, data, cfg); err != nil {
return err
}
return nil
}

// Put a new value to the specified key and update the version.
func (db *DB) Put(key, value []byte, options ...opts.SetOptions) (_ *pb.Object, err error) {
var tx engine.Transaction
Expand Down
23 changes: 23 additions & 0 deletions honu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,29 @@ func TestLevelDBInteractions(t *testing.T) {
require.Equal(t, uint64(3), obj.Version.Version)
require.False(t, obj.Tombstone())

// Attempt to directly update the object in the database
obj.Data = []byte("directly updated")
obj.Owner = "me"
obj.Version.Parent = nil
obj.Version.Version = 42
obj.Version.Pid = 93
obj.Version.Region = "here"
obj.Version.Tombstone = false
require.NoError(t, db.Update(obj))

obj, err = db.Object(key, options.WithNamespace(namespace))
require.NoError(t, err)
require.Equal(t, uint64(42), obj.Version.Version)
require.Equal(t, uint64(93), obj.Version.Pid)
require.Equal(t, "me", obj.Owner)
require.Equal(t, "here", obj.Version.Region)

// Update with same namespace option should not error.
require.NoError(t, db.Update(obj, options.WithNamespace(namespace)))

// Update with wrong namespace should error
require.Error(t, db.Update(obj, options.WithNamespace("this is not the right thing")))

// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
// on :: and grabbing the first string, so it only
Expand Down
7 changes: 7 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package object

var VersionZero = Version{}

// Tombstone returns true if the version of the object is a Tombstone (for a deleted object)
func (o *Object) Tombstone() bool {
if o.Version == nil {
Expand All @@ -17,6 +19,11 @@ func (v *Version) IsZero() bool {
// IsLater returns true if the specified version is later than the other version. It
// returns false if the other version is later or equal to the specified version.
func (v *Version) IsLater(other *Version) bool {
// If other is nil, then we assume it represents the zero-valued version.
if other == nil {
other = &VersionZero
}

// Version is monotonically increasing, if it's greater than the other, then this
// version is later than the other.
if v.Version > other.Version {
Expand Down
5 changes: 5 additions & 0 deletions object/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func TestTombstone(t *testing.T) {

func TestVersionIsLater(t *testing.T) {
v1 := &Version{Pid: 8, Version: 42}
require.True(t, v1.IsLater(nil))
require.True(t, v1.IsLater(&VersionZero))
require.False(t, VersionZero.IsLater(v1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good checks. Is there any situation where we would end up comparing two VersionZeros? Do we want to have that test case here anyway to validate our logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's possible; I'll add the following:

require.False(t, VersionZero.IsLater(nil))
require.False(t, VersionZero.IsLater(VersionZero))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out that this was a good catch VersionZero.IsLater(nil) was True - so I had to implement a minor change to fix that! Thanks for suggesting the test!

require.False(t, VersionZero.IsLater(nil))
require.False(t, VersionZero.IsLater(&VersionZero))
require.True(t, v1.IsLater(&Version{Pid: 8, Version: 40}))
require.True(t, v1.IsLater(&Version{Pid: 9, Version: 42}))
require.False(t, v1.IsLater(&Version{Pid: 7, Version: 42}))
Expand Down
65 changes: 30 additions & 35 deletions proto/replica/v1/replica.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,37 @@ import "object/v1/object.proto";
//
// The Replication Service requires mTLS authentication in order to conduct exchanges.
service Replication {
// During gossip, the initiating replica sends a randomly selected remote peer the
// version vectors of all objects it currently stores. The remote peer should
// respond with updates that correspond to more recent versions of the objects. The
// remote peer can than also make a reciprocal request for updates by sending the
// set of versions requested that were more recent on the initiating replica, and
// use a partial flag to indicate that it is requesting specific versions. This
// mechanism implements bilateral anti-entropy: a push and pull gossip.
rpc Gossip(VersionVectors) returns (Updates) {}
// Gossip implements biltateral anti-entropy: during a Gossip session the initiating
// replica pushes updates to the remote peer and pulls requested changes. Using
// bidirectional streaming, the initiating peer sends data-less sync messages with
// the versions of objects it stores locally. The remote replica then responds with
// data if its local version is later or sends a sync message back requesting the
// data from the initating replica if its local version is earlier (no exchange)
// occurs if both replicas have the same version. At the end of a gossip session,
// both replicas should have synchronized and have identical underlying data stores.
rpc Gossip(stream Sync) returns (stream Sync) {}
}

// VersionVectors initiate a Gossip RPC by passing along the current state of the
// namespace of objects on the initiating replica. Objects should not have data
// populated in this message, only metadata. If the partial flag is set, that indicates
// that the replica only wants to consider the objects described rather than the entire
// namespace (e.g. objects that may have been created outside the set of objects
// described). The partial flag is typically used as a mechanism to fetch specific
// objects that are known to be later from the remote replica. The namespaces array
// specifies which namespaces should be considered in gossip allowing for partial
// replication by namespace if necessary. If the namespaces array is empty, then all
// namespaces are considered during Gossip.
message VersionVectors {
// Version vectors of objects without data.
repeated honu.object.v1.Object objects = 1;
// Sync messages allow replicas to exchange information in a bidirectional stream.
// Because Sync messages are sent in the stream, there are different synchronization
// types. A "check" synchronization sends only the version vector without data,
// requesting the remote peer to check if they have a later version, and if so, return
// it. A "repair" synchronization sends object data back if the sender's version is
// later. A "complete" synchronization indicates that the sender is fully synchronized,
// and an "error" synchronization message says that there was some failure repairing
// the specified version vector.
message Sync {
// Status indicates what state the synchronization is in between peers, allowing
// the replicas to coordinate between multiple sync messages in a grpc stream.
enum Status {
UNKNOWN = 0; // Should not be used as a status
CHECK = 1; // Sync contains version information only
REPAIR = 2; // Sync contains object data information
COMPLETE = 3; // Sync contains no object information, ready to end Gossip
ERROR = 4; // Sync contains object-specific error information
}

// A request to consider only the objects specified in the version vector and not
// the entire namespace (e.g. ignore objects that are not specified).
bool partial = 2;

// Limit the comparison only to the specified namespace(s). If not set or empty,
// then all namespaces are considered.
repeated string namespaces = 3;
}

// Updates returns data to repair entropy (e.g. to repair the divergence caused by
// updates). Updates should only contain the objects that have later versions and the
// objects should have their data fields populated.
message Updates {
repeated honu.object.v1.Object objects = 1;
Status status = 1; // The status/type of sync messages
honu.object.v1.Object object = 2; // The object being synchronized
string error = 3; // Error information if the object failed to sync
}
Loading