Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors #1

Merged
merged 18 commits into from
Dec 9, 2022

Conversation

reta
Copy link
Member

@reta reta commented Sep 23, 2022

Signed-off-by: Andriy Redko andriy.redko@aiven.io

What is the purpose of the change

The goal of this change is to provide dedicated Opensearch connectors [1], [2], [3], [4].

Brief change log

The implementation is largely based on the existing Elasticsearch 7 connector with a few notable changes (besides the dependencies and APIs):

  • any mentions and uses of mapping types have been removed: it is deprecated feature, scheduled for removal (the indices with mapping types cannot be created or migrated to Opensearch 1.x and beyond)
  • any mentions and uses have been removed: it is deprecated feature, scheduled for removal (only HighLevelRestClient is used)
  • the default distributions of Opensearch come with HTTPS turned on, using self-signed certificates: to simplify the integration a new option allow-insecure has been added to suppress certificates validation for development and testing purposes
  • old streaming APIs are also supported to facilitate the migration of existing applications from Elasticsearch 7/6 to Opensearch (the classes will change but the familiar model will stay)

The new connector name is opensearch and it follows the existing conventions:

CREATE TABLE users ( ... ) WITH (
  'connector' = 'opensearch', 
  'hosts' = 'https://localhost:9200',
  'index' = 'users', 
  'allow-insecure' = 'true', 
  'username' = 'admin', 
  'password' = 'admin');

Verifying this change

This change added comprehensive tests and can be verified as follows (largely ported the existing unit and integration tests for Elasticsearch 7):

  • Added unit tests
  • Added integration tests for end-to-end
  • Added end-to-end tests
  • Manually verified the connector by running a node clusters

Does this pull request potentially affect one of the following parts:

  • Dependencies: yes (the latest Opensearch 1.2.4 APIs as of this moment)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (docs - in progress, JavaDocs)

Huge thanks @snuyanzin for help.
Retargeting apache/flink#18541 to separate repository

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-243%3A+Dedicated+Opensearch+connectors
[2] https://www.mail-archive.com/dev@flink.apache.org/msg58911.html
[3] https://lists.apache.org/thread/jls0vqc7jb84jp14j4jok1pqfgo2cl30
[4] https://lists.apache.org/thread/4bms24983g38q956rp8qmm4bpdo4361s

@reta
Copy link
Member Author

reta commented Oct 3, 2022

@MartijnVisser could you please take a look? thank you :-)

@reta reta force-pushed the FLINK-25756 branch 3 times, most recently from 35abeac to 71d9810 Compare November 3, 2022 12:42
@reta
Copy link
Member Author

reta commented Nov 4, 2022

@MartijnVisser doing my deligance by pinging once per month :-)

@reta
Copy link
Member Author

reta commented Nov 14, 2022

@MartijnVisser the POMs and CI scripts have been updated accordingly

@zentol
Copy link
Contributor

zentol commented Nov 15, 2022

Can you split the commit such that 1 adds the workflows and another the rest? We can then merge the workflow commit first and actually get CI in this PR.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over infrastructure/build/legal files.

flink-connector-opensearch-e2e-tests/pom.xml Outdated Show resolved Hide resolved
flink-connector-opensearch-e2e-tests/pom.xml Outdated Show resolved Hide resolved
flink-connector-opensearch-e2e-tests/pom.xml Outdated Show resolved Hide resolved
flink-connector-opensearch/pom.xml Show resolved Hide resolved
flink-connector-opensearch/pom.xml Outdated Show resolved Hide resolved
flink-connector-opensearch/pom.xml Outdated Show resolved Hide resolved
flink-sql-connector-opensearch/pom.xml Outdated Show resolved Hide resolved
flink-sql-connector-opensearch/pom.xml Show resolved Hide resolved
flink-sql-connector-opensearch/pom.xml Outdated Show resolved Hide resolved
@zentol zentol self-assigned this Nov 15, 2022
@reta
Copy link
Member Author

reta commented Nov 15, 2022

Can you split the commit such that 1 adds the workflows and another the rest? We can then merge the workflow commit first and actually get CI in this PR.

