Skip to content

Commit

Permalink
feat(kafka-connect): Added settings so it can be configured like kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexnortung committed Nov 1, 2024
1 parent 68136e5 commit 9bbf08e
Showing 1 changed file with 125 additions and 39 deletions.
164 changes: 125 additions & 39 deletions src/modules/services/kafka-connect.nix
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,41 @@ let
cfg = config.services.kafka.connect;
types = lib.types;

stateDir = config.env.DEVENV_STATE + "/kafka/connect";

storageFile = stateDir + "/connect.offsets";

mkPropertyString =
let
render = {
bool = lib.boolToString;
int = toString;
list = lib.concatMapStringsSep "," mkPropertyString;
string = lib.id;
};
in
v: render.${builtins.typeOf v} v;

stringlyGeneric = (attrs:
lib.mapAttrs (_: mkPropertyString)
(lib.filterAttrs (_: v: v != null) attrs)
);

stringlySettings = stringlyGeneric cfg.settings;

generator = (pkgs.formats.javaProperties { }).generate;
in
{
options.services.kafka.connect = {
enable = lib.mkEnableOption "Kafka Connect";

listeners = lib.mkOption {
type = types.listOf types.str;
default = [ ];
description = ''
List of listeners for Kafka Connect
(By default Kafka Connect listens on http://localhost:8083)
'';
example = [ "http://localhost:8080" ];
};

pluginDirectories = lib.mkOption {
type = types.listOf types.str;
default = [ ];
description = ''
The list should consist of top level directories that include any combination of:
a) directories immediately containing jars with plugins and their dependencies
b) uber-jars with plugins and their dependencies
c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Note: symlinks will be followed to discover dependencies or plugins.
'';
};

initialConnectors = lib.mkOption {
type = types.listOf (types.submodule {
freeformType = with lib.types; let
primitive = oneOf [ bool int str ];
in
lazyAttrsOf (nullOr (either primitive (listOf primitive)));

options = {
name = lib.mkOption {
type = types.str;
Expand All @@ -54,27 +60,107 @@ in
List of Kafka Connect connectors to set up initially
'';
};

settings = lib.mkOption {
description = ''
{file}`connect-standalone.properties`.
Note that .properties files contain mappings from string to string.
Keys with dots are NOT represented by nested attrs in these settings,
but instead as quoted strings (ie. `settings."broker.id"`, NOT
`settings.broker.id`).
'';
default = { };
type = lib.types.submodule {
freeformType = with lib.types; let
primitive = oneOf [ bool int str ];
in
lazyAttrsOf (nullOr (either primitive (listOf primitive)));

options = {
"listeners" = lib.mkOption {
type = types.nullOr (types.listOf types.str);
default = null;
description = ''
List of listeners for Kafka Connect
(By default Kafka Connect listens on http://localhost:8083)
'';
example = [ "http://localhost:8080" ];
};

"bootstrap.servers" = lib.mkOption {
type = types.listOf types.str;
description = ''
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
'';
default = [ "localhost:9092" ];
};

"plugin.path" = lib.mkOption {
type = types.nullOr (types.listOf (types.either types.str types.path));
description = ''
The list should consist of top level directories that include any combination of:
a) directories immediately containing jars with plugins and their dependencies
b) uber-jars with plugins and their dependencies
c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Note: symlinks will be followed to discover dependencies or plugins.
'';
};

"offset.storage.file.filename" = lib.mkOption {
type = types.str;
default = storageFile;
};

"offset.flush.interval.ms" = lib.mkOption {
type = types.int;
default = 10000;
};

"key.converter" = lib.mkOption {
type = types.str;
default = "org.apache.kafka.connect.json.JsonConverter";
description = ''
The key converter to use for the connector.
'';
};

"value.converter" = lib.mkOption {
type = types.str;
default = "org.apache.kafka.connect.json.JsonConverter";
description = ''
The value converter to use for the connector.
'';
};

"key.converter.schemas.enable" = lib.mkOption {
type = types.bool;
default = true;
description = ''
Whether the key converter should include schema information in the message.
'';
};

"value.converter.schemas.enable" = lib.mkOption {
type = types.bool;
default = true;
description = ''
Whether the value converter should include schema information in the message.
'';
};
};
};
};
};

config =
let
pkg = kafkaCfg.package;
stateDir = config.env.DEVENV_STATE + "/kafka/connect";
storageFile = stateDir + "/connect.offsets";

configFile = pkgs.writeText "connect-standalone.properties" ''
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=${storageFile}
offset.flush.interval.ms=10000
${lib.optionalString (lib.lists.length cfg.listeners > 0) "listeners=${lib.concatStringsSep "," cfg.listeners}"}
${lib.optionalString (lib.lists.length cfg.pluginDirectories > 0) "plugin.path=${lib.concatStringsSep "," cfg.pluginDirectories}"}
'';

# Create a json file for each connector
configFile = generator "connect-standalone.properties" stringlySettings;

# TODO: make it work with .properties files?
# connectorFiles = lib.lists.map (c: generator "connector-${c.name}.properties" (stringlyGeneric c)) cfg.initialConnectors;
connectorFiles = lib.lists.map (c: pkgs.writeText "connector.json" (builtins.toJSON c)) cfg.initialConnectors;
connectorFilesConcatted = lib.concatStringsSep " " connectorFiles;

Expand All @@ -83,7 +169,7 @@ in
${pkg}/bin/connect-standalone.sh ${configFile} ${connectorFilesConcatted}
'';
in
lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable {
(lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable {
processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect";
});
}));
}

0 comments on commit 9bbf08e

Please sign in to comment.