|
@@ -36,10 +36,8 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.EnumSet;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
@@ -58,7 +56,6 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
|
|
|
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
@@ -172,8 +169,6 @@ public class DataNode extends Configured
|
|
|
|
|
|
volatile boolean shouldRun = true;
|
|
|
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
- /** list of blocks being recovered */
|
|
|
- private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
|
|
|
private LinkedList<String> delHints = new LinkedList<String>();
|
|
|
public final static String EMPTY_DEL_HINT = "";
|
|
|
AtomicInteger xmitsInProgress = new AtomicInteger();
|
|
@@ -1587,55 +1582,36 @@ public class DataNode extends Configured
|
|
|
Block block = rBlock.getBlock();
|
|
|
DatanodeInfo[] targets = rBlock.getLocations();
|
|
|
DatanodeID[] datanodeids = (DatanodeID[])targets;
|
|
|
- // If the block is already being recovered, then skip recovering it.
|
|
|
- // This can happen if the namenode and client start recovering the same
|
|
|
- // file at the same time.
|
|
|
- synchronized (ongoingRecovery) {
|
|
|
- if (ongoingRecovery.get(new Block(block.getBlockId(), block.getNumBytes(),
|
|
|
- GenerationStamp.WILDCARD_STAMP)) != null) {
|
|
|
- String msg = "Block " + block + " is already being recovered, " +
|
|
|
- " ignoring this request to recover it.";
|
|
|
- LOG.info(msg);
|
|
|
- throw new IOException(msg);
|
|
|
- }
|
|
|
- ongoingRecovery.put(block, block);
|
|
|
- }
|
|
|
- try {
|
|
|
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
|
|
|
- long minlength = Long.MAX_VALUE;
|
|
|
- int errorCount = 0;
|
|
|
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
|
|
|
+ long minlength = Long.MAX_VALUE;
|
|
|
+ int errorCount = 0;
|
|
|
|
|
|
- //check generation stamps
|
|
|
- for(DatanodeID id : datanodeids) {
|
|
|
- try {
|
|
|
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
|
|
|
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
|
|
|
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
|
|
|
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
|
|
|
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
|
|
|
- if (info.getNumBytes() < minlength) {
|
|
|
- minlength = info.getNumBytes();
|
|
|
- }
|
|
|
+ //check generation stamps
|
|
|
+ for(DatanodeID id : datanodeids) {
|
|
|
+ try {
|
|
|
+ InterDatanodeProtocol datanode = dnRegistration.equals(id)?
|
|
|
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
|
|
|
+ BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
|
|
|
+ if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
|
|
|
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
|
|
|
+ if (info.getNumBytes() < minlength) {
|
|
|
+ minlength = info.getNumBytes();
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- ++errorCount;
|
|
|
- InterDatanodeProtocol.LOG.warn(
|
|
|
- "Failed to getBlockMetaDataInfo for block (=" + block
|
|
|
- + ") from datanode (=" + id + ")", e);
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ ++errorCount;
|
|
|
+ InterDatanodeProtocol.LOG.warn(
|
|
|
+ "Failed to getBlockMetaDataInfo for block (=" + block
|
|
|
+ + ") from datanode (=" + id + ")", e);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (syncList.isEmpty() && errorCount > 0) {
|
|
|
- throw new IOException("All datanodes failed: block=" + block
|
|
|
- + ", datanodeids=" + Arrays.asList(datanodeids));
|
|
|
- }
|
|
|
- block.setNumBytes(minlength);
|
|
|
- return syncBlock(rBlock, syncList);
|
|
|
- } finally {
|
|
|
- synchronized (ongoingRecovery) {
|
|
|
- ongoingRecovery.remove(block);
|
|
|
- }
|
|
|
+ if (syncList.isEmpty() && errorCount > 0) {
|
|
|
+ throw new IOException("All datanodes failed: block=" + block
|
|
|
+ + ", datanodeids=" + Arrays.asList(datanodeids));
|
|
|
}
|
|
|
+ block.setNumBytes(minlength);
|
|
|
+ return syncBlock(rBlock, syncList);
|
|
|
}
|
|
|
|
|
|
/** Block synchronization */
|
|
@@ -1706,16 +1682,6 @@ public class DataNode extends Configured
|
|
|
+ syncList.size() + " datanodes success {" + b + "\n}");
|
|
|
}
|
|
|
|
|
|
- // ClientDataNodeProtocol implementation
|
|
|
- /** {@inheritDoc} */
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
- public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
|
|
|
- ) throws IOException {
|
|
|
- logRecoverBlock("Client", block, targets);
|
|
|
- assert false : "ClientDatanodeProtocol.recoverBlock: should never be called.";
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
private static void logRecoverBlock(String who,
|
|
|
Block block, DatanodeID[] targets) {
|
|
|
StringBuilder msg = new StringBuilder(targets[0].getName());
|
|
@@ -1726,8 +1692,9 @@ public class DataNode extends Configured
|
|
|
+ ", targets=[" + msg + "])");
|
|
|
}
|
|
|
|
|
|
+ // ClientDataNodeProtocol implementation
|
|
|
/** {@inheritDoc} */
|
|
|
- @Override
|
|
|
+ @Override // ClientDataNodeProtocol
|
|
|
public long getReplicaVisibleLength(final Block block) throws IOException {
|
|
|
final Replica replica = data.getReplica(block.getBlockId());
|
|
|
if (replica == null) {
|