Skip to content

Kafka Connectors Shared Logic

Ahmed Elbahtemy edited this page Apr 26, 2019 · 24 revisions

Contents

Configuration

The configuration properties below are shared across all Kafka connectors in Brooklin.

Remember:

Property Description Default

defaultKeySerde

  • Name of SerDe (Serializer/Deserializer)

    to use with Kafka topic key

  • Name has to match one of the values

(None)

defaultValueSerde

  • Name of SerDe (Serializer/Deserializer)

    to use with Kafka topic value

  • Name has to match one of the values

(None)

isGroupIdHashingEnabled

  • A flag indicating whether Kafka consumer group ID should be hashed

  • If true, the consumer group ID for a datastream is set to <clusterName>.<groupIdMD5Hash>, where:

  • If false, the consumer group ID for a datastream is set to <sourceConnectionString>-to-<destConnectionString>, where:

false

commitIntervalMs

  • The time duration (in milliseconds) between successive

    offset commits to Kafka

  • Must be in the range [0, Log.MAX_VALUE] (inclusive)

60000

(=1 min)

pollTimeoutMs

  • The timeout (in milliseconds) to spend waiting

    in poll calls to Kafka if no data is available

  • Specifying a value of 0 causes the KafkaConsumer

    to return immediately if no data is available

  • Must be in the range [0, Long.MAX_VALLUE] (inclusive)

30000

(=30 sec)

retryCount

The maximum number of times to attempt sending data

to Kafka in case of failure

5

retrySleepDurationMs

The time duration (in milliseconds) to wait between

successive attempts of sending data to Kafka in case

of failure

5000

(=5 sec)

pausePartitionOnError

A flag indicating whether to auto-pause topic partition

if sending messages to Kafka fails after exhausting

the max attempts specified by retryCount

false

pauseErrorPartitionDurationMs

The time duration (in milliseconds) to keep a topic partition

paused after encountering send errors, before attempting to

auto-resume

600000

(=10 min)

daemonThreadIntervalInSeconds

  • The time duration between successive attempts to restart unhealthy DatastreamTasks

  • Also used as the initial delay before checking

    the DatastreamTasks health status for the first time

300 sec

nonGoodStateThresholdMs

  • The maximum time duration (in milliseconds) to allow

    between successive polls from Kafka, before

    a DatastreamTask is deemed unhealthy

  • Must be in the range [60000, Long.MAX_VALUE] (inclusive)

600000

(=10 min)

processingDelayLogThreshold

The maximum time duration (in milliseconds) to allow between consuming data from Kafka and dispatching it for delivery to destination, before incrementing <TODO>

60000

(=1 min)

enableKafkaPositionTracker

<TODO>

false

consumerFactoryClassName

KafkaConsumerFactoryImpl

consumer.*

Kafka consumer configuration properties

  • enable.auto.commit: false

  • auto.offset.reset: none

Diagnostics

The diagnostic endpoints below are shared across all Kafka connectors in Brooklin.

URL

GET /diag?q=:query&type=:component&scope=:scope&content=:componentQuery

URL Params

Required

  • query

    • Possible values are: status or allStatus

    • status retrieves data for a single Brooklin instance

    • allStatus retrieves aggregated data for all Brooklin instances in the cluster

  • type: only supported value is connector

  • scope: Name of the connector to query, as specified in brooklin.server.connectorNames

  • content

    • Represents a subquery to the component (connector) in question

    • Possible values are: datastream_state and position

      Subquery

      datastream_state

      Subquery Params

      Required

      datastreamName: Name of datastream to query

      Example

      datastream_state?datastream=:datastreamName

      Constraints

      • Datastream must exist

      Response

      Content-Type

      application/json

      Schema
      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }

      Subquery

      position

      Subquery Params

      Optional

      • offsets

        • A boolean indicating whether to return time-based vs. offset-based position data for the retrieved DatastreamTasks

        • Default is false (time-based)

          Example

          position?offsets=true

          Constraints

          Setting it to true requires enableKafkaPositionTracker to be true

      • datastream: Name of datastream to limit results to

        Example

        position?datastream=:datastreamName

        Constraints

        Datastream must exist

      • topic: Name of Kafka topic to limit results to

        Example

        position?topic=:topicName

        Constraints

        Topic must exist

      Response

      Content-Type

      application/json

      Schema
      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }

Metrics

The metrics below are shared across all Kafka connectors in Brooklin.

General Metrics

General metrics prefix: <connectorName>.

Metric Name Description

numDatastreams

The number of datastreams using the connector in the entire cluster

numDatastreamTasks

The number of datastream tasks that belong to datastreams using the connector in the entire cluster

Aggregate Metrics

  • Aggregate metrics cover all datastreams in a single Brooklin instance.

  • Aggregate metrics prefix: <connectorName>.<connectorTask>.aggregate.

Metric Name Description

clientPollOverTimeout

The number of times polling Kafka consumer exceeds pollTimeoutMs in calls to KafkaConsumer::poll(long) calls, by more than 1 sec

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka record consumption

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

pollIntervalOverSessionTimeout

The number of polls exceeding the maximum session timeout <TODO:?>

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

Datastream-Specific Metrics

  • Datastream-specific metrics prefix: <connectorName>.<connectorTask>.<datastreamName>.

Metric Name Description

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventCountsPerPoll

The distribution (histogram) of the number of records retrieved from Kafka in every poll

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka record consumption

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numPolls

The rate of polls performed using the Kafka consumer

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

pollIntervalOverSessionTimeout

The number of polls exceeding the maximum session timeout <TODO:?>

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

timeSinceLastEventReceivedMs

The time duration (in milliseconds) since the last non-empty ConsumerRecord was fetched using the Kafka consumer

REST API Data Models

KafkaDatastreamStatesResponse

Field Name Type Description

datastream

string

Datastream name

assignedTopicPartitions

array

Assigned topic partitions

autoPausedPartitions

map

Associates each auto-paused topic partition with metadata about the paused partitions

manualPausedPartitions

map

Associates each topic with a list of manually paused partitions

inFlightMessageCounts

map

Associates each topic partition with the number of in-flight messages

DatastreamPositionResponse

Field Name Type Description

datastreamToPhysicalSources

map

  • Associates a datastream name with a list of topic partitions

  • Key is datastream name

  • Value is a PhysicalSources object

PhysicalSources

Field Name Type Description

physicalSourceToPosition

map

  • Associates a topic partition with the following position information:

    • positionType

    • sourceQueriedTimeMs

      The time (in milliseconds from the Unix epoch) data was most recently fetched from the Kafka consumer

    • sourcePosition

      The latest/newest position available in the source Kafka topic

    • consumerProcessedTimeMs

      The time (in milliseconds from the Unix epoch) that the consumer processed data from the source Kafka topic

    • consumerPosition

      The position of the last message of the datastream consumer