-
Notifications
You must be signed in to change notification settings - Fork 566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka support #1510
Kafka support #1510
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have commented on a few issues.
Please have a look at https://github.com/oracle/helidon/blob/master/DEV-GUIDELINES.md and fix your PR accordingly.
Please add module-info.java
to your module.
messaging/kafka/src/main/java/io/helidon/messaging/kafka/connector/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
messaging/kafka/src/main/java/io/helidon/messaging/kafka/SimpleKafkaConsumer.java
Outdated
Show resolved
Hide resolved
messaging/kafka/src/main/java/io/helidon/messaging/kafka/SimpleKafkaConsumer.java
Outdated
Show resolved
Hide resolved
messaging/kafka/src/main/java/io/helidon/messaging/kafka/SimpleKafkaConsumer.java
Outdated
Show resolved
Hide resolved
messaging/kafka/src/main/java/io/helidon/messaging/kafka/SimpleKafkaConsumer.java
Outdated
Show resolved
Hide resolved
messaging/kafka/src/main/java/io/helidon/messaging/kafka/SimpleKafkaConsumer.java
Outdated
Show resolved
Hide resolved
public void close() { | ||
// Stops pooling | ||
consumer.wakeup(); | ||
while (running.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this while is acceptable because of CPU usage. However I put it there because the time frame must be very small (few milliseconds) or zero.
Let me know if you prefer other way, with count down latch for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do this with a synchronize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I think it is much better to use a lock.
@Override | ||
public void request(long n) { | ||
// Pushing Kafka consumer doesn't support requests. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uf this was my super bad idea, we have to do something about this, it basically ignores backpressure. Something like this would be much better:
https://github.com/oracle/helidon/blob/1e5ae594bc356ecd1283e487a7e7f85e26355ee9/microprofile/reactive-streams/src/main/java/io/helidon/microprofile/reactive/EmittingPublisher.java
But that depends on protected RS with SequentialSubscriber,
I expect David to remove SequentialSubscriber from RS implemetation in #1511 so it gets little more complicated then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I integrated the EmittingSubscriber
...nectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/BasicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
...nectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/BasicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
...nectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/BasicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
...nectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/BasicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
...nectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/BasicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
...tors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
...tors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
...tors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
...tors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
...tors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/KafkaConnectorFactory.java
Outdated
Show resolved
Hide resolved
/oca-checked |
aad54c4
to
ede74fc
Compare
...nnectors/kafka/src/main/java/io/helidon/microprofile/connectors/kafka/EmittingPublisher.java
Outdated
Show resolved
Hide resolved
89106e8
to
3aac549
Compare
6ce484a
to
5a1897e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
3d22f6c
to
9f76d6c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requested changes through DM.
c556cc8
to
e17c7ab
Compare
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
…DifferentPartitions test Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
Find here the spec of this:
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.pdf
It was mostly done by @danielkec months ago. I only took his changes and I did some small fixes and some tests.