Sure, #2, thanks @zentol

@reta reta force-pushed the FLINK-25756 branch 11 times, most recently from 2ffb58c to 8500c3e Compare November 16, 2022 13:47
reta added 3 commits November 16, 2022 11:59
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
@reta reta force-pushed the FLINK-25756 branch 2 times, most recently from c8936e2 to 49d2539 Compare November 16, 2022 17:16
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
@reta
Copy link
Member Author

reta commented Nov 28, 2022

@zentol thnaks a lot for the review, I think I addressed all your comments, please let me know if I missed some, thanks!

@reta There are some unresolved comments, like #1 (comment), #1 (comment), #1 (comment).

Thanks @zentol , addressed or/and answered

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
testHarness.processElement(new StreamRecord<>("msg-1"));

// Await for flush to be complete
awaitForFlushToFinish(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid this kind of busy waiting quite easily by using OneShotLatches.

    private OneShotLatch addResponse(Consumer<HttpResponse> consumer) {
        OneShotLatch oneShotLatch = new OneShotLatch();
        responses.add(
                response -> {
                    consumer.accept(response);
                    oneShotLatch.trigger();
                });
        return oneShotLatch;
    }
....
      OneShotLatch firstResponse = responses.add(
                createResponse(
                        new BulkItemResponse(
                                1,
                                OpType.INDEX,
                                new IndexResponse(
                                        new ShardId("test", "-", 0), "_doc", "1", 0, 0, 1, true))));
...
    firstResponse.await();

That being said, this whole "wait for server response to be consumed" approach is a bit sketchy?
We're only waiting for the server to send the response, not for the connector to process it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue there is definitely no need for additional latch: the queue is the "latch" by itself - we know when it is drained. We could also use https://github.com/awaitility/awaitility to make the waiting logic clean.

That being said, this whole "wait for server response to be consumed" approach is a bit sketchy?

We are dealing with async system so I think this is kind of expectable: we are waiting for some events to happen at some point (be it latch, queue or other means). The sink does not provide any means to hook inside and, for example, we could observe the "process it" part using getNumPendingRequests only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point was to not wait by looping over a condition to save CI resources.
This is especially a problem when the condition is never fulfilled and we just churn through CPU cycles for no good reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point was to not wait by looping over a condition to save CI resources.

Sure, replaced with Condition, should be simpler than per-response monitors.

This is especially a problem when the condition is never fulfilled and we just churn through CPU cycles for no good reason.

All tests are timeboxed to 5 seconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests are timeboxed to 5 seconds

a) In our experience such small timeouts actually cause tests to be unstable.
b) code has a tendency to travel. Arguing that "this code is fine because of that code over there" basically means "this code is unsafe to copy", which isn't a good state to be in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests seem to be very stable, I was running them continuously for 2h and have not seen any flakyness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more of a CI problem. Time jumping ahead and things like that. 😩

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could always update the test cases, if the instability comes in, the 5 seconds buffer is 3x buffer for what test case should take.

testHarness.processElement(new StreamRecord<>("msg"));

// current number of pending request should be 1 due to the re-add
assertThat(sink.getNumPendingRequests()).isEqualTo(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion sometimes fails locally when the test is run in a loop.

expected: 1L
 but was: 0L

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, I think I know why, will fix that

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
};
}

private static void awaitForCondition(Supplier<Boolean> condition) throws InterruptedException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only 1 place when we need that, due to the fact that failureRequestIndexer is not immediately re-adding the requests but does it on the next batch interval only

