|
@@ -34,11 +34,6 @@ import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
-import java.util.concurrent.ArrayBlockingQueue;
|
|
|
-import java.util.concurrent.BlockingQueue;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.concurrent.FutureTask;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -96,7 +91,6 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.ExitUtil;
|
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -198,10 +192,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
/** Replication thread. */
|
|
|
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
|
|
|
|
|
- /** Block report thread for handling async reports. */
|
|
|
- private final BlockReportProcessingThread blockReportThread =
|
|
|
- new BlockReportProcessingThread();
|
|
|
-
|
|
|
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
|
|
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
|
|
|
|
@@ -493,7 +483,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
datanodeManager.activate(conf);
|
|
|
this.replicationThread.setName("ReplicationMonitor");
|
|
|
this.replicationThread.start();
|
|
|
- this.blockReportThread.start();
|
|
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
|
|
bmSafeMode.activate(blockTotal);
|
|
|
}
|
|
@@ -502,9 +491,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
bmSafeMode.close();
|
|
|
try {
|
|
|
replicationThread.interrupt();
|
|
|
- blockReportThread.interrupt();
|
|
|
replicationThread.join(3000);
|
|
|
- blockReportThread.join(3000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
datanodeManager.close();
|
|
@@ -1890,7 +1877,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
try {
|
|
|
node = datanodeManager.getDatanode(nodeID);
|
|
|
- if (node == null || !node.isRegistered()) {
|
|
|
+ if (node == null || !node.isAlive()) {
|
|
|
throw new IOException(
|
|
|
"ProcessReport from dead or unregistered node: " + nodeID);
|
|
|
}
|
|
@@ -3242,23 +3229,17 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
+ int received = 0;
|
|
|
+ int deleted = 0;
|
|
|
+ int receiving = 0;
|
|
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
|
|
- if (node == null || !node.isRegistered()) {
|
|
|
+ if (node == null || !node.isAlive()) {
|
|
|
blockLog.warn("BLOCK* processIncrementalBlockReport"
|
|
|
+ " is received from dead or unregistered node {}", nodeID);
|
|
|
throw new IOException(
|
|
|
"Got incremental block report from unregistered or dead node");
|
|
|
}
|
|
|
- try {
|
|
|
- processIncrementalBlockReport(node, srdb);
|
|
|
- } catch (Exception ex) {
|
|
|
- node.setForceRegistration(true);
|
|
|
- throw ex;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- private void processIncrementalBlockReport(final DatanodeDescriptor node,
|
|
|
- final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
DatanodeStorageInfo storageInfo =
|
|
|
node.getStorageInfo(srdb.getStorage().getStorageID());
|
|
|
if (storageInfo == null) {
|
|
@@ -3270,10 +3251,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
storageInfo = node.updateStorage(srdb.getStorage());
|
|
|
}
|
|
|
|
|
|
- int received = 0;
|
|
|
- int deleted = 0;
|
|
|
- int receiving = 0;
|
|
|
-
|
|
|
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
|
|
switch (rdbi.getStatus()) {
|
|
|
case DELETED_BLOCK:
|
|
@@ -3291,17 +3268,17 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
break;
|
|
|
default:
|
|
|
String msg =
|
|
|
- "Unknown block status code reported by " + node +
|
|
|
+ "Unknown block status code reported by " + nodeID +
|
|
|
": " + rdbi;
|
|
|
blockLog.warn(msg);
|
|
|
assert false : msg; // if assertions are enabled, throw.
|
|
|
break;
|
|
|
}
|
|
|
blockLog.debug("BLOCK* block {}: {} is received from {}",
|
|
|
- rdbi.getStatus(), rdbi.getBlock(), node);
|
|
|
+ rdbi.getStatus(), rdbi.getBlock(), nodeID);
|
|
|
}
|
|
|
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
|
|
|
- + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
|
|
|
+ + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
|
|
|
received, deleted);
|
|
|
}
|
|
|
|
|
@@ -3902,119 +3879,4 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- // async processing of an action, used for IBRs.
|
|
|
- public void enqueueBlockOp(final Runnable action) throws IOException {
|
|
|
- try {
|
|
|
- blockReportThread.enqueue(action);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- throw new IOException(ie);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // sync batch processing for a full BR.
|
|
|
- public <T> T runBlockOp(final Callable<T> action)
|
|
|
- throws IOException {
|
|
|
- final FutureTask<T> future = new FutureTask<T>(action);
|
|
|
- enqueueBlockOp(future);
|
|
|
- try {
|
|
|
- return future.get();
|
|
|
- } catch (ExecutionException ee) {
|
|
|
- Throwable cause = ee.getCause();
|
|
|
- if (cause == null) {
|
|
|
- cause = ee;
|
|
|
- }
|
|
|
- if (!(cause instanceof IOException)) {
|
|
|
- cause = new IOException(cause);
|
|
|
- }
|
|
|
- throw (IOException)cause;
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException(ie);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- public void flushBlockOps() throws IOException {
|
|
|
- runBlockOp(new Callable<Void>(){
|
|
|
- @Override
|
|
|
- public Void call() {
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public int getBlockOpQueueLength() {
|
|
|
- return blockReportThread.queue.size();
|
|
|
- }
|
|
|
-
|
|
|
- private class BlockReportProcessingThread extends Thread {
|
|
|
- private static final long MAX_LOCK_HOLD_MS = 4;
|
|
|
- private long lastFull = 0;
|
|
|
-
|
|
|
- private final BlockingQueue<Runnable> queue =
|
|
|
- new ArrayBlockingQueue<Runnable>(1024);
|
|
|
-
|
|
|
- BlockReportProcessingThread() {
|
|
|
- super("Block report processor");
|
|
|
- setDaemon(true);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- processQueue();
|
|
|
- } catch (Throwable t) {
|
|
|
- ExitUtil.terminate(1,
|
|
|
- getName() + " encountered fatal exception: " + t);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void processQueue() {
|
|
|
- while (namesystem.isRunning()) {
|
|
|
- NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
|
|
- try {
|
|
|
- Runnable action = queue.take();
|
|
|
- // batch as many operations in the write lock until the queue
|
|
|
- // runs dry, or the max lock hold is reached.
|
|
|
- int processed = 0;
|
|
|
- namesystem.writeLock();
|
|
|
- metrics.setBlockOpsQueued(queue.size() + 1);
|
|
|
- try {
|
|
|
- long start = Time.monotonicNow();
|
|
|
- do {
|
|
|
- processed++;
|
|
|
- action.run();
|
|
|
- if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
|
|
|
- break;
|
|
|
- }
|
|
|
- action = queue.poll();
|
|
|
- } while (action != null);
|
|
|
- } finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
- metrics.addBlockOpsBatched(processed - 1);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // ignore unless thread was specifically interrupted.
|
|
|
- if (Thread.interrupted()) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- queue.clear();
|
|
|
- }
|
|
|
-
|
|
|
- void enqueue(Runnable action) throws InterruptedException {
|
|
|
- if (!queue.offer(action)) {
|
|
|
- if (!isAlive() && namesystem.isRunning()) {
|
|
|
- ExitUtil.terminate(1, getName()+" is not running");
|
|
|
- }
|
|
|
- long now = Time.monotonicNow();
|
|
|
- if (now - lastFull > 4000) {
|
|
|
- lastFull = now;
|
|
|
- LOG.info("Block report queue is full");
|
|
|
- }
|
|
|
- queue.put(action);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|