Skip to content

Commit

Permalink
refactor: record events in memory in mrbv2 (#27734)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl authored Jan 23, 2025
1 parent 7d585a2 commit 5f172d6
Show file tree
Hide file tree
Showing 21 changed files with 2,027 additions and 571 deletions.
4 changes: 4 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ export function getDefaultConfig(): PluginsServerConfig {
INGESTION_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,

// Session recording V2
SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB
SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ export class DefaultBatchConsumerFactory implements BatchConsumerFactory {
groupId,
topic,
eachBatch,
callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions
autoCommit: true,
autoOffsetStore: true, // TODO: remove this once we implement our own offset store logic
callEachBatchWhenEmpty: true, // Required, as we want to flush session batches periodically
autoCommit: false,
autoOffsetStore: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import {
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
} from './constants'
import { KafkaMessageParser } from './kafka/message-parser'
import { KafkaMetrics } from './kafka/metrics'
import { KafkaParser } from './kafka/parser'
import { KafkaOffsetManager } from './kafka/offset-manager'
import { SessionRecordingMetrics } from './metrics'
import { PromiseScheduler } from './promise-scheduler'
import { BlackholeSessionBatchWriter } from './sessions/blackhole-session-batch-writer'
import { SessionBatchManager } from './sessions/session-batch-manager'
import { SessionBatchRecorder, SessionBatchRecorderInterface } from './sessions/session-batch-recorder'
import { TeamFilter } from './teams/team-filter'
import { TeamService } from './teams/team-service'
import { MessageWithTeam } from './teams/types'
import { BatchMessageProcessor } from './types'
import { CaptureIngestionWarningFn } from './types'
import { getPartitionsForTopic } from './utils'
import { LibVersionMonitor } from './versions/lib-version-monitor'
Expand All @@ -41,43 +44,57 @@ export class SessionRecordingIngester {
isStopping = false

private isDebugLoggingEnabled: ValueMatcher<number>
private readonly messageProcessor: BatchMessageProcessor<Message, MessageWithTeam>
private readonly metrics: SessionRecordingMetrics
private readonly promiseScheduler: PromiseScheduler
private readonly batchConsumerFactory: BatchConsumerFactory
private readonly sessionBatchManager: SessionBatchManager
private readonly kafkaParser: KafkaMessageParser
private readonly teamFilter: TeamFilter
private readonly libVersionMonitor?: LibVersionMonitor

constructor(
private config: PluginsServerConfig,
private consumeOverflow: boolean,
batchConsumerFactory: BatchConsumerFactory,
ingestionWarningProducer?: KafkaProducerWrapper
) {
this.isDebugLoggingEnabled = buildIntegerMatcher(config.SESSION_RECORDING_DEBUG_PARTITION, true)
const kafkaMetrics = KafkaMetrics.getInstance()
const kafkaParser = new KafkaParser(kafkaMetrics)
const teamService = new TeamService()
this.metrics = SessionRecordingMetrics.getInstance()
this.promiseScheduler = new PromiseScheduler()
this.topic = consumeOverflow
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
this.batchConsumerFactory = batchConsumerFactory

const teamFilter = new TeamFilter(teamService, kafkaParser)
this.messageProcessor = teamFilter
this.isDebugLoggingEnabled = buildIntegerMatcher(config.SESSION_RECORDING_DEBUG_PARTITION, true)

this.promiseScheduler = new PromiseScheduler()

this.kafkaParser = new KafkaMessageParser(KafkaMetrics.getInstance())
this.teamFilter = new TeamFilter(new TeamService())
if (ingestionWarningProducer) {
const captureWarning: CaptureIngestionWarningFn = async (teamId, type, details, debounce) => {
await captureIngestionWarning(ingestionWarningProducer, teamId, type, details, debounce)
}

this.messageProcessor = new LibVersionMonitor<Message>(
teamFilter,
captureWarning,
VersionMetrics.getInstance()
)
this.libVersionMonitor = new LibVersionMonitor(captureWarning, VersionMetrics.getInstance())
}

this.topic = consumeOverflow
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
this.metrics = SessionRecordingMetrics.getInstance()

const offsetManager = new KafkaOffsetManager(async (offsets) => {
await new Promise<void>((resolve, reject) => {
try {
this.batchConsumer!.consumer.commitSync(offsets)
resolve()
} catch (error) {
reject(error)
}
})
}, this.topic)
this.sessionBatchManager = new SessionBatchManager({
maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 0) * 1024,
maxBatchAgeMs: config.SESSION_RECORDING_MAX_BATCH_AGE_MS ?? 1000,
createBatch: () => new SessionBatchRecorder(new BlackholeSessionBatchWriter()),
offsetManager,
})

this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID
}

Expand All @@ -90,6 +107,24 @@ export class SessionRecordingIngester {
}
}

public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
context.heartbeat()

if (messages.length > 0) {
logger.info('🔁', `blob_ingester_consumer_v2 - handling batch`, {
size: messages.length,
partitionsInBatch: [...new Set(messages.map((x) => x.partition))],
assignedPartitions: this.assignedPartitions,
})
}

await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => this.processBatchMessages(messages, context),
})
}

private async processBatchMessages(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
// Increment message received counter for each message
messages.forEach((message) => {
Expand All @@ -98,50 +133,71 @@ export class SessionRecordingIngester {

const batchSize = messages.length
const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024

this.metrics.observeKafkaBatchSize(batchSize)
this.metrics.observeKafkaBatchSizeKb(batchSizeKb)

const parsedMessages = await runInstrumentedFunction({
const processedMessages = await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch.parseBatch`,
func: async () => {
return this.messageProcessor.parseBatch(messages)
const parsedMessages = await this.kafkaParser.parseBatch(messages)
const messagesWithTeam = await this.teamFilter.filterBatch(parsedMessages)
const processedMessages = this.libVersionMonitor
? await this.libVersionMonitor.processBatch(messagesWithTeam)
: messagesWithTeam
return processedMessages
},
})

context.heartbeat()

await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch.processMessages`,
func: async () => this.processMessages(parsedMessages),
func: async () => this.processMessages(processedMessages, context.heartbeat),
})
}

private async processMessages(parsedMessages: MessageWithTeam[]): Promise<void> {
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await Promise.all(parsedMessages.map((m) => this.consume(m)))
} else {
private async processMessages(parsedMessages: MessageWithTeam[], heartbeat: () => void) {
await this.sessionBatchManager.withBatch(async (batch) => {
for (const message of parsedMessages) {
await this.consume(message)
this.consume(message, batch)
}
}
return Promise.resolve()
})

heartbeat()

await this.sessionBatchManager.flushIfNeeded()
}

public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
context.heartbeat()
private consume(message: MessageWithTeam, batch: SessionBatchRecorderInterface) {
// we have to reset this counter once we're consuming messages since then we know we're not re-balancing
// otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever
this.metrics.resetSessionsRevoked()
const { team, message: parsedMessage } = message
const debugEnabled = this.isDebugLoggingEnabled(parsedMessage.metadata.partition)

if (messages.length > 0) {
logger.info('🔁', `blob_ingester_consumer_v2 - handling batch`, {
size: messages.length,
partitionsInBatch: [...new Set(messages.map((x) => x.partition))],
assignedPartitions: this.assignedPartitions,
if (debugEnabled) {
logger.debug('🔄', 'processing_session_recording', {
partition: parsedMessage.metadata.partition,
offset: parsedMessage.metadata.offset,
distinct_id: parsedMessage.distinct_id,
session_id: parsedMessage.session_id,
raw_size: parsedMessage.metadata.rawSize,
})
}

await runInstrumentedFunction({
statsKey: `recordingingesterv2.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => this.processBatchMessages(messages, context),
})
const { partition } = parsedMessage.metadata
const isDebug = this.isDebugLoggingEnabled(partition)
if (isDebug) {
logger.info('🔁', '[blob_ingester_consumer_v2] - [PARTITION DEBUG] - consuming event', {
...parsedMessage.metadata,
team_id: team.teamId,
session_id: parsedMessage.session_id,
})
}

this.metrics.observeSessionInfo(parsedMessage.metadata.rawSize)
batch.record(message)
}

public async start(): Promise<void> {
Expand Down Expand Up @@ -233,38 +289,6 @@ export class SessionRecordingIngester {
return this.assignedTopicPartitions.map((x) => x.partition)
}

private async consume(messageWithTeam: MessageWithTeam): Promise<void> {
// we have to reset this counter once we're consuming messages since then we know we're not re-balancing
// otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever
this.metrics.resetSessionsRevoked()
const { team, message } = messageWithTeam
const debugEnabled = this.isDebugLoggingEnabled(message.metadata.partition)

if (debugEnabled) {
logger.debug('🔄', 'processing_session_recording', {
partition: message.metadata.partition,
offset: message.metadata.offset,
distinct_id: message.distinct_id,
session_id: message.session_id,
raw_size: message.metadata.rawSize,
})
}

const { partition } = message.metadata
const isDebug = this.isDebugLoggingEnabled(partition)
if (isDebug) {
logger.info('🔁', '[blob_ingester_consumer_v2] - [PARTITION DEBUG] - consuming event', {
...message.metadata,
team_id: team.teamId,
session_id: message.session_id,
})
}

this.metrics.observeSessionInfo(message.metadata.rawSize)

return Promise.resolve()
}

private async onRevokePartitions(topicPartitions: TopicPartition[]): Promise<void> {
/**
* The revoke_partitions indicates that the consumer group has had partitions revoked.
Expand All @@ -277,7 +301,6 @@ export class SessionRecordingIngester {
}

this.metrics.resetSessionsHandled()

return Promise.resolve()
await this.sessionBatchManager.discardPartitions(revokedPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ import { ParsedMessageData } from './types'
const GZIP_HEADER = Uint8Array.from([0x1f, 0x8b, 0x08, 0x00])
const decompressWithGzip = promisify(gunzip)

export class KafkaParser {
export class KafkaMessageParser {
constructor(private readonly metrics: KafkaMetrics) {}

public async parseMessage(message: Message): Promise<ParsedMessageData | null> {
public async parseBatch(messages: Message[]): Promise<ParsedMessageData[]> {
const parsedMessages = await Promise.all(messages.map((message) => this.parseMessage(message)))
return parsedMessages.filter((msg) => msg !== null) as ParsedMessageData[]
}

private async parseMessage(message: Message): Promise<ParsedMessageData | null> {
const dropMessage = (reason: string, extra?: Record<string, any>) => {
this.metrics.incrementMessageDropped('session_recordings_blob_ingestion', reason)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { TopicPartitionOffset } from 'node-rdkafka'

import { SessionBatchRecorderInterface } from '../sessions/session-batch-recorder'
import { MessageWithTeam } from '../teams/types'

interface PartitionOffset {
partition: number
offset: number
}

type CommitOffsetsCallback = (offsets: TopicPartitionOffset[]) => Promise<void>

class OffsetTrackingSessionBatchRecorderWrapper implements SessionBatchRecorderInterface {
constructor(
private readonly recorder: SessionBatchRecorderInterface,
private readonly offsetManager: KafkaOffsetManager
) {}

public record(message: MessageWithTeam): number {
const bytesWritten = this.recorder.record(message)
this.offsetManager.trackOffset(message.message.metadata)
return bytesWritten
}

public async flush(): Promise<void> {
await this.recorder.flush()
}

public discardPartition(partition: number): void {
this.recorder.discardPartition(partition)
this.offsetManager.discardPartition(partition)
}

public get size(): number {
return this.recorder.size
}
}

export class KafkaOffsetManager {
private partitionOffsets: Map<number, number> = new Map()

constructor(private readonly commitOffsets: CommitOffsetsCallback, private readonly topic: string) {}

public wrapBatch(recorder: SessionBatchRecorderInterface): SessionBatchRecorderInterface {
return new OffsetTrackingSessionBatchRecorderWrapper(recorder, this)
}

public trackOffset({ partition, offset }: PartitionOffset): void {
// We track the next offset to process
this.partitionOffsets.set(partition, offset + 1)
}

public discardPartition(partition: number): void {
this.partitionOffsets.delete(partition)
}

public async commit(): Promise<void> {
const topicPartitionOffsets: TopicPartitionOffset[] = []

for (const [partition, offset] of this.partitionOffsets.entries()) {
topicPartitionOffsets.push({
topic: this.topic,
partition,
offset,
})
}

if (topicPartitionOffsets.length > 0) {
await this.commitOffsets(topicPartitionOffsets)
this.partitionOffsets.clear()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { PassThrough } from 'stream'

import { SessionBatchWriter, StreamWithFinish } from './session-batch-recorder'

export class BlackholeSessionBatchWriter implements SessionBatchWriter {
public async open(): Promise<StreamWithFinish> {
return Promise.resolve({
stream: new PassThrough(),
finish: async () => Promise.resolve(),
})
}
}
Loading

0 comments on commit 5f172d6

Please sign in to comment.