lock.lock();
try {
responses.poll().accept(resp);
flushed.signalAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you could just use a OneShotLatch which makes all of this logic here a one-liner and gives you more control to wait for a specific message to be consumed 🤷
(and you wouldn't need awaitForCondition)

Copy link
Member Author

@reta reta Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitForCondition is not checking the flushing part, it is checking the numPendingRequest, which will be eventually updated, for the test it is sufficient to just know that flush has happened (at least, I don't see the reasons to complicate this part with per-response latches).

@reta
Copy link
Member Author

reta commented Dec 5, 2022

@zentol thanks one more time for the review, I think none of your comments left unattended, thanks

@wbeckler
Copy link

wbeckler commented Dec 7, 2022

It would be amazing if this could be based on the opensearch-java client rather than RHLC, since that is where all of the improvements are going. Is that how this is set up?

@reta
Copy link
Member Author

reta commented Dec 7, 2022

It would be amazing if this could be based on the opensearch-java client rather than RHLC, since that is where all of the improvements are going. Is that how this is set up?

Not for now, see please opensearch-project/opensearch-java#181 (this is what the connector uses).

pom.xml Outdated Show resolved Hide resolved
Comment on lines +437 to +438
</plugin>
</plugins>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
</plugin>
</plugins>
</plugin>
</plugins>

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta
Copy link
Member Author

reta commented Dec 8, 2022

Just noticed that we're lacking documentation; like https://github.com/apache/flink-connector-elasticsearch/blob/main/docs/content/docs/connectors/datastream/elasticsearch.md.

Thanks @zentol, I have it ready and thought to open separate pull request (to split MDs from code), makes sense to you? (#3)

@zentol
Copy link
Contributor

zentol commented Dec 9, 2022

ok

@zentol zentol merged commit 48b0c0f into apache:main Dec 9, 2022
@ypark2103
Copy link

hi @zentol @reta
I see that flink-opensearch-connector was merged to master recently but I don't see it was registered in maven repository.
When will this be added to maven repository so I can use this from pom.xml? I also don't see opensearch connector in official flink documentation. Can this library be used in Java 8 also?

@reta
Copy link
Member Author

reta commented Dec 21, 2022

hi @zentol @reta I see that flink-opensearch-connector was merged to master recently but I don't see it was registered in maven repository. When will this be added to maven repository so I can use this from pom.xml?

Hey @ypark2103 , the vote was ongoing and just approved [1], expect artifacts to be available in the coming days.

[1] https://lists.apache.org/list?dev@flink.apache.org:lte=1M:Opensearch

I also don't see opensearch connector in official flink documentation.

The Opensearch documentations available here [2] but I am not sure how it is going to be integrated into Flink documentation, @zentol may be you could spot some light?

[2] https://github.com/apache/flink-connector-opensearch/tree/main/docs

Can this library be used in Java 8 also?

Yes

@ypark2103
Copy link

ypark2103 commented Dec 21, 2022

@reta Is there a way we can install this library with pom.xml?
I don't see opensearch in maven depository yet like elasticsearch connector.
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7
The artifacts you mentioned that will be available in the coming days is referring to registering this library to maven repo?

@reta
Copy link
Member Author

reta commented Dec 21, 2022

The artifacts you mentioned that will be available in the coming days is referring to registering this library to maven repo?

@ypark2103 Correct

@MartijnVisser
Copy link
Contributor

The Opensearch documentations available here [2] but I am not sure how it is going to be integrated into Flink documentation, @zentol may be you could spot some light?

They will be integrated via apache/flink#21518 when the final steps for the release process have been completed :)

@ypark2103
Copy link

@reta @MartijnVisser Is this Flink-Connector-Opensearch only available for flink 1.16 version? What about flink 1.11 or 1.12?

@reta
Copy link
Member Author

reta commented Dec 27, 2022

@ypark2103 that is correct, the Flink Opensearch Connector was following the Flink's externalization model for connectors, the necessary scaffolding is only available in Flink 1.16 and above.

@ypark2103
Copy link

@reta I heard java 8 was deprecated starting Flink 1.15. Does it mean I need to upgrade to java 11 to use this connector? Or can I still use java 8 for Flink 1.16?

@reta
Copy link
Member Author

reta commented Dec 28, 2022

@reta I heard java 8 was deprecated starting Flink 1.15. Does it mean I need to upgrade to java 11 to use this connector? Or can I still use java 8 for Flink 1.16?

@ypark2103 the connector itself has baseline of JDK-8 so you don't need JDK-11 to use it as-is (using 1.3.x OpenSearch client), but if you plan to upgrade to JDK-11, it is even better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants