Skip to content

Commit

Permalink
Add deterministic upload mode
Browse files Browse the repository at this point in the history
This is to avoid #600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
  • Loading branch information
glasser committed Mar 1, 2019
1 parent 0c96000 commit 19dfa6f
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 18 deletions.
21 changes: 21 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,27 @@ secor.upload.minute_mark=0
# appropriate grace period to allow a full upload before a forced termination.
secor.upload.on.shutdown=false

# If true, uploads are entirely deterministic, which can avoid some race conditions
# which can lead to messages being backed up multiple times. This is incompatible with
# secor.upload.on.shutdown=true, and ignores the values of secor.max.file.size.bytes,
# secor.max.file.age.seconds, and secor.upload.minute_mark.
#
# In deterministic mode, you must set one or both of secor.max.file.timestamp.range.millis and
# secor.max.input.payload.size.bytes. These determine when to upload, and are ignored outside
# of deterministic mode.
secor.upload.deterministic=false

# If this is set, upload files when the range between earliest and latest
# timestamp in messages read reaches this value. You must set kafka.useTimestamp
# to true and you must use a secor.file.reader.writer.factory which supports
# timestamps. This is ignored outside of deterministic mode.
secor.max.file.timestamp.range.millis=0

# If this is set, upload files when the total size of *input* payloads (not
# output log files) reaches this value. This is ignored outside of deterministic
# mode.
secor.max.input.payload.size.bytes=0

# File age per topic and per partition is checked against secor.max.file.age.seconds by looking at
# the youngest file when true or at the oldest file when false. Setting it to true ensures that files
# are uploaded when data stops comming and sized based policy cannot trigger. Setting it to false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.pinterest.secor.common;

import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.message.Message;

import java.util.HashMap;
import java.util.Map;

