Prechádzať zdrojové kódy

HDFS-9255. Consolidate block recovery related implementation into a single class. Contributed by Walter Su.

Change-Id: I7a1c03f50123d79ac0a78c981d9721617e3229d1
Zhe Zhang 9 rokov pred
rodič
commit
e287e7d14b

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1596,6 +1596,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9311. Support optional offload of NameNode HA service health checks to
     a separate RPC server. (cnauroth)
 
+    HDFS-9255. Consolidate block recovery related implementation into a single
+    class. (Walter Su via zhz)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 11 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
@@ -49,13 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -367,11 +363,16 @@ public class PBHelper {
   }
 
   public static RecoveringBlock convert(RecoveringBlockProto b) {
-    ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB());
-    DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList());
-    return (b.hasTruncateBlock()) ?
-        new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) :
-        new RecoveringBlock(block, locs, b.getNewGenStamp());
+    LocatedBlock lb = PBHelperClient.convertLocatedBlockProto(b.getBlock());
+    RecoveringBlock rBlock;
+    if (b.hasTruncateBlock()) {
+      rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
+          PBHelperClient.convert(b.getTruncateBlock()));
+    } else {
+      rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
+          b.getNewGenStamp());
+    }
+    return rBlock;
   }
 
   public static ReplicaState convert(ReplicaStateProto state) {

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1390,15 +1390,17 @@ public class DatanodeManager {
               // in block recovery.
               recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
             }
+            RecoveringBlock rBlock;
             if(truncateRecovery) {
               Block recoveryBlock = (copyOnTruncateRecovery) ? b :
                   uc.getTruncateBlock();
-              brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
-                                                recoveryBlock));
+              rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
+                  recoveryBlock);
             } else {
-              brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
-                                                uc.getBlockRecoveryId()));
+              rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
+                  uc.getBlockRecoveryId());
             }
