Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low-level Java client #905

Closed
RGB314 opened this issue Mar 27, 2024 · 12 comments
Closed

Low-level Java client #905

RGB314 opened this issue Mar 27, 2024 · 12 comments
Labels
enhancement New feature or request untriaged

Comments

@RGB314
Copy link

RGB314 commented Mar 27, 2024

Is your feature request related to a problem?

Java client - OpenSearch Documentation says…

The OpenSearch Java client allows you to interact with your OpenSearch clusters through Java methods and data structures rather than HTTP methods and raw JSON.

What solution would you like?

So is there another (low-level) client which would allow me to interact with raw JSON requests and responses?

What alternatives have you considered?

Not available

Do you have any additional context?

I'd like to search where this API client will help in improving latency by avoiding serialisation and deserialisation.

@RGB314 RGB314 added enhancement New feature or request untriaged labels Mar 27, 2024
@dblock
Copy link
Member

dblock commented Mar 27, 2024

If you don't want to serialize/deserialize, any HTTP client can help you make HTTPs raw JSON requests, apache http client, etc. (https://stackoverflow.com/questions/1359689/how-to-send-http-request-in-java). There's nothing really special about OpenSearch as a web server.

Better raw JSON support in this client is #257 and #377, closing this one as a dup.

@RGB314
Copy link
Author

RGB314 commented Apr 8, 2024

@dblock
I was able to connect and then search using the Apache HTTP client. I can make the POST request work for _search (with org.opensearch.client.opensearch.core.SearchRequest) but not for _msearch as the latter request fails with the error: The msearch request must be terminated by a newline [\n].

Here's the simplified code:

import org.apache.http.entity.ByteArrayEntity
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.CloseableHttpClient

val data = the msearch request
// either as a "org.opensearch.client.opensearch.core.MsearchRequest" object or the multiline JSON string as shown in https://opensearch.org/docs/2.13/api-reference/multi-search/
val baos = ByteArrayOutputStream()
val generator = jsonpMapper.jsonProvider().createGenerator(baos)
jsonpMapper.serialize(data, generator)
generator.close()

val httpPost = HttpPost("/_msearch")
httpPost.entity = ByteArrayEntity(baos.toByteArray(), APPLICATION_JSON)
closeableHttpClient.execute(httpPost)

Things to note

  1. In case of the multiline JSON string, it works as-is on the dashboard but not in the code.
  2. The same org.opensearch.client.opensearch.core.MsearchRequest object works for org.opensearch.client.opensearch.OpenSearchClient#msearch(org.opensearch.client.opensearch.core.MsearchRequest, java.lang.Class<TDocument>)

What could I be missing here?

@dblock
Copy link
Member

dblock commented Apr 8, 2024

