|
@@ -35,6 +35,11 @@ import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.ArrayBlockingQueue;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.FutureTask;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -100,6 +105,7 @@ import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.ExitUtil;
|
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -201,6 +207,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
/** Replication thread. */
|
|
|
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
|
|
|
|
|
+ /** Block report thread for handling async reports. */
|
|
|
+ private final BlockReportProcessingThread blockReportThread =
|
|
|
+ new BlockReportProcessingThread();
|
|
|
+
|
|
|
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
|
|
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
|
|
|
|
@@ -506,6 +516,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
datanodeManager.activate(conf);
|
|
|
this.replicationThread.setName("ReplicationMonitor");
|
|
|
this.replicationThread.start();
|
|
|
+ this.blockReportThread.start();
|
|
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
|
|
bmSafeMode.activate(blockTotal);
|
|
|
}
|
|
@@ -514,7 +525,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
bmSafeMode.close();
|
|
|
try {
|
|
|
replicationThread.interrupt();
|
|
|
+ blockReportThread.interrupt();
|
|
|
replicationThread.join(3000);
|
|
|
+ blockReportThread.join(3000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
datanodeManager.close();
|
|
@@ -2042,7 +2055,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
try {
|
|
|
node = datanodeManager.getDatanode(nodeID);
|
|
|
- if (node == null || !node.isAlive()) {
|
|
|
+ if (node == null || !node.isRegistered()) {
|
|
|
throw new IOException(
|
|
|
"ProcessReport from dead or unregistered node: " + nodeID);
|
|
|
}
|
|
@@ -3550,17 +3563,23 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
- int received = 0;
|
|
|
- int deleted = 0;
|
|
|
- int receiving = 0;
|
|
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
|
|
- if (node == null || !node.isAlive()) {
|
|
|
+ if (node == null || !node.isRegistered()) {
|
|
|
blockLog.warn("BLOCK* processIncrementalBlockReport"
|
|
|
+ " is received from dead or unregistered node {}", nodeID);
|
|
|
throw new IOException(
|
|
|
"Got incremental block report from unregistered or dead node");
|
|
|
}
|
|
|
+ try {
|
|
|
+ processIncrementalBlockReport(node, srdb);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ node.setForceRegistration(true);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ private void processIncrementalBlockReport(final DatanodeDescriptor node,
|
|
|
+ final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
DatanodeStorageInfo storageInfo =
|
|
|
node.getStorageInfo(srdb.getStorage().getStorageID());
|
|
|
if (storageInfo == null) {
|
|
@@ -3572,6 +3591,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
storageInfo = node.updateStorage(srdb.getStorage());
|
|
|
}
|
|
|
|
|
|
+ int received = 0;
|
|
|
+ int deleted = 0;
|
|
|
+ int receiving = 0;
|
|
|
+
|
|
|
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
|
|
switch (rdbi.getStatus()) {
|
|
|
case DELETED_BLOCK:
|
|
@@ -3589,17 +3612,17 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
break;
|
|
|
default:
|
|
|
String msg =
|
|
|
- "Unknown block status code reported by " + nodeID +
|
|
|
+ "Unknown block status code reported by " + node +
|
|
|
": " + rdbi;
|
|
|
blockLog.warn(msg);
|
|
|
assert false : msg; // if assertions are enabled, throw.
|
|
|
break;
|
|
|
}
|
|
|
blockLog.debug("BLOCK* block {}: {} is received from {}",
|
|
|
- rdbi.getStatus(), rdbi.getBlock(), nodeID);
|
|
|
+ rdbi.getStatus(), rdbi.getBlock(), node);
|
|
|
}
|
|
|
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
|
|
|
- + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
|
|
|
+ + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
|
|
|
received, deleted);
|
|
|
}
|
|
|
|
|
@@ -4266,4 +4289,119 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ // async processing of an action, used for IBRs.
|
|
|
+ public void enqueueBlockOp(final Runnable action) throws IOException {
|
|
|
+ try {
|
|
|
+ blockReportThread.enqueue(action);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException(ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // sync batch processing for a full BR.
|
|
|
+ public <T> T runBlockOp(final Callable<T> action)
|
|
|
+ throws IOException {
|
|
|
+ final FutureTask<T> future = new FutureTask<T>(action);
|
|
|
+ enqueueBlockOp(future);
|
|
|
+ try {
|
|
|
+ return future.get();
|
|
|
+ } catch (ExecutionException ee) {
|
|
|
+ Throwable cause = ee.getCause();
|
|
|
+ if (cause == null) {
|
|
|
+ cause = ee;
|
|
|
+ }
|
|
|
+ if (!(cause instanceof IOException)) {
|
|
|
+ cause = new IOException(cause);
|
|
|
+ }
|
|
|
+ throw (IOException)cause;
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException(ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void flushBlockOps() throws IOException {
|
|
|
+ runBlockOp(new Callable<Void>(){
|
|
|
+ @Override
|
|
|
+ public Void call() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getBlockOpQueueLength() {
|
|
|
+ return blockReportThread.queue.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ private class BlockReportProcessingThread extends Thread {
|
|
|
+ private static final long MAX_LOCK_HOLD_MS = 4;
|
|
|
+ private long lastFull = 0;
|
|
|
+
|
|
|
+ private final BlockingQueue<Runnable> queue =
|
|
|
+ new ArrayBlockingQueue<Runnable>(1024);
|
|
|
+
|
|
|
+ BlockReportProcessingThread() {
|
|
|
+ super("Block report processor");
|
|
|
+ setDaemon(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ processQueue();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ ExitUtil.terminate(1,
|
|
|
+ getName() + " encountered fatal exception: " + t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processQueue() {
|
|
|
+ while (namesystem.isRunning()) {
|
|
|
+ NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
|
|
+ try {
|
|
|
+ Runnable action = queue.take();
|
|
|
+ // batch as many operations in the write lock until the queue
|
|
|
+ // runs dry, or the max lock hold is reached.
|
|
|
+ int processed = 0;
|
|
|
+ namesystem.writeLock();
|
|
|
+ metrics.setBlockOpsQueued(queue.size() + 1);
|
|
|
+ try {
|
|
|
+ long start = Time.monotonicNow();
|
|
|
+ do {
|
|
|
+ processed++;
|
|
|
+ action.run();
|
|
|
+ if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ action = queue.poll();
|
|
|
+ } while (action != null);
|
|
|
+ } finally {
|
|
|
+ namesystem.writeUnlock();
|
|
|
+ metrics.addBlockOpsBatched(processed - 1);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // ignore unless thread was specifically interrupted.
|
|
|
+ if (Thread.interrupted()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ queue.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ void enqueue(Runnable action) throws InterruptedException {
|
|
|
+ if (!queue.offer(action)) {
|
|
|
+ if (!isAlive() && namesystem.isRunning()) {
|
|
|
+ ExitUtil.terminate(1, getName()+" is not running");
|
|
|
+ }
|
|
|
+ long now = Time.monotonicNow();
|
|
|
+ if (now - lastFull > 4000) {
|
|
|
+ lastFull = now;
|
|
|
+ LOG.info("Block report queue is full");
|
|
|
+ }
|
|
|
+ queue.put(action);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|