Skip to content

Commit

Permalink
Add support to upload last seen offset onto ZK to solve the determini…
Browse files Browse the repository at this point in the history
…stic upload problem mentioned in #600

When there was a failed attempt uploading for this topic partition, we might end up on S3 with:
                    //     s3n://topic/partition/day/hour=0/offset1
                    //     s3n://topic/partition/day/hour=1/offset1
If this attempty eventually failed and we resume the processing on another node, we might end up with the following upload if the upload was triggered too early because time based upload policy:
                    // might have less files to upload, e.g.
                    //     localfs://topic/partition/day/hour=0/offset1
                    // If we continue uploading, we will upload this file:
                    //     s3n://topic/partition/day/hour=0/offset1
                    // But the next file to be uploaded will become:
                    //     s3n://topic/partition/day/hour=1/offset2
                    // So we will end up with 2 different files for hour=1/

We should wait a bit longer to have at least getting to the same offset as ZK's

Added config property secor.upload.last.seen.offset to support the following upload sequence:

1. acquire ZK lock for the current topic/partition
--- new ---> 1.1 check whether the in-memory lastSeenOffset >= ZK's lastSeenOffset, if not skip the rest;
--- new ---> 1.2 persist the in-memory lastSeenOffset onto ZK lastSeenOffsetPath
2. upload a list of files for the current topic/partition
3. persist the current offset to ZK committedOffset path
4. release ZK lock
  • Loading branch information
Henry Cai committed May 26, 2020
1 parent cae002d commit 31e7f20
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ secor.upload.minute_mark=0
# appropriate grace period to allow a full upload before a forced termination.
secor.upload.on.shutdown=false

# If true, uploads lastSeen offset onto ZK, see https://github.com/pinterest/secor/issues/600 for details
secor.upload.secor.upload.last.seen.offset=false

# If true, uploads are entirely deterministic, which can avoid some race conditions
# which can lead to messages being backed up multiple times. This is incompatible with
# secor.upload.on.shutdown=true, and ignores the values of secor.max.file.size.bytes,
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ public boolean getUploadOnShutdown() {
return getBoolean("secor.upload.on.shutdown");
}

public boolean getUploadLastSeenOffset() {
return getBoolean("secor.upload.last.seen.offset", false);
}

