Skip to content

Commit

Permalink
kafka param methods + doc
Browse files Browse the repository at this point in the history
  • Loading branch information
tanguyvda committed May 29, 2021
1 parent ad9be8f commit cae76fa
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 53 deletions.
31 changes: 30 additions & 1 deletion modules/centreon-stream-connectors-lib/sc_params.lua
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ function ScParams:param_override(user_params)
end

for param_name, param_value in pairs(user_params) do
if self.params[param_name] then
if self.params[param_name] or string.find(param_name, "^_sc_kafka_") ~= nil then
self.params[param_name] = param_value
self.logger:notice("[sc_params:param_override]: overriding parameter: " .. tostring(param_name) .. " with value: " .. tostring(param_value))
else
Expand Down Expand Up @@ -254,4 +254,33 @@ function ScParams:check_params()
self.params.enable_service_status_dedup = self.common:check_boolean_number_option_syntax(self.params.enable_service_status_dedup, 0)
end

--- get_kafka_params: retrieve the kafka parameters and store them the self.params.kafka table
-- @param kafka_config (object) object instance of kafka_config
-- @param params (table) the list of parameters from broker web configuration
function ScParams:get_kafka_params(kafka_config, params)
for param_name, param_value in pairs(params) do
-- check if param starts with sc_kafka (meaning it is a parameter for kafka)
if string.find(param_name, "^sc_kafka_") ~= nil then
-- remove the _sc_kafka_ prefix and store the param in a dedicated kafka table
kafka_config[string.gsub(param_name, "_sc_kafka_", "")] = param_value
end
end
end

--- is_mandatory_config_set: check if the mandatory parameters required by a stream connector are set
-- @param mandatory_params (table) the list of mandatory parameters
-- @param params (table) the list of parameters from broker web configuration
-- @eturn true|false (boolean)
function ScParmas:is_mandatory_config_set(mandatory_params, params)
for index, mandatory_param in ipairs(mandatory_params) do
if not params[mandatory_param] then
self.logger:error("[sc_param:is_mandatory_config_set]: " .. tostring(mandatory_param)
.. " parameter is not set in the stream connector web configuration")
return false
end
end

return true
end

return sc_params
Loading

0 comments on commit cae76fa

Please sign in to comment.