-
Notifications
You must be signed in to change notification settings - Fork 213
Conversation
559f011
to
39176ad
Compare
41dc0fc
to
021cab2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial look through, I'll give it another look later today
|
||
// Closer takes care of receiving on the done channel and then properly cleaning up | ||
// the session and WaitGroup. | ||
func Closer(done chan struct{}, wg *sync.WaitGroup, s client.Session) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closer
sounds like it should be an interface by its name. Maybe should be Close
type Creator func(chan struct{}, *sync.WaitGroup, *ClientOptions) (client.Writer, error) | ||
|
||
// Clients contains the map of versioned clients | ||
var Clients = map[string]*VersionedClient{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private if we're going to delegate access through Add
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
planning on addressing this in #234
@@ -0,0 +1,36 @@ | |||
package clients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see us having/wanting to use this sort of registry for other db types outside of ES. Let's make an issue for making this generic and have the other adaptors use the registry as well. I guess we would need to be able to get the version information through an API call like we do for ES. Should be possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue created to track that work, #234
var id string | ||
if _, ok := msg.Data()["_id"]; ok { | ||
id = msg.ID() | ||
msg.Data().Delete("_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we have to restore the id
field after the write for nodes further down in the pipeline that may expect _id
to be present? I'm not sure if msg.Data()
returns a copy or not. I think not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.Data()
does not return a copy and I hadn't really thought about the use case for chaining the adaptors in such a way, it does bring up the issue of whether the msg.Data should be immutable because in instances were multiple Sink
adaptors are attached to the same Source
, this code could cause a race condition with a concurrent map read/write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #233 (comment)
} | ||
|
||
func (w *Writer) Close() { | ||
log.Infoln("flushing BulkProcessor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have this be a little more descriptive (that it's an ES bulk processor)
log.Debugln("tests shutdown complete") | ||
} | ||
|
||
type CountResponse struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private
var id string | ||
if _, ok := msg.Data()["_id"]; ok { | ||
id = msg.ID() | ||
msg.Data().Delete("_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as with v1, do we need to restore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed by making a copy of the original msg.Data()
in the adaptor.
@@ -0,0 +1,91 @@ | |||
package v2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you're aware of all the code duplication, just want to make sure we track it in an issue to investigate clever ways to do the versioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
hostBits := strings.Split(e.uri.Host, ":") | ||
if len(hostBits) > 1 { | ||
client.SetPort(hostBits[1]) | ||
for _, vc := range clients.Clients { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make the registry generic, I think a function to return the proper client given the adaptor type and version would be better than having this logic in the adaptor. Even for just elasticsearch, I think it would be better to have this in the registry.
@@ -96,6 +98,10 @@ func (l logger) Errorf(format string, args ...interface{}) { | |||
l.entry.Errorf(format, args...) | |||
} | |||
|
|||
func (l logger) Printf(format string, args ...interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary if we have logger.Infof
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to fulfill the Logger
interface in the new elasticsearch lib, https://github.com/olivere/elastic/wiki/Logging
…ls Flush for Bulk, add es to travis
…st not do delete via bulk
…empty before flushing
c239adb
to
d9c82d1
Compare
} | ||
|
||
func newWriter(client *elastic.Client, done chan struct{}, wg *sync.WaitGroup) *Writer { | ||
p, _ := client.BulkProcessor(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what we'd do with the errors, but right now we're not catching any of the errors from the BulkProcessor. You can use a BulkAfterFunc and .After(callback) to get at the errors though. Would be bad to throw away errors without any output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, thanks for the reminder, I wanted to log any errors so I'll get that in now
BulkActions(1000). // commit if # requests >= 1000 | ||
BulkSize(2 << 20). // commit if size of requests >= 2 MB | ||
FlushInterval(30 * time.Second). // commit every 30s | ||
Do() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here with the error handling
e.pipe.Stop() | ||
e.indexer.Stop() | ||
} | ||
log.With("path", e.path).Infoln("adaptor Stopping...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i noticed the e.path there, it will show the adaptor type, all good.
|
||
func newTransport(accessKeyID, secretAccessKey string) http.RoundTripper { | ||
t := http.DefaultTransport | ||
if accessKeyID != "" && secretAccessKey != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any way to test the creds and error sooner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would we be testing here? when adding support for AWS signed auth, this seemed like the best/cleanest route to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was just wondering if there was a way to bail out early if the creds were bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha, I honestly don't know
addresses multiple issues with sending data to elasticsearch, handling of _id, major API version differences, and performance fixes #209
switch from
elastigo
toelastic
library and create underlying clients for each major version of elasticsearch.fixes #209, #222, #167, and #159