diff --git a/.github/workflows/build_test_pr.yml b/.github/workflows/build_test_pr.yml new file mode 100644 index 0000000..3c31914 --- /dev/null +++ b/.github/workflows/build_test_pr.yml @@ -0,0 +1,66 @@ +name: "Pull Request Build & Test" + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + build: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 8 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Validate SNAPSHOT version + env: + SNAPSHOT_REGEX: ^[0-9]{1,2}[.][0-9]{1,2}[.][0-9]{1,3}-SNAPSHOT$ + if: github.ref != 'refs/heads/master' + run: | + lein pom + export VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "Version is:" $VERSION + if [[ !("$VERSION" =~ $SNAPSHOT_REGEX) ]] + then + echo "Version isn't a SNAPSHOT version:" $VERSION + exit 0 + fi + + - name: Setup linter + uses: DeLaGuardo/setup-clj-kondo@master + with: + version: '2023.09.07' + - name: Lint + run: clj-kondo --lint src test --config '{:lint-as {clojure.test.check.properties/for-all clojure.core/let}}' + + test: + needs: build + uses: ./.github/workflows/test.yml + + event_file: + needs: test + name: "Event File" + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v2 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/.github/workflows/ci_branch.yml b/.github/workflows/ci_branch.yml new file mode 100644 index 0000000..6e0b167 --- /dev/null +++ b/.github/workflows/ci_branch.yml @@ -0,0 +1,101 @@ +name: "Push CI - branches" + +on: + push: + branches-ignore: + - master + +jobs: + build: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Validate SNAPSHOT version + env: + SNAPSHOT_REGEX: ^[0-9]{1,2}[.][0-9]{1,2}[.][0-9]{1,3}-SNAPSHOT$ + if: github.ref != 'refs/heads/master' + run: | + lein pom + export VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "Version is:" $VERSION + if [[ !("$VERSION" =~ $SNAPSHOT_REGEX) ]] + then + echo "Version isn't a SNAPSHOT version:" $VERSION + exit 0 + fi + + - name: Setup linter + uses: DeLaGuardo/setup-clj-kondo@master + with: + version: '2023.09.07' + - name: Lint + run: clj-kondo --lint src test --config '{:lint-as {clojure.test.check.properties/for-all clojure.core/let}}' + + test: + needs: build + uses: ./.github/workflows/test.yml + + deploy: + needs: test + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Deploy SNAPSHOT version + env: + SNAPSHOT_REGEX: ^[0-9]{1,2}[.][0-9]{1,2}[.][0-9]{1,3}-SNAPSHOT$ + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CLOJARS_USER: ${{ secrets.CLOJARS_USER }} + CLOJARS_PASS: ${{ secrets.CLOJARS_PASS }} + run: | + git config --global user.name "github-actions-bot" + git config --global user.email "<>" + lein pom + export SNAPSHOT_VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "SNAPSHOT version is:" $SNAPSHOT_VERSION + if [[ !("$SNAPSHOT_VERSION" =~ $SNAPSHOT_REGEX) ]] + then + echo "Version isn't a SNAPSHOT version:" $SNAPSHOT_VERSION ", skipping deployment to Clojars..." + exit 0 + fi + lein deploy + echo "SNAPSHOT version:" $SNAPSHOT_VERSION"; commit: "${{github.sha}}"; successfully deployed to Clojars" diff --git a/.github/workflows/ci_master.yml b/.github/workflows/ci_master.yml new file mode 100644 index 0000000..caac8bd --- /dev/null +++ b/.github/workflows/ci_master.yml @@ -0,0 +1,112 @@ +name: "Push CI - master" + +permissions: + contents: write + +on: + push: + branches: + - master + +jobs: + build: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Validate SNAPSHOT version + env: + SNAPSHOT_REGEX: ^[0-9]{1,2}[.][0-9]{1,2}[.][0-9]{1,3}-SNAPSHOT$ + if: github.ref != 'refs/heads/master' + run: | + lein pom + export VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "Version is:" $VERSION + if [[ !("$VERSION" =~ $SNAPSHOT_REGEX) ]] + then + echo "Version isn't a SNAPSHOT version:" $VERSION + exit 0 + fi + + - name: Setup linter + uses: DeLaGuardo/setup-clj-kondo@master + with: + version: '2023.09.07' + - name: Lint + run: clj-kondo --lint src test --config '{:lint-as {clojure.test.check.properties/for-all clojure.core/let}}' + + test: + needs: build + uses: ./.github/workflows/test.yml + + deploy: + needs: test + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Deploy release version + env: + RELEASE_REGEX: ^[0-9]{1,2}[.][0-9]{1,2}[.][0-9]{1,3}$ + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CLOJARS_USER: ${{ secrets.CLOJARS_USER }} + CLOJARS_PASS: ${{ secrets.CLOJARS_PASS }} + run: | + git config --global user.name "github-actions-bot" + git config --global user.email "<>" + git config --global push.followTags true + lein pom + export ORIGINAL_VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "Original version is:" $ORIGINAL_VERSION + lein change version leiningen.release/bump-version release + lein do vcs commit, install + lein pom + export RELEASE_VERSION=$(less pom.xml | grep "" | head -1 | cut -d ">" -f2 | cut -d "<" -f1) + echo "Release version is:" $RELEASE_VERSION + if [[ !("$RELEASE_VERSION" =~ $RELEASE_REGEX) ]] + then + echo "Version isn't a release version:" $RELEASE_VERSION ", skipping deployment to Clojars..." + exit 0 + fi + lein deploy + echo "Release version:" $RELEASE_VERSION"; commit: "${{github.sha}}"; successfully deployed to Clojars" + git tag -a $RELEASE_VERSION -m "Release version $RELEASE_VERSION" + git push origin master diff --git a/.github/workflows/dev-build.yml b/.github/workflows/dev-build.yml deleted file mode 100644 index 0787ffb..0000000 --- a/.github/workflows/dev-build.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: Dev - -on: - push: - branches: - - dev - pull_request: - -jobs: - build: - name: Build - runs-on: ubuntu-20.04 - steps: - - name: Checkout - uses: actions/checkout@v2 - with: - persist-credentials: false - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: 11 - - name: Setup linter - uses: DeLaGuardo/setup-clj-kondo@master - with: - version: '2021.06.18' - - name: Lint - run: clj-kondo --lint src test --config '{:lint-as {clojure.test.check.properties/for-all clojure.core/let}}' - - name: Test - run: lein cloverage --lcov --junit diff --git a/.github/workflows/main-build.yml b/.github/workflows/main-build.yml deleted file mode 100644 index 1fcb055..0000000 --- a/.github/workflows/main-build.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: Release - -on: - push: - branches: - - master - -jobs: - build: - name: Build - runs-on: ubuntu-20.04 - permissions: write-all - steps: - - name: Checkout - uses: actions/checkout@v2 - with: - persist-credentials: false - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: 11 - - name: Lint - uses: DeLaGuardo/clojure-lint-action@master - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - clj-kondo-args: --lint src test --config '{:lint-as {clojure.test.check.properties/for-all clojure.core/let}}' - - name: Test - run: lein cloverage --lcov --junit - - name: Publish Unit Test Results - uses: EnricoMi/publish-unit-test-result-action@v1.17 - if: always() - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - files: "target/coverage/junit.xml" - - name: Coveralls - uses: coverallsapp/github-action@master - with: - github-token: ${{ secrets.GITHUB_TOKEN }} - path-to-lcov: "target/coverage/lcov.info" - - name: Deploy - env: - CLOJARS_USER: ${{ secrets.CLOJARS_USER }} - CLOJARS_PASS: ${{ secrets.CLOJARS_PASS }} - run: lein deploy diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..7cf5214 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,29 @@ +on: [workflow_call] + +jobs: + test: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Unit and Integration tests + run: lein eftest :all diff --git a/.github/workflows/test_results.yml b/.github/workflows/test_results.yml new file mode 100644 index 0000000..23490b1 --- /dev/null +++ b/.github/workflows/test_results.yml @@ -0,0 +1,44 @@ +name: Test Results + +on: + workflow_run: + workflows: ["Pull Request Build & Test"] + types: + - completed + +permissions: {} + +jobs: + test-results: + name: Test Results + runs-on: ubuntu-latest + if: github.event.workflow_run.conclusion != 'skipped' + + permissions: + checks: write + pull-requests: write + actions: read + + steps: + - name: Download and Extract Artifacts + env: + GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + run: | + mkdir -p artifacts && cd artifacts + + artifacts_url=${{ github.event.workflow_run.artifacts_url }} + + gh api "$artifacts_url" -q '.artifacts[] | [.name, .archive_download_url] | @tsv' | while read artifact + do + IFS=$'\t' read name url <<< "$artifact" + gh api $url > "$name.zip" + unzip -d "$name" "$name.zip" + done + + - name: Publish Test Results + uses: EnricoMi/publish-unit-test-result-action@v1.39 + with: + commit: ${{ github.event.workflow_run.head_sha }} + event_file: artifacts/Event File/event.json + event_name: ${{ github.event.workflow_run.event }} + files: "artifacts/**/*.xml" \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 84f3595..9fe0331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [[1.1.0]](https://github.com/AppsFlyer/ketu/pull/18) - 2024-07-29 + +### Added +- consumer decorator support. + ## [1.0.0] - 2023-01-10 ### Added diff --git a/README.md b/README.md index 0067a26..8899605 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A Clojure Apache Kafka client with core.async api ```clojure -[com.appsflyer/ketu "1.0.0"] +[com.appsflyer/ketu "1.1.0"] ``` ## Features @@ -28,23 +28,23 @@ Consume a name string from kafka and produce a greeting string for that name bac (:require [clojure.core.async :refer [chan close! !!]] [ketu.async.source :as source] [ketu.async.sink :as sink])) - -(let [greets (chan 10) - sink-opts {:name "greeter-producer" - :brokers "broker2:9091" - :topic "greetings" - :value-type :string - :shape :value} - sink (sink/sink >greets sink-opts)] + >greets (chan 10) + sink-opts {:name "greeter-producer" + :brokers "broker2:9091" + :topic "greetings" + :value-type :string + :shape :value} + sink (sink/sink >greets sink-opts)] ;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline. (->> (]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | + +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| #### Producer-sink options -| Key | Type | Req? | Notes | -|-------------------|------------------------------------------------------------------------------------------------------------------|------------|------------------------------------------------------------------------------------------------| -| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | -| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | -| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | + +| Key | Type | Req? | Notes | +|-------------------|------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------| +| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | +| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | +| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | ## Data shapes You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape: + ```clojure ; Value only: -{:topic "names" - :key-type :string +{:topic "names" + :key-type :string :value-type :string - :shape :value} + :shape :value} ( "v" @@ -111,12 +117,14 @@ To get a clojure data structure with any of the ConsumerRecord fields, configure ( {:key "k", :value "v", :topic "names"} ``` + Similarly, to put a clojure data structure on the producer channel: + ```clojure ; Value only: -{:key-type :string +{:key-type :string :value-type :string - :shape :value} + :shape :value} (>!! producer-chan "v") ; Vector: @@ -129,6 +137,90 @@ Similarly, to put a clojure data structure on the producer channel: (>!! producer-chan ["k2" "v2" "events"]) ``` +## Consumer Decorator + +The consumer decorator allows running custom logic on the consumer polling thread. +This allows custom control on the consumer behavior including manual offset management. +Custom decorator logic may require different consumer configurations. +for example when managing the offset manually, auto-commit should usually set to false. + +In this example we use the decorator to run commands in the polling thread context. +The consumer is paused/resumed based on commands sent from the application. +The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). +```clojure +(ns consumer-decorator-example + (:require [clojure.core.async :as async] + [ketu.async.source :as source] + [ketu.async.sink :as sink])) + +(let [commands-chan (async/chan 10) + consumer-chan (async/chan 10) + consumer-opts {:name "consumer-example" + :brokers "broker1:9092" + :topic "example" + :group-id "example" + :value-type :string + :shape :value + :ketu.source/consumer-decorator (fn [consumer-ctx poll-fn] + (loop [] + (when-let [command (async/poll! commands-chan)] + (command consumer-ctx) + (recur))) + (poll-fn))} + source (source/source consumer-chan consumer-opts) + + producer-chan (async/chan 10) + sink-opts {:name "producer-example" + :brokers "broker1:9092" + :topic "example" + :value-type :string + :shape :value} + sink (sink/sink producer-chan sink-opts) + + ; periodically produce data to the topic + producing (future + (dotimes [i 20] + (async/>!! producer-chan (str i)) + (Thread/sleep 300)) + (async/>!! producer-chan "done") + (async/close! producer-chan)) + + ; read from the consumer channel and print to the screen + processing (future + (loop [] + (let [message (async/!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.pause consumer (.assignment consumer)) + (deliver paused true))) + + @paused + (println "consumer is paused") + (Thread/sleep 2000) + + ; Send the commands channel a function that will resume the consumer + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.resume consumer (.paused consumer)) + (deliver resumed true))) + + @resumed + (println "consumer is resumed") + + ; Wait for all futures to finish + @producing + @processing) + (finally + (source/stop! source)))) +``` + ## Development & Contribution We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, diff --git a/project.clj b/project.clj index 0d23595..b5488dc 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "1.0.1-SNAPSHOT" +(defproject com.appsflyer/ketu "1.1.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" @@ -21,7 +21,8 @@ :profiles {;; REPL, development and testing :dev {:source-paths ["dev"] - :plugins [[lein-cloverage "1.2.4"]] + :plugins [[lein-cloverage "1.2.4"] + [lein-eftest "0.5.9"]] :dependencies [[org.clojure/tools.namespace "1.3.0"] ;For repl refresh [tortue/spy "2.13.0"] [metosin/sieppari "0.0.0-alpha13"] @@ -31,7 +32,10 @@ ; Kafka (docker in docker) [org.testcontainers/kafka "1.17.6"] [clj-test-containers "0.7.4"]] - :jvm-opts ["-Dlogback.configurationFile=dev-logback.xml"]} + :jvm-opts ["-Dlogback.configurationFile=dev-logback.xml"] + :eftest {:multithread? false + :report eftest.report.junit/report + :report-to-file "target/junit.xml"}} ;; Tests only, silent logs :test diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 558b51e..5918b6b 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -73,21 +73,22 @@ (fn [consumer] (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) -(defn- poll-fn [^Consumer consumer opts] - (let [source-name (:ketu/name opts) - poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts)) - catching-poll? (:ketu.source.legacy/catching-poll? opts)] - (if catching-poll? - ;TODO Eliminate catching poll ASAP. - ; Just in case of a production issue and generic error handling wasn't implemented yet. - (fn [] - (try - (consumer/poll! consumer poll-timeout-duration) - (catch Exception e - (log/error logger "[source={}] Caught poll exception" source-name e) - nil))) - (fn [] - (consumer/poll! consumer poll-timeout-duration))))) +(defn- poll-fn [^Consumer consumer should-poll? opts] + (when @should-poll? + (let [source-name (:ketu/name opts) + poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts)) + catching-poll? (:ketu.source.legacy/catching-poll? opts)] + (if catching-poll? + ;TODO Eliminate catching poll ASAP. + ; Just in case of a production issue and generic error handling wasn't implemented yet. + (fn [] + (try + (consumer/poll! consumer poll-timeout-duration) + (catch Exception e + (log/error logger "[source={}] Caught poll exception" source-name e) + []))) + (fn [] + (consumer/poll! consumer poll-timeout-duration)))))) (defn- ->data-fn [{:keys [ketu.source/shape] :as opts}] (cond @@ -102,13 +103,18 @@ close-out-chan? (:ketu.source/close-out-chan? opts) ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) - should-poll? (volatile! true) + decorator-fn (some-> (:ketu.source/consumer-decorator opts) + (partial {:ketu.source/consumer consumer})) + abort-pending-put (async/chan) done-putting (async/chan) subscribe! (or (subscribe-fn opts) (assign-fn opts)) - poll! (poll-fn consumer opts) + poll-impl (poll-fn consumer should-poll? opts) + poll! (if (some? decorator-fn) + (partial decorator-fn poll-impl) + poll-impl) ->data (->data-fn opts) put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) @@ -120,9 +126,10 @@ (subscribe! consumer) - (while @should-poll? - (let [records (poll!)] - (run! put! records))) + (loop [] + (when-let [records (poll!)] + (run! put! records) + (recur))) (catch WakeupException e ; We wakeup the consumer on graceful shutdown after should-poll? is false. diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index 4e1db2a..abe78a3 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -2,7 +2,8 @@ (:require [clojure.set] [clojure.spec.alpha :as s] [clojure.string] - [expound.alpha :as expound]) + [expound.alpha :as expound] + [clojure.core.async.impl.protocols]) (:import (java.util.regex Pattern) (org.apache.kafka.clients.producer Callback) (org.apache.kafka.common.serialization Deserializer Serializer))) @@ -27,7 +28,7 @@ (s/def :ketu.source/close-out-chan? boolean?) (s/def :ketu.source/close-consumer? boolean?) (s/def :ketu.source/create-rebalance-listener-obj fn?) - +(s/def :ketu.source/consumer-decorator fn?) (s/def :ketu.source.assign/topic :ketu/topic) (s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?)) (s/def :ketu.source/assign-single-topic-partitions @@ -76,7 +77,8 @@ :ketu.source/consumer-close-timeout-ms :ketu.source/consumer-thread-timeout-ms :ketu.source/close-out-chan? - :ketu.source/close-consumer?])) + :ketu.source/close-consumer? + :ketu.source/consumer-decorator])) (s/def :ketu.apache.producer/config map?) (s/def :ketu.sink/sender-threads-num pos-int?) diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index 6ae4e41..bec34a5 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -12,7 +12,7 @@ (:import (java.time Duration) (org.apache.kafka.common PartitionInfo TopicPartition) (org.apache.kafka.clients.producer RecordMetadata) - (org.apache.kafka.clients.consumer Consumer) + (org.apache.kafka.clients.consumer Consumer ConsumerRecord) (org.apache.kafka.clients.admin AdminClient NewTopic) (java.util.concurrent TimeUnit))) @@ -211,3 +211,39 @@ (source/stop! source-rebalance) (source/stop! source) (.close ^AdminClient admin-client))))) + +(deftest consumer-decorator + (let [consumer-chan (async/chan 10) + result-chan (async/chan 100) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn] + (let [records (poll-fn)] + (doseq [^ConsumerRecord record records] + (async/>!! result-chan (String. ^"[B" (.value record)))) + records))} + source (source/source consumer-chan clicks-consumer-opts) + clicks-producer-opts {:name "clicks-producer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :key-type :string + :internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"} + :shape [:vector :key :value]} + producer-chan (async/chan 10) + sink (sink/sink producer-chan clicks-producer-opts) + input-values #{"1" "2" "3"}] + (try + (doseq [value input-values] + (async/>!! producer-chan ["1" value])) + (is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan))))) + (is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan))))) + (finally + (Thread/sleep 2000) + (source/stop! source) + (async/close! producer-chan) + (sink/stop! sink))))) +