I believe msearch requires line-delimited JSON, which is just a fancy version of multiple lines of json (itself it's not valid json), so you want to get each line of json, then concatenate them.

Want to put a project up similar to https://github.com/dblock/opensearch-java-client-demo that has a working sample for regular search like the one above, and a non-working one for msearch? I can try to fix it.

@RGB314
Copy link
Author

RGB314 commented Apr 8, 2024

Please find the sample Kotlin code below...

Apache HTTP client config:

import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor
import org.apache.http.HttpRequestInterceptor
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.auth.signer.Aws4Signer
import software.amazon.awssdk.regions.Region

val httpRequestInterceptor: HttpRequestInterceptor = AwsRequestSigningApacheInterceptor(
            "es",
            Aws4Signer.create(),
            DefaultCredentialsProvider.create(),
            Region.US_EAST_1
        )

val httpClient: CloseableHttpClient = HttpClients.custom()
            .addInterceptorLast(httpRequestInterceptor)
            .build()

OpenSearchClient config:

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import org.opensearch.client.json.JsonpMapper
import org.opensearch.client.json.jackson.JacksonJsonpMapper
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.aws.AwsSdk2Transport
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region
import java.io.ByteArrayOutputStream

val objectMapper = ObjectMapper()
val jsonFactory: JsonFactory by lazy { objectMapper.factory }
val jsonpMapper: JsonpMapper by lazy { JacksonJsonpMapper(objectMapper, jsonFactory) }

val transportOptions = AwsSdk2TransportOptions.builder()
    .setMapper(jsonpMapper)
    .setResponseCompression(true)
    .build()

val sdkHttpClient = ApacheHttpClient.builder()
    .maxConnections(50)
    .tcpKeepAlive(true)
    .expectContinueEnabled(true)
    .build()

val awsSdk2Transport = AwsSdk2Transport(
    sdkHttpClient,
    "host",
    Region.US_EAST_1,
    transportOptions
)
val openSearchClient = OpenSearchClient(
    awsSdk2Transport,
    transportOptions
)

fun toByteArray(data: Any): ByteArray {
    val baos = ByteArrayOutputStream()
    val generator = jsonpMapper.jsonProvider().createGenerator(baos)
    jsonpMapper.serialize(data, generator)
    generator.close()
    return baos.toByteArray()
}

MsearchRequest creation:

import org.opensearch.client.opensearch._types.SearchType.DfsQueryThenFetch
import org.opensearch.client.opensearch._types.query_dsl.MultiMatchQuery
import org.opensearch.client.opensearch._types.query_dsl.Query
import org.opensearch.client.opensearch._types.query_dsl.TextQueryType.BoolPrefix
import org.opensearch.client.opensearch.core.MsearchRequest
import org.opensearch.client.opensearch.core.msearch.MultisearchBody
import org.opensearch.client.opensearch.core.msearch.MultisearchHeader
import org.opensearch.client.opensearch.core.msearch.RequestItem

fun msearchRequest(
    indices: List<String>,
    input: String
): MsearchRequest {
    val requestItems = mutableListOf<RequestItem>()
    for (index in indices) {
        val mmq = MultiMatchQuery.Builder()
            .query(input)
            .type(BoolPrefix)
            .fields(listOf("f1", "f2", "so on"))
            .build()
        val query = Query.Builder().multiMatch(mmq).build()
        val header = MultisearchHeader.Builder().index(index).build()
        val body = MultisearchBody.Builder().trackScores(true).query(query).build()
        requestItems.add(RequestItem.Builder().header(header).body(body).build())
    }
    val mSearchRequest = MsearchRequest.Builder()
        .index(indices)
        .searchType(DfsQueryThenFetch)
        .searches(requestItems)
        .build()
    return mSearchRequest
}

Working test:

import com.fasterxml.jackson.databind.node.ObjectNode
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.opensearch.core.MsearchResponse

val response: MsearchResponse<out ObjectNode> = openSearchClient.msearch(msearchRequest, ObjectNode::class.java)

Failing tests:

import org.apache.http.entity.ByteArrayEntity
import org.apache.http.entity.ContentType

val httpPost1 = HttpPost("https://.../_msearch")
httpPost1.entity = ByteArrayEntity(toByteArray(msearchRequest), ContentType.APPLICATION_JSON)
val response1: CloseableHttpResponse = httpClient.execute(httpPost1) // FAILS

val request = StringBuilder()
request.append("{\"index\": \"index1\", \"search_type\": \"dfs_query_then_fetch\"}")
request.append("{\"query\":{\"multi_match\":{\"query\":\"myquery\",\"type\":\"bool_prefix\",\"fields\":[\"f1\",\"f2\"],\"operator\":\"and\"}}}")
request.append("\n")
request.append("{\"index\": \"index2\", \"search_type\": \"dfs_query_then_fetch\"}")
request.append("{\"query\":{\"multi_match\":{\"query\":\"myquery\",\"type\":\"bool_prefix\",\"fields\":[\"f3\",\"f4\"],\"operator\":\"and\"}}}")
request.append("\n")

val httpPost2 = HttpPost("https://.../_msearch")
httpPost2.entity = ByteArrayEntity(toByteArray(request.toString()), ContentType.APPLICATION_JSON)
val response2: CloseableHttpResponse = httpClient.execute(httpPost2) // FAILS

@dblock
Copy link
Member

dblock commented Apr 8, 2024

I don't see anything wrong. Could you please make a new GitHub repo with working code that I can git clone and run with some environment variables? I promise to look at that.

@RGB314
Copy link
Author

RGB314 commented Apr 8, 2024

Thanks a lot!

I have put all the sample code in this test class which you can just simply execute as 3 different tests. Apologies for not being able to create a repo (as I won't be able to push/upload the code from my company's network).

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor
import org.apache.http.HttpRequestInterceptor
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.ByteArrayEntity
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Test
import org.opensearch.client.json.JsonpMapper
import org.opensearch.client.json.jackson.JacksonJsonpMapper
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.opensearch._types.SearchType.DfsQueryThenFetch
import org.opensearch.client.opensearch._types.query_dsl.MultiMatchQuery
import org.opensearch.client.opensearch._types.query_dsl.Query
import org.opensearch.client.opensearch._types.query_dsl.TextQueryType.BoolPrefix
import org.opensearch.client.opensearch.core.MsearchRequest
import org.opensearch.client.opensearch.core.msearch.MultisearchBody
import org.opensearch.client.opensearch.core.msearch.MultisearchHeader
import org.opensearch.client.opensearch.core.msearch.RequestItem
import org.opensearch.client.transport.aws.AwsSdk2Transport
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
import software.amazon.awssdk.auth.signer.Aws4Signer
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region.US_EAST_1
import software.amazon.awssdk.utils.IoUtils.toUtf8String
import java.io.ByteArrayOutputStream

class MyMultiSearchTest {

    private val msearchRequest = msearchRequest(listOf("index1", "index2", "index3"), "AA")
    private val host = "cluster.host.name.com"
    private val uri = "https://$host/_msearch"

    private val httpRequestInterceptor: HttpRequestInterceptor = AwsRequestSigningApacheInterceptor(
        "es",
        Aws4Signer.create(),
        ProfileCredentialsProvider.builder().profileName("abcd").build(),
        US_EAST_1
    )

    private val httpClient: CloseableHttpClient = HttpClients.custom()
        .addInterceptorLast(httpRequestInterceptor)
        .build()

    private val objectMapper = ObjectMapper()
    private val jsonFactory: JsonFactory by lazy { objectMapper.factory }
    private val jsonpMapper: JsonpMapper by lazy { JacksonJsonpMapper(objectMapper, jsonFactory) }

    private val transportOptions = AwsSdk2TransportOptions.builder()
        .setMapper(jsonpMapper)
        .setResponseCompression(true)
        .build()

    private val sdkHttpClient = ApacheHttpClient.builder()
        .maxConnections(50)
        .tcpKeepAlive(true)
        .expectContinueEnabled(true)
        .build()

    private val awsSdk2Transport = AwsSdk2Transport(
        sdkHttpClient,
        host,
        US_EAST_1,
        transportOptions
    )
    val openSearchClient = OpenSearchClient(
        awsSdk2Transport,
        transportOptions
    )

    private fun toByteArray(data: Any): ByteArray {
        val baos = ByteArrayOutputStream()
        val generator = jsonpMapper.jsonProvider().createGenerator(baos)
        jsonpMapper.serialize(data, generator)
        generator.close()
        return baos.toByteArray()
    }

    private fun msearchRequest(
        indices: List<String>,
        input: String
    ): MsearchRequest {
        val requestItems = mutableListOf<RequestItem>()
        for (index in indices) {
            val mmq = MultiMatchQuery.Builder()
                .query(input)
                .type(BoolPrefix)
                .fields(lookupSearchFieldsByIndexName(index))
                .build()
            val query = Query.Builder().multiMatch(mmq).build()
            val header = MultisearchHeader.Builder().index(index).build()
            val body = MultisearchBody.Builder().trackScores(true).query(query).build()
            requestItems.add(RequestItem.Builder().header(header).body(body).build())
        }
        val mSearchRequest = MsearchRequest.Builder()
            .index(indices)
            .searchType(DfsQueryThenFetch)
            .searches(requestItems)
            .build()
        return mSearchRequest
    }

    @Test
    fun testOpenSearchClient() {
        val response = openSearchClient.msearch(msearchRequest, ObjectNode::class.java)
        assertNotNull(response)
        assertEquals(3, response.responses().size)
    }

    @Test
    fun testHttpClient1() {
        val request = HttpPost(uri)
        request.entity = ByteArrayEntity(toByteArray(msearchRequest), APPLICATION_JSON)
        val response = httpClient.execute(request)
        val content = response.entity.content
        println(toUtf8String(content))
        content.close()
        assertEquals(200, response.statusLine.statusCode)
        // {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The msearch request must be terminated by a newline [\n]"}],"type":"illegal_argument_exception","reason":"The msearch request must be terminated by a newline [\n]"},"status":400}
    }

    @Test
    fun testHttpClient2() {
        val query = StringBuilder()
        query.append("{\"index\": \"index1\", \"search_type\": \"dfs_query_then_fetch\"}")
        query.append("{\"query\":{\"multi_match\":{\"query\":\"AA\",\"type\":\"bool_prefix\",\"fields\":[\"field1\"],\"operator\":\"and\"}}}")
        query.append("\n")
        query.append("{\"index\": \"index2\", \"search_type\": \"dfs_query_then_fetch\"}")
        query.append("{\"query\":{\"multi_match\":{\"query\":\"AA\",\"type\":\"bool_prefix\",\"fields\":[\"field1^2\",\"field2^2\",\"field3\",\"field4\"],\"operator\":\"and\"}}}")
        query.append("\n")
        query.append("{\"index\": \"index3\", \"search_type\": \"dfs_query_then_fetch\"}")
        query.append("{\"query\":{\"multi_match\":{\"query\":\"AA\",\"type\":\"bool_prefix\",\"fields\":[\"field1\"],\"operator\":\"and\"}}}")
        query.append("\n")
        val request = HttpPost(uri)
        request.entity = ByteArrayEntity(toByteArray(query.toString()), APPLICATION_JSON)
        val response = httpClient.execute(request)
        val content = response.entity.content
        println(toUtf8String(content))
        content.close()
        assertEquals(200, response.statusLine.statusCode)
        // {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The msearch request must be terminated by a newline [\n]"}],"type":"illegal_argument_exception","reason":"The msearch request must be terminated by a newline [\n]"},"status":400}
    }
}

UPDATE: pom.xml

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-bom</artifactId>
                <version>1.9.22</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.21.7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson</groupId>
                <artifactId>jackson-bom</artifactId>
                <version>2.14.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


        <dependency>
            <groupId>org.opensearch.client</groupId>
            <artifactId>opensearch-java</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.github.acm19</groupId>
            <artifactId>aws-request-signing-apache-interceptor</artifactId>
            <version>2.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib-jdk8</artifactId>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib-common</artifactId>
        </dependency>

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sdk-core</artifactId>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>auth</artifactId>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>apache-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-parameter-names</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

dblock added a commit to dblock/opensearch-kotlin-client-demo that referenced this issue Apr 10, 2024
Signed-off-by: dblock <dblock@amazon.com>
@dblock
Copy link
Member

dblock commented Apr 10, 2024

@RGB314 Your code is missing lookupSearchFieldsByIndexName, and after I hacked that I got many Overload resolution ambiguity errors. I put this code in https://github.com/dblock/opensearch-kotlin-java-demo. Please narrow it down (make PRs) to something I can just run.

@RGB314
Copy link
Author

RGB314 commented Apr 11, 2024

@dblock Thanks. I have raised this PR.

@dblock
Copy link
Member

dblock commented Apr 11, 2024

@RGB314 the code is trying to convert the multiple lines of JSON (line-delimited JSON) into a byte array, which is not what the server expects, use a StringEntity to keep it as a string.

dblock/opensearch-kotlin-client-demo@0fdcd11#diff-aecfd6054b44df2f66b1c5f4d0d004601a92282f5f0d10e416ab896864b5b5f0R174

request.entity = StringEntity(query.toString(), ContentType.create("application/json", "UTF-8"));

This runs the second test that uses StringBuilder for me.

For the first one you need to convert msearchRequest to ldjson like so. Post your code when you have that working.

@RGB314
Copy link
Author

RGB314 commented Apr 12, 2024

@dblock Thanks a lot!

@RGB314
Copy link
Author

RGB314 commented Apr 17, 2024

Quick point on #910:
I think sending requests as strings (as mentioned in generic.md) would not allow users to see which request has what mandatory or non-null fields. They will only find out so when an error response is received. What do you think?

@dblock
Copy link
Member

dblock commented Apr 17, 2024

@RGB314 Right, this is pure JSON in JSON out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request untriaged
Projects
None yet
Development

No branches or pull requests

2 participants