+            brCommand.add(rBlock);
           }
           return new DatanodeCommand[] { brCommand };
         }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -700,7 +700,8 @@ class BPOfferService {
       break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
       String who = "NameNode at " + actor.getNNSocketAddress();
-      dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+      dn.getBlockRecoveryWorker().recoverBlocks(who,
+          ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");

+ 330 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java

@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Daemon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class handles the block recovery work commands.
+ */
+@InterfaceAudience.Private
+public class BlockRecoveryWorker {
+  public static final Log LOG = DataNode.LOG;
+
+  private final DataNode datanode;
+  private final Configuration conf;
+  private final DNConf dnConf;
+
+  BlockRecoveryWorker(DataNode datanode) {
+    this.datanode = datanode;
+    conf = datanode.getConf();
+    dnConf = datanode.getDnConf();
+  }
+
+  /** A convenient class used in block recovery. */
+  static class BlockRecord {
+    private final DatanodeID id;
+    private final InterDatanodeProtocol datanode;
+    private final ReplicaRecoveryInfo rInfo;
+
+    private String storageID;
+
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode,
+        ReplicaRecoveryInfo rInfo) {
+      this.id = id;
+      this.datanode = datanode;
+      this.rInfo = rInfo;
+    }
+
+    private void updateReplicaUnderRecovery(String bpid, long recoveryId,
+        long newBlockId, long newLength) throws IOException {
+      final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
+      storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
+          newLength);
+    }
+
+    @Override
+    public String toString() {
+      return "block:" + rInfo + " node:" + id;
+    }
+  }
+
+  /** A block recovery task for a contiguous block. */
+  class RecoveryTaskContiguous {
+    private final RecoveringBlock rBlock;
+    private final ExtendedBlock block;
+    private final String bpid;
+    private final DatanodeInfo[] locs;
+    private final long recoveryId;
+
+    RecoveryTaskContiguous(RecoveringBlock rBlock) {
+      this.rBlock = rBlock;
+      block = rBlock.getBlock();
+      bpid = block.getBlockPoolId();
+      locs = rBlock.getLocations();
+      recoveryId = rBlock.getNewGenerationStamp();
+    }
+
+    protected void recover() throws IOException {
+      List<BlockRecord> syncList = new ArrayList<>(locs.length);
+      int errorCount = 0;
+
+      //check generation stamps
+      for(DatanodeID id : locs) {
+        try {
+          DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration;
+          InterDatanodeProtocol proxyDN = bpReg.equals(id)?
+              datanode: DataNode.createInterDataNodeProtocolProxy(id, conf,
+              dnConf.socketTimeout, dnConf.connectToDnViaHostname);
+          ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, rBlock);
+          if (info != null &&
+              info.getGenerationStamp() >= block.getGenerationStamp() &&
+              info.getNumBytes() > 0) {
+            syncList.add(new BlockRecord(id, proxyDN, info));
+          }
+        } catch (RecoveryInProgressException ripE) {
+          InterDatanodeProtocol.LOG.warn(
+              "Recovery for replica " + block + " on data-node " + id
+                  + " is already in progress. Recovery id = "
+                  + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+          return;
+        } catch (IOException e) {
+          ++errorCount;
+          InterDatanodeProtocol.LOG.warn(
+              "Failed to obtain replica info for block (=" + block
+                  + ") from datanode (=" + id + ")", e);
+        }
+      }
+
+      if (errorCount == locs.length) {
+        throw new IOException("All datanodes failed: block=" + block
+            + ", datanodeids=" + Arrays.asList(locs));
+      }
+
+      syncBlock(syncList);
+    }
+
+    /** Block synchronization. */
+    void syncBlock(List<BlockRecord> syncList) throws IOException {
+      DatanodeProtocolClientSideTranslatorPB nn =
+          getActiveNamenodeForBP(block.getBlockPoolId());
+
+      boolean isTruncateRecovery = rBlock.getNewBlock() != null;
+      long blockId = (isTruncateRecovery) ?
+          rBlock.getNewBlock().getBlockId() : block.getBlockId();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+            + "), syncList=" + syncList);
+      }
+
+      // syncList.isEmpty() means that all data-nodes do not have the block
+      // or their replicas have 0 length.
+      // The block can be deleted.
+      if (syncList.isEmpty()) {
+        nn.commitBlockSynchronization(block, recoveryId, 0,
+            true, true, DatanodeID.EMPTY_ARRAY, null);
+        return;
+      }
+
+      // Calculate the best available replica state.
+      ReplicaState bestState = ReplicaState.RWR;
+      long finalizedLength = -1;
+      for (BlockRecord r : syncList) {
+        assert r.rInfo.getNumBytes() > 0 : "zero length replica";
+        ReplicaState rState = r.rInfo.getOriginalReplicaState();
+        if (rState.getValue() < bestState.getValue()) {
+          bestState = rState;
+        }
+        if(rState == ReplicaState.FINALIZED) {
+          if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) {
+            throw new IOException("Inconsistent size of finalized replicas. " +
+                "Replica " + r.rInfo + " expected size: " + finalizedLength);
+          }
+          finalizedLength = r.rInfo.getNumBytes();
+        }
+      }
+
+      // Calculate list of nodes that will participate in the recovery
+      // and the new block size
+      List<BlockRecord> participatingList = new ArrayList<>();
+      final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
+          -1, recoveryId);
+      switch(bestState) {
+      case FINALIZED:
+        assert finalizedLength > 0 : "finalizedLength is not positive";
+        for(BlockRecord r : syncList) {
+          ReplicaState rState = r.rInfo.getOriginalReplicaState();
+          if (rState == ReplicaState.FINALIZED ||
+              rState == ReplicaState.RBW &&
+                  r.rInfo.getNumBytes() == finalizedLength) {
+            participatingList.add(r);
+          }
+        }
+        newBlock.setNumBytes(finalizedLength);
+        break;
+      case RBW:
+      case RWR:
+        long minLength = Long.MAX_VALUE;
+        for(BlockRecord r : syncList) {
+          ReplicaState rState = r.rInfo.getOriginalReplicaState();
+          if(rState == bestState) {
+            minLength = Math.min(minLength, r.rInfo.getNumBytes());
+            participatingList.add(r);
+          }
+        }
+        newBlock.setNumBytes(minLength);
+        break;
+      case RUR:
+      case TEMPORARY:
+        assert false : "bad replica state: " + bestState;
+      default:
+        break; // we have 'case' all enum values
+      }
+      if (isTruncateRecovery) {
+        newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
+      }
+
+      List<DatanodeID> failedList = new ArrayList<>();
+      final List<BlockRecord> successList = new ArrayList<>();
+      for (BlockRecord r : participatingList) {
+        try {
+          r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
+              newBlock.getNumBytes());
+          successList.add(r);
+        } catch (IOException e) {
+          InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+              + newBlock + ", datanode=" + r.id + ")", e);
+          failedList.add(r.id);
+        }
+      }
+
+      // If any of the data-nodes failed, the recovery fails, because
+      // we never know the actual state of the replica on failed data-nodes.
+      // The recovery should be started over.
+      if (!failedList.isEmpty()) {
+        StringBuilder b = new StringBuilder();
+        for(DatanodeID id : failedList) {
+          b.append("\n  " + id);
+        }
+        throw new IOException("Cannot recover " + block + ", the following "
+            + failedList.size() + " data-nodes failed {" + b + "\n}");
+      }
+
+      // Notify the name-node about successfully recovered replicas.
+      final DatanodeID[] datanodes = new DatanodeID[successList.size()];
+      final String[] storages = new String[datanodes.length];
+      for (int i = 0; i < datanodes.length; i++) {
+        final BlockRecord r = successList.get(i);
+        datanodes[i] = r.id;
+        storages[i] = r.storageID;
+      }
+      nn.commitBlockSynchronization(block,
+          newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
+          datanodes, storages);
+    }
+  }
+
+  private static void logRecoverBlock(String who, RecoveringBlock rb) {
+    ExtendedBlock block = rb.getBlock();
+    DatanodeInfo[] targets = rb.getLocations();
+
+    LOG.info(who + " calls recoverBlock(" + block
+        + ", targets=[" + Joiner.on(", ").join(targets) + "]"
+        + ", newGenerationStamp=" + rb.getNewGenerationStamp()
+        + ", newBlock=" + rb.getNewBlock()
+        + ", isStriped=" + rb.isStriped()
+        + ")");
+  }
+
+  /**
+   * Convenience method, which unwraps RemoteException.
+   * @throws IOException not a RemoteException.
+   */
+  private static ReplicaRecoveryInfo callInitReplicaRecovery(
+      InterDatanodeProtocol datanode, RecoveringBlock rBlock)
+      throws IOException {
+    try {
+      return datanode.initReplicaRecovery(rBlock);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException();
+    }
+  }
+
+  /**
+   * Get the NameNode corresponding to the given block pool.
+   *
+   * @param bpid Block pool Id
+   * @return Namenode corresponding to the bpid
+   * @throws IOException if unable to get the corresponding NameNode
+   */
+  DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(
+      String bpid) throws IOException {
+    BPOfferService bpos = datanode.getBPOfferService(bpid);
+    if (bpos == null) {
+      throw new IOException("No block pool offer service for bpid=" + bpid);
+    }
+
+    DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
+    if (activeNN == null) {
+      throw new IOException(
+          "Block pool " + bpid + " has not recognized an active NN");
+    }
+    return activeNN;
+  }
+
+  public Daemon recoverBlocks(final String who,
+      final Collection<RecoveringBlock> blocks) {
+    Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
+      @Override
+      public void run() {
+        for(RecoveringBlock b : blocks) {
+          try {
+            logRecoverBlock(who, b);
+            RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
+            task.recover();
+          } catch (IOException e) {
+            LOG.warn("recoverBlocks FAILED: " + b, e);
+          }
+        }
+      }
+    });
+    d.start();
+    return d;
+  }
+}