public boolean getDeterministicUpload() {
return getBoolean("secor.upload.deterministic");
}
Expand Down
105 changes: 98 additions & 7 deletions src/main/java/com/pinterest/secor/common/ZookeeperConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class ZookeeperConnector implements Closeable {
private CuratorFramework mCurator;
private HashMap<String, InterProcessMutex> mLocks;
private String mCommittedOffsetGroupPath;
private String mLastSeenOffsetGroupPath;

protected ZookeeperConnector() {
}
Expand Down Expand Up @@ -116,29 +117,52 @@ public void unlock(String lockPath) {
mLocks.remove(lockPath);
}

protected String getOffsetGroupPath(String subPath) {
String stripped = StringUtils.strip(mConfig.getKafkaZookeeperPath(), "/");
String path = Joiner.on("/").skipNulls().join(
"",
stripped.equals("") ? null : stripped,
"consumers",
mConfig.getKafkaGroup(),
subPath
);
return path;
}

protected String getCommittedOffsetGroupPath() {
if (Strings.isNullOrEmpty(mCommittedOffsetGroupPath)) {
String stripped = StringUtils.strip(mConfig.getKafkaZookeeperPath(), "/");
mCommittedOffsetGroupPath = Joiner.on("/").skipNulls().join(
"",
stripped.equals("") ? null : stripped,
"consumers",
mConfig.getKafkaGroup(),
"offsets"
);
mCommittedOffsetGroupPath = getOffsetGroupPath("offsets");
}
return mCommittedOffsetGroupPath;
}

protected String getLastSeenOffsetGroupPath() {
if (Strings.isNullOrEmpty(mLastSeenOffsetGroupPath)) {
String stripped = StringUtils.strip(mConfig.getKafkaZookeeperPath(), "/");
mLastSeenOffsetGroupPath = getOffsetGroupPath("lastSeen");
}
return mLastSeenOffsetGroupPath;
}

private String getCommittedOffsetTopicPath(String topic) {
return getCommittedOffsetGroupPath() + "/" + topic;
}

private String getLastSeenOffsetTopicPath(String topic) {
return getLastSeenOffsetGroupPath() + "/" + topic;
}

private String getCommittedOffsetPartitionPath(TopicPartition topicPartition) {
return getCommittedOffsetTopicPath(topicPartition.getTopic()) + "/" +
topicPartition.getPartition();
}

private String getLastSeenOffsetPartitionPath(TopicPartition topicPartition) {
return getLastSeenOffsetTopicPath(topicPartition.getTopic()) + "/" +
topicPartition.getPartition();
}

public long getCommittedOffsetCount(TopicPartition topicPartition) throws Exception {
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
try {
Expand All @@ -150,6 +174,17 @@ public long getCommittedOffsetCount(TopicPartition topicPartition) throws Except
}
}

public long getLastSeenOffsetCount(TopicPartition topicPartition) throws Exception {
String offsetPath = getLastSeenOffsetPartitionPath(topicPartition);
try {
byte[] data = mCurator.getData().forPath(offsetPath);
return Long.parseLong(new String(data));
} catch (KeeperException.NoNodeException exception) {
LOG.warn("path {} does not exist in zookeeper", offsetPath);
return -1;
}
}

public List<Integer> getCommittedOffsetPartitions(String topic) throws Exception {
String topicPath = getCommittedOffsetTopicPath(topic);
List<String> partitions = mCurator.getChildren().forPath(topicPath);
Expand All @@ -162,6 +197,18 @@ public List<Integer> getCommittedOffsetPartitions(String topic) throws Exception
return result;
}

public List<Integer> getLastSeenOffsetPartitions(String topic) throws Exception {
String topicPath = getLastSeenOffsetTopicPath(topic);
List<String> partitions = mCurator.getChildren().forPath(topicPath);
LinkedList<Integer> result = new LinkedList<Integer>();
for (String partitionPath : partitions) {
String[] elements = partitionPath.split("/");
String partition = elements[elements.length - 1];
result.add(Integer.valueOf(partition));
}
return result;
}

public List<String> getCommittedOffsetTopics() throws Exception {
String offsetPath = getCommittedOffsetGroupPath();
List<String> topics = mCurator.getChildren().forPath(offsetPath);
Expand All @@ -174,6 +221,18 @@ public List<String> getCommittedOffsetTopics() throws Exception {
return result;
}

public List<String> getLastSeenOffsetTopics() throws Exception {
String offsetPath = getLastSeenOffsetGroupPath();
List<String> topics = mCurator.getChildren().forPath(offsetPath);
LinkedList<String> result = new LinkedList<String>();
for (String topicPath : topics) {
String[] elements = topicPath.split("/");
String topic = elements[elements.length - 1];
result.add(topic);
}
return result;
}

private void createMissingParents(String path) throws Exception {
Stat stat = mCurator.checkExists().forPath(path);
if (stat == null) {
Expand All @@ -200,6 +259,21 @@ public void setCommittedOffsetCount(TopicPartition topicPartition, long count)
}
}

public void setLastSeenOffsetCount(TopicPartition topicPartition, long count)
throws Exception {
String offsetPath = getLastSeenOffsetPartitionPath(topicPartition);
LOG.info("creating missing parents for zookeeper path {}", offsetPath);
createMissingParents(offsetPath);
byte[] data = Long.toString(count).getBytes();
try {
LOG.info("setting zookeeper path {} value {}", offsetPath, count);
// -1 matches any version
mCurator.setData().forPath(offsetPath, data);
} catch (KeeperException.NoNodeException exception) {
LOG.warn("Failed to set value to path " + offsetPath, exception);
}
}

public void deleteCommittedOffsetTopicCount(String topic) throws Exception {
List<Integer> partitions = getCommittedOffsetPartitions(topic);
for (Integer partition : partitions) {
Expand All @@ -210,13 +284,30 @@ public void deleteCommittedOffsetTopicCount(String topic) throws Exception {
}
}

public void deleteLastSeenOffsetTopicCount(String topic) throws Exception {
List<Integer> partitions = getLastSeenOffsetPartitions(topic);
for (Integer partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
String offsetPath = getLastSeenOffsetPartitionPath(topicPartition);
LOG.info("deleting path {}", offsetPath);
mCurator.delete().forPath(offsetPath);
}
}

public void deleteCommittedOffsetPartitionCount(TopicPartition topicPartition)
throws Exception {
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
LOG.info("deleting path {}", offsetPath);
mCurator.delete().forPath(offsetPath);
}

public void deleteLastSeenOffsetPartitionCount(TopicPartition topicPartition)
throws Exception {
String offsetPath = getLastSeenOffsetPartitionPath(topicPartition);
LOG.info("deleting path {}", offsetPath);
mCurator.delete().forPath(offsetPath);
}

protected void setConfig(SecorConfig config) {
this.mConfig = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ public static void main(String[] args) {
((Number) commandLine.getParsedOptionValue("partition")).intValue();
TopicPartition topicPartition = new TopicPartition(topic, partition);
zookeeperConnector.deleteCommittedOffsetPartitionCount(topicPartition);
zookeeperConnector.deleteLastSeenOffsetPartitionCount(topicPartition);
} else {
zookeeperConnector.deleteCommittedOffsetTopicCount(topic);
zookeeperConnector.deleteLastSeenOffsetTopicCount(topic);
}
} catch (Throwable t) {
LOG.error("Zookeeper client failed", t);
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Uploader {
protected UploadManager mUploadManager;
protected MessageReader mMessageReader;
private DeterministicUploadPolicyTracker mDeterministicUploadPolicyTracker;
private boolean mUploadLastSeenOffset;
protected String mTopicFilter;

private boolean isOffsetsStorageKafka = false;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry f
if (mConfig.getOffsetsStorage().equals(SecorConstants.KAFKA_OFFSETS_STORAGE_KAFKA)) {
isOffsetsStorageKafka = true;
}
mUploadLastSeenOffset = mConfig.getUploadLastSeenOffset();
}

protected void uploadFiles(TopicPartition topicPartition) throws Exception {
Expand All @@ -121,6 +123,32 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception {
long zookeeperCommittedOffsetCount = mZookeeperConnector.getCommittedOffsetCount(
topicPartition);
if (zookeeperCommittedOffsetCount == committedOffsetCount) {
if (mUploadLastSeenOffset) {
long zkLastSeenOffset = mZookeeperConnector.getLastSeenOffsetCount(topicPartition);
// If the in-memory lastSeenOffset is less than ZK's, this means there was a failed
// attempt uploading for this topic partition and we already have some files uploaded
// on S3, e.g.
// s3n://topic/partition/day/hour=0/offset1
// s3n://topic/partition/day/hour=1/offset1
// Since our in-memory accumulated files (offsets) is less than what's on ZK, we
// might have less files to upload, e.g.
// localfs://topic/partition/day/hour=0/offset1
// If we continue uploading, we will upload this file:
// s3n://topic/partition/day/hour=0/offset1
// But the next file to be uploaded will become:
// s3n://topic/partition/day/hour=1/offset2
// So we will end up with 2 different files for hour=1/
// We should wait a bit longer to have at least getting to the same offset as ZK's
//
// Note We use offset + 1 throughout secor when we persist to ZK
if (lastSeenOffset + 1 < zkLastSeenOffset) {
LOG.warn("TP {}, ZK lastSeenOffset {}, in-memory lastSeenOffset {}, skip uploading this time",
topicPartition, zkLastSeenOffset, lastSeenOffset + 1);
mMetricCollector.increment("uploader.offset_delays", topicPartition.getTopic());
}
LOG.info("Setting lastSeenOffset for {} with {}", topicPartition, lastSeenOffset + 1);
mZookeeperConnector.setLastSeenOffsetCount(topicPartition, lastSeenOffset + 1);
}
LOG.info("uploading topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition());
// Deleting writers closes their streams flushing all pending data to the disk.
mFileRegistry.deleteWriters(topicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,21 @@ protected void verify(String zookeeperPath, String expectedOffsetPath) {
zookeeperConnector.setConfig(secorConfig);
Assert.assertEquals(expectedOffsetPath, zookeeperConnector.getCommittedOffsetGroupPath());
}

@Test
public void testGetLastSeenOffsetGroupPath() throws Exception {
verifyLastSeen("/", "/consumers/secor_cg/lastSeen");
verifyLastSeen("/chroot", "/chroot/consumers/secor_cg/lastSeen");
verifyLastSeen("/chroot/", "/chroot/consumers/secor_cg/lastSeen");
}

protected void verifyLastSeen(String zookeeperPath, String expectedOffsetPath) {
ZookeeperConnector zookeeperConnector = new ZookeeperConnector();
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.setProperty("kafka.zookeeper.path", zookeeperPath);
properties.setProperty("secor.kafka.group", "secor_cg");
SecorConfig secorConfig = new SecorConfig(properties);
zookeeperConnector.setConfig(secorConfig);
Assert.assertEquals(expectedOffsetPath, zookeeperConnector.getLastSeenOffsetGroupPath());
}
}

0 comments on commit 31e7f20

Please sign in to comment.