|
@@ -33,8 +33,11 @@ import java.security.NoSuchAlgorithmException;
|
|
|
import java.security.SecureRandom;
|
|
|
import java.util.AbstractList;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Formatter;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
|
@@ -50,11 +53,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
|
|
|
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
@@ -142,6 +145,8 @@ public class DataNode extends Configured
|
|
|
|
|
|
volatile boolean shouldRun = true;
|
|
|
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
+ /** list of blocks being recovered */
|
|
|
+ private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
|
|
|
private LinkedList<String> delHints = new LinkedList<String>();
|
|
|
public final static String EMPTY_DEL_HINT = "";
|
|
|
int xmitsInProgress = 0;
|
|
@@ -1319,8 +1324,15 @@ public class DataNode extends Configured
|
|
|
|
|
|
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
|
|
|
Daemon d = new Daemon(threadGroup, new Runnable() {
|
|
|
+ /** Recover a list of blocks. It is run by the primary datanode. */
|
|
|
public void run() {
|
|
|
- LeaseManager.recoverBlocks(blocks, targets, DataNode.this, namenode, getConf());
|
|
|
+ for(int i = 0; i < blocks.length; i++) {
|
|
|
+ try {
|
|
|
+ recoverBlock(blocks[i], targets[i], true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("recoverBlocks, i=" + i, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
d.start();
|
|
@@ -1353,6 +1365,129 @@ public class DataNode extends Configured
|
|
|
+ ": " + protocol);
|
|
|
}
|
|
|
|
|
|
+ /** A convenient class used in lease recovery */
|
|
|
+ private static class BlockRecord {
|
|
|
+ final DatanodeID id;
|
|
|
+ final InterDatanodeProtocol datanode;
|
|
|
+ final Block block;
|
|
|
+
|
|
|
+ BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
|
|
|
+ this.id = id;
|
|
|
+ this.datanode = datanode;
|
|
|
+ this.block = block;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public String toString() {
|
|
|
+ return "block:" + block + " node:" + id;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Recover a block */
|
|
|
+ private LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
|
|
|
+ boolean closeFile) throws IOException {
|
|
|
+
|
|
|
+ // If the block is already being recovered, then skip recovering it.
|
|
|
+ // This can happen if the namenode and client start recovering the same
|
|
|
+ // file at the same time.
|
|
|
+ synchronized (ongoingRecovery) {
|
|
|
+ Block tmp = new Block();
|
|
|
+ tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
|
|
|
+ if (ongoingRecovery.get(tmp) != null) {
|
|
|
+ String msg = "Block " + block + " is already being recovered, " +
|
|
|
+ " ignoring this request to recover it.";
|
|
|
+ LOG.info(msg);
|
|
|
+ throw new IOException(msg);
|
|
|
+ }
|
|
|
+ ongoingRecovery.put(block, block);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("block=" + block
|
|
|
+ + ", datanodeids=" + Arrays.asList(datanodeids));
|
|
|
+ }
|
|
|
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
|
|
|
+ long minlength = Long.MAX_VALUE;
|
|
|
+ int errorCount = 0;
|
|
|
+
|
|
|
+ //check generation stamps
|
|
|
+ for(DatanodeID id : datanodeids) {
|
|
|
+ try {
|
|
|
+ InterDatanodeProtocol datanode = dnRegistration.equals(id)?
|
|
|
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
|
|
|
+ BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
|
|
|
+ if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
|
|
|
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
|
|
|
+ if (info.getNumBytes() < minlength) {
|
|
|
+ minlength = info.getNumBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ ++errorCount;
|
|
|
+ InterDatanodeProtocol.LOG.warn(
|
|
|
+ "Failed to getBlockMetaDataInfo for block (=" + block
|
|
|
+ + ") from datanode (=" + id + ")", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (syncList.isEmpty() && errorCount > 0) {
|
|
|
+ throw new IOException("All datanodes failed: block=" + block
|
|
|
+ + ", datanodeids=" + Arrays.asList(datanodeids));
|
|
|
+ }
|
|
|
+ return syncBlock(block, minlength, syncList, closeFile);
|
|
|
+ } finally {
|
|
|
+ synchronized (ongoingRecovery) {
|
|
|
+ ongoingRecovery.remove(block);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Block synchronization */
|
|
|
+ private LocatedBlock syncBlock(Block block, long minlength,
|
|
|
+ List<BlockRecord> syncList, boolean closeFile) throws IOException {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("block=" + block + ", minlength=" + minlength
|
|
|
+ + ", syncList=" + syncList + ", closeFile=" + closeFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ //syncList.isEmpty() that all datanodes do not have the block
|
|
|
+ //so the block can be deleted.
|
|
|
+ if (syncList.isEmpty()) {
|
|
|
+ namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
|
|
|
+ DatanodeID.EMPTY_ARRAY);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<DatanodeID> successList = new ArrayList<DatanodeID>();
|
|
|
+
|
|
|
+ long generationstamp = namenode.nextGenerationStamp(block);
|
|
|
+ Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
|
|
|
+
|
|
|
+ for(BlockRecord r : syncList) {
|
|
|
+ try {
|
|
|
+ r.datanode.updateBlock(r.block, newblock, closeFile);
|
|
|
+ successList.add(r.id);
|
|
|
+ } catch (IOException e) {
|
|
|
+ InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
|
|
|
+ + newblock + ", datanode=" + r.id + ")", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!successList.isEmpty()) {
|
|
|
+ DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
|
|
|
+
|
|
|
+ namenode.commitBlockSynchronization(block,
|
|
|
+ newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
|
|
|
+ nlist);
|
|
|
+ DatanodeInfo[] info = new DatanodeInfo[nlist.length];
|
|
|
+ for (int i = 0; i < nlist.length; i++) {
|
|
|
+ info[i] = new DatanodeInfo(nlist[i]);
|
|
|
+ }
|
|
|
+ return new LocatedBlock(newblock, info); // success
|
|
|
+ }
|
|
|
+ return null; // failed
|
|
|
+ }
|
|
|
+
|
|
|
// ClientDataNodeProtocol implementation
|
|
|
/** {@inheritDoc} */
|
|
|
public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
|
|
@@ -1366,7 +1501,6 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
LOG.info("Client invoking recoverBlock for block " + block +
|
|
|
" on datanodes " + msg.toString());
|
|
|
- return LeaseManager.recoverBlock(block, targets, this, namenode,
|
|
|
- getConf(), false);
|
|
|
+ return recoverBlock(block, targets, false);
|
|
|
}
|
|
|
}
|