Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.slf4j.Logger;
Expand Down Expand Up @@ -226,6 +228,7 @@ public class Balancer {
private final long maxSizeToMove;
private final long defaultBlockSize;
private final boolean sortTopNodes;
private final BalancerMetrics metrics;

// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
Expand Down Expand Up @@ -357,6 +360,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
this.defaultBlockSize = getLongBytes(conf,
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
this.metrics = BalancerMetrics.create(this);
}

private static long getCapacity(DatanodeStorageReport report, StorageType t) {
Expand Down Expand Up @@ -454,6 +458,8 @@ private long init(List<DatanodeStorageReport> reports) {
}

logUtilizationCollections();
metrics.setNumOfOverUtilizedNodes(overUtilized.size());
metrics.setNumOfUnderUtilizedNodes(underUtilized.size());

Preconditions.checkState(dispatcher.getStorageGroupMap().size()
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
Expand Down Expand Up @@ -636,7 +642,11 @@ void resetData(Configuration conf) {
this.belowAvgUtilized.clear();
this.underUtilized.clear();
this.policy.reset();
dispatcher.reset(conf);;
dispatcher.reset(conf);
}

NameNodeConnector getNnc() {
return nnc;
}

static class Result {
Expand Down Expand Up @@ -710,8 +720,10 @@ Result newResult(ExitStatus exitStatus) {
/** Run an iteration for all datanodes. */
Result runOneIteration() {
try {
metrics.setIterateRunning(true);
final List<DatanodeStorageReport> reports = dispatcher.init();
final long bytesLeftToMove = init(reports);
metrics.setBytesLeftToMove(bytesLeftToMove);
if (bytesLeftToMove == 0) {
return newResult(ExitStatus.SUCCESS, bytesLeftToMove, 0);
} else {
Expand Down Expand Up @@ -766,6 +778,7 @@ Result runOneIteration() {
System.out.println(e + ". Exiting ...");
return newResult(ExitStatus.INTERRUPTED);
} finally {
metrics.setIterateRunning(false);
dispatcher.shutdownNow();
}
}
Expand Down Expand Up @@ -848,6 +861,10 @@ static int run(Collection<URI> namenodes, final BalancerParameters p,
static int run(Collection<URI> namenodes, Collection<String> nsIds,
final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
DefaultMetricsSystem.initialize("Balancer");
JvmMetrics.create("Balancer",
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());
if (!p.getRunAsService()) {
return doBalance(namenodes, nsIds, p, conf);
}
Expand Down Expand Up @@ -893,6 +910,8 @@ static int run(Collection<URI> namenodes, Collection<String> nsIds,
time2Str(scheduleInterval));
Thread.sleep(scheduleInterval);
}
DefaultMetricsSystem.shutdown();

// normal stop
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;

import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* Metrics for individual Balancer.
*/
@Metrics(about="Balancer metrics", context="dfs")
final class BalancerMetrics {

private final Balancer balancer;

@Metric("If a balancer iterate is running")
private MutableGaugeInt iterateRunning;

@Metric("Bytes left to move to make cluster balanced")
private MutableGaugeLong bytesLeftToMove;

@Metric("Number of under utilized nodes")
private MutableGaugeInt numOfUnderUtilizedNodes;

@Metric("Number of over utilized nodes")
private MutableGaugeInt numOfOverUtilizedNodes;

private BalancerMetrics(Balancer b) {
this.balancer = b;
}

public static BalancerMetrics create(Balancer b) {
BalancerMetrics m = new BalancerMetrics(b);
return DefaultMetricsSystem.instance().register(
m.getName(), null, m);
}

String getName() {
return "Balancer-" + balancer.getNnc().getBlockpoolID();
}

@Metric("Bytes that already moved in current doBalance run.")
public long getBytesMovedInCurrentRun() {
return balancer.getNnc().getBytesMoved().get();
}

void setIterateRunning(boolean iterateRunning) {
this.iterateRunning.set(iterateRunning ? 1 : 0);
}

void setBytesLeftToMove(long bytesLeftToMove) {
this.bytesLeftToMove.set(bytesLeftToMove);
}

void setNumOfUnderUtilizedNodes(int numOfUnderUtilizedNodes) {
this.numOfUnderUtilizedNodes.set(numOfUnderUtilizedNodes);
}

void setNumOfOverUtilizedNodes(int numOfOverUtilizedNodes) {
this.numOfOverUtilizedNodes.set(numOfOverUtilizedNodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Tool;
import org.junit.Test;

Expand Down Expand Up @@ -129,8 +132,24 @@ public void testBalancerServiceBalanceTwice() throws Exception {
newBalancerService(conf, new String[] {"-asService"});
balancerThread.start();

// Check metrics
final String balancerMetricsName = "Balancer-"
+ cluster.getNameNode(0).getNamesystem().getBlockPoolId();
GenericTestUtils.waitFor(() -> {
// Validate metrics after metrics system initiated.
if (DefaultMetricsSystem.instance().getSource(balancerMetricsName) == null) {
return false;
}
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(balancerMetricsName);
return rb != null && MetricsAsserts.getLongGauge("BytesLeftToMove", rb) > 0;
}, 100, 2000);

TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);

MetricsRecordBuilder rb = MetricsAsserts.getMetrics(balancerMetricsName);
assertTrue(MetricsAsserts.getLongGauge("BytesMovedInCurrentRun", rb) > 0);

cluster.triggerHeartbeats();
cluster.triggerBlockReports();

Expand Down