+ 17 - 275
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -123,7 +123,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -174,7 +173,6 @@ import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
@@ -368,6 +366,7 @@ public class DataNode extends ReconfigurableBase
   private String supergroup;
   private boolean isPermissionEnabled;
   private String dnUserName = null;
+  private BlockRecoveryWorker blockRecoveryWorker;
   private ErasureCodingWorker ecWorker;
   private final Tracer tracer;
   private final TracerConfigurationManager tracerConfigurationManager;
@@ -706,7 +705,7 @@ public class DataNode extends ReconfigurableBase
 
   /**
    * Remove volumes from DataNode.
-   * See {@link removeVolumes(final Set<File>, boolean)} for details.
+   * See {@link #removeVolumes(Set, boolean)} for details.
    *
    * @param locations the StorageLocations of the volumes to be removed.
    * @throws IOException
@@ -730,7 +729,7 @@ public class DataNode extends ReconfigurableBase
    * <li>
    *   <ul>Remove volumes and block info from FsDataset.</ul>
    *   <ul>Remove volumes from DataStorage.</ul>
-   *   <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent
+   *   <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
    *   active volumes.</ul>
    * </li>
    * @param absoluteVolumePaths the absolute path of volumes.
@@ -856,7 +855,6 @@ public class DataNode extends ReconfigurableBase
       }
     }
   }
-  
 
   private void initIpcServer(Configuration conf) throws IOException {
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
@@ -1104,8 +1102,6 @@ public class DataNode extends ReconfigurableBase
     bpos.trySendErrorReport(errCode, errMsg);
   }
 
-
-  
   /**
    * Return the BPOfferService instance corresponding to the given block.
    * @return the BPOS
@@ -1122,8 +1118,6 @@ public class DataNode extends ReconfigurableBase
     return bpos;
   }
 
-
-  
   // used only for testing
   @VisibleForTesting
   void setHeartbeatsDisabledForTests(
@@ -1215,7 +1209,10 @@ public class DataNode extends ReconfigurableBase
 
     metrics = DataNodeMetrics.create(conf, getDisplayName());
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
-    
+
+    ecWorker = new ErasureCodingWorker(conf, this);
+    blockRecoveryWorker = new BlockRecoveryWorker(this);
+
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
 
@@ -1225,8 +1222,6 @@ public class DataNode extends ReconfigurableBase
     saslClient = new SaslDataTransferClient(dnConf.conf, 
         dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
-    // Initialize ErasureCoding worker
-    ecWorker = new ErasureCodingWorker(conf, this);
     startMetricsLogger(conf);
   }
 
@@ -1450,6 +1445,10 @@ public class DataNode extends ReconfigurableBase
   List<BPOfferService> getAllBpOs() {
     return blockPoolManager.getAllNamenodeThreads();
   }
+
+  BPOfferService getBPOfferService(String bpid){
+    return blockPoolManager.get(bpid);
+  }
   
   int getBpOsCount() {
     return blockPoolManager.getAllNamenodeThreads().size();
@@ -2626,49 +2625,13 @@ public class DataNode extends ReconfigurableBase
     secureMain(args, null);
   }
 
-  public Daemon recoverBlocks(
-      final String who,
-      final Collection<RecoveringBlock> blocks) {
-    
-    Daemon d = new Daemon(threadGroup, new Runnable() {
-      /** Recover a list of blocks. It is run by the primary datanode. */
-      @Override
-      public void run() {
-        for(RecoveringBlock b : blocks) {
-          try {
-            logRecoverBlock(who, b);
-            recoverBlock(b);
-          } catch (IOException e) {
-            LOG.warn("recoverBlocks FAILED: " + b, e);
-          }
-        }
-      }
-    });
-    d.start();
-    return d;
-  }
-
   // InterDataNodeProtocol implementation
   @Override // InterDatanodeProtocol
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
-  throws IOException {
+      throws IOException {
     return data.initReplicaRecovery(rBlock);
   }
 
-  /**
-   * Convenience method, which unwraps RemoteException.
-   * @throws IOException not a RemoteException.
-   */
-  private static ReplicaRecoveryInfo callInitReplicaRecovery(
-      InterDatanodeProtocol datanode,
-      RecoveringBlock rBlock) throws IOException {
-    try {
-      return datanode.initReplicaRecovery(rBlock);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException();
-    }
-  }
-
   /**
    * Update replica with the new generation stamp and length.  
    */
@@ -2689,231 +2652,6 @@ public class DataNode extends ReconfigurableBase
     return storageID;
   }
 