/**
* DeterministicUploadPolicyTracker stores the range of timestamps seen so far for a given TopicPartition.
* It lets us implement a "time-based" upload policy that is still deterministic.
*/
public class DeterministicUploadPolicyTracker
{
private final long mMaxFileTimestampRangeMillis;
private final long mMaxInputPayloadSizeBytes;

private static class TopicPartitionStats {
long earliestTimestamp;
long latestTimestamp;
long totalInputPayloadSizeBytes;
TopicPartitionStats(long earliestTimestamp, long latestTimestamp, long totalInputPayloadSizeBytes) {
this.earliestTimestamp = earliestTimestamp;
this.latestTimestamp = latestTimestamp;
this.totalInputPayloadSizeBytes = totalInputPayloadSizeBytes;
}
}

private Map<TopicPartition, TopicPartitionStats> mStats;

public DeterministicUploadPolicyTracker(long maxFileTimestampRangeMillis, long maxInputPayloadSizeBytes)
{
if (maxFileTimestampRangeMillis > 0) {
mMaxFileTimestampRangeMillis = maxFileTimestampRangeMillis;
} else {
mMaxFileTimestampRangeMillis = Long.MAX_VALUE;
}
if (maxInputPayloadSizeBytes > 0) {
mMaxInputPayloadSizeBytes = maxInputPayloadSizeBytes;
} else {
mMaxInputPayloadSizeBytes = Long.MAX_VALUE;
}
if (mMaxFileTimestampRangeMillis == Long.MAX_VALUE && mMaxInputPayloadSizeBytes == Long.MAX_VALUE) {
throw new RuntimeException("When secor.upload.deterministic is true, you must set either " +
"secor.max.file.timestamp.range.millis or secor.max.input.payload.size.bytes");
}
mStats = new HashMap<>();
}

public void track(Message message) {
track(new TopicPartition(message.getTopic(), message.getKafkaPartition()),
message.getTimestamp(),
message.getPayload().length);
}

public void track(TopicPartition topicPartition, KeyValue kv) {
track(topicPartition, kv.getTimestamp(), kv.getValue().length);
}

private void track(TopicPartition topicPartition, long timestamp, long inputPayloadSizeBytes) {
if (timestamp <= 0 && mMaxFileTimestampRangeMillis != Long.MAX_VALUE) {
throw new RuntimeException("Message without timestamp incompatible with secor.max.file.timestamp.range.millis");
}
final TopicPartitionStats stats = mStats.get(topicPartition);
if (stats == null) {
mStats.put(topicPartition, new TopicPartitionStats(timestamp, timestamp, inputPayloadSizeBytes));
} else {
stats.earliestTimestamp = Math.min(stats.earliestTimestamp, timestamp);
stats.latestTimestamp = Math.max(stats.latestTimestamp, timestamp);
stats.totalInputPayloadSizeBytes += inputPayloadSizeBytes;
}

}

public void reset(TopicPartition topicPartition)
{
mStats.remove(topicPartition);
}

public boolean shouldUpload(TopicPartition topicPartition) {
final TopicPartitionStats stats = mStats.get(topicPartition);
if (stats == null) {
// No messages at all: definitely not ready for upload.
return false;
}
return stats.totalInputPayloadSizeBytes >= mMaxInputPayloadSizeBytes
|| (stats.latestTimestamp - stats.earliestTimestamp) >= mMaxFileTimestampRangeMillis;
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ public boolean getUploadOnShutdown() {
return getBoolean("secor.upload.on.shutdown");
}

public boolean getDeterministicUpload() {
return getBoolean("secor.upload.deterministic");
}

public long getMaxFileTimestampRangeMillis() {
return getLong("secor.max.file.timestamp.range.millis");
}

public long getMaxInputPayloadSizeBytes() {
return getLong("secor.max.input.payload.size.bytes");
}

public boolean getFileAgeYoungest() {
return getBoolean("secor.file.age.youngest");
}
Expand Down
29 changes: 24 additions & 5 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.pinterest.secor.consumer;

import com.pinterest.secor.common.DeterministicUploadPolicyTracker;
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class Consumer extends Thread {
protected MessageWriter mMessageWriter;
protected MessageParser mMessageParser;
protected OffsetTracker mOffsetTracker;
private DeterministicUploadPolicyTracker mDeterministicUploadPolicyTracker;
protected MessageTransformer mMessageTransformer;
protected Uploader mUploader;
// TODO(pawel): we should keep a count per topic partition.
Expand All @@ -73,21 +75,32 @@ public Consumer(SecorConfig config) {

private void init() throws Exception {
mOffsetTracker = new OffsetTracker();
if (mConfig.getDeterministicUpload()) {
mDeterministicUploadPolicyTracker = new DeterministicUploadPolicyTracker(
mConfig.getMaxFileTimestampRangeMillis(), mConfig.getMaxInputPayloadSizeBytes()
);
} else {
mDeterministicUploadPolicyTracker = null;
}
mMessageReader = new MessageReader(mConfig, mOffsetTracker);
mMetricCollector = ReflectionUtil.createMetricCollector(mConfig.getMetricsCollectorClass());

FileRegistry fileRegistry = new FileRegistry(mConfig);
UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig);

mUploader = ReflectionUtil.createUploader(mConfig.getUploaderClass());
mUploader.init(mConfig, mOffsetTracker, fileRegistry, uploadManager, mMessageReader, mMetricCollector);
mMessageWriter = new MessageWriter(mConfig, mOffsetTracker, fileRegistry);
mUploader.init(mConfig, mOffsetTracker, fileRegistry, uploadManager, mMessageReader, mMetricCollector,
mDeterministicUploadPolicyTracker);
mMessageWriter = new MessageWriter(mConfig, mOffsetTracker, fileRegistry, mDeterministicUploadPolicyTracker);
mMessageParser = ReflectionUtil.createMessageParser(mConfig.getMessageParserClass(), mConfig);
mMessageTransformer = ReflectionUtil.createMessageTransformer(mConfig.getMessageTransformerClass(), mConfig);
mUnparsableMessages = 0.;

mUploadOnShutdown = mConfig.getUploadOnShutdown();
if (mUploadOnShutdown) {
if (mDeterministicUploadPolicyTracker != null) {
throw new RuntimeException("Can't set secor.upload.on.shutdown with secor.upload.deterministic!");
}
Runtime.getRuntime().addShutdownHook(this.new FinalUploadShutdownHook());
}
}
Expand Down Expand Up @@ -143,14 +156,17 @@ public void run() {
}

long now = System.currentTimeMillis();
if (nMessages++ % checkMessagesPerSecond == 0 ||
if (mDeterministicUploadPolicyTracker != null ||
nMessages++ % checkMessagesPerSecond == 0 ||
(now - lastChecked) > checkEveryNSeconds * 1000) {
lastChecked = now;
checkUploadPolicy(false);
}
}
LOG.info("Done reading messages; uploading what we have");
checkUploadPolicy(true);
if (mDeterministicUploadPolicyTracker == null) {
LOG.info("Done reading messages; uploading what we have");
checkUploadPolicy(true);
}
LOG.info("Consumer thread done");
} catch (Throwable t) {
LOG.error("Thread failed", t);
Expand Down Expand Up @@ -224,6 +240,9 @@ protected boolean consumeNextMessage() {
}
throw new RuntimeException("Failed to write message " + parsedMessage.toTruncatedString(), e);
}
if (mDeterministicUploadPolicyTracker != null) {
mDeterministicUploadPolicyTracker.track(rawMessage);
}
}
}
return true;
Expand Down
58 changes: 47 additions & 11 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.pinterest.secor.uploader;

