Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Adapter: Implements RethinkDB as a source of documents #64

Merged
merged 14 commits into from
Mar 17, 2015

Conversation

alindeman
Copy link
Contributor

Similar to the MongoDB adapter, this proposed RethinkDB source sends all documents in the table, then (if configured via the tail configuration parameter) watches for changes via RethinkDB's Changefeeds feature.

Also implemented is a small change to the ElasticSearch adapter to support Delete operations.

👀 in the form of code review definitely welcomed.

/cc: @brandon-beacher

@nstott
Copy link
Contributor

nstott commented Mar 16, 2015

Hey, thanks for this
Getting rethink working as a source will be great.
One of the things with mongo that's different then rethink, is that with mongo we have the timestamp associated with the operation, so we are able to resume streaming ops from a specific point in time if transporter crashes
From what I understand of rethink replication, we aren't able to get that information from the changeset?

The session handling for this is mostly unfinished right now, so i don't see that as being a blocker to getting this in

@alindeman
Copy link
Contributor Author

One of the things with mongo that's different then rethink, is that with mongo we have the timestamp associated with the operation, so we are able to resume streaming ops from a specific point in time if transporter crashes

You are right, there is no support yet for restarting a changes feed in RethinkDB. There's at least one proposal (rethinkdb/rethinkdb#3471), but it is not yet implemented.

That said, as far as I can tell, this is a potential problem with the MongoDB adapter too. Imagine this scenario:

  • At 00:00, the transporter crashes or loses its connection to MongoDB (for whatever reason: a crash, MongoDB downtime, network issues, hardware issues, etc.)
  • At 00:10, the transporter starts again. The oplogTime is not persisted between runs, but is instead captured as nowAsMongoTimestamp(). ~10 minutes of oplog will not be read, as far as I can tell anyway.

I believe it's mitigated by the fact that the MongoDB source---and this proposed RethinkDB source too---send all documents before tailing the changes feed. Otherwise, we'd need to keep state around between runs of the transporter and implement some kind of snapshotting mechanism. It might be a good idea to consider this at some point, if only because sending all documents is expensive for large tables, but it's also something that would require some architectural rethinking and would make transporter a stateful service, where it's now stateless.

The most obvious place that it falls down is deletions. If the transporter restarts and a document is deleted during the downtime, we will not be notified of it and will therefore not send a Delete operation through the transporter pipe. In the case of MongoDB, it's because we don't rewind the oplog to the time when it crashed, and in the case of RethinkDB, it's because this kind of feature is not supported.

I decided not to tackle the "deletions during downtime" problem in this PR because I think it will require more fundamental architectural changes, and it is already not well supported in MongoDB. I recommend that we 🏈 it to a new PR, though some documentation around that failure scenario might be appropriate in the mean time.

What do you think? :)

@jipperinbham
Copy link
Contributor

That said, as far as I can tell, this is a potential problem with the MongoDB adapter too.

That is true at the moment and some work has begun adaptor-state on being able to optionally add state persistence to transporter but other work and changes to the message.Msg caused us to hold off on it for now.

It might be a good idea to consider this at some point, if only because sending all documents is expensive for large tables, but it's also something that would require some architectural rethinking and would make transporter a stateful service, where it's now stateless.

The current proposal (not really documented other than what has been implemented in the branch) would be that adaptor's would mostly not know or care about State except during startup where the last known good state would be injected into it. The gathering of State happens within the Pipeline and the persistence is something that is controlled within the implemented SessionStore. So, ideally, adaptors would not change much at all other than performing some Resume process on startup.

All that to say, this PR should not be held up by that implementation now and even after we have the concept of State, until (rethinkdb/rethinkdb#3471) is available, the RethinkDB adaptor will be unable to resume from a point in time.

I have a one maybe two suggestions/changes but I'll make them inline relative to the code.

@@ -31,10 +33,24 @@ type Rethinkdb struct {
client *gorethink.Session
}

// rethinkDbConfig provides custom configuration options for the RethinkDB adapter
type rethinkDbConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be publicly accessible so that it can be properly "Registered" here as:

Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, RethinkdbConfig{})

@alindeman
Copy link
Contributor Author

@jipperinbham Thanks for the 👀 I've pushed some changes that I believe address your comments.

@nstott
Copy link
Contributor

nstott commented Mar 17, 2015

👍

nstott added a commit that referenced this pull request Mar 17, 2015
Adapter: Implements RethinkDB as a source of documents
@nstott nstott merged commit 253c2cc into compose:master Mar 17, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants