|
@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.util.Canceler;
|
|
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
|
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
|
@@ -42,6 +45,7 @@ public class ContainerDataScanner extends Thread {
|
|
|
private final ContainerController controller;
|
|
|
private final DataTransferThrottler throttler;
|
|
|
private final Canceler canceler;
|
|
|
+ private final ContainerDataScrubberMetrics metrics;
|
|
|
|
|
|
/**
|
|
|
* True if the thread is stopping.<p/>
|
|
@@ -50,12 +54,15 @@ public class ContainerDataScanner extends Thread {
|
|
|
private volatile boolean stopping = false;
|
|
|
|
|
|
|
|
|
- public ContainerDataScanner(ContainerController controller,
|
|
|
+ public ContainerDataScanner(Configuration conf,
|
|
|
+ ContainerController controller,
|
|
|
HddsVolume volume, long bytesPerSec) {
|
|
|
this.controller = controller;
|
|
|
this.volume = volume;
|
|
|
- this.throttler = new DataTransferThrottler(bytesPerSec);
|
|
|
+ this.throttler = new HddsDataTransferThrottler(bytesPerSec);
|
|
|
this.canceler = new Canceler();
|
|
|
+ this.metrics = ContainerDataScrubberMetrics.create(conf,
|
|
|
+ volume.toString());
|
|
|
setName("ContainerDataScanner(" + volume + ")");
|
|
|
setDaemon(true);
|
|
|
}
|
|
@@ -65,26 +72,54 @@ public class ContainerDataScanner extends Thread {
|
|
|
LOG.trace("{}: thread starting.", this);
|
|
|
try {
|
|
|
while (!stopping) {
|
|
|
- Iterator<Container> itr = controller.getContainers(volume);
|
|
|
- while (!stopping && itr.hasNext()) {
|
|
|
- Container c = itr.next();
|
|
|
- try {
|
|
|
- if (c.shouldScanData()) {
|
|
|
- if(!c.scanData(throttler, canceler)) {
|
|
|
- controller.markContainerUnhealthy(
|
|
|
- c.getContainerData().getContainerID());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException ex) {
|
|
|
- long containerId = c.getContainerData().getContainerID();
|
|
|
- LOG.warn("Unexpected exception while scanning container "
|
|
|
- + containerId, ex);
|
|
|
- }
|
|
|
- }
|
|
|
+ runIteration();
|
|
|
+ metrics.resetNumContainersScanned();
|
|
|
+ metrics.resetNumUnhealthyContainers();
|
|
|
}
|
|
|
LOG.info("{} exiting.", this);
|
|
|
} catch (Throwable e) {
|
|
|
LOG.error("{} exiting because of exception ", this, e);
|
|
|
+ } finally {
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.unregister();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void runIteration() {
|
|
|
+ long startTime = System.nanoTime();
|
|
|
+ Iterator<Container> itr = controller.getContainers(volume);
|
|
|
+ while (!stopping && itr.hasNext()) {
|
|
|
+ Container c = itr.next();
|
|
|
+ if (c.shouldScanData()) {
|
|
|
+ try {
|
|
|
+ if (!c.scanData(throttler, canceler)) {
|
|
|
+ metrics.incNumUnHealthyContainers();
|
|
|
+ controller.markContainerUnhealthy(
|
|
|
+ c.getContainerData().getContainerID());
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ long containerId = c.getContainerData().getContainerID();
|
|
|
+ LOG.warn("Unexpected exception while scanning container "
|
|
|
+ + containerId, ex);
|
|
|
+ } finally {
|
|
|
+ metrics.incNumContainersScanned();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ long totalDuration = System.nanoTime() - startTime;
|
|
|
+ if (!stopping) {
|
|
|
+ metrics.incNumScanIterations();
|
|
|
+ LOG.info("Completed an iteration of container data scrubber in" +
|
|
|
+ " {} minutes." +
|
|
|
+ " Number of iterations (since the data-node restart) : {}" +
|
|
|
+ ", Number of containers scanned in this iteration : {}" +
|
|
|
+ ", Number of unhealthy containers found in this iteration : {}",
|
|
|
+ TimeUnit.NANOSECONDS.toMinutes(totalDuration),
|
|
|
+ metrics.getNumScanIterations(),
|
|
|
+ metrics.getNumContainersScanned(),
|
|
|
+ metrics.getNumUnHealthyContainers());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -100,9 +135,32 @@ public class ContainerDataScanner extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public ContainerDataScrubberMetrics getMetrics() {
|
|
|
+ return metrics;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "ContainerDataScanner(" + volume +
|
|
|
", " + volume.getStorageID() + ")";
|
|
|
}
|
|
|
+
|
|
|
+ private class HddsDataTransferThrottler extends DataTransferThrottler {
|
|
|
+ HddsDataTransferThrottler(long bandwidthPerSec) {
|
|
|
+ super(bandwidthPerSec);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void throttle(long numOfBytes) {
|
|
|
+ ContainerDataScanner.this.metrics.incNumBytesScanned(numOfBytes);
|
|
|
+ super.throttle(numOfBytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void throttle(long numOfBytes, Canceler c) {
|
|
|
+ ContainerDataScanner.this.metrics.incNumBytesScanned(numOfBytes);
|
|
|
+ super.throttle(numOfBytes, c);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|