Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Phase 1/Phase 2 Anti-Entropy and Gossip #299

Merged
merged 9 commits into from
Jan 26, 2022
Merged

Refactor Phase 1/Phase 2 Anti-Entropy and Gossip #299

merged 9 commits into from
Jan 26, 2022

Conversation

bbengfort
Copy link
Collaborator

@bbengfort bbengfort commented Jan 15, 2022

This PR refactors anti-entropy (and hopefully corrects some of the bugs). It is broken into several commits to show the restructuring.

The first commit moves the trtl.ReplicaService into its own package, pkg/trtl/replica so that the new struct is replica.Service. @pdeziel suggested multiple files to break down and organize some of the code, and decoupling replication into its own package allowed us to do this without cluttering the trtl package. It also gives us the opportunity to create a replication-specific go doc (SC-2697). I did my best to follow the conventions we're establishing in trtl - but there is a little bit of weirdness, such as what has to be passed to replica.New in order to allow the decoupling to happen.

The second commit refactors the AntiEntropySync method, breaking out the initiatorPhase1 and initiatorPhase2 go routines for better readability. It also creates a data structure for the streamSender since both AntiEntropySync and Gossip will require this functionality. To make things more understandable, I've removed the unnecessary stream recv go routine, since initiatorPhase2 is the only go routine that receives messages. I've also added mTLS configuration (though this doesn't appear to be working yet) in a connect method. This update eliminates many of the bugs we saw with the first implementation; including not sending all messages (the AntiEntropySync method blocks until all messages are sent), the context canceled error (also fixed by blocking until the sender completes), and the could not select remote peer error - by copying the key []byte into our keys array so that the pointer doesn't change in the for loop. I tested this locally and everything is working smoothly, but still more work to be done!

The third commit pulls in honu PR #18. The refactoring required a change to honu.Update so it checks the version before writing it, otherwise we will introduce the possibility of an inconsistency where a concurrent sync or Put changes the local version while anti-entropy is occurring.

Blocked by #298 (SC-952)

@bbengfort bbengfort force-pushed the sc-2694 branch 3 times, most recently from d704fb8 to 15a04af Compare January 18, 2022 19:52
pkg/trtl/config/config.go Outdated Show resolved Hide resolved
@bbengfort bbengfort changed the title [WIP] Refactor Phase 1/Phase 2 Anti-Entropy and Gossip Refactor Phase 1/Phase 2 Anti-Entropy and Gossip Jan 22, 2022
@codecov-commenter
Copy link

codecov-commenter commented Jan 22, 2022

Codecov Report

Merging #299 (9570054) into main (4528a22) will increase coverage by 2.21%.
The diff coverage is 15.78%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #299      +/-   ##
==========================================
+ Coverage   46.18%   48.39%   +2.21%     
==========================================
  Files          58       56       -2     
  Lines        7397     7044     -353     
==========================================
- Hits         3416     3409       -7     
+ Misses       3347     3002     -345     
+ Partials      634      633       -1     
Impacted Files Coverage Δ
pkg/trtl/trtl.go 75.38% <0.00%> (-0.48%) ⬇️
pkg/trtl/config/config.go 32.29% <6.45%> (-6.46%) ⬇️
pkg/trtl/server.go 36.95% <50.00%> (ø)
pkg/trtl/peers.go 58.13% <100.00%> (+0.99%) ⬆️
pkg/trtl/replica/util.go
pkg/gds/admin.go 55.65% <0.00%> (+0.10%) ⬆️
pkg/sectigo/sectigo.go 42.89% <0.00%> (+0.54%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4528a22...9570054. Read the comment docs.

@bbengfort bbengfort marked this pull request as ready for review January 22, 2022 22:38
Copy link
Collaborator

@rebeccabilbro rebeccabilbro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great @bbengfort! I really like the logic factored out into different files, specifically to help distinguish between client/initiator methods and server/remote methods. I also really appreciate the inline comments and helper function names, which make it really convenient to follow along with the bigger processes.

I have left a handful of notes and ideas based partly on our discussion yesterday (and that weird version-match/value-mismatch bug we noticed) and partly on some possible remaining misconceptions I had about how some terms are used in the AE literature. Thank you in advance for any additional clarification you can provide on those, and for making such a complex process so accessible!

@@ -122,7 +122,6 @@ func (s *trtlTestSuite) TestPeers() {
require.Equal(network[3].Addr, rep.Peers[0].Addr)
require.Equal(network[3].Name, rep.Peers[0].Name)
require.Equal(network[3].Region, rep.Peers[0].Region)
require.NotEqual(rep.Peers[0].Created, rep.Peers[0].Modified)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it was too specific?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that tests executed so fast that the created and modified timestamps were less than a second apart, which meant the test could fail occasionally since the timestamp is only to the second granularity.

pkg/trtl/trtl.go Outdated
Comment on lines 652 to 655
// b64e encodes []byte keys and values as base64 encoded strings suitable for logging.
func b64e(src []byte) string {
return base64.RawURLEncoding.EncodeToString(src)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we could just alias this if we wanted to (as we discovered during our walkthrough)

Comment on lines 69 to 76
// Gossip implements bilateral anti-entropy: during a Gossip session the initiating
// replica pushes updates to the remote peer and pulls requested changes. Using
// bidirectional streaming, the initiating peer sends data-less sync messages with
// the versions of objects it stores locally. The remote replica then responds with
// data if its local version is later or sends a sync message back requesting the
// data from the initiating replica if its local version is earlier. No exchange
// occurs if both replicas have the same version. At the end of a gossip session,
// both replicas should have synchronized and have identical underlying data stores.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this description but I think we might want to move it to the godoc as a description of gossip overall rather than a docstring for the latest version of the Gossip method. Perhaps here the docstring should clarify that Gossip is from the perspective of the remote/receiving replica? Something like:

// Gossip is a server method that responds to anti-entropy requests.
// The initiating replica will engage `Gossip` to enable the remote/receiving
// replica to receive incoming version vectors for all objects in the initiating
// replica's Trtl store in phase one. The final step of phase one triggers phase
// two, when the remote replica responds with data if its local version is later.
// Concurrently with these phases, the remote sends a sync message back 
// requesting data from the initiating replica if its local version is earlier. 

// (so could be considered both phase 1 and phase 3). The difference is that the phase
// is allowed to send messages in the first part (phase 1) but cannot send messages in
// the second part (phase 3). In phase 1, CHECK messages from the initiator are
// responded to with either a mirror CHECK if the initiator version is later, a REPAIR
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a mirror CHECK like an ack message to the client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more that the CHECK is reflected back to the client if the remote's version is earlier than the initiator's and it's requesting a repair. E.g. a mirror check is the following sequence:

initiator: CHECK v(8,4)
remote: I'm at v(8,3) ⟶ CHECK v(8,4)
initiator: REPAIR v(8,4)

// Send a check request back to the initiating replica to fetch its version
sender.Send(sync)
default:
// The versions are equal, do nothing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the versions are equal but the values are not?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be an unexpected and bad situation that would indicate there was a bug in the versioning system or in our deployment. E.g. this situation could occur if we accidentally assigned the same PID to two different replicas. If the versions are equal, that should imply that the values are equal because a new version is assigned every time the value changes.

Comment on lines +201 to +216
// Start phase 1: loop over all objects in the local database and send check
// requests to the remote replica. This is also called the "pull" phase, since we're
// asking the remote for its objects that are later than our own, e.g. pulling the
// objects to this replica from the remote. Phase 1 ends when we've completed
// looping over the local database.
wg.Add(1)
go r.initiatorPhase1(ctx, wg, log, sender)

// Start phase 2: this phase is concurrent with phase 1 since it listens for and
// responds to all messages from the remote replica. This is also called the "push"
// phase, since we're pushing objects back to the remote replica. This phase ends
// when the remote replica sends a COMPLETE message, meaning it is done sending
// messages. At that point, we will no longer send any messages so this phase will
// close the sender go routine, which will stop when all messages have been sent.
wg.Add(1)
go r.initiatorPhase2(ctx, wg, log, sender, stream)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! Possibly I had this backwards before. I was under the impression that phase one was called the push phase because the initiator is pushing (first versions, then data) to the remote. Push is all that happens in the non bilateral AE case, and the remote is the only replica that makes changes in that case. Bilateral AE introduces phase two, the pull phase, where the initiator has the opportunity to receive repairs from the remote.

If I do have that backwards, please let me know and I'll fix the anti-entropy docs I wrote.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have it right. Just to clarify; I believe that push and pull refer to types of gossip where the synchronization is unidirectional:

  1. Pull: initiator is synchronized with remote
  2. Push: remote is synchronized by the initiator
  3. Bilateral: both initiator and remote are synchronized

}

switch sync.Status {
case replica.Sync_CHECK:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the mirror CHECK mentioned in Gossip?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

// NOTE: we must check if it's later than our local version in case a
// concurrent Put (a stomp) or concurrent synchronization updated it.
// NOTE: honu.Update performs the version checking in a transaction.
if err = r.db.Update(sync.Object, options.WithNamespace(sync.Object.Namespace)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right place to track a skip or stomp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is - and now that we have the update type functionality, I've added a TODO here for that tracking.

// NOTE: we must check if it's later than our local version in case a
// concurrent Put (a stomp) or concurrent synchronization updated it.
// NOTE: honu.Update performs the version checking in a transaction.
if err := r.db.Update(sync.Object, options.WithNamespace(sync.Object.Namespace)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is a place where we'd increment skips or stomps? I guess we'll have to change honu to let us know if there was a PID tiebreaker (stomp) or not (skip)?

Comment on lines -203 to -204
GossipInterval: 1 * time.Second,
GossipSigma: 100 * time.Millisecond,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because we just want to use the default values here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're no longer testing the replica in trtl - the tests are in replica, so this configuration was unnecessary. Removed it for clarity.

Copy link
Collaborator

@pdeziel pdeziel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a very good improvement over the previous iteration in terms of understandability, I appreciate the thorough docstrings and splitting it up by role (initiator, remote) makes sense to me since it's somewhat analogous to client/server interactions. I also like that we are passing along the log contexts, which seems like it will be helpful for tracking down the bugs.

// Start phase 1: receive object version vectors from the initiator. This go routine
// will kick off phase 2: sending unchecked versions back to the initiator.
wg.Add(1)
go r.remotePhase1(stream.Context(), wg, logctx, stream, sender)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we had better names for phase1 and phase2. I feel like we have a mutual understanding of what phase1 and phase2 mean after our discussions but it might be difficult for others to understand without the accompanying diagram. Unfortunately, I don't have any better suggestions right now.

Msg("a replication error occurred")
} else {
log.Warn().Str("error", sync.Error).Msg("a replication error occurred")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we continue if there is an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular error is related to a single object - so if the object has been corrupted, an admin may be able to fix it, but we want to continue with the rest of the synchronization so that all other objects are replicated. If this was a critical error, then instead of sending a Sync_ERROR, the initiator would simply close the stream with a gRPC error.

src := iter.Key()
dst := make([]byte, len(src))
copy(dst, src)
keys = append(keys, dst)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the "duplicate" key fix?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants