From d8f28c057e6084ad03f3927dcec3f1e75bd266f3 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 10 Jul 2024 20:45:16 +0000 Subject: [PATCH] Add profiler for request execution details. The usage of the new API will be added in the next PR --- .../bigquery/storage/v1/RequestProfiler.java | 269 ++++++++++++++++++ .../storage/v1/RequestProfilerTest.java | 209 ++++++++++++++ 2 files changed, 478 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java new file mode 100644 index 0000000000..41be3bf9af --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java @@ -0,0 +1,269 @@ +package com.google.cloud.bigquery.storage.v1; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * A profiler that would periodically generate a report for the past period with the latency report for the slowest + * requests. This is used for debugging only. + * + * The report will contain the execution details of the TOP_K slowest requests, one example: + * ``` + * INFO: At system time 1720566109971, in total 2 finished during the last 60000 milliseconds, the top 10 long latency requests details report: + * ----------------------------- + * Request uuid: request_1 with total time 1000 milliseconds + * Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 200 milliseconds + * Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 800 milliseconds + * ----------------------------- + * Request uuid: request_2 with total time 500 milliseconds + * Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds + * Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds + * ``` + */ +public class RequestProfiler { + enum OperationName { + // The total end to end latency for a request. + TOTAL_REQUEST("total_request_time"), + // Json to proto conversion time. + JSON_TO_PROTO_CONVERSION("json_to_proto_conversion"), + // Time spent to fetch the table schema when user didn't provide it. + SCHEMA_FECTCHING("schema_fetching"), + // Time spent within wait queue before it get picked up. + WAIT_QUEUE("wait_queue"), + // Time spent within backend to process the request. + BACKEND_LATENCY("backend_latency"); + private final String operationName; + + OperationName(String operationName) { + this.operationName = operationName; + } + } + + private static final Logger log = Logger.getLogger(RequestProfiler.class.getName()); + + // Singleton for easier access. + public static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler(); + + // Tunable static variable indicate how many top longest latency requests we should consider. + private static int TOP_K = 10; + + // Tunable static variable indicate how often the report should be generated. + private static Duration FLUSH_PERIOD = Duration.ofMinutes(1); + + // From request uuid to the profiler of individual request. This will be cleaned up periodically. + private final Map idToIndividualOperation = + new ConcurrentHashMap<>(); + + private Thread flushThread; + + // Mark an operation for a given request id to be start. + void startOperation(OperationName operationName, String requestUniqueId) { + idToIndividualOperation.putIfAbsent(requestUniqueId, + new IndividualRequestProfiler(requestUniqueId)); + idToIndividualOperation.get(requestUniqueId).startOperation(operationName); + } + + // Mark an operation for a given request id to be end. + void endOperation(OperationName operationName, String requestUniqueId) { + idToIndividualOperation.get(requestUniqueId).endOperation(operationName); + } + + void flushReport() { + log.info(flushAndGenerateReportText()); + } + + void startPeriodicalReportFlushing() { + this.flushThread = + new Thread( + new Runnable() { + @Override + public void run() { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis()); + } catch (InterruptedException e) { + log.warning("Flush report thread is interrupted by " + e.toString()); + throw new RuntimeException(e); + } + flushReport(); + } + } + }); + this.flushThread.start(); + } + + String flushAndGenerateReportText() { + RequestProfilerComparator comparator = new RequestProfilerComparator(); + + // Find the top k requests with the longest latency. + PriorityQueue minHeap = + new PriorityQueue(comparator); + Iterator> iterator = + idToIndividualOperation.entrySet().iterator(); + int finishedRequestCount = 0; + while (iterator.hasNext()) { + Entry individualRequestProfiler = iterator.next(); + if (!individualRequestProfiler.getValue().finalized) { + continue; + } + finishedRequestCount++; + if (minHeap.size() < TOP_K || + individualRequestProfiler.getValue().totalTime > minHeap.peek().totalTime) { + minHeap.add(individualRequestProfiler.getValue()); + } + if (minHeap.size() > TOP_K) { + minHeap.poll(); + } + // Remove during using iterator is safe. + iterator.remove(); + } + + String reportText = String.format("At system time %s, in total %s finished during the " + + "last %s milliseconds, the top %s long latency requests details report:\n", + System.currentTimeMillis(), finishedRequestCount, FLUSH_PERIOD.toMillis(), TOP_K); + if (minHeap.isEmpty()) { + reportText += "-----------------------------\n"; + reportText += "\t0 requests finished during the last period."; + } else { + // Print the report for the top k requests. + ArrayList reportList = new ArrayList<>(); + while (minHeap.size() > 0) { + reportList.add("-----------------------------\n" + minHeap.poll().generateReport()); + } + // Output in reverse order to make sure the longest latency request shows up in front. + for (int i = 0; i < reportList.size(); i++) { + reportText += reportList.get(reportList.size() - i - 1); + } + } + return reportText; + } + + // Min heap comparator + class RequestProfilerComparator implements Comparator { + @Override + public int compare(IndividualRequestProfiler x, IndividualRequestProfiler y) { + if (x.totalTime > y.totalTime) { + return 1; + } else if (x.totalTime < y.totalTime) { + return -1; + } + return 0; + } + } + + /** + * Record the profiling information for each individual request. Act like a buffer of the past + * requests, either finished or not finished. + */ + private static final class IndividualRequestProfiler { + // From operation name to the list of time spent each time we do this operation. + // e.g. some operation is retried two times, resulting in two time recorded in the queue. + private final Map> timeRecorderMap; + + // All current finished operations. + private final List finishedOperations; + + private final String requestUniqueId; + + // TOTAL_REQUEST has been marked as finished for this request. In this state `finalized` will + // be true and totalTime will have non zero value. + private long totalTime; + private boolean finalized; + + IndividualRequestProfiler(String requestUniqueId) { + this.timeRecorderMap = new ConcurrentHashMap<>(); + this.finishedOperations = Collections.synchronizedList(new ArrayList()); + this.requestUniqueId = requestUniqueId; + } + + void startOperation(OperationName operationName) { + timeRecorderMap.putIfAbsent(operationName, new ConcurrentLinkedDeque<>()); + // Please be aware that System.currentTimeMillis() is not accurate in Windows system. + timeRecorderMap.get(operationName).add(System.currentTimeMillis()); + } + + void endOperation(OperationName operationName) { + if (!timeRecorderMap.containsKey(operationName)) { + String warningMessage = String.format("Operation %s ignored for request %s due to " + + "startOperation() is not called before calling endOperation().", + operationName, requestUniqueId); + log.warning(warningMessage); + return; + } + long startTime = timeRecorderMap.get(operationName).poll(); + long endTime = System.currentTimeMillis(); + long totalTime = endTime - startTime; + finishedOperations.add(new IndividualOperation(operationName, + startTime, + endTime, + totalTime)); + if (operationName == OperationName.TOTAL_REQUEST) { + finalized = true; + this.totalTime = totalTime; + } + } + + String generateReport() { + String message = "\tRequest uuid: " + requestUniqueId + " with total time " + + this.totalTime + " milliseconds\n"; + for (int i = 0; i < finishedOperations.size(); i++) { + if (finishedOperations.get(i).operationName == OperationName.TOTAL_REQUEST) { + continue; + } + message += "\t\t"; + message += finishedOperations.get(i).format(); + message += "\n"; + } + return message; + } + + // Record the stats of individual operation. + private static final class IndividualOperation { + OperationName operationName; + + // Runtime stats for individual operation. + long totalTime; + long startTimestamp; + long endTimestamp; + + IndividualOperation( + OperationName operationName, + long startTimestamp, + long endTimestamp, + long totalTime) { + this.operationName = operationName; + this.startTimestamp = startTimestamp; + this.endTimestamp = endTimestamp; + this.totalTime = totalTime; + } + + String format() { + return String.format("Operation name %s starts at: %s, ends at: " + + "%s, total time: %s milliseconds", + operationName.operationName, startTimestamp, endTimestamp, totalTime); + } + } + } + + // Sets how many top latency requests to log during every reportss period. + public static void setTopKRequestsToLog(int topK) { + TOP_K = topK; + } + + // Sets the report period of the profiler. + public static void setReportPeriod(Duration flushPeriod) { + FLUSH_PERIOD = flushPeriod; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java new file mode 100644 index 0000000000..8dc7f1afae --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java @@ -0,0 +1,209 @@ + +package com.google.cloud.bigquery.storage.v1; + +import static org.junit.Assert.assertTrue; + +import com.google.cloud.bigquery.storage.v1.RequestProfiler.OperationName; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RequestProfilerTest { + private static final Logger log = Logger.getLogger(RequestProfiler.class.getName()); + + @Test + public void testNormalCase() throws Exception { + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.TOTAL_REQUEST, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.BACKEND_LATENCY, "request_1"); + + // Another request starts in the middle + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.TOTAL_REQUEST, "request_2"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + + // Continue request 1 + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.BACKEND_LATENCY, "request_1"); + + // Continue request 2 + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.BACKEND_LATENCY, "request_2"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.BACKEND_LATENCY, "request_2"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.TOTAL_REQUEST, "request_2"); + + // Continue request 1 + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.TOTAL_REQUEST, "request_1"); + + // Test the report generated. + String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("Request uuid: request_1 with total time")); + assertTrue(reportText.contains("Operation name json_to_proto_conversion starts at")); + assertTrue(reportText.contains("Operation name backend_latency starts at")); + assertTrue(reportText.contains("Request uuid: request_2 with total time")); + + // Second time flush is called, it should generate empty report. + reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("0 requests finished during")); + } + + @Test + public void mixFinishedAndUnfinishedRequest() throws Exception { + // Start request 1. + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.TOTAL_REQUEST, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.BACKEND_LATENCY, "request_1"); + + // Another request starts in the middle + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.TOTAL_REQUEST, "request_2"); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + + // First report should be empty since no requests end. + String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("0 requests finished during")); + + // End one of them. + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.TOTAL_REQUEST, "request_1"); + reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("Request uuid: request_1 with total time")); + + // End another, expect the first request's log not showing up. + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.TOTAL_REQUEST, "request_2"); + reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(!reportText.contains("Request uuid: request_1 with total time")); + assertTrue(reportText.contains("Request uuid: request_2 with total time")); + + // Flush again will be empty report. + reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("0 requests finished during")); + } + + @Test + public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception { + int totalRequest = 1000; + ListeningExecutorService threadPool = + MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("TestThread") + .build())); + + List> futures = new ArrayList<>(); + // Make some request particularly slower than the others. + ImmutableSet slowRequestIndex = ImmutableSet.of(10, 15, 20, 25, 30, 40, 50); + for (int i = 0; i < totalRequest; i++) { + int finalI = i; + futures.add( + threadPool.submit( + () -> { + String uuid = String.format("request_%s", finalI); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.TOTAL_REQUEST, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + if (slowRequestIndex.contains(finalI)) { + try { + TimeUnit.MILLISECONDS.sleep(finalI * 100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.WAIT_QUEUE, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.WAIT_QUEUE, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.TOTAL_REQUEST, uuid); + })); + } + + // Wait all requests to finish. + for (int i = 0; i < futures.size(); i++) { + futures.get(i).get(); + } + String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("in total 1000 finished during the last 60000 milliseconds")); + assertTrue(reportText.contains("Request uuid: request_50 with total time")); + assertTrue(reportText.contains("Request uuid: request_40 with total time")); + assertTrue(reportText.contains("Request uuid: request_30 with total time")); + assertTrue(reportText.contains("Request uuid: request_25 with total time")); + assertTrue(reportText.contains("Request uuid: request_20 with total time")); + } + + @Test + public void concurrentProfilingTest_RunWhileFlushing() throws Exception { + int totalRequest = 1000; + ListeningExecutorService threadPool = + MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("TestThread") + .build())); + + List> futures = new ArrayList<>(); + // Make some request particularly slower than the others. + ImmutableSet slowRequestIndex = ImmutableSet.of(10, 15, 20, 25, 30, 40, 50); + for (int i = 0; i < totalRequest; i++) { + int finalI = i; + futures.add( + threadPool.submit( + () -> { + try { + String uuid = String.format("request_%s", finalI); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.TOTAL_REQUEST, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + if (slowRequestIndex.contains(finalI)) { + TimeUnit.MILLISECONDS.sleep(finalI * 100); + } + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.JSON_TO_PROTO_CONVERSION, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + OperationName.WAIT_QUEUE, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.WAIT_QUEUE, uuid); + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + OperationName.TOTAL_REQUEST, uuid); + String unused = RequestProfiler.REQUEST_PROFILER_SINGLETON + .flushAndGenerateReportText(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + } + + // Wait all requests to finish. + for (int i = 0; i < futures.size(); i++) { + futures.get(i).get(); + } + String reportText = + RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + assertTrue(reportText.contains("0 requests finished during")); + } +}