Skip to content

Commit

Permalink
opensearchapi: Add Point in Time integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Hahn <jakob.hahn@hetzner.com>
  • Loading branch information
Jakob3xD committed Mar 24, 2023
1 parent 9c2a3a6 commit a92d115
Showing 1 changed file with 93 additions and 29 deletions.
122 changes: 93 additions & 29 deletions opensearchapi/opensearchapi_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ import (
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
)

func createTestIndex(client *opensearch.Client, index string) error {
var buf bytes.Buffer
// Index data
//
for j := 1; j <= 1000; j++ {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n"))
data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`)
data = append(data, "\n"...)

buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(data)
}
_, err := client.Bulk(bytes.NewReader(buf.Bytes()), client.Bulk.WithIndex(index), client.Bulk.WithRefresh("true"))
return err
}

func TestAPI(t *testing.T) {
t.Run("Search", func(t *testing.T) {
client, err := opensearch.NewDefaultClient()
Expand Down Expand Up @@ -84,8 +101,6 @@ func TestAPI(t *testing.T) {

t.Run("OpaqueID", func(t *testing.T) {
var (
buf bytes.Buffer

res *opensearchapi.Response
err error

Expand All @@ -103,20 +118,10 @@ func TestAPI(t *testing.T) {

// Index data
//
for j := 1; j <= 1000; j++ {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n"))
data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`)
data = append(data, "\n"...)

buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(data)
}
res, err = client.Bulk(bytes.NewReader(buf.Bytes()), client.Bulk.WithIndex("test"), client.Bulk.WithRefresh("true"))
err = createTestIndex(client, "test")
if err != nil {
t.Fatalf("Failed to index data: %s", err)
}
defer res.Body.Close()

// Launch reindexing task with wait_for_completion=false
//
Expand Down Expand Up @@ -228,23 +233,10 @@ func TestAPI(t *testing.T) {

// Index data
//
var buf bytes.Buffer
for j := 1; j <= 1000; j++ {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n"))
data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`)
data = append(data, "\n"...)

buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(data)
}

bulkReq := &opensearchapi.BulkRequest{
Body: bytes.NewReader(buf.Bytes()),
Index: "test",
Refresh: "true",
err = createTestIndex(client, "test")
if err != nil {
t.Fatalf("Failed to index data: %s", err)
}
opensearchDo(ctx, client, bulkReq, "index data", t)

// Test Snapshot functions
//
Expand Down Expand Up @@ -313,4 +305,76 @@ func TestAPI(t *testing.T) {
opensearchDo(ctx, client, sDeleteReq, "delete Snapshots", t)
opensearchDo(ctx, client, iDeleteReq, "index data", t)
})
t.Run("Point_in_Time", func(t *testing.T) {
var (
err error
major, minor int64
data opensearchapi.InfoResp
)
index := "test"

// Create Client
//
client, err := opensearch.NewDefaultClient()
if err != nil {
t.Fatalf("Error creating the client: %s\n", err)
}

// Skip test if Cluster version is below 2.4.0
infoResp, err := client.Info()
if err != nil {
t.Fatalf("Error getting the cluster info: %s\n", err)
}
if err = json.NewDecoder(infoResp.Body).Decode(&data); err != nil {
t.Fatalf("Error parsing the cluster info: %s\n", err)
}
major, minor, _, err = opensearch.ParseVersion(data.Version.Number)
if err != nil {
t.Fatalf("Error parsing the cluster version")
}
if major <= 2 && minor < 4 {
return
}

// Cleanup all existing Pits
//
resp, _, err := client.PointInTime.Delete(client.PointInTime.Delete.WithPitID("_all"))
if err != nil {
if resp != nil && resp.StatusCode != 404 {
t.Fatalf("Failed to Delete all Pits: %s", err)
}
}

// Index data
//
err = createTestIndex(client, index)
if err != nil {
t.Fatalf("Failed to index data: %s", err)
}

// Create a Pit
//
keepAlive, _ := time.ParseDuration("5m")
_, pitCreateResp, err := client.PointInTime.Create(client.PointInTime.Create.WithKeepAlive(keepAlive), client.PointInTime.Create.WithIndex(index))
if err != nil {
t.Fatalf("Failed to create Pit: %s", err)
}

// Get all Pits
//
_, pitGetResp, err := client.PointInTime.Get()
if err != nil {
t.Fatalf("Failed to get Pits: %s", err)
}

// Delete the create Pit
//
_, pitDeleteResp, err := client.PointInTime.Delete(client.PointInTime.Delete.WithPitID(pitCreateResp.PitID))
if err != nil {
t.Fatalf("Failed to delete Pit: %s", err)
}
if (pitCreateResp.PitID != pitGetResp.Pits[0].PitID) || (pitCreateResp.PitID != pitDeleteResp.Pits[0].PitID) {
t.Fatalf("The create Pit does not match the Get Pit or Deleted Pit")
}
})
}

0 comments on commit a92d115

Please sign in to comment.