From f9f678b4582a64f4ad573d1a18717d5100e51c3c Mon Sep 17 00:00:00 2001 From: Jakob Hahn Date: Thu, 23 Mar 2023 16:34:10 +0100 Subject: [PATCH] opensearchapi: Add Point in Time integration test Signed-off-by: Jakob Hahn --- .../opensearchapi_integration_test.go | 79 ++++++++++++++++--- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/opensearchapi/opensearchapi_integration_test.go b/opensearchapi/opensearchapi_integration_test.go index 8690b112b..ab43e7bc1 100644 --- a/opensearchapi/opensearchapi_integration_test.go +++ b/opensearchapi/opensearchapi_integration_test.go @@ -41,6 +41,23 @@ import ( "github.com/opensearch-project/opensearch-go/v2/opensearchapi" ) +func createTestIndex(client *opensearch.Client, index string) error { + var buf bytes.Buffer + // Index data + // + for j := 1; j <= 1000; j++ { + meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n")) + data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`) + data = append(data, "\n"...) + + buf.Grow(len(meta) + len(data)) + buf.Write(meta) + buf.Write(data) + } + _, err := client.Bulk(bytes.NewReader(buf.Bytes()), client.Bulk.WithIndex(index), client.Bulk.WithRefresh("true")) + return err +} + func TestAPI(t *testing.T) { t.Run("Search", func(t *testing.T) { client, err := opensearch.NewDefaultClient() @@ -82,8 +99,6 @@ func TestAPI(t *testing.T) { t.Run("OpaqueID", func(t *testing.T) { var ( - buf bytes.Buffer - res *opensearchapi.Response err error @@ -101,20 +116,10 @@ func TestAPI(t *testing.T) { // Index data // - for j := 1; j <= 1000; j++ { - meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n")) - data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`) - data = append(data, "\n"...) - - buf.Grow(len(meta) + len(data)) - buf.Write(meta) - buf.Write(data) - } - res, err = client.Bulk(bytes.NewReader(buf.Bytes()), client.Bulk.WithIndex("test"), client.Bulk.WithRefresh("true")) + err = createTestIndex(client, "test") if err != nil { t.Fatalf("Failed to index data: %s", err) } - defer res.Body.Close() // Launch reindexing task with wait_for_completion=false // @@ -192,4 +197,52 @@ func TestAPI(t *testing.T) { } } }) + t.Run("Point_in_Time", func(t *testing.T) { + var err error + index := "test" + // Create Client + // + client, err := opensearch.NewDefaultClient() + if err != nil { + t.Fatalf("Error creating the client: %s\n", err) + } + resp, _, err := client.PointInTime.Delete(client.PointInTime.Delete.WithAllPits(true)) + if err != nil { + if resp != nil && resp.StatusCode != 404 { + t.Fatalf("Failed to Delete all Pits: %s", err) + } + } + + // Index data + // + err = createTestIndex(client, index) + if err != nil { + t.Fatalf("Failed to index data: %s", err) + } + + // Create a Pit + // + keepAlive, _ := time.ParseDuration("5m") + _, pitCreateResp, err := client.PointInTime.Create(client.PointInTime.Create.WithKeepAlive(keepAlive), client.PointInTime.Create.WithIndex(index)) + if err != nil { + t.Fatalf("Failed to create Pit: %s", err) + } + + // Get all Pits + // + _, pitGetResp, err := client.PointInTime.Get() + if err != nil { + t.Fatalf("Failed to get Pits: %s", err) + } + + // Delete the create Pit + // + _, pitDeleteResp, err := client.PointInTime.Delete(client.PointInTime.Delete.WithPitID(pitCreateResp.PitID)) + if err != nil { + t.Fatalf("Failed to delete Pit: %s", err) + } + if (pitCreateResp.PitID != pitGetResp.Pits[0].PitID) || (pitCreateResp.PitID != pitDeleteResp.Pits[0].PitID) { + t.Fatalf("The create Pit does not match the Get Pit or Deleted Pit") + } + }) }