diff --git a/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java b/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java index 1dc44ec35..62c748291 100644 --- a/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java +++ b/priam/src/main/java/com/netflix/priam/backup/DirectorySize.java @@ -2,9 +2,10 @@ import com.google.inject.ImplementedBy; -/** estimates the number of bytes remaining to upload in a snapshot */ -@ImplementedBy(SnapshotDirectorySize.class) +/** estimates the number of bytes and files remaining to upload in a snapshot/backup */ public interface DirectorySize { - /** return the total bytes of all snapshot files south of location in the filesystem */ + /** return the total bytes of all snapshot/backup files south of location in the filesystem */ long getBytes(String location); + /** return the total files of all snapshot/backup files south of location in the filesystem */ + int getFiles(String location); } diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java index c757db36e..b8ab963a5 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java @@ -27,8 +27,10 @@ import com.netflix.priam.scheduler.SimpleTimer; import com.netflix.priam.scheduler.TaskTimer; import java.io.File; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Set; +import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.commons.io.FileUtils; diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackupDirectorySize.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackupDirectorySize.java new file mode 100644 index 000000000..e1fb7d87e --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackupDirectorySize.java @@ -0,0 +1,66 @@ +package com.netflix.priam.backup; + +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; + +/** Estimates remaining bytes or files to upload in a backup by looking at the file system */ +public class IncrementalBackupDirectorySize implements DirectorySize { + + public long getBytes(String location) { + SummingFileVisitor fileVisitor = new SummingFileVisitor(); + try { + Files.walkFileTree(Paths.get(location), fileVisitor); + } catch (IOException e) { + // BackupFileVisitor is happy with an estimate and won't produce these in practice. + } + return fileVisitor.getTotalBytes(); + } + + public int getFiles(String location) { + SummingFileVisitor fileVisitor = new SummingFileVisitor(); + try { + Files.walkFileTree(Paths.get(location), fileVisitor); + } catch (IOException e) { + // BackupFileVisitor is happy with an estimate and won't produce these in practice. + } + return fileVisitor.getTotalFiles(); + } + + private static final class SummingFileVisitor implements FileVisitor { + private long totalBytes; + private int totalFiles; + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (file.toString().contains(AbstractBackup.INCREMENTAL_BACKUP_FOLDER) && attrs.isRegularFile()) { + totalBytes += attrs.size(); + totalFiles += 1; + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { + return FileVisitResult.CONTINUE; + } + + long getTotalBytes() { + return totalBytes; + } + + int getTotalFiles() { + return totalFiles; + } + } +} diff --git a/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java b/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java index ab77cfe4a..e38a7d795 100644 --- a/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java +++ b/priam/src/main/java/com/netflix/priam/backup/SnapshotDirectorySize.java @@ -4,7 +4,7 @@ import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; -/** Estimates remaining bytes to upload in a backup by looking at the file system */ +/** Estimates remaining bytes or files to upload in a backup by looking at the file system */ public class SnapshotDirectorySize implements DirectorySize { public long getBytes(String location) { @@ -17,8 +17,19 @@ public long getBytes(String location) { return fileVisitor.getTotalBytes(); } + public int getFiles(String location) { + SummingFileVisitor fileVisitor = new SummingFileVisitor(); + try { + Files.walkFileTree(Paths.get(location), fileVisitor); + } catch (IOException e) { + // BackupFileVisitor is happy with an estimate and won't produce these in practice. + } + return fileVisitor.getTotalFiles(); + } + private static final class SummingFileVisitor implements FileVisitor { private long totalBytes; + private int totalFiles; @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { @@ -29,6 +40,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { if (file.toString().contains(AbstractBackup.SNAPSHOT_FOLDER) && attrs.isRegularFile()) { totalBytes += attrs.size(); + totalFiles += 1; } return FileVisitResult.CONTINUE; } @@ -46,5 +58,9 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) { long getTotalBytes() { return totalBytes; } + + int getTotalFiles() { + return totalFiles; + } } } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java index 846574d8a..bf8d2995e 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupV2Service.java @@ -17,6 +17,7 @@ package com.netflix.priam.backupv2; +import com.netflix.priam.backup.DirectorySize; import com.netflix.priam.backup.IncrementalBackup; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; @@ -25,8 +26,13 @@ import com.netflix.priam.scheduler.PriamScheduler; import com.netflix.priam.scheduler.TaskTimer; import com.netflix.priam.tuner.CassandraTunerService; +import java.util.HashMap; +import java.util.Map; import javax.inject.Inject; import org.apache.commons.lang3.math.Fraction; +import com.netflix.priam.backup.SnapshotDirectorySize; +import com.netflix.priam.backup.IncrementalBackupDirectorySize; + /** * Encapsulate the backup service 2.0 - Execute all the tasks required to run backup service. @@ -39,6 +45,8 @@ public class BackupV2Service implements IService { private final SnapshotMetaTask snapshotMetaTask; private final CassandraTunerService cassandraTunerService; private final ITokenRetriever tokenRetriever; + private final DirectorySize snapshotDirectorySize = new SnapshotDirectorySize(); + private final DirectorySize incrementalBackupDirectorySize = new IncrementalBackupDirectorySize(); @Inject public BackupV2Service( @@ -101,4 +109,11 @@ public void updateServicePre() throws Exception { @Override public void updateServicePost() throws Exception {} + + public Map countPendingBackupFiles() throws Exception { + Map backupFiles = new HashMap(); + backupFiles.put("totalFiles", (snapshotDirectorySize.getFiles(configuration.getDataFileLocation()) + + incrementalBackupDirectorySize.getFiles(configuration.getDataFileLocation()))); + return backupFiles; + } } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index 10afa1cec..09f4768dd 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -48,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; diff --git a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java index 70ea8bfd8..a6aea683d 100644 --- a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java +++ b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java @@ -17,6 +17,8 @@ package com.netflix.priam.resources; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.priam.PriamServer; import com.netflix.priam.backup.*; import com.netflix.priam.backupv2.BackupTTLTask; import com.netflix.priam.backupv2.BackupV2Service; @@ -29,7 +31,9 @@ import com.netflix.priam.utils.GsonJsonSerializer; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import javax.inject.Inject; @@ -55,6 +59,8 @@ public class BackupServletV2 { private final Provider pathProvider; private final BackupV2Service backupService; private final BackupNotificationMgr backupNotificationMgr; + private final PriamServer priamServer; + private static final String REST_SUCCESS = "[\"ok\"]"; @Inject @@ -68,7 +74,8 @@ public BackupServletV2( @Named("v2") IMetaProxy metaV2Proxy, Provider pathProvider, BackupV2Service backupService, - BackupNotificationMgr backupNotificationMgr) { + BackupNotificationMgr backupNotificationMgr, + PriamServer priamServer) { this.backupStatusMgr = backupStatusMgr; this.backupVerification = backupVerification; this.snapshotMetaService = snapshotMetaService; @@ -78,6 +85,7 @@ public BackupServletV2( this.pathProvider = pathProvider; this.backupService = backupService; this.backupNotificationMgr = backupNotificationMgr; + this.priamServer = priamServer; } @GET @@ -175,4 +183,26 @@ public Response list(@PathParam("daterange") String daterange) throws Exception files.stream().map(AbstractBackupPath::getRemotePath).collect(Collectors.toList()); return Response.ok(GsonJsonSerializer.getGson().toJson(remotePaths)).build(); } + + @GET + @Path("/state/{hours}") + public Response backupState(@PathParam("hours") int hours) throws Exception { + Map responseMap = new HashMap<>(); + + responseMap.put("tasksQueued", fs.getUploadTasksQueued()); + responseMap.put("queueSize", priamServer.getConfiguration().getBackupQueueSize()); + for (Map.Entry entry : + backupService.countPendingBackupFiles().entrySet()) { + responseMap.put(entry.getKey(), entry.getValue()); + } + + List latestBackupMetadata = + backupStatusMgr.getLatestBackupMetadata( + new DateRange(Instant.now().minus(hours, ChronoUnit.HOURS), Instant.now())); + responseMap.put("latestBackupMetadata", latestBackupMetadata); + + ObjectMapper mapper = new ObjectMapper(); + String jsonResponse = mapper.writeValueAsString(responseMap); + return Response.ok(jsonResponse).build(); + } } diff --git a/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java b/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java index 43318a4a2..9658f5e16 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestBackupDynamicRateLimiter.java @@ -20,7 +20,8 @@ public class TestBackupDynamicRateLimiter { private static final Instant NOW = Instant.ofEpochMilli(1 << 16); private static final Instant LATER = NOW.plusMillis(Duration.ofHours(1).toMillis()); - private static final int DIR_SIZE = 1 << 16; + private static final int DIR_SIZE_BYTES = 1 << 16; + private static final int DIR_SIZE_FILES = 10; private BackupDynamicRateLimiter rateLimiter; private FakeConfiguration config; @@ -34,7 +35,7 @@ public void setUp() { @Test public void sunnyDay() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 21); Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtLeast(1_000); Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(2_000); @@ -42,14 +43,14 @@ public void sunnyDay() { @Test public void targetSetToEpoch() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); Stopwatch timer = timePermitAcquisition(getBackupPath(), Instant.EPOCH, 20); assertNoRateLimiting(timer); } @Test public void pathIsNotASnapshot() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); AbstractBackupPath path = getBackupPath( "target/data/Keyspace1/Standard1/backups/Keyspace1-Standard1-ia-4-Data.db"); @@ -59,14 +60,14 @@ public void pathIsNotASnapshot() { @Test public void targetIsNow() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); Stopwatch timer = timePermitAcquisition(getBackupPath(), NOW, 20); assertNoRateLimiting(timer); } @Test public void targetIsInThePast() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); Instant target = NOW.minus(Duration.ofHours(1L)); Stopwatch timer = timePermitAcquisition(getBackupPath(), target, 20); assertNoRateLimiting(timer); @@ -74,32 +75,32 @@ public void targetIsInThePast() { @Test public void noBackupThreads() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20)); } @Test public void negativeBackupThreads() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20)); } @Test public void noData() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0, 0); Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 20); assertNoRateLimiting(timer); } @Test public void noPermitsRequested() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, 0)); } @Test public void negativePermitsRequested() { - rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE); + rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES); assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, -1)); } @@ -123,12 +124,12 @@ private Stopwatch timePermitAcquisition(AbstractBackupPath path, Instant now, in } private BackupDynamicRateLimiter getRateLimiter( - Map properties, Instant now, long directorySize) { + Map properties, Instant now, long directorySizeBytes, int directorySizeFiles) { properties.forEach(config::setFakeConfig); return new BackupDynamicRateLimiter( config, Clock.fixed(now, ZoneId.systemDefault()), - new FakeDirectorySize(directorySize)); + new FakeDirectorySize(directorySizeBytes, directorySizeFiles)); } private void assertNoRateLimiting(Stopwatch timer) { @@ -155,14 +156,21 @@ private void assertIllegalArgument(Runnable method) { private static final class FakeDirectorySize implements DirectorySize { private final long size; + private final int fileCount; - FakeDirectorySize(long size) { + FakeDirectorySize(long size, int fileCount) { this.size = size; + this.fileCount = fileCount; } @Override public long getBytes(String location) { return size; } + + @Override + public int getFiles(String location) { + return fileCount; + } } }