-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Phase 1/Phase 2 Anti-Entropy and Gossip #299
Conversation
d704fb8
to
15a04af
Compare
Codecov Report
@@ Coverage Diff @@
## main #299 +/- ##
==========================================
+ Coverage 46.18% 48.39% +2.21%
==========================================
Files 58 56 -2
Lines 7397 7044 -353
==========================================
- Hits 3416 3409 -7
+ Misses 3347 3002 -345
+ Partials 634 633 -1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great @bbengfort! I really like the logic factored out into different files, specifically to help distinguish between client/initiator methods and server/remote methods. I also really appreciate the inline comments and helper function names, which make it really convenient to follow along with the bigger processes.
I have left a handful of notes and ideas based partly on our discussion yesterday (and that weird version-match/value-mismatch bug we noticed) and partly on some possible remaining misconceptions I had about how some terms are used in the AE literature. Thank you in advance for any additional clarification you can provide on those, and for making such a complex process so accessible!
@@ -122,7 +122,6 @@ func (s *trtlTestSuite) TestPeers() { | |||
require.Equal(network[3].Addr, rep.Peers[0].Addr) | |||
require.Equal(network[3].Name, rep.Peers[0].Name) | |||
require.Equal(network[3].Region, rep.Peers[0].Region) | |||
require.NotEqual(rep.Peers[0].Created, rep.Peers[0].Modified) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because it was too specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem was that tests executed so fast that the created and modified timestamps were less than a second apart, which meant the test could fail occasionally since the timestamp is only to the second granularity.
pkg/trtl/trtl.go
Outdated
// b64e encodes []byte keys and values as base64 encoded strings suitable for logging. | ||
func b64e(src []byte) string { | ||
return base64.RawURLEncoding.EncodeToString(src) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we could just alias this if we wanted to (as we discovered during our walkthrough)
pkg/trtl/replica/replica.go
Outdated
// Gossip implements bilateral anti-entropy: during a Gossip session the initiating | ||
// replica pushes updates to the remote peer and pulls requested changes. Using | ||
// bidirectional streaming, the initiating peer sends data-less sync messages with | ||
// the versions of objects it stores locally. The remote replica then responds with | ||
// data if its local version is later or sends a sync message back requesting the | ||
// data from the initiating replica if its local version is earlier. No exchange | ||
// occurs if both replicas have the same version. At the end of a gossip session, | ||
// both replicas should have synchronized and have identical underlying data stores. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this description but I think we might want to move it to the godoc as a description of gossip overall rather than a docstring for the latest version of the Gossip
method. Perhaps here the docstring should clarify that Gossip
is from the perspective of the remote/receiving replica? Something like:
// Gossip is a server method that responds to anti-entropy requests.
// The initiating replica will engage `Gossip` to enable the remote/receiving
// replica to receive incoming version vectors for all objects in the initiating
// replica's Trtl store in phase one. The final step of phase one triggers phase
// two, when the remote replica responds with data if its local version is later.
// Concurrently with these phases, the remote sends a sync message back
// requesting data from the initiating replica if its local version is earlier.
// (so could be considered both phase 1 and phase 3). The difference is that the phase | ||
// is allowed to send messages in the first part (phase 1) but cannot send messages in | ||
// the second part (phase 3). In phase 1, CHECK messages from the initiator are | ||
// responded to with either a mirror CHECK if the initiator version is later, a REPAIR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a mirror CHECK like an ack message to the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more that the CHECK is reflected back to the client if the remote's version is earlier than the initiator's and it's requesting a repair. E.g. a mirror check is the following sequence:
initiator: CHECK v(8,4)
remote: I'm at v(8,3) ⟶ CHECK v(8,4)
initiator: REPAIR v(8,4)
// Send a check request back to the initiating replica to fetch its version | ||
sender.Send(sync) | ||
default: | ||
// The versions are equal, do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the versions are equal but the values are not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be an unexpected and bad situation that would indicate there was a bug in the versioning system or in our deployment. E.g. this situation could occur if we accidentally assigned the same PID to two different replicas. If the versions are equal, that should imply that the values are equal because a new version is assigned every time the value changes.
// Start phase 1: loop over all objects in the local database and send check | ||
// requests to the remote replica. This is also called the "pull" phase, since we're | ||
// asking the remote for its objects that are later than our own, e.g. pulling the | ||
// objects to this replica from the remote. Phase 1 ends when we've completed | ||
// looping over the local database. | ||
wg.Add(1) | ||
go r.initiatorPhase1(ctx, wg, log, sender) | ||
|
||
// Start phase 2: this phase is concurrent with phase 1 since it listens for and | ||
// responds to all messages from the remote replica. This is also called the "push" | ||
// phase, since we're pushing objects back to the remote replica. This phase ends | ||
// when the remote replica sends a COMPLETE message, meaning it is done sending | ||
// messages. At that point, we will no longer send any messages so this phase will | ||
// close the sender go routine, which will stop when all messages have been sent. | ||
wg.Add(1) | ||
go r.initiatorPhase2(ctx, wg, log, sender, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! Possibly I had this backwards before. I was under the impression that phase one was called the push phase because the initiator is pushing (first versions, then data) to the remote. Push is all that happens in the non bilateral AE case, and the remote is the only replica that makes changes in that case. Bilateral AE introduces phase two, the pull phase, where the initiator has the opportunity to receive repairs from the remote.
If I do have that backwards, please let me know and I'll fix the anti-entropy docs I wrote.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have it right. Just to clarify; I believe that push and pull refer to types of gossip where the synchronization is unidirectional:
- Pull: initiator is synchronized with remote
- Push: remote is synchronized by the initiator
- Bilateral: both initiator and remote are synchronized
} | ||
|
||
switch sync.Status { | ||
case replica.Sync_CHECK: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the mirror CHECK mentioned in Gossip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
pkg/trtl/replica/sync.go
Outdated
// NOTE: we must check if it's later than our local version in case a | ||
// concurrent Put (a stomp) or concurrent synchronization updated it. | ||
// NOTE: honu.Update performs the version checking in a transaction. | ||
if err = r.db.Update(sync.Object, options.WithNamespace(sync.Object.Namespace)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right place to track a skip or stomp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is - and now that we have the update type functionality, I've added a TODO here for that tracking.
pkg/trtl/replica/replica.go
Outdated
// NOTE: we must check if it's later than our local version in case a | ||
// concurrent Put (a stomp) or concurrent synchronization updated it. | ||
// NOTE: honu.Update performs the version checking in a transaction. | ||
if err := r.db.Update(sync.Object, options.WithNamespace(sync.Object.Namespace)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is a place where we'd increment skips or stomps? I guess we'll have to change honu to let us know if there was a PID tiebreaker (stomp) or not (skip)?
GossipInterval: 1 * time.Second, | ||
GossipSigma: 100 * time.Millisecond, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because we just want to use the default values here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we're no longer testing the replica in trtl
- the tests are in replica
, so this configuration was unnecessary. Removed it for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a very good improvement over the previous iteration in terms of understandability, I appreciate the thorough docstrings and splitting it up by role (initiator, remote) makes sense to me since it's somewhat analogous to client/server interactions. I also like that we are passing along the log contexts, which seems like it will be helpful for tracking down the bugs.
// Start phase 1: receive object version vectors from the initiator. This go routine | ||
// will kick off phase 2: sending unchecked versions back to the initiator. | ||
wg.Add(1) | ||
go r.remotePhase1(stream.Context(), wg, logctx, stream, sender) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we had better names for phase1 and phase2. I feel like we have a mutual understanding of what phase1 and phase2 mean after our discussions but it might be difficult for others to understand without the accompanying diagram. Unfortunately, I don't have any better suggestions right now.
Msg("a replication error occurred") | ||
} else { | ||
log.Warn().Str("error", sync.Error).Msg("a replication error occurred") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we continue if there is an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular error is related to a single object - so if the object has been corrupted, an admin may be able to fix it, but we want to continue with the rest of the synchronization so that all other objects are replicated. If this was a critical error, then instead of sending a Sync_ERROR
, the initiator would simply close the stream with a gRPC error.
src := iter.Key() | ||
dst := make([]byte, len(src)) | ||
copy(dst, src) | ||
keys = append(keys, dst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the "duplicate" key fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is!
This PR refactors anti-entropy (and hopefully corrects some of the bugs). It is broken into several commits to show the restructuring.
The first commit moves the
trtl.ReplicaService
into its own package,pkg/trtl/replica
so that the new struct isreplica.Service
. @pdeziel suggested multiple files to break down and organize some of the code, and decoupling replication into its own package allowed us to do this without cluttering thetrtl
package. It also gives us the opportunity to create a replication-specific go doc (SC-2697). I did my best to follow the conventions we're establishing intrtl
- but there is a little bit of weirdness, such as what has to be passed toreplica.New
in order to allow the decoupling to happen.The second commit refactors the
AntiEntropySync
method, breaking out theinitiatorPhase1
andinitiatorPhase2
go routines for better readability. It also creates a data structure for thestreamSender
since bothAntiEntropySync
andGossip
will require this functionality. To make things more understandable, I've removed the unnecessary stream recv go routine, sinceinitiatorPhase2
is the only go routine that receives messages. I've also added mTLS configuration (though this doesn't appear to be working yet) in aconnect
method. This update eliminates many of the bugs we saw with the first implementation; including not sending all messages (theAntiEntropySync
method blocks until all messages are sent), thecontext canceled
error (also fixed by blocking until the sender completes), and the could not select remote peer error - by copying the key[]byte
into our keys array so that the pointer doesn't change in thefor
loop. I tested this locally and everything is working smoothly, but still more work to be done!The third commit pulls in honu PR #18. The refactoring required a change to
honu.Update
so it checks the version before writing it, otherwise we will introduce the possibility of an inconsistency where a concurrent sync or Put changes the local version while anti-entropy is occurring.Blocked by #298 (SC-952)