import com.google.common.base.Joiner;
import com.pinterest.secor.common.DeterministicUploadPolicyTracker;
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class Uploader {
protected ZookeeperConnector mZookeeperConnector;
protected UploadManager mUploadManager;
protected MessageReader mMessageReader;
private DeterministicUploadPolicyTracker mDeterministicUploadPolicyTracker;
protected String mTopicFilter;

private boolean isOffsetsStorageKafka = false;
Expand All @@ -75,15 +77,17 @@ public class Uploader {
* @param metricCollector component that ingest metrics into monitoring system
*/
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager, MessageReader messageReader, MetricCollector metricCollector) {
UploadManager uploadManager, MessageReader messageReader, MetricCollector metricCollector,
DeterministicUploadPolicyTracker deterministicUploadPolicyTracker) {
init(config, offsetTracker, fileRegistry, uploadManager, messageReader,
new ZookeeperConnector(config), metricCollector);
new ZookeeperConnector(config), metricCollector, deterministicUploadPolicyTracker);
}

// For testing use only.
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager, MessageReader messageReader,
ZookeeperConnector zookeeperConnector, MetricCollector metricCollector) {
ZookeeperConnector zookeeperConnector, MetricCollector metricCollector,
DeterministicUploadPolicyTracker deterministicUploadPolicyTracker) {
mConfig = config;
mOffsetTracker = offsetTracker;
mFileRegistry = fileRegistry;
Expand All @@ -92,6 +96,7 @@ public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry f
mZookeeperConnector = zookeeperConnector;
mTopicFilter = mConfig.getKafkaTopicUploadAtMinuteMarkFilter();
mMetricCollector = metricCollector;
mDeterministicUploadPolicyTracker = deterministicUploadPolicyTracker;
if (mConfig.getOffsetsStorage().equals(SecorConstants.KAFKA_OFFSETS_STORAGE_KAFKA)) {
isOffsetsStorageKafka = true;
}
Expand Down Expand Up @@ -128,12 +133,19 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception {
uploadHandle.get();
}
mFileRegistry.deleteTopicPartition(topicPartition);
if (mDeterministicUploadPolicyTracker != null) {
mDeterministicUploadPolicyTracker.reset(topicPartition);
}
mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
if (isOffsetsStorageKafka) {
mMessageReader.commit(topicPartition, lastSeenOffset + 1);
}
mMetricCollector.increment("uploader.file_uploads.count", paths.size(), topicPartition.getTopic());
} else {
LOG.warn("Zookeeper committed offset didn't match for topic {} partition {}: {} vs {}",
topicPartition.getTopic(), topicPartition.getTopic(), zookeeperCommittedOffsetCount,
committedOffsetCount);
}
} finally {
mZookeeperConnector.unlock(lockPath);
Expand All @@ -157,8 +169,12 @@ protected FileReader createReader(LogFilePath srcPath, CompressionCodec codec) t
}

