Skip to content

Commit

Permalink
fix(backup-restore): fix backup restore jobs usage of system-metadata (
Browse files Browse the repository at this point in the history
  • Loading branch information
Dexter Lee authored Oct 25, 2021
1 parent d5a42e3 commit 87d344f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeContext;
import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -41,7 +42,7 @@ public EbeanAspectBackupIterator getBackupIterator(UpgradeContext context) {

try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path.get())).build();
return new ParquetEbeanAspectBackupIterator(reader);
return new ParquetEbeanAspectBackupIterator(ImmutableList.of(reader));
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to build ParquetReader: %s", e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.hadoop.ParquetReader;

Expand All @@ -14,34 +16,51 @@
* Iterator to retrieve EbeanAspectV2 objects from the ParquetReader
* Converts the avro GenericRecord object into EbeanAspectV2
*/
@Slf4j
@RequiredArgsConstructor
public class ParquetEbeanAspectBackupIterator implements EbeanAspectBackupIterator {
private final ParquetReader<GenericRecord> _parquetReader;
private final List<ParquetReader<GenericRecord>> _parquetReaders;
private int currentReaderIndex = 0;

@Override
public EbeanAspectV2 next() {

if (currentReaderIndex >= _parquetReaders.size()) {
return null;
}
ParquetReader<GenericRecord> parquetReader = _parquetReaders.get(currentReaderIndex);

try {
GenericRecord record = _parquetReader.read();
GenericRecord record = parquetReader.read();
if (record == null) {
return null;
log.info("Record is null, moving to next reader {} {}", currentReaderIndex, _parquetReaders.size());
parquetReader.close();
currentReaderIndex++;
return next();
}
return convertRecord(record);
} catch (IOException e) {
e.printStackTrace();
log.error("Error while reading backed up aspect", e);
return null;
}
}

@Override
public void close() throws IOException {
_parquetReader.close();
public void close() {
_parquetReaders.forEach(reader -> {
try {
reader.close();
} catch (IOException e) {
log.error("Error while closing parquet reader", e);
}
});
}

private EbeanAspectV2 convertRecord(GenericRecord record) {
return new EbeanAspectV2(record.get("urn").toString(), record.get("aspect").toString(),
(Long) record.get("version"), record.get("metadata").toString(),
Timestamp.from(Instant.ofEpochMilli((Long) record.get("createdon") / 1000)), record.get("createdby").toString(),
Optional.ofNullable(record.get("createdfor")).map(Object::toString).orElse(null),
record.get("systemMetadata").toString());
Optional.ofNullable(record.get("systemmetadata")).map(Object::toString).orElse(null));
}
}

0 comments on commit 87d344f

Please sign in to comment.