-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors #1
Conversation
@MartijnVisser could you please take a look? thank you :-) |
35abeac
to
71d9810
Compare
@MartijnVisser doing my deligance by pinging once per month :-) |
@MartijnVisser the POMs and CI scripts have been updated accordingly |
Can you split the commit such that 1 adds the workflows and another the rest? We can then merge the workflow commit first and actually get CI in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass over infrastructure/build/legal files.
flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
Outdated
Show resolved
Hide resolved
2ffb58c
to
8500c3e
Compare
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
c8936e2
to
49d2539
Compare
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
@zentol thnaks a lot for the review, I think I addressed all your comments, please let me know if I missed some, thanks!
Thanks @zentol , addressed or/and answered |
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
testHarness.processElement(new StreamRecord<>("msg-1")); | ||
|
||
// Await for flush to be complete | ||
awaitForFlushToFinish(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can avoid this kind of busy waiting quite easily by using OneShotLatches
.
private OneShotLatch addResponse(Consumer<HttpResponse> consumer) {
OneShotLatch oneShotLatch = new OneShotLatch();
responses.add(
response -> {
consumer.accept(response);
oneShotLatch.trigger();
});
return oneShotLatch;
}
....
OneShotLatch firstResponse = responses.add(
createResponse(
new BulkItemResponse(
1,
OpType.INDEX,
new IndexResponse(
new ShardId("test", "-", 0), "_doc", "1", 0, 0, 1, true))));
...
firstResponse.await();
That being said, this whole "wait for server response to be consumed" approach is a bit sketchy?
We're only waiting for the server to send the response, not for the connector to process it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue there is definitely no need for additional latch: the queue is the "latch" by itself - we know when it is drained. We could also use https://github.com/awaitility/awaitility to make the waiting logic clean.
That being said, this whole "wait for server response to be consumed" approach is a bit sketchy?
We are dealing with async system so I think this is kind of expectable: we are waiting for some events to happen at some point (be it latch, queue or other means). The sink does not provide any means to hook inside and, for example, we could observe the "process it" part using getNumPendingRequests
only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point was to not wait by looping over a condition to save CI resources.
This is especially a problem when the condition is never fulfilled and we just churn through CPU cycles for no good reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point was to not wait by looping over a condition to save CI resources.
Sure, replaced with Condition
, should be simpler than per-response monitors.
This is especially a problem when the condition is never fulfilled and we just churn through CPU cycles for no good reason.
All tests are timeboxed to 5 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All tests are timeboxed to 5 seconds
a) In our experience such small timeouts actually cause tests to be unstable.
b) code has a tendency to travel. Arguing that "this code is fine because of that code over there" basically means "this code is unsafe to copy", which isn't a good state to be in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests seem to be very stable, I was running them continuously for 2h and have not seen any flakyness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more of a CI problem. Time jumping ahead and things like that. 😩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could always update the test cases, if the instability comes in, the 5 seconds buffer is 3x buffer for what test case should take.
testHarness.processElement(new StreamRecord<>("msg")); | ||
|
||
// current number of pending request should be 1 due to the re-add | ||
assertThat(sink.getNumPendingRequests()).isEqualTo(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion sometimes fails locally when the test is run in a loop.
expected: 1L
but was: 0L
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, I think I know why, will fix that
...earch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java
Show resolved
Hide resolved
...earch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
}; | ||
} | ||
|
||
private static void awaitForCondition(Supplier<Boolean> condition) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only 1 place when we need that, due to the fact that failureRequestIndexer
is not immediately re-adding the requests but does it on the next batch interval only
lock.lock(); | ||
try { | ||
responses.poll().accept(resp); | ||
flushed.signalAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you could just use a OneShotLatch which makes all of this logic here a one-liner and gives you more control to wait for a specific message to be consumed 🤷
(and you wouldn't need awaitForCondition
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitForCondition
is not checking the flushing part, it is checking the numPendingRequest
, which will be eventually updated, for the test it is sufficient to just know that flush has happened (at least, I don't see the reasons to complicate this part with per-response latches).
@zentol thanks one more time for the review, I think none of your comments left unattended, thanks |
It would be amazing if this could be based on the opensearch-java client rather than RHLC, since that is where all of the improvements are going. Is that how this is set up? |
Not for now, see please opensearch-project/opensearch-java#181 (this is what the connector uses). |
</plugin> | ||
</plugins> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
</plugin> | |
</plugins> | |
</plugin> | |
</plugins> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that we're lacking documentation; like https://github.com/apache/flink-connector-elasticsearch/blob/main/docs/content/docs/connectors/datastream/elasticsearch.md.
Thanks @zentol, I have it ready and thought to open separate pull request (to split MDs from code), makes sense to you? (#3) |
ok |
hi @zentol @reta |
Hey @ypark2103 , the vote was ongoing and just approved [1], expect artifacts to be available in the coming days. [1] https://lists.apache.org/list?dev@flink.apache.org:lte=1M:Opensearch
The Opensearch documentations available here [2] but I am not sure how it is going to be integrated into Flink documentation, @zentol may be you could spot some light? [2] https://github.com/apache/flink-connector-opensearch/tree/main/docs
Yes |
@reta Is there a way we can install this library with pom.xml? |
@ypark2103 Correct |
They will be integrated via apache/flink#21518 when the final steps for the release process have been completed :) |
@reta @MartijnVisser Is this Flink-Connector-Opensearch only available for flink 1.16 version? What about flink 1.11 or 1.12? |
@ypark2103 that is correct, the Flink Opensearch Connector was following the Flink's externalization model for connectors, the necessary scaffolding is only available in Flink 1.16 and above. |
@reta I heard java 8 was deprecated starting Flink 1.15. Does it mean I need to upgrade to java 11 to use this connector? Or can I still use java 8 for Flink 1.16? |
@ypark2103 the connector itself has baseline of JDK-8 so you don't need JDK-11 to use it as-is (using 1.3.x OpenSearch client), but if you plan to upgrade to JDK-11, it is even better. |
Signed-off-by: Andriy Redko andriy.redko@aiven.io
What is the purpose of the change
The goal of this change is to provide dedicated Opensearch connectors [1], [2], [3], [4].
Brief change log
The implementation is largely based on the existing Elasticsearch 7 connector with a few notable changes (besides the dependencies and APIs):
HighLevelRestClient
is used)allow-insecure
has been added to suppress certificates validation for development and testing purposesThe new connector name is
opensearch
and it follows the existing conventions:Verifying this change
This change added comprehensive tests and can be verified as follows (largely ported the existing unit and integration tests for Elasticsearch 7):
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation
Huge thanks @snuyanzin for help.
Retargeting apache/flink#18541 to separate repository
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-243%3A+Dedicated+Opensearch+connectors
[2] https://www.mail-archive.com/dev@flink.apache.org/msg58911.html
[3] https://lists.apache.org/thread/jls0vqc7jb84jp14j4jok1pqfgo2cl30
[4] https://lists.apache.org/thread/4bms24983g38q956rp8qmm4bpdo4361s