private void trim(LogFilePath srcPath, long startOffset) throws Exception {
final TopicPartition topicPartition = new TopicPartition(srcPath.getTopic(), srcPath.getKafkaPartition());
if (startOffset == srcPath.getOffset()) {
return;
// If *all* the files had the right offset already, trimFiles would have returned
// before resetting the tracker. If just some do, we don't want to rewrite files in place
// (it's probably safe but let's not stress it), but this shouldn't happen anyway.
throw new RuntimeException("Some LogFilePath has unchanged offset, but they don't all? " + srcPath);
}
FileReader reader = null;
FileWriter writer = null;
Expand Down Expand Up @@ -188,6 +204,9 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception {
codec);
}
writer.write(keyVal);
if (mDeterministicUploadPolicyTracker != null) {
mDeterministicUploadPolicyTracker.track(topicPartition, keyVal);
}
copiedMessages++;
}
}
Expand All @@ -207,6 +226,14 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception {

protected void trimFiles(TopicPartition topicPartition, long startOffset) throws Exception {
Collection<LogFilePath> paths = mFileRegistry.getPaths(topicPartition);
if (paths.stream().allMatch(srcPath -> srcPath.getOffset() == startOffset)) {
// We thought we needed to trim, but we were wrong: we already had started at the right offset.
// (Probably because we don't initialize the offset from ZK on startup.)
return;
}
if (mDeterministicUploadPolicyTracker != null) {
mDeterministicUploadPolicyTracker.reset(topicPartition);
}
for (LogFilePath path : paths) {
trim(path, startOffset);
}
Expand All @@ -232,13 +259,19 @@ private boolean isRequiredToUploadAtTime(TopicPartition topicPartition) throws E
}

protected void checkTopicPartition(TopicPartition topicPartition, boolean forceUpload) throws Exception {
final long size = mFileRegistry.getSize(topicPartition);
final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition);
LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
if (forceUpload ||
size >= mConfig.getMaxFileSizeBytes() ||
modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
isRequiredToUploadAtTime(topicPartition)) {
boolean shouldUpload;
if (mDeterministicUploadPolicyTracker != null) {
shouldUpload = mDeterministicUploadPolicyTracker.shouldUpload(topicPartition);
} else {
final long size = mFileRegistry.getSize(topicPartition);
final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition);
LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
shouldUpload = forceUpload ||
size >= mConfig.getMaxFileSizeBytes() ||
modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
isRequiredToUploadAtTime(topicPartition);
}
if (shouldUpload) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
newOffsetCount);
Expand All @@ -252,6 +285,9 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU
// There was a rebalancing event and someone committed an offset beyond that of the
// current message. We need to delete the local file.
mFileRegistry.deleteTopicPartition(topicPartition);
if (mDeterministicUploadPolicyTracker != null) {
mDeterministicUploadPolicyTracker.reset(topicPartition);
}
} else { // oldOffsetCount < newOffsetCount <= lastSeenOffset
LOG.debug("previous committed offset count {} is lower than committed offset {} is lower than or equal to last seen offset {}. " +
"Trimming files in topic {} partition {}",
Expand Down
Loading

0 comments on commit 19dfa6f

Please sign in to comment.