Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
Feature/elasticsearch routing properly (#423)
Browse files Browse the repository at this point in the history
* Routing added to ensure Parent and Routing is on the same shard

* updated readme

* patching up deletes as well to include routing using the pID

* update tests
added test scenarios where parent-child is updated
added test scenarios where parent-child is deleted
added test scenarios where parent-child _routing is detected and mapped properly
  • Loading branch information
johnjjung authored and jipperinbham committed Sep 30, 2017
1 parent f3388b4 commit 612eda1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 14 deletions.
9 changes: 9 additions & 0 deletions adaptor/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ Be sure to add your [parent-child mapping](https://www.elastic.co/guide/en/elast

Check that after you add your parent-child mapping, that data is getting inserted properly.

**Update**

About Routing:

Elasticsearch recommends that routing values default to `_id` if there are no parent values specified. However, if a `parent` is specified, the parent and child should use the same routing value which is the parent `_id`. In our case, we will be using `parent_id` as our routing value to that it ensures both parent and child documents are on the same shard.

https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-parent-child.html


### Example of Parent-Child Mapping:

This step is manual, you must set your mapping manually using a `PUT` request to Elasticsearch.
Expand Down
8 changes: 7 additions & 1 deletion adaptor/elasticsearch/clients/v5/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,24 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error
// we need to flush any pending writes here or this could fail because we're using
// more than 1 worker
w.bp.Flush()
br = elastic.NewBulkDeleteRequest().Index(w.index).Type(indexType).Id(id)
indexReq := elastic.NewBulkDeleteRequest().Index(w.index).Type(indexType).Id(id)
if pID != "" {
indexReq.Routing(pID)
}
br = indexReq
case ops.Insert:
indexReq := elastic.NewBulkIndexRequest().Index(w.index).Type(indexType).Id(id)
if pID != "" {
indexReq.Parent(pID)
indexReq.Routing(pID)
}
indexReq.Doc(msg.Data())
br = indexReq
case ops.Update:
indexReq := elastic.NewBulkUpdateRequest().Index(w.index).Type(indexType).Id(id)
if pID != "" {
indexReq.Parent(pID)
indexReq.Routing(pID)
}
indexReq.Doc(msg.Data())
br = indexReq
Expand Down
77 changes: 64 additions & 13 deletions adaptor/elasticsearch/clients/v5/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func clearTestData() error {
req, _ := http.NewRequest(http.MethodDelete, fullURL(""), nil)
resp, err := http.DefaultClient.Do(req)
log.Debugf("clearTestData response, %+v", resp)
parentReq, _ := http.NewRequest(http.MethodDelete, parentFullURL(""), nil)
parentResp, err := http.DefaultClient.Do(parentReq)
log.Debugf("clearTestData response, %+v", parentResp)
return err
}
func createMapping() error {
Expand Down Expand Up @@ -78,10 +81,22 @@ func shutdown() {
log.Debugln("tests shutdown complete")
}

type countResponse struct {
type elasticResponse struct {
Count int `json:"count"`
Hits struct {
Hits []struct {
ID string `json:"_id"`
Parent string `json:"_parent"`
Routing string `json:"_routing"`
Name string `json:"name"`
Type string `json:"_type"`
} `json:"hits"`
} `json:"hits"`
}

/**
* This tests non-parent-child insert,update,delete
*/
func TestWriter(t *testing.T) {
confirms, cleanup := adaptor.MockConfirmWrites()
defer adaptor.VerifyWriteConfirmed(cleanup, t)
Expand Down Expand Up @@ -124,13 +139,16 @@ func TestWriter(t *testing.T) {
t.Fatalf("_count request failed, %s", err)
}
defer resp.Body.Close()
var r countResponse
var r elasticResponse
json.NewDecoder(resp.Body).Decode(&r)
if r.Count != 1 {
t.Errorf("mismatched doc count, expected 1, got %d", r.Count)
}
}

/**
* This tests parent-child inserts and updates
*/
func TestWithParentWriter(t *testing.T) {
confirms, cleanup := adaptor.MockConfirmWrites()
defer adaptor.VerifyWriteConfirmed(cleanup, t)
Expand All @@ -140,19 +158,28 @@ func TestWithParentWriter(t *testing.T) {
Index: parentDefaultIndex,
ParentID: "parent_id",
}
// create mapping
createMapping()
vc := clients.Clients["v5"]
w, _ := vc.Creator(opts)
// insert parent
w.Write(
message.WithConfirms(
confirms,
message.From(ops.Insert, "company", map[string]interface{}{"_id": "9g2g", "name": "gingerbreadhouse"})),
)(nil)
// insert child
w.Write(
message.WithConfirms(
confirms,
message.From(ops.Insert, "employee", map[string]interface{}{"_id": "9g6g", "name": "witch", "parent_id": "gingerbreadhouse"})),
)(nil)
// update child
w.Write(
message.WithConfirms(
confirms,
message.From(ops.Update, "employee", map[string]interface{}{"_id": "9g6g", "name": "wickedwitch", "parent_id": "gingerbreadhouse"})),
)(nil)
w.(client.Closer).Close()
if _, err := http.Get(parentFullURL("/_refresh")); err != nil {
t.Fatalf("_refresh request failed, %s", err)
Expand All @@ -163,8 +190,10 @@ func TestWithParentWriter(t *testing.T) {
t.Fatalf("_count request failed, %s", err)
}
defer countResp.Body.Close()
var r countResponse
var r elasticResponse
json.NewDecoder(countResp.Body).Decode(&r)

// both parent and child should've gotten inserted correctly
if r.Count != 2 {
t.Errorf("mismatched doc count, expected 2, got %d", r.Count)
}
Expand All @@ -173,18 +202,40 @@ func TestWithParentWriter(t *testing.T) {
t.Fatalf("_count request failed, %s", err)
}
defer employeeResp.Body.Close()
type Employee struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Parent string `json:"_parent"`
Type string `json:"_type"`
} `json:"hits"`
} `json:"hits"`
}
var par Employee

var par elasticResponse
// decode and make sure that _parent is in the json response
json.NewDecoder(employeeResp.Body).Decode(&par)
if par.Hits.Hits[0].Parent != "gingerbreadhouse" {
t.Errorf("mismatched _parent, got %d", par.Hits.Hits[0].Parent)
}
// decode and make sure that _parent and _routing is in the json response
if par.Hits.Hits[0].Routing != par.Hits.Hits[0].Parent {
t.Errorf("mismatched _routing does not equal _parent, got %d", par.Hits.Hits[0].Parent)
}
// decode and make sure that _parent and _routing is in the json response
if par.Hits.Hits[0].Name == "wickedwitch" {
t.Errorf("mismatched _routing does not equal _parent, got %d", par.Hits.Hits[0].Parent)
}

w2, _ := vc.Creator(opts)
// delete child
w2.Write(
message.WithConfirms(
confirms,
message.From(ops.Delete, "employee", map[string]interface{}{"_id": "9g6g", "name": "wickedwitch", "parent_id": "gingerbreadhouse"})),
)(nil)
w2.(client.Closer).Close()
time.Sleep(1 * time.Second)
deletedCountResp, err := http.Get(parentFullURL("/employee/_count"))
if err != nil {
t.Fatalf("_count request failed, %s", err)
}
defer deletedCountResp.Body.Close()
// make sure count is 1
var dr elasticResponse
json.NewDecoder(deletedCountResp.Body).Decode(&dr)
if dr.Count != 0 {
t.Errorf("mismatched doc count, expected 0, got %d", dr.Count)
}
}

0 comments on commit 612eda1

Please sign in to comment.