-  /** A convenient class used in block recovery */
-  static class BlockRecord { 
-    final DatanodeID id;
-    final InterDatanodeProtocol datanode;
-    final ReplicaRecoveryInfo rInfo;
-    
-    private String storageID;
-
-    BlockRecord(DatanodeID id,
-                InterDatanodeProtocol datanode,
-                ReplicaRecoveryInfo rInfo) {
-      this.id = id;
-      this.datanode = datanode;
-      this.rInfo = rInfo;
-    }
-
-    void updateReplicaUnderRecovery(String bpid, long recoveryId,
-                                    long newBlockId, long newLength)
-        throws IOException {
-      final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
-      storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
-          newLength);
-    }
-
-    @Override
-    public String toString() {
-      return "block:" + rInfo + " node:" + id;
-    }
-  }
-
-  /** Recover a block */
-  private void recoverBlock(RecoveringBlock rBlock) throws IOException {
-    ExtendedBlock block = rBlock.getBlock();
-    String blookPoolId = block.getBlockPoolId();
-    DatanodeID[] datanodeids = rBlock.getLocations();
-    List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
-    int errorCount = 0;
-
-    //check generation stamps
-    for(DatanodeID id : datanodeids) {
-      try {
-        BPOfferService bpos = blockPoolManager.get(blookPoolId);
-        DatanodeRegistration bpReg = bpos.bpRegistration;
-        InterDatanodeProtocol datanode = bpReg.equals(id)?
-            this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
-                dnConf.socketTimeout, dnConf.connectToDnViaHostname);
-        ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
-        if (info != null &&
-            info.getGenerationStamp() >= block.getGenerationStamp() &&
-            info.getNumBytes() > 0) {
-          syncList.add(new BlockRecord(id, datanode, info));
-        }
-      } catch (RecoveryInProgressException ripE) {
-        InterDatanodeProtocol.LOG.warn(
-            "Recovery for replica " + block + " on data-node " + id
-            + " is already in progress. Recovery id = "
-            + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
-        return;
-      } catch (IOException e) {
-        ++errorCount;
-        InterDatanodeProtocol.LOG.warn(
-            "Failed to obtain replica info for block (=" + block 
-            + ") from datanode (=" + id + ")", e);
-      }
-    }
-
-    if (errorCount == datanodeids.length) {
-      throw new IOException("All datanodes failed: block=" + block
-          + ", datanodeids=" + Arrays.asList(datanodeids));
-    }
-
-    syncBlock(rBlock, syncList);
-  }
-
-  /**
-   * Get the NameNode corresponding to the given block pool.
-   *
-   * @param bpid Block pool Id
-   * @return Namenode corresponding to the bpid
-   * @throws IOException if unable to get the corresponding NameNode
-   */
-  public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid)
-      throws IOException {
-    BPOfferService bpos = blockPoolManager.get(bpid);
-    if (bpos == null) {
-      throw new IOException("No block pool offer service for bpid=" + bpid);
-    }
-    
-    DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
-    if (activeNN == null) {
-      throw new IOException(
-          "Block pool " + bpid + " has not recognized an active NN");
-    }
-    return activeNN;
-  }
-
-  /** Block synchronization */
-  void syncBlock(RecoveringBlock rBlock,
-                         List<BlockRecord> syncList) throws IOException {
-    ExtendedBlock block = rBlock.getBlock();
-    final String bpid = block.getBlockPoolId();
-    DatanodeProtocolClientSideTranslatorPB nn =
-      getActiveNamenodeForBP(block.getBlockPoolId());
-
-    long recoveryId = rBlock.getNewGenerationStamp();
-    boolean isTruncateRecovery = rBlock.getNewBlock() != null;
-    long blockId = (isTruncateRecovery) ?
-        rBlock.getNewBlock().getBlockId() : block.getBlockId();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
-          + "), syncList=" + syncList);
-    }
-
-    // syncList.isEmpty() means that all data-nodes do not have the block
-    // or their replicas have 0 length.
-    // The block can be deleted.
-    if (syncList.isEmpty()) {
-      nn.commitBlockSynchronization(block, recoveryId, 0,
-          true, true, DatanodeID.EMPTY_ARRAY, null);
-      return;
-    }
-
-    // Calculate the best available replica state.
-    ReplicaState bestState = ReplicaState.RWR;
-    long finalizedLength = -1;
-    for(BlockRecord r : syncList) {
-      assert r.rInfo.getNumBytes() > 0 : "zero length replica";
-      ReplicaState rState = r.rInfo.getOriginalReplicaState(); 
-      if(rState.getValue() < bestState.getValue())
-        bestState = rState;
-      if(rState == ReplicaState.FINALIZED) {
-        if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
-          throw new IOException("Inconsistent size of finalized replicas. " +
-              "Replica " + r.rInfo + " expected size: " + finalizedLength);
-        finalizedLength = r.rInfo.getNumBytes();
-      }
-    }
-
-    // Calculate list of nodes that will participate in the recovery
-    // and the new block size
-    List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
-    final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-        -1, recoveryId);
-    switch(bestState) {
-    case FINALIZED:
-      assert finalizedLength > 0 : "finalizedLength is not positive";
-      for(BlockRecord r : syncList) {
-        ReplicaState rState = r.rInfo.getOriginalReplicaState();
-        if(rState == ReplicaState.FINALIZED ||
-           rState == ReplicaState.RBW &&
-                      r.rInfo.getNumBytes() == finalizedLength)
-          participatingList.add(r);
-      }
-      newBlock.setNumBytes(finalizedLength);
-      break;
-    case RBW:
-    case RWR:
-      long minLength = Long.MAX_VALUE;
-      for(BlockRecord r : syncList) {
-        ReplicaState rState = r.rInfo.getOriginalReplicaState();
-        if(rState == bestState) {
-          minLength = Math.min(minLength, r.rInfo.getNumBytes());
-          participatingList.add(r);
-        }
-      }
-      newBlock.setNumBytes(minLength);
-      break;
-    case RUR:
-    case TEMPORARY:
-      assert false : "bad replica state: " + bestState;
-    }
-    if(isTruncateRecovery)
-      newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
-
-    List<DatanodeID> failedList = new ArrayList<DatanodeID>();
-    final List<BlockRecord> successList = new ArrayList<BlockRecord>();
-    for(BlockRecord r : participatingList) {
-      try {
-        r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
-            newBlock.getNumBytes());
-        successList.add(r);
-      } catch (IOException e) {
-        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
-            + newBlock + ", datanode=" + r.id + ")", e);
-        failedList.add(r.id);
-      }
-    }
-
-    // If any of the data-nodes failed, the recovery fails, because
-    // we never know the actual state of the replica on failed data-nodes.
-    // The recovery should be started over.
-    if(!failedList.isEmpty()) {
-      StringBuilder b = new StringBuilder();
-      for(DatanodeID id : failedList) {
-        b.append("\n  " + id);
-      }
-      throw new IOException("Cannot recover " + block + ", the following "
-          + failedList.size() + " data-nodes failed {" + b + "\n}");
-    }
-
-    // Notify the name-node about successfully recovered replicas.
-    final DatanodeID[] datanodes = new DatanodeID[successList.size()];
-    final String[] storages = new String[datanodes.length];
-    for(int i = 0; i < datanodes.length; i++) {
-      final BlockRecord r = successList.get(i);
-      datanodes[i] = r.id;
-      storages[i] = r.storageID;
-    }
-    nn.commitBlockSynchronization(block,
-        newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
-        datanodes, storages);
-  }
-  
-  private static void logRecoverBlock(String who, RecoveringBlock rb) {
-    ExtendedBlock block = rb.getBlock();
-    DatanodeInfo[] targets = rb.getLocations();
-    
-    LOG.info(who + " calls recoverBlock(" + block
-        + ", targets=[" + Joiner.on(", ").join(targets) + "]"
-        + ((rb.getNewBlock() == null) ? ", newGenerationStamp="
-            + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock())
-        + ")");
-  }
-
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
     checkReadAccess(block);
@@ -3337,7 +3075,11 @@ public class DataNode extends ReconfigurableBase
     checkSuperuserPrivilege();
     tracerConfigurationManager.removeSpanReceiver(id);
   }
-  
+
+  public BlockRecoveryWorker getBlockRecoveryWorker(){
+    return blockRecoveryWorker;
+  }
+
   public ErasureCodingWorker getErasureCodingWorker(){
     return ecWorker;
   }

+ 51 - 23
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -64,7 +64,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
+import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -79,7 +79,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -98,6 +97,8 @@ public class TestBlockRecovery {
   private static final String DATA_DIR =
     MiniDFSCluster.getBaseDirectory() + "data";
   private DataNode dn;
+  private DataNode spyDN;
+  private BlockRecoveryWorker recoveryWorker;
   private Configuration conf;
   private boolean tearDownDone;
   private final static long RECOVERY_ID = 3000L;
@@ -179,6 +180,8 @@ public class TestBlockRecovery {
     };
     // Trigger a heartbeat so that it acknowledges the NN as active.
     dn.getAllBpOs().get(0).triggerHeartbeatForTests();
+    spyDN = spy(dn);
+    recoveryWorker = new BlockRecoveryWorker(spyDN);
   }
 
   /**
@@ -225,7 +228,10 @@ public class TestBlockRecovery {
         anyLong(), anyLong())).thenReturn("storage1");
     when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
         anyLong(), anyLong())).thenReturn("storage2");
-    dn.syncBlock(rBlock, syncList);
+
+    BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+        recoveryWorker.new RecoveryTaskContiguous(rBlock);
+    RecoveryTaskContiguous.syncBlock(syncList);
   }
   
   /**
@@ -446,13 +452,17 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    DataNode spyDN = spy(dn);
     doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
        when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
-    Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
-    d.join();
-    verify(spyDN, never()).syncBlock(
-        any(RecoveringBlock.class), anyListOf(BlockRecord.class));
+
+    for(RecoveringBlock rBlock: initRecoveringBlocks()){
+      BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+          recoveryWorker.new RecoveryTaskContiguous(rBlock);
+      BlockRecoveryWorker.RecoveryTaskContiguous spyTask
+          = spy(RecoveryTaskContiguous);
+      spyTask.recover();
+      verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class));
+    }
   }
 
   /**
@@ -466,13 +476,21 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    DataNode spyDN = spy(dn);
     doThrow(new IOException()).
        when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
-    Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
-    d.join();
-    verify(spyDN, never()).syncBlock(
-        any(RecoveringBlock.class), anyListOf(BlockRecord.class));
+
+    for(RecoveringBlock rBlock: initRecoveringBlocks()){
+      BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+          recoveryWorker.new RecoveryTaskContiguous(rBlock);
+      BlockRecoveryWorker.RecoveryTaskContiguous spyTask = spy(RecoveryTaskContiguous);
+      try {
+        spyTask.recover();
+        fail();
+      } catch(IOException e){
+        GenericTestUtils.assertExceptionContains("All datanodes failed", e);
+      }
+      verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class));
+    }
   }
 
   /**
@@ -485,13 +503,18 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    DataNode spyDN = spy(dn);
     doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
         block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
         initReplicaRecovery(any(RecoveringBlock.class));
-    Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
-    d.join();
-    DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
+
+    for(RecoveringBlock rBlock: initRecoveringBlocks()){
+      BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+          recoveryWorker.new RecoveryTaskContiguous(rBlock);
+      BlockRecoveryWorker.RecoveryTaskContiguous spyTask
+          = spy(RecoveryTaskContiguous);
+      spyTask.recover();
+    }
+    DatanodeProtocol dnP = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
     verify(dnP).commitBlockSynchronization(
         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
   }
@@ -520,11 +543,12 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    DataNode spyDN = spy(dn);
     doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery(
         block, RECOVERY_ID, BLOCK_ID, block.getNumBytes());
     try {
-      spyDN.syncBlock(rBlock, initBlockRecords(spyDN));
+      BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+          recoveryWorker.new RecoveryTaskContiguous(rBlock);
+      RecoveryTaskContiguous.syncBlock(initBlockRecords(spyDN));
       fail("Sync should fail");
     } catch (IOException e) {
       e.getMessage().startsWith("Cannot recover ");
@@ -542,13 +566,15 @@ public class TestBlockRecovery {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     dn.data.createRbw(StorageType.DEFAULT, block, false);
+    BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+        recoveryWorker.new RecoveryTaskContiguous(rBlock);
     try {
-      dn.syncBlock(rBlock, initBlockRecords(dn));
+      RecoveryTaskContiguous.syncBlock(initBlockRecords(dn));
       fail("Sync should fail");
     } catch (IOException e) {
       e.getMessage().startsWith("Cannot recover ");
     }
-    DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
+    DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
     verify(namenode, never()).commitBlockSynchronization(
         any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
         anyBoolean(), any(DatanodeID[].class), any(String[].class));
@@ -572,13 +598,15 @@ public class TestBlockRecovery {
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       streams.getChecksumOut().write('a');
       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
+      BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+          recoveryWorker.new RecoveryTaskContiguous(rBlock);
       try {
-        dn.syncBlock(rBlock, initBlockRecords(dn));
+        RecoveryTaskContiguous.syncBlock(initBlockRecords(dn));
         fail("Sync should fail");
       } catch (IOException e) {
         e.getMessage().startsWith("Cannot recover ");
       }
-      DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
+      DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID);
       verify(namenode, never()).commitBlockSynchronization(
           any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
           anyBoolean(), any(DatanodeID[].class), any(String[].class));