Bläddra i källkod

HDFS-1971. Send block report from datanode to both active and standby namenodes. (sanjay, todd via suresh)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1208925 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 13 år sedan
förälder
incheckning
1e346aa829

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt

@@ -29,3 +29,5 @@ HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd)
 HDFS-2591. MiniDFSCluster support to mix and match federation with HA (todd)
 
 HDFS-1975. Support for sharing the namenode state from active to standby. (jitendra, atm, todd)
+
+HDFS-1971. Send block report from datanode to both active and standby namenodes. (sanjay, todd via suresh)

+ 271 - 526
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -17,62 +17,43 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
- * A thread per namenode to perform:
- * <ul>
- * <li> Pre-registration handshake with namenode</li>
- * <li> Registration with namenode</li>
- * <li> Send periodic heartbeats to the namenode</li>
- * <li> Handle commands received from the namenode</li>
- * </ul>
+ * One instance per block-pool/namespace on the DN, which handles the
+ * heartbeats to the active and standby NNs for that namespace.
+ * This class manages an instance of {@link BPServiceActor} for each NN,
+ * and delegates calls to both NNs. 
+ * It also maintains the state about which of the NNs is considered active.
  */
 @InterfaceAudience.Private
-class BPOfferService implements Runnable {
+class BPOfferService {
   static final Log LOG = DataNode.LOG;
   
-  final InetSocketAddress nnAddr;
-  
   /**
    * Information about the namespace that this service
    * is registering with. This is assigned after
@@ -87,27 +68,25 @@ class BPOfferService implements Runnable {
    */
   DatanodeRegistration bpRegistration;
   
-  long lastBlockReport = 0;
-  long lastDeletedReport = 0;
-
-  boolean resetBlockReportTime = true;
-
-  Thread bpThread;
-  DatanodeProtocol bpNamenode;
-  private long lastHeartbeat = 0;
-  private volatile boolean initialized = false;
-  private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList 
-    = new LinkedList<ReceivedDeletedBlockInfo>();
-  private volatile int pendingReceivedRequests = 0;
-  private volatile boolean shouldServiceRun = true;
   UpgradeManagerDatanode upgradeManager = null;
   private final DataNode dn;
-  private final DNConf dnConf;
 
-  BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
+  private BPServiceActor bpServiceToActive;
+  private List<BPServiceActor> bpServices =
+    new CopyOnWriteArrayList<BPServiceActor>();
+
+  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
+    Preconditions.checkArgument(!nnAddrs.isEmpty(),
+        "Must pass at least one NN.");
     this.dn = dn;
-    this.nnAddr = nnAddr;
-    this.dnConf = dn.getDnConf();
+
+    for (InetSocketAddress addr : nnAddrs) {
+      this.bpServices.add(new BPServiceActor(addr, this));
+    }
+    // TODO(HA): currently we just make the first one the initial
+    // active. In reality it should start in an unknown state and then
+    // as we figure out which is active, designate one as such.
+    this.bpServiceToActive = this.bpServices.get(0);
   }
 
   /**
@@ -115,15 +94,18 @@ class BPOfferService implements Runnable {
    * and has registered with the corresponding namenode
    * @return true if initialized
    */
-  public boolean isInitialized() {
-    return initialized;
+  boolean isInitialized() {
+    // TODO(HA) is this right?
+    return bpServiceToActive != null && bpServiceToActive.isInitialized();
   }
   
-  public boolean isAlive() {
-    return shouldServiceRun && bpThread.isAlive();
+  boolean isAlive() {
+    // TODO: should || all the bp actors probably?
+    return bpServiceToActive != null &&
+      bpServiceToActive.isAlive();
   }
   
-  public String getBlockPoolId() {
+  String getBlockPoolId() {
     if (bpNSInfo != null) {
       return bpNSInfo.getBlockPoolID();
     } else {
@@ -133,10 +115,11 @@ class BPOfferService implements Runnable {
     }
   }
   
-  public NamespaceInfo getNamespaceInfo() {
+  NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
   
+  @Override
   public String toString() {
     if (bpNSInfo == null) {
       // If we haven't yet connected to our NN, we don't yet know our
@@ -148,519 +131,279 @@ class BPOfferService implements Runnable {
         storageId = "unknown";
       }
       return "Block pool <registering> (storage id " + storageId +
-        ") connecting to " + nnAddr;
+        ")";
     } else {
       return "Block pool " + getBlockPoolId() +
         " (storage id " + dn.getStorageId() +
-        ") registered with " + nnAddr;
+        ")";
     }
   }
   
-  InetSocketAddress getNNSocketAddress() {
-    return nnAddr;
-  }
-
-  /**
-   * Used to inject a spy NN in the unit tests.
-   */
-  @VisibleForTesting
-  void setNameNode(DatanodeProtocol dnProtocol) {
-    bpNamenode = dnProtocol;
-  }
-
-  /**
-   * Perform the first part of the handshake with the NameNode.
-   * This calls <code>versionRequest</code> to determine the NN's
-   * namespace and version info. It automatically retries until
-   * the NN responds or the DN is shutting down.
-   * 
-   * @return the NamespaceInfo
-   * @throws IncorrectVersionException if the remote NN does not match
-   * this DN's version
-   */
-  NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
-    NamespaceInfo nsInfo = null;
-    while (shouldRun()) {
-      try {
-        nsInfo = bpNamenode.versionRequest();
-        LOG.debug(this + " received versionRequest response: " + nsInfo);
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.warn("Problem connecting to server: " + nnAddr);
-      } catch(IOException e ) {  // namenode is not available
-        LOG.warn("Problem connecting to server: " + nnAddr);
-      }
-      
-      // try again in a second
-      sleepAndLogInterrupts(5000, "requesting version info from NN");
-    }
-    
-    if (nsInfo != null) {
-      checkNNVersion(nsInfo);        
-    }
-    return nsInfo;
-  }
-
-  private void checkNNVersion(NamespaceInfo nsInfo)
-      throws IncorrectVersionException {
-    // build and layout versions should match
-    String nsBuildVer = nsInfo.getBuildVersion();
-    String stBuildVer = Storage.getBuildVersion();
-    if (!nsBuildVer.equals(stBuildVer)) {
-      LOG.warn("Data-node and name-node Build versions must be the same. " +
-        "Namenode build version: " + nsBuildVer + "Datanode " +
-        "build version: " + stBuildVer);
-      throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
-    }
-
-    if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
-      LOG.warn("Data-node and name-node layout versions must be the same." +
-        " Expected: "+ HdfsConstants.LAYOUT_VERSION +
-        " actual "+ bpNSInfo.getLayoutVersion());
-      throw new IncorrectVersionException(
-          bpNSInfo.getLayoutVersion(), "namenode");
-    }
-  }
-
-  private void connectToNNAndHandshake() throws IOException {
-    // get NN proxy
-    bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-          DatanodeProtocol.versionID, nnAddr, dn.getConf());
-
-    // First phase of the handshake with NN - get the namespace
-    // info.
-    bpNSInfo = retrieveNamespaceInfo();
-    
-    // Now that we know the namespace ID, etc, we can pass this to the DN.
-    // The DN can now initialize its local storage if we are the
-    // first BP to handshake, etc.
-    dn.initBlockPool(this);
-    
-    // Second phase of the handshake with the NN.
-    register();
-  }
-  
-  /**
-   * This methods  arranges for the data node to send the block report at 
-   * the next heartbeat.
-   */
-  void scheduleBlockReport(long delay) {
-    if (delay > 0) { // send BR after random delay
-      lastBlockReport = System.currentTimeMillis()
-      - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
-    } else { // send at next heartbeat
-      lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
-    }
-    resetBlockReportTime = true; // reset future BRs for randomness
-  }
-
   void reportBadBlocks(ExtendedBlock block) {
-    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
-    
-    try {
-      bpNamenode.reportBadBlocks(blocks);  
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : "
-          + " Exception", e);
+    checkBlock(block);
+    for (BPServiceActor actor : bpServices) {
+      actor.reportBadBlocks(block);
     }
-    
   }
   
-  /**
-   * Report received blocks and delete hints to the Namenode
-   * 
-   * @throws IOException
-   */
-  private void reportReceivedDeletedBlocks() throws IOException {
-
-    // check if there are newly received blocks
-    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    int currentReceivedRequestsCounter;
-    synchronized (receivedAndDeletedBlockList) {
-      currentReceivedRequestsCounter = pendingReceivedRequests;
-      int numBlocks = receivedAndDeletedBlockList.size();
-      if (numBlocks > 0) {
-        //
-        // Send newly-received and deleted blockids to namenode
-        //
-        receivedAndDeletedBlockArray = receivedAndDeletedBlockList
-            .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
-      }
-    }
-    if (receivedAndDeletedBlockArray != null) {
-      bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
-          receivedAndDeletedBlockArray);
-      synchronized (receivedAndDeletedBlockList) {
-        for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
-          receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
-        }
-        pendingReceivedRequests -= currentReceivedRequestsCounter;
-      }
-    }
-  }
-
   /*
    * Informing the name node could take a long long time! Should we wait
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
   void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-    if (block == null || delHint == null) {
-      throw new IllegalArgumentException(block == null ? "Block is null"
-          : "delHint is null");
-    }
-
-    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
-      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
-          + getBlockPoolId());
-      return;
+    checkBlock(block);
+    checkDelHint(delHint);
+    ReceivedDeletedBlockInfo bInfo = 
+               new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint);
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeReceivedBlock(bInfo);
     }
+  }
 
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
-          .getLocalBlock(), delHint));
-      pendingReceivedRequests++;
-      receivedAndDeletedBlockList.notifyAll();
-    }
+  private void checkBlock(ExtendedBlock block) {
+    Preconditions.checkArgument(block != null,
+        "block is null");
+    Preconditions.checkArgument(block.getBlockPoolId().equals(getBlockPoolId()),
+        "block belongs to BP %s instead of BP %s",
+        block.getBlockPoolId(), getBlockPoolId());
+  }
+  
+  private void checkDelHint(String delHint) {
+    Preconditions.checkArgument(delHint != null,
+        "delHint is null");
   }
 
   void notifyNamenodeDeletedBlock(ExtendedBlock block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Block is null");
-    }
-
-    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
-      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
-          + getBlockPoolId());
-      return;
-    }
-
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
-          .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
+    checkBlock(block);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block
+          .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT);
+    
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeDeletedBlock(bInfo);
     }
   }
 
-
-  /**
-   * Report the list blocks to the Namenode
-   * @throws IOException
-   */
-  DatanodeCommand blockReport() throws IOException {
-    // send block report if timer has expired.
-    DatanodeCommand cmd = null;
-    long startTime = now();
-    if (startTime - lastBlockReport > dnConf.blockReportInterval) {
-
-      // Create block report
-      long brCreateStartTime = now();
-      BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
-
-      // Send block report
-      long brSendStartTime = now();
-      cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
-          .getBlockListAsLongs());
-
-      // Log the block report processing stats from Datanode perspective
-      long brSendCost = now() - brSendStartTime;
-      long brCreateCost = brSendStartTime - brCreateStartTime;
-      dn.metrics.addBlockReport(brSendCost);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
-          + " blocks took " + brCreateCost + " msec to generate and "
-          + brSendCost + " msecs for RPC and NN processing");
-
-      // If we have sent the first block report, then wait a random
-      // time before we start the periodic block reports.
-      if (resetBlockReportTime) {
-        lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
-        resetBlockReportTime = false;
-      } else {
-        /* say the last block report was at 8:20:14. The current report
-         * should have started around 9:20:14 (default 1 hour interval).
-         * If current time is :
-         *   1) normal like 9:20:18, next report should be at 10:20:14
-         *   2) unexpected like 11:35:43, next report should be at 12:20:14
-         */
-        lastBlockReport += (now() - lastBlockReport) /
-        dnConf.blockReportInterval * dnConf.blockReportInterval;
-      }
-      LOG.info("sent block report, processed command:" + cmd);
-    }
-    return cmd;
-  }
-  
-  
-  DatanodeCommand [] sendHeartBeat() throws IOException {
-    return bpNamenode.sendHeartbeat(bpRegistration,
-        dn.data.getCapacity(),
-        dn.data.getDfsUsed(),
-        dn.data.getRemaining(),
-        dn.data.getBlockPoolUsed(getBlockPoolId()),
-        dn.xmitsInProgress.get(),
-        dn.getXceiverCount(), dn.data.getNumFailedVolumes());
-  }
-  
   //This must be called only by blockPoolManager
   void start() {
-    if ((bpThread != null) && (bpThread.isAlive())) {
-      //Thread is started already
-      return;
+    for (BPServiceActor actor : bpServices) {
+      actor.start();
     }
-    bpThread = new Thread(this, formatThreadName());
-    bpThread.setDaemon(true); // needed for JUnit testing
-    bpThread.start();
-  }
-  
-  private String formatThreadName() {
-    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
-    return "DataNode: [" +
-      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
-      " heartbeating to " + nnAddr;
   }
   
   //This must be called only by blockPoolManager.
   void stop() {
-    shouldServiceRun = false;
-    if (bpThread != null) {
-        bpThread.interrupt();
+    for (BPServiceActor actor : bpServices) {
+      actor.stop();
     }
   }
   
   //This must be called only by blockPoolManager
   void join() {
-    try {
-      if (bpThread != null) {
-        bpThread.join();
-      }
-    } catch (InterruptedException ie) { }
+    for (BPServiceActor actor : bpServices) {
+      actor.join();
+    }
+  }
+
+  synchronized UpgradeManagerDatanode getUpgradeManager() {
+    if(upgradeManager == null)
+      upgradeManager = 
+        new UpgradeManagerDatanode(dn, getBlockPoolId());
+    
+    return upgradeManager;
   }
   
-  //Cleanup method to be called by current thread before exiting.
-  private synchronized void cleanUp() {
+  void processDistributedUpgradeCommand(UpgradeCommand comm)
+  throws IOException {
+    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
+    upgradeManager.processUpgradeCommand(comm);
+  }
+
+  /**
+   * Start distributed upgrade if it should be initiated by the data-node.
+   */
+  synchronized void startDistributedUpgradeIfNeeded() throws IOException {
+    UpgradeManagerDatanode um = getUpgradeManager();
     
-    if(upgradeManager != null)
-      upgradeManager.shutdownUpgrade();
-    shouldServiceRun = false;
-    RPC.stopProxy(bpNamenode);
-    dn.shutdownBlockPool(this);
+    if(!um.getUpgradeState())
+      return;
+    um.setUpgradeState(false, um.getUpgradeVersion());
+    um.startUpgrade();
+    return;
+  }
+  
+  DataNode getDataNode() {
+    return dn;
   }
 
   /**
-   * Main loop for each BP thread. Run until shutdown,
-   * forever calling remote NameNode functions.
+   * Called by the BPServiceActors when they handshake to a NN.
+   * If this is the first NN connection, this sets the namespace info
+   * for this BPOfferService. If it's a connection to a new NN, it
+   * verifies that this namespace matches (eg to prevent a misconfiguration
+   * where a StandbyNode from a different cluster is specified)
    */
-  private void offerService() throws Exception {
-    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
-        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
-        + dnConf.blockReportInterval + "msec" + " Initial delay: "
-        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
-        + dnConf.heartBeatInterval);
-
-    //
-    // Now loop for a long time....
-    //
-    while (shouldRun()) {
-      try {
-        long startTime = now();
-
-        //
-        // Every so often, send heartbeat or block-report
-        //
-        if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
-          //
-          // All heartbeat messages include following info:
-          // -- Datanode name
-          // -- data transfer port
-          // -- Total capacity
-          // -- Bytes remaining
-          //
-          lastHeartbeat = startTime;
-          if (!dn.areHeartbeatsDisabledForTests()) {
-            DatanodeCommand[] cmds = sendHeartBeat();
-            dn.metrics.addHeartbeat(now() - startTime);
-
-            long startProcessCommands = now();
-            if (!processCommand(cmds))
-              continue;
-            long endProcessCommands = now();
-            if (endProcessCommands - startProcessCommands > 2000) {
-              LOG.info("Took " + (endProcessCommands - startProcessCommands) +
-                  "ms to process " + cmds.length + " commands from NN");
-            }
-          }
-        }
-        if (pendingReceivedRequests > 0
-            || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
-          reportReceivedDeletedBlocks();
-          lastDeletedReport = startTime;
-        }
+  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    if (this.bpNSInfo == null) {
+      this.bpNSInfo = nsInfo;
+      
+      // Now that we know the namespace ID, etc, we can pass this to the DN.
+      // The DN can now initialize its local storage if we are the
+      // first BP to handshake, etc.
+      dn.initBlockPool(this);
+      return;
+    } else {
+      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+          "Blockpool ID");
+      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+          "Namespace ID");
+      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+          "Cluster ID");
+    }
+  }
 
-        DatanodeCommand cmd = blockReport();
-        processCommand(cmd);
+  /**
+   * After one of the BPServiceActors registers successfully with the
+   * NN, it calls this function to verify that the NN it connected to
+   * is consistent with other NNs serving the block-pool.
+   */
+  void registrationSucceeded(BPServiceActor bpServiceActor,
+      DatanodeRegistration reg) throws IOException {
+    if (bpRegistration != null) {
+      checkNSEquality(bpRegistration.storageInfo.getNamespaceID(),
+          reg.storageInfo.getNamespaceID(), "namespace ID");
+      checkNSEquality(bpRegistration.storageInfo.getClusterID(),
+          reg.storageInfo.getClusterID(), "cluster ID");
+    } else {
+      bpRegistration = reg;
+    }
+  }
 
-        // Now safe to start scanning the block pool
-        if (dn.blockScanner != null) {
-          dn.blockScanner.addBlockPool(this.getBlockPoolId());
-        }
+  /**
+   * Verify equality of two namespace-related fields, throwing
+   * an exception if they are unequal.
+   */
+  private static void checkNSEquality(
+      Object ourID, Object theirID,
+      String idHelpText) throws IOException {
+    if (!ourID.equals(theirID)) {
+      throw new IOException(idHelpText + " mismatch: " +
+          "previously connected to " + idHelpText + " " + ourID + 
+          " but now connected to " + idHelpText + " " + theirID);
+    }
+  }
 
-        //
-        // There is no work to do;  sleep until hearbeat timer elapses, 
-        // or work arrives, and then iterate again.
-        //
-        long waitTime = dnConf.heartBeatInterval - 
-        (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedAndDeletedBlockList) {
-          if (waitTime > 0 && pendingReceivedRequests == 0) {
-            try {
-              receivedAndDeletedBlockList.wait(waitTime);
-            } catch (InterruptedException ie) {
-              LOG.warn("BPOfferService for " + this + " interrupted");
-            }
-          }
-        } // synchronized
-      } catch(RemoteException re) {
-        String reClass = re.getClassName();
-        if (UnregisteredNodeException.class.getName().equals(reClass) ||
-            DisallowedDatanodeException.class.getName().equals(reClass) ||
-            IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn(this + " is shutting down", re);
-          shouldServiceRun = false;
-          return;
-        }
-        LOG.warn("RemoteException in offerService", re);
-        try {
-          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-      } catch (IOException e) {
-        LOG.warn("IOException in offerService", e);
-      }
-    } // while (shouldRun())
-  } // offerService
+  DatanodeRegistration createRegistration() {
+    Preconditions.checkState(bpNSInfo != null,
+        "getRegistration() can only be called after initial handshake");
+    return dn.createBPRegistration(bpNSInfo);
+  }
 
   /**
-   * Register one bp with the corresponding NameNode
-   * <p>
-   * The bpDatanode needs to register with the namenode on startup in order
-   * 1) to report which storage it is serving now and 
-   * 2) to receive a registrationID
-   *  
-   * issued by the namenode to recognize registered datanodes.
-   * 
-   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
-   * @throws IOException
+   * Called when an actor shuts down. If this is the last actor
+   * to shut down, shuts down the whole blockpool in the DN.
    */
-  void register() throws IOException {
-    Preconditions.checkState(bpNSInfo != null,
-        "register() should be called after handshake()");
-    
-    // The handshake() phase loaded the block pool storage
-    // off disk - so update the bpRegistration object from that info
-    bpRegistration = dn.createBPRegistration(bpNSInfo);
+  void shutdownActor(BPServiceActor actor) {
+    if (bpServiceToActive == actor) {
+      bpServiceToActive = null;
+    }
 
-    LOG.info(this + " beginning handshake with NN");
+    bpServices.remove(actor);
 
-    while (shouldRun()) {
-      try {
-        // Use returned registration from namenode with updated machine name.
-        bpRegistration = bpNamenode.registerDatanode(bpRegistration);
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + nnAddr);
-        sleepAndLogInterrupts(1000, "connecting to server");
-      }
+    // TODO: synchronization should be a little better here
+    if (bpServices.isEmpty()) {
+      dn.shutdownBlockPool(this);
+      
+      if(upgradeManager != null)
+        upgradeManager.shutdownUpgrade();
     }
-    
-    LOG.info("Block pool " + this + " successfully registered with NN");
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-
-    // random short delay - helps scatter the BR from all DNs
-    scheduleBlockReport(dnConf.initialBlockReportDelay);
   }
 
+  @Deprecated
+  InetSocketAddress getNNSocketAddress() {
+    // TODO(HA) this doesn't make sense anymore
+    return bpServiceToActive.getNNSocketAddress();
+  }
 
-  private void sleepAndLogInterrupts(int millis,
-      String stateString) {
-    try {
-      Thread.sleep(millis);
-    } catch (InterruptedException ie) {
-      LOG.info("BPOfferService " + this +
-          " interrupted while " + stateString);
+  /**
+   * Called by the DN to report an error to the NNs.
+   */
+  void trySendErrorReport(int errCode, String errMsg) {
+    for (BPServiceActor actor : bpServices) {
+      actor.trySendErrorReport(errCode, errMsg);
     }
   }
 
   /**
-   * No matter what kind of exception we get, keep retrying to offerService().
-   * That's the loop that connects to the NameNode and provides basic DataNode
-   * functionality.
-   *
-   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
-   * happen either at shutdown or due to refreshNamenodes.
+   * Ask each of the actors to schedule a block report after
+   * the specified delay.
    */
-  @Override
-  public void run() {
-    LOG.info(this + " starting to offer service");
+  void scheduleBlockReport(long delay) {
+    for (BPServiceActor actor : bpServices) {
+      actor.scheduleBlockReport(delay);
+    }
+  }
 
-    try {
-      // init stuff
+  /**
+   * Ask each of the actors to report a bad block hosted on another DN.
+   */
+  void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
+    for (BPServiceActor actor : bpServices) {
       try {
-        // setup storage
-        connectToNNAndHandshake();
-      } catch (IOException ioe) {
-        // Initial handshake, storage recovery or registration failed
-        // End BPOfferService thread
-        LOG.fatal("Initialization failed for block pool " + this, ioe);
-        return;
-      }
-
-      initialized = true; // bp is initialized;
-      
-      while (shouldRun()) {
-        try {
-          startDistributedUpgradeIfNeeded();
-          offerService();
-        } catch (Exception ex) {
-          LOG.error("Exception in BPOfferService for " + this, ex);
-          sleepAndLogInterrupts(5000, "offering service");
-        }
+        actor.reportRemoteBadBlock(dnInfo, block);
+      } catch (IOException e) {
+        LOG.warn("Couldn't report bad block " + block + " to " + actor,
+            e);
       }
-    } catch (Throwable ex) {
-      LOG.warn("Unexpected exception in block pool " + this, ex);
-    } finally {
-      LOG.warn("Ending block pool service for: " + this);
-      cleanUp();
     }
   }
 
-  private boolean shouldRun() {
-    return shouldServiceRun && dn.shouldRun();
+  /**
+   * TODO: this is still used in a few places where we need to sort out
+   * what to do in HA!
+   * @return a proxy to the active NN
+   */
+  @Deprecated
+  DatanodeProtocol getActiveNN() {
+    return bpServiceToActive.bpNamenode;
   }
 
   /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
+   * @return true if the given NN address is one of the NNs for this
+   * block pool
    */
-  private boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (processCommand(cmd) == false) {
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
+  boolean containsNN(InetSocketAddress addr) {
+    for (BPServiceActor actor : bpServices) {
+      if (actor.getNNSocketAddress().equals(addr)) {
+        return true;
       }
     }
-    return true;
+    return false;
+  }
+  
+  @VisibleForTesting
+  int countNameNodes() {
+    return bpServices.size();
+  }
+
+  /**
+   * Run an immediate block report on this thread. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerBlockReportForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerBlockReportForTests();
+    }
+  }
+
+  boolean processCommandFromActor(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
+    assert bpServices.contains(actor);
+    if (actor == bpServiceToActive) {
+      return processCommandFromActive(cmd, actor);
+    } else {
+      return processCommandFromStandby(cmd, actor);
+    }
   }
 
   /**
@@ -669,7 +412,8 @@ class BPOfferService implements Runnable {
    * @return true if further processing may be required or false otherwise. 
    * @throws IOException
    */
-  private boolean processCommand(DatanodeCommand cmd) throws IOException {
+  private boolean processCommandFromActive(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
     if (cmd == null)
       return true;
     final BlockCommand bcmd = 
@@ -700,19 +444,12 @@ class BPOfferService implements Runnable {
       dn.metrics.incrBlocksRemoved(toDelete.length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
-      // shut down the data node
-      shouldServiceRun = false;
-      return false;
+      // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
+      throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
     case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration - at start or if NN lost contact
       LOG.info("DatanodeCommand action: DNA_REGISTER");
-      if (shouldRun()) {
-        // re-retrieve namespace info to make sure that, if the NN
-        // was restarted, we still match its version (HDFS-2120)
-        retrieveNamespaceInfo();
-        // and re-register
-        register();
-      }
+      actor.reRegister();
       break;
     case DatanodeProtocol.DNA_FINALIZE:
       String bp = ((FinalizeCommand) cmd).getBlockPoolId(); 
@@ -732,7 +469,8 @@ class BPOfferService implements Runnable {
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
       if (dn.isBlockTokenEnabled) {
-        dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), 
+        dn.blockPoolTokenSecretManager.setKeys(
+            getBlockPoolId(), 
             ((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
@@ -751,32 +489,39 @@ class BPOfferService implements Runnable {
     }
     return true;
   }
-  
-  private void processDistributedUpgradeCommand(UpgradeCommand comm)
-  throws IOException {
-    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
-    upgradeManager.processUpgradeCommand(comm);
+ 
+  private boolean processCommandFromStandby(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
+    if (cmd == null)
+      return true;
+    switch(cmd.getAction()) {
+    case DatanodeProtocol.DNA_REGISTER:
+      // namenode requested a registration - at start or if NN lost contact
+      LOG.info("DatanodeCommand action: DNA_REGISTER");
+      actor.reRegister();
+      return true;
+    case DatanodeProtocol.DNA_TRANSFER:
+    case DatanodeProtocol.DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_SHUTDOWN:
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+      LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
+      return true;   
+    default:
+      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+    }
+    return true;
   }
 
-  synchronized UpgradeManagerDatanode getUpgradeManager() {
-    if(upgradeManager == null)
-      upgradeManager = 
-        new UpgradeManagerDatanode(dn, getBlockPoolId());
-    
-    return upgradeManager;
-  }
-  
   /**
-   * Start distributed upgrade if it should be initiated by the data-node.
+   * Connect to the NN at the given address. This is separated out for ease
+   * of testing.
    */
-  private void startDistributedUpgradeIfNeeded() throws IOException {
-    UpgradeManagerDatanode um = getUpgradeManager();
-    
-    if(!um.getUpgradeState())
-      return;
-    um.setUpgradeState(false, um.getUpgradeVersion());
-    um.startUpgrade();
-    return;
+  DatanodeProtocol connectToNN(InetSocketAddress nnAddr)
+      throws IOException {
+    return (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+        DatanodeProtocol.versionID, nnAddr, dn.getConf());
   }
 
-}
+}

+ 633 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A thread per active or standby namenode to perform:
+ * <ul>
+ * <li> Pre-registration handshake with namenode</li>
+ * <li> Registration with namenode</li>
+ * <li> Send periodic heartbeats to the namenode</li>
+ * <li> Handle commands received from the namenode</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BPServiceActor implements Runnable {
+  
+  static final Log LOG = DataNode.LOG;
+  final InetSocketAddress nnAddr;
+
+  BPOfferService bpos;
+  
+  long lastBlockReport = 0;
+  long lastDeletedReport = 0;
+
+  boolean resetBlockReportTime = true;
+
+  Thread bpThread;
+  DatanodeProtocol bpNamenode;
+  private long lastHeartbeat = 0;
+  private volatile boolean initialized = false;
+  private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList 
+    = new LinkedList<ReceivedDeletedBlockInfo>();
+  private volatile int pendingReceivedRequests = 0;
+  private volatile boolean shouldServiceRun = true;
+  private final DataNode dn;
+  private final DNConf dnConf;
+
+  private DatanodeRegistration bpRegistration;
+
+  BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
+    this.bpos = bpos;
+    this.dn = bpos.getDataNode();
+    this.nnAddr = nnAddr;
+    this.dnConf = dn.getDnConf();
+  }
+
+  /**
+   * returns true if BP thread has completed initialization of storage
+   * and has registered with the corresponding namenode
+   * @return true if initialized
+   */
+  boolean isInitialized() {
+    return initialized;
+  }
+  
+  boolean isAlive() {
+    return shouldServiceRun && bpThread.isAlive();
+  }
+
+  @Override
+  public String toString() {
+    return bpos.toString() + " service to " + nnAddr;
+  }
+  
+  InetSocketAddress getNNSocketAddress() {
+    return nnAddr;
+  }
+
+  /**
+   * Used to inject a spy NN in the unit tests.
+   */
+  @VisibleForTesting
+  void setNameNode(DatanodeProtocol dnProtocol) {
+    bpNamenode = dnProtocol;
+  }
+
+  /**
+   * Perform the first part of the handshake with the NameNode.
+   * This calls <code>versionRequest</code> to determine the NN's
+   * namespace and version info. It automatically retries until
+   * the NN responds or the DN is shutting down.
+   * 
+   * @return the NamespaceInfo
+   */
+  @VisibleForTesting
+  NamespaceInfo retrieveNamespaceInfo() throws IOException {
+    NamespaceInfo nsInfo = null;
+    while (shouldRun()) {
+      try {
+        nsInfo = bpNamenode.versionRequest();
+        LOG.debug(this + " received versionRequest response: " + nsInfo);
+        break;
+      } catch(SocketTimeoutException e) {  // namenode is busy
+        LOG.warn("Problem connecting to server: " + nnAddr);
+      } catch(IOException e ) {  // namenode is not available
+        LOG.warn("Problem connecting to server: " + nnAddr);
+      }
+      
+      // try again in a second
+      sleepAndLogInterrupts(5000, "requesting version info from NN");
+    }
+    
+    if (nsInfo != null) {
+      checkNNVersion(nsInfo);
+    } else {
+      throw new IOException("DN shut down before block pool connected");
+    }
+    return nsInfo;
+  }
+
+  private void checkNNVersion(NamespaceInfo nsInfo)
+      throws IncorrectVersionException {
+    // build and layout versions should match
+    String nsBuildVer = nsInfo.getBuildVersion();
+    String stBuildVer = Storage.getBuildVersion();
+    if (!nsBuildVer.equals(stBuildVer)) {
+      LOG.warn("Data-node and name-node Build versions must be the same. " +
+        "Namenode build version: " + nsBuildVer + "Datanode " +
+        "build version: " + stBuildVer);
+      throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
+    }
+
+    if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
+      LOG.warn("Data-node and name-node layout versions must be the same." +
+        " Expected: "+ HdfsConstants.LAYOUT_VERSION +
+        " actual "+ nsInfo.getLayoutVersion());
+      throw new IncorrectVersionException(
+          nsInfo.getLayoutVersion(), "namenode");
+    }
+  }
+
+  private void connectToNNAndHandshake() throws IOException {
+    // get NN proxy
+    bpNamenode = bpos.connectToNN(nnAddr);
+
+    // First phase of the handshake with NN - get the namespace
+    // info.
+    NamespaceInfo nsInfo = retrieveNamespaceInfo();
+    
+    // Verify that this matches the other NN in this HA pair.
+    // This also initializes our block pool in the DN if we are
+    // the first NN connection for this BP.
+    bpos.verifyAndSetNamespaceInfo(nsInfo);
+    
+    // Second phase of the handshake with the NN.
+    register();
+  }
+  
+  /**
+   * This methods  arranges for the data node to send the block report at 
+   * the next heartbeat.
+   */
+  void scheduleBlockReport(long delay) {
+    if (delay > 0) { // send BR after random delay
+      lastBlockReport = System.currentTimeMillis()
+      - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
+    } else { // send at next heartbeat
+      lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
+    }
+    resetBlockReportTime = true; // reset future BRs for randomness
+  }
+
+  void reportBadBlocks(ExtendedBlock block) {
+    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
+    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+    
+    try {
+      bpNamenode.reportBadBlocks(blocks);  
+    } catch (IOException e){
+      /* One common reason is that NameNode could be in safe mode.
+       * Should we keep on retrying in that case?
+       */
+      LOG.warn("Failed to report bad block " + block + " to namenode : "
+          + " Exception", e);
+    }
+  }
+  
+  /**
+   * Report received blocks and delete hints to the Namenode
+   * 
+   * @throws IOException
+   */
+  private void reportReceivedDeletedBlocks() throws IOException {
+
+    // check if there are newly received blocks
+    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+    int currentReceivedRequestsCounter;
+    synchronized (receivedAndDeletedBlockList) {
+      currentReceivedRequestsCounter = pendingReceivedRequests;
+      int numBlocks = receivedAndDeletedBlockList.size();
+      if (numBlocks > 0) {
+        //
+        // Send newly-received and deleted blockids to namenode
+        //
+        receivedAndDeletedBlockArray = receivedAndDeletedBlockList
+            .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+      }
+    }
+    if (receivedAndDeletedBlockArray != null) {
+      bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
+          receivedAndDeletedBlockArray);
+      synchronized (receivedAndDeletedBlockList) {
+        for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
+          receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
+        }
+        pendingReceivedRequests -= currentReceivedRequestsCounter;
+      }
+    }
+  }
+
+  /*
+   * Informing the name node could take a long long time! Should we wait
+   * till namenode is informed before responding with success to the
+   * client? For now we don't.
+   */
+  void notifyNamenodeReceivedBlock(ReceivedDeletedBlockInfo bInfo) {
+    synchronized (receivedAndDeletedBlockList) {
+      receivedAndDeletedBlockList.add(bInfo);
+      pendingReceivedRequests++;
+      receivedAndDeletedBlockList.notifyAll();
+    }
+  }
+
+  void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
+    synchronized (receivedAndDeletedBlockList) {
+      receivedAndDeletedBlockList.add(bInfo);
+    }
+  }
+
+  /**
+   * Run an immediate block report on this thread. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerBlockReportForTests() throws IOException {
+      lastBlockReport = 0;
+      blockReport();
+  }
+
+  /**
+   * Report the list blocks to the Namenode
+   * @throws IOException
+   */
+  DatanodeCommand blockReport() throws IOException {
+    // send block report if timer has expired.
+    DatanodeCommand cmd = null;
+    long startTime = now();
+    if (startTime - lastBlockReport > dnConf.blockReportInterval) {
+
+      // Create block report
+      long brCreateStartTime = now();
+      BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
+          bpos.getBlockPoolId());
+
+      // Send block report
+      long brSendStartTime = now();
+      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), bReport
+          .getBlockListAsLongs());
+
+      // Log the block report processing stats from Datanode perspective
+      long brSendCost = now() - brSendStartTime;
+      long brCreateCost = brSendStartTime - brCreateStartTime;
+      dn.getMetrics().addBlockReport(brSendCost);
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+          + " blocks took " + brCreateCost + " msec to generate and "
+          + brSendCost + " msecs for RPC and NN processing");
+
+      // If we have sent the first block report, then wait a random
+      // time before we start the periodic block reports.
+      if (resetBlockReportTime) {
+        lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
+        resetBlockReportTime = false;
+      } else {
+        /* say the last block report was at 8:20:14. The current report
+         * should have started around 9:20:14 (default 1 hour interval).
+         * If current time is :
+         *   1) normal like 9:20:18, next report should be at 10:20:14
+         *   2) unexpected like 11:35:43, next report should be at 12:20:14
+         */
+        lastBlockReport += (now() - lastBlockReport) /
+        dnConf.blockReportInterval * dnConf.blockReportInterval;
+      }
+      LOG.info("sent block report, processed command:" + cmd);
+    }
+    return cmd;
+  }
+  
+  
+  DatanodeCommand [] sendHeartBeat() throws IOException {
+    LOG.info("heartbeat: " + this);
+    // TODO: saw an NPE here - maybe if the two BPOS register at
+    // same time, this one won't block on the other one?
+    return bpNamenode.sendHeartbeat(bpRegistration,
+        dn.getFSDataset().getCapacity(),
+        dn.getFSDataset().getDfsUsed(),
+        dn.getFSDataset().getRemaining(),
+        dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId()),
+        dn.getXmitsInProgress(),
+        dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes());
+  }
+  
+  //This must be called only by BPOfferService
+  void start() {
+    if ((bpThread != null) && (bpThread.isAlive())) {
+      //Thread is started already
+      return;
+    }
+    bpThread = new Thread(this, formatThreadName());
+    bpThread.setDaemon(true); // needed for JUnit testing
+    bpThread.start();
+  }
+  
+  private String formatThreadName() {
+    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
+    return "DataNode: [" +
+      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+      " heartbeating to " + nnAddr;
+  }
+  
+  //This must be called only by blockPoolManager.
+  void stop() {
+    shouldServiceRun = false;
+    if (bpThread != null) {
+        bpThread.interrupt();
+    }
+  }
+  
+  //This must be called only by blockPoolManager
+  void join() {
+    try {
+      if (bpThread != null) {
+        bpThread.join();
+      }
+    } catch (InterruptedException ie) { }
+  }
+  
+  //Cleanup method to be called by current thread before exiting.
+  private synchronized void cleanUp() {
+    
+    shouldServiceRun = false;
+    RPC.stopProxy(bpNamenode);
+    bpos.shutdownActor(this);
+  }
+
+  /**
+   * Main loop for each BP thread. Run until shutdown,
+   * forever calling remote NameNode functions.
+   */
+  private void offerService() throws Exception {
+    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+        + dnConf.blockReportInterval + "msec" + " Initial delay: "
+        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+        + dnConf.heartBeatInterval);
+
+    //
+    // Now loop for a long time....
+    //
+    while (shouldRun()) {
+      try {
+        long startTime = now();
+
+        //
+        // Every so often, send heartbeat or block-report
+        //
+        if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
+          //
+          // All heartbeat messages include following info:
+          // -- Datanode name
+          // -- data transfer port
+          // -- Total capacity
+          // -- Bytes remaining
+          //
+          lastHeartbeat = startTime;
+          if (!dn.areHeartbeatsDisabledForTests()) {
+            DatanodeCommand[] cmds = sendHeartBeat();
+            dn.getMetrics().addHeartbeat(now() - startTime);
+
+            long startProcessCommands = now();
+            if (!processCommand(cmds))
+              continue;
+            long endProcessCommands = now();
+            if (endProcessCommands - startProcessCommands > 2000) {
+              LOG.info("Took " + (endProcessCommands - startProcessCommands) +
+                  "ms to process " + cmds.length + " commands from NN");
+            }
+          }
+        }
+        if (pendingReceivedRequests > 0
+            || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+          reportReceivedDeletedBlocks();
+          lastDeletedReport = startTime;
+        }
+
+        DatanodeCommand cmd = blockReport();
+        processCommand(new DatanodeCommand[]{ cmd });
+
+        // Now safe to start scanning the block pool
+        // TODO(HA): this doesn't seem quite right
+        if (dn.blockScanner != null) {
+          dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
+        }
+
+        //
+        // There is no work to do;  sleep until hearbeat timer elapses, 
+        // or work arrives, and then iterate again.
+        //
+        long waitTime = dnConf.heartBeatInterval - 
+        (System.currentTimeMillis() - lastHeartbeat);
+        synchronized(receivedAndDeletedBlockList) {
+          if (waitTime > 0 && pendingReceivedRequests == 0) {
+            try {
+              receivedAndDeletedBlockList.wait(waitTime);
+            } catch (InterruptedException ie) {
+              LOG.warn("BPOfferService for " + this + " interrupted");
+            }
+          }
+        } // synchronized
+      } catch(RemoteException re) {
+        String reClass = re.getClassName();
+        if (UnregisteredNodeException.class.getName().equals(reClass) ||
+            DisallowedDatanodeException.class.getName().equals(reClass) ||
+            IncorrectVersionException.class.getName().equals(reClass)) {
+          LOG.warn(this + " is shutting down", re);
+          shouldServiceRun = false;
+          return;
+        }
+        LOG.warn("RemoteException in offerService", re);
+        try {
+          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      } catch (IOException e) {
+        LOG.warn("IOException in offerService", e);
+      }
+    } // while (shouldRun())
+  } // offerService
+
+  /**
+   * Register one bp with the corresponding NameNode
+   * <p>
+   * The bpDatanode needs to register with the namenode on startup in order
+   * 1) to report which storage it is serving now and 
+   * 2) to receive a registrationID
+   *  
+   * issued by the namenode to recognize registered datanodes.
+   * 
+   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+   * @throws IOException
+   */
+  void register() throws IOException {
+    // The handshake() phase loaded the block pool storage
+    // off disk - so update the bpRegistration object from that info
+    bpRegistration = bpos.createRegistration();
+
+    LOG.info(this + " beginning handshake with NN");
+
+    while (shouldRun()) {
+      try {
+        // Use returned registration from namenode with updated machine name.
+        bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+        break;
+      } catch(SocketTimeoutException e) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + nnAddr);
+        sleepAndLogInterrupts(1000, "connecting to server");
+      }
+    }
+    
+    LOG.info("Block pool " + this + " successfully registered with NN");
+    bpos.registrationSucceeded(this, bpRegistration);
+
+    // random short delay - helps scatter the BR from all DNs
+    scheduleBlockReport(dnConf.initialBlockReportDelay);
+  }
+
+
+  private void sleepAndLogInterrupts(int millis,
+      String stateString) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException ie) {
+      LOG.info("BPOfferService " + this +
+          " interrupted while " + stateString);
+    }
+  }
+
+  /**
+   * No matter what kind of exception we get, keep retrying to offerService().
+   * That's the loop that connects to the NameNode and provides basic DataNode
+   * functionality.
+   *
+   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
+   * happen either at shutdown or due to refreshNamenodes.
+   */
+  @Override
+  public void run() {
+    LOG.info(this + " starting to offer service");
+
+    try {
+      // init stuff
+      try {
+        // setup storage
+        connectToNNAndHandshake();
+      } catch (IOException ioe) {
+        // Initial handshake, storage recovery or registration failed
+        // End BPOfferService thread
+        LOG.fatal("Initialization failed for block pool " + this, ioe);
+        return;
+      }
+
+      initialized = true; // bp is initialized;
+      
+      while (shouldRun()) {
+        try {
+          bpos.startDistributedUpgradeIfNeeded();
+          offerService();
+        } catch (Exception ex) {
+          LOG.error("Exception in BPOfferService for " + this, ex);
+          sleepAndLogInterrupts(5000, "offering service");
+        }
+      }
+    } catch (Throwable ex) {
+      LOG.warn("Unexpected exception in block pool " + this, ex);
+    } finally {
+      LOG.warn("Ending block pool service for: " + this);
+      cleanUp();
+    }
+  }
+
+  private boolean shouldRun() {
+    return shouldServiceRun && dn.shouldRun();
+  }
+
+  /**
+   * Process an array of datanode commands
+   * 
+   * @param cmds an array of datanode commands
+   * @return true if further processing may be required or false otherwise. 
+   */
+  boolean processCommand(DatanodeCommand[] cmds) {
+    if (cmds != null) {
+      for (DatanodeCommand cmd : cmds) {
+        try {
+          if (bpos.processCommandFromActor(cmd, this) == false) {
+            return false;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Error processing datanode Command", ioe);
+        }
+      }
+    }
+    return true;
+  }
+
+  void trySendErrorReport(int errCode, String errMsg) {
+    try {
+      bpNamenode.errorReport(bpRegistration, errCode, errMsg);
+    } catch(IOException e) {
+      LOG.warn("Error reporting an error to NameNode " + nnAddr,
+          e);
+    }
+  }
+
+  /**
+   * Report a bad block from another DN in this cluster.
+   */
+  void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
+      throws IOException {
+    LocatedBlock lb = new LocatedBlock(block, 
+                                    new DatanodeInfo[] {dnInfo});
+    bpNamenode.reportBadBlocks(new LocatedBlock[] {lb});
+  }
+
+  void reRegister() throws IOException {
+    if (shouldRun()) {
+      // re-retrieve namespace info to make sure that, if the NN
+      // was restarted, we still match its version (HDFS-2120)
+      retrieveNamespaceInfo();
+      // and re-register
+      register();
+    }
+  }
+
+}

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -320,7 +320,6 @@ class BlockReceiver implements Closeable {
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
                              byte[] checksumBuf, int checksumOff ) 
                              throws IOException {
-    DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId());
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       
@@ -331,9 +330,7 @@ class BlockReceiver implements Closeable {
           try {
             LOG.info("report corrupt block " + block + " from datanode " +
                       srcDataNode + " to namenode");
-            LocatedBlock lb = new LocatedBlock(block, 
-                                            new DatanodeInfo[] {srcDataNode});
-            nn.reportBadBlocks(new LocatedBlock[] {lb});
+            datanode.reportRemoteBadBlock(srcDataNode, block);
           } catch (IOException e) {
             LOG.warn("Failed to report bad block " + block + 
                       " from datanode " + srcDataNode + " to namenode");

+ 90 - 47
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -71,6 +71,7 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -243,7 +244,7 @@ public class DataNode extends Configured
   @InterfaceAudience.Private
   class BlockPoolManager {
     private final Map<String, BPOfferService> bpMapping;
-    private final Map<InetSocketAddress, BPOfferService> nameNodeThreads;
+    private final List<BPOfferService> offerServices;
  
     //This lock is used only to ensure exclusion of refreshNamenodes
     private final Object refreshNamenodesLock = new Object();
@@ -251,31 +252,26 @@ public class DataNode extends Configured
     BlockPoolManager(Configuration conf)
         throws IOException {
       bpMapping = new HashMap<String, BPOfferService>();
-      nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
+      offerServices = new ArrayList<BPOfferService>();
   
       Map<String, Map<String, InetSocketAddress>> map =
         DFSUtil.getNNServiceRpcAddresses(conf);
       for (Entry<String, Map<String, InetSocketAddress>> entry :
            map.entrySet()) {
         List<InetSocketAddress> nnList = Lists.newArrayList(entry.getValue().values());
-        // TODO(HA) when HDFS-1971 (dual BRs) is done, pass all of the NNs
-        // to BPOS
-        InetSocketAddress isa = nnList.get(0);
-        BPOfferService bpos = new BPOfferService(isa, DataNode.this);
-        nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+        BPOfferService bpos = new BPOfferService(nnList, DataNode.this);
+        offerServices.add(bpos);
       }
     }
     
-    synchronized void addBlockPool(BPOfferService t) {
-      if (nameNodeThreads.get(t.getNNSocketAddress()) == null) {
-        throw new IllegalArgumentException(
-            "Unknown BPOfferService thread for namenode address:"
-                + t.getNNSocketAddress());
-      }
-      if (t.getBlockPoolId() == null) {
+    synchronized void addBlockPool(BPOfferService bpos) {
+      Preconditions.checkArgument(offerServices.contains(bpos),
+          "Unknown BPOS: %s", bpos);
+      if (bpos.getBlockPoolId() == null) {
         throw new IllegalArgumentException("Null blockpool id");
       }
-      bpMapping.put(t.getBlockPoolId(), t);
+      LOG.info("===> registering in bpmapping: " + bpos);
+      bpMapping.put(bpos.getBlockPoolId(), bpos);
     }
     
     /**
@@ -283,21 +279,26 @@ public class DataNode extends Configured
      * Caution: The BPOfferService returned could be shutdown any time.
      */
     synchronized BPOfferService[] getAllNamenodeThreads() {
-      BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values()
-          .size()];
-      return nameNodeThreads.values().toArray(bposArray);
+      BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
+      return offerServices.toArray(bposArray);
     }
-    
-    synchronized BPOfferService get(InetSocketAddress addr) {
-      return nameNodeThreads.get(addr);
-    }
-    
+        
     synchronized BPOfferService get(String bpid) {
       return bpMapping.get(bpid);
     }
     
+    // TODO(HA) would be good to kill this
+    synchronized BPOfferService get(InetSocketAddress addr) {
+      for (BPOfferService bpos : offerServices) {
+        if (bpos.containsNN(addr)) {
+          return bpos;
+        }
+      }
+      return null;
+    }
+
     synchronized void remove(BPOfferService t) {
-      nameNodeThreads.remove(t.getNNSocketAddress());
+      offerServices.remove(t);
       bpMapping.remove(t.getBlockPoolId());
     }
     
@@ -318,7 +319,7 @@ public class DataNode extends Configured
         UserGroupInformation.getLoginUser().doAs(
             new PrivilegedExceptionAction<Object>() {
               public Object run() throws Exception {
-                for (BPOfferService bpos : nameNodeThreads.values()) {
+                for (BPOfferService bpos : offerServices) {
                   bpos.start();
                 }
                 return null;
@@ -339,6 +340,10 @@ public class DataNode extends Configured
     
     void refreshNamenodes(Configuration conf)
         throws IOException {
+      throw new UnsupportedOperationException("TODO(HA)");
+/*
+ * TODO(HA)
+
       LOG.info("Refresh request received for nameservices: "
           + conf.get(DFS_FEDERATION_NAMESERVICES));
       
@@ -355,20 +360,20 @@ public class DataNode extends Configured
       List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
       synchronized (refreshNamenodesLock) {
         synchronized (this) {
-          for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) {
+          for (InetSocketAddress nnaddr : offerServices.keySet()) {
             if (!(newAddresses.contains(nnaddr))) {
-              toShutdown.add(nameNodeThreads.get(nnaddr));
+              toShutdown.add(offerServices.get(nnaddr));
             }
           }
           for (InetSocketAddress nnaddr : newAddresses) {
-            if (!(nameNodeThreads.containsKey(nnaddr))) {
+            if (!(offerServices.containsKey(nnaddr))) {
               toStart.add(nnaddr);
             }
           }
 
           for (InetSocketAddress nnaddr : toStart) {
             BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
-            nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+            offerServices.put(bpos.getNNSocketAddress(), bpos);
           }
         }
 
@@ -383,7 +388,9 @@ public class DataNode extends Configured
         // Now start the threads that are not already running.
         startAll();
       }
+      */
     }
+
   }
   
   volatile boolean shouldRun = true;
@@ -685,13 +692,44 @@ public class DataNode extends Configured
     }
   }
   
+  /**
+   * Report a bad block which is hosted on the local DN.
+   */
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
+    BPOfferService bpos = getBPOSForBlock(block);
+    bpos.reportBadBlocks(block);
+  }
+
+  /**
+   * Report a bad block on another DN (eg if we received a corrupt replica
+   * from a remote host).
+   * @param srcDataNode the DN hosting the bad block
+   * @param block the block itself
+   */
+  public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block)
+      throws IOException {
+    BPOfferService bpos = getBPOSForBlock(block);
+    bpos.reportRemoteBadBlock(srcDataNode, block);
+  }
+  
+  /**
+   * Return the BPOfferService instance corresponding to the given block.
+   * @param block
+   * @return the BPOS
+   * @throws IOException if no such BPOS can be found
+   */
+  private BPOfferService getBPOSForBlock(ExtendedBlock block)
+      throws IOException {
+    Preconditions.checkNotNull(block);
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
-    if(bpos == null || bpos.bpNamenode == null) {
-      throw new IOException("cannot locate OfferService thread for bp="+block.getBlockPoolId());
+    if (bpos == null) {
+      throw new IOException("cannot locate OfferService thread for bp="+
+          block.getBlockPoolId());
     }
-    bpos.reportBadBlocks(block);
+    return bpos;
   }
+
+
   
   // used only for testing
   void setHeartbeatsDisabledForTests(
@@ -1006,11 +1044,15 @@ public class DataNode extends Configured
   
   /**
    * get BP registration by machine and port name (host:port)
-   * @param mName
+   * @param mName - the name that the NN used
    * @return BP registration 
    * @throws IOException 
    */
   DatanodeRegistration getDNRegistrationByMachineName(String mName) {
+    // TODO: all the BPs should have the same name as each other, they all come
+    // from getName() here! and the use cases only are in tests where they just
+    // call with getName(). So we could probably just make this method return
+    // the first BPOS's registration
     BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
     for (BPOfferService bpos : bposArray) {
       if(bpos.bpRegistration.getName().equals(mName))
@@ -1055,6 +1097,8 @@ public class DataNode extends Configured
    * @return namenode address corresponding to the bpid
    */
   public InetSocketAddress getNameNodeAddr(String bpid) {
+    // TODO(HA) this function doesn't make sense! used by upgrade code
+    // Should it return just the active one or simply return the BPService.
     BPOfferService bp = blockPoolManager.get(bpid);
     if (bp != null) {
       return bp.getNNSocketAddress();
@@ -1288,12 +1332,7 @@ public class DataNode extends Configured
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
-      DatanodeProtocol nn = bpos.bpNamenode;
-      try {
-        nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
-      } catch(IOException e) {
-        LOG.warn("Error reporting disk failure to NameNode", e);
-      }
+      bpos.trySendErrorReport(dpError, errMsgr);
     }
     
     if(hasEnoughResources) {
@@ -1309,6 +1348,10 @@ public class DataNode extends Configured
   int getXceiverCount() {
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
+  
+  int getXmitsInProgress() {
+    return xmitsInProgress.get();
+  }
     
   UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
     BPOfferService bpos = blockPoolManager.get(bpid);
@@ -1321,14 +1364,15 @@ public class DataNode extends Configured
   private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
-    DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+    BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
-      nn.errorReport(bpReg, DatanodeProtocol.INVALID_BLOCK, errStr);
+      
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr);
       return;
     }
 
@@ -1336,9 +1380,7 @@ public class DataNode extends Configured
     long onDiskLength = data.getLength(block);
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
-      nn.reportBadBlocks(new LocatedBlock[]{
-          new LocatedBlock(block, new DatanodeInfo[] {
-              new DatanodeInfo(bpReg)})});
+      bpos.reportBadBlocks(block);
       LOG.warn("Can't replicate block " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
@@ -1991,10 +2033,10 @@ public class DataNode extends Configured
    */
   public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
-    if(bpos == null || bpos.bpNamenode == null) {
+    if (bpos == null) {
       throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
     }
-    return bpos.bpNamenode;
+    return bpos.getActiveNN();
   }
 
   /** Block synchronization */
@@ -2013,6 +2055,7 @@ public class DataNode extends Configured
     // or their replicas have 0 length.
     // The block can be deleted.
     if (syncList.isEmpty()) {
+      // TODO: how does this work in HA??
       nn.commitBlockSynchronization(block, recoveryId, 0,
           true, true, DatanodeID.EMPTY_ARRAY);
       return;
@@ -2229,7 +2272,7 @@ public class DataNode extends Configured
   public String getNamenodeAddresses() {
     final Map<String, String> info = new HashMap<String, String>();
     for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
-      if (bpos != null && bpos.bpThread != null) {
+      if (bpos != null) {
         info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId());
       }
     }

+ 282 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestBPOfferService {
+
+  private static final String FAKE_BPID = "fake bpid";
+  private static final String FAKE_CLUSTERID = "fake cluster";
+  protected static final Log LOG = LogFactory.getLog(
+      TestBPOfferService.class);
+  private static final ExtendedBlock FAKE_BLOCK =
+    new ExtendedBlock(FAKE_BPID, 12345L);
+
+  static {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private DatanodeProtocol mockNN1;
+  private DatanodeProtocol mockNN2;
+  private DataNode mockDn;
+  private FSDatasetInterface mockFSDataset;
+  
+  @Before
+  public void setupMocks() throws Exception {
+    mockNN1 = setupNNMock();
+    mockNN2 = setupNNMock();
+
+    // Set up a mock DN with the bare-bones configuration
+    // objects, etc.
+    mockDn = Mockito.mock(DataNode.class);
+    Mockito.doReturn(true).when(mockDn).shouldRun();
+    Configuration conf = new Configuration();
+    Mockito.doReturn(conf).when(mockDn).getConf();
+    Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
+    Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
+    .when(mockDn).getMetrics();
+
+    // Set up a simulated dataset with our fake BP
+    mockFSDataset = Mockito.spy(new SimulatedFSDataset(conf));
+    mockFSDataset.addBlockPool(FAKE_BPID, conf);
+
+    // Wire the dataset to the DN.
+    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
+  }
+
+  /**
+   * Set up a mock NN with the bare minimum for a DN to register to it.
+   */
+  private DatanodeProtocol setupNNMock() throws Exception {
+    DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class);
+    Mockito.doReturn(
+        new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
+            0, HdfsConstants.LAYOUT_VERSION))
+      .when(mock).versionRequest();
+    return mock;
+  }
+  
+  /**
+   * Test that the BPOS can register to talk to two different NNs,
+   * sends block reports to both, etc.
+   */
+  @Test
+  public void testBasicFunctionality() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      
+      // The DN should have register to both NNs.
+      Mockito.verify(mockNN1).registerDatanode(
+          (DatanodeRegistration) Mockito.anyObject());
+      Mockito.verify(mockNN2).registerDatanode(
+          (DatanodeRegistration) Mockito.anyObject());
+      
+      // Should get block reports from both NNs
+      waitForBlockReport(mockNN1);
+      waitForBlockReport(mockNN2);
+
+      // When we receive a block, it should report it to both NNs
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
+
+      ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
+      assertEquals(1, ret.length);
+      assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+      
+      ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
+      assertEquals(1, ret.length);
+      assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+    } finally {
+      bpos.stop();
+    }
+  }
+
+  /**
+   * Test that DNA_INVALIDATE commands from the standby are ignored.
+   */
+  @Test
+  public void testIgnoreDeletionsFromNonActive() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+
+    // Ask to invalidate FAKE_BLOCK when block report hits the
+    // standby
+    Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
+        FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() }))
+        .when(mockNN2).blockReport(
+            Mockito.<DatanodeRegistration>anyObject(),  
+            Mockito.eq(FAKE_BPID),
+            Mockito.<long[]>anyObject());
+
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      
+      // Should get block reports from both NNs
+      waitForBlockReport(mockNN1);
+      waitForBlockReport(mockNN2);
+
+    } finally {
+      bpos.stop();
+    }
+    
+    // Should ignore the delete command from the standby
+    Mockito.verify(mockFSDataset, Mockito.never())
+      .invalidate(Mockito.eq(FAKE_BPID),
+          (Block[]) Mockito.anyObject());
+  }
+
+  /**
+   * Ensure that, if the two NNs configured for a block pool
+   * have different block pool IDs, they will refuse to both
+   * register.
+   */
+  @Test
+  public void testNNsFromDifferentClusters() throws Exception {
+    Mockito.doReturn(
+        new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID,
+            0, HdfsConstants.LAYOUT_VERSION))
+      .when(mockNN1).versionRequest();
+        
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForOneToFail(bpos);
+    } finally {
+      bpos.stop();
+    }
+  }
+
+  private void waitForOneToFail(final BPOfferService bpos)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return bpos.countNameNodes() == 1;
+      }
+    }, 100, 10000);
+  }
+
+  /**
+   * Create a BPOfferService which registers with and heartbeats with the
+   * specified namenode proxy objects.
+   */
+  private BPOfferService setupBPOSForNNs(DatanodeProtocol ... nns) {
+    // Set up some fake InetAddresses, then override the connectToNN
+    // function to return the corresponding proxies.
+
+    final Map<InetSocketAddress, DatanodeProtocol> nnMap = Maps.newLinkedHashMap();
+    for (int port = 0; port < nns.length; port++) {
+      nnMap.put(new InetSocketAddress(port), nns[port]);
+    }
+
+    return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn) {
+      @Override
+      DatanodeProtocol connectToNN(InetSocketAddress nnAddr) throws IOException {
+        DatanodeProtocol nn = nnMap.get(nnAddr);
+        if (nn == null) {
+          throw new AssertionError("bad NN addr: " + nnAddr);
+        }
+        return nn;
+      }
+    };
+  }
+
+  private void waitForInitialization(final BPOfferService bpos)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return bpos.isAlive() && bpos.isInitialized();
+      }
+    }, 100, 10000);
+  }
+  
+  private void waitForBlockReport(final DatanodeProtocol mockNN)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          Mockito.verify(mockNN).blockReport(
+              Mockito.<DatanodeRegistration>anyObject(),  
+              Mockito.eq(FAKE_BPID),
+              Mockito.<long[]>anyObject());
+          return true;
+        } catch (Throwable t) {
+          LOG.info("waiting on block report: " + t.getMessage());
+          return false;
+        }
+      }
+    }, 500, 10000);
+  }
+  
+  private ReceivedDeletedBlockInfo[] waitForBlockReceived(
+      ExtendedBlock fakeBlock,
+      DatanodeProtocol mockNN) throws Exception {
+    final ArgumentCaptor<ReceivedDeletedBlockInfo[]> captor =
+      ArgumentCaptor.forClass(ReceivedDeletedBlockInfo[].class);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        try {
+          Mockito.verify(mockNN1).blockReceivedAndDeleted(
+            Mockito.<DatanodeRegistration>anyObject(),
+            Mockito.eq(FAKE_BPID),
+            captor.capture());
+          return true;
+        } catch (Throwable t) {
+          return false;
+        }
+      }
+    }, 100, 10000);
+    return captor.getValue();
+  }
+
+}

+ 7 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

@@ -93,23 +93,22 @@ public class TestDataNodeMultipleRegistrations {
       assertEquals("number of volumes is wrong", 2, volInfos.size());
 
       for (BPOfferService bpos : dn.getAllBpOs()) {
-        LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
-            + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
+        LOG.info("BP: " + bpos);
       }
 
       BPOfferService bpos1 = dn.getAllBpOs()[0];
       BPOfferService bpos2 = dn.getAllBpOs()[1];
 
       // The order of bpos is not guaranteed, so fix the order
-      if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) {
+      if (bpos1.getNNSocketAddress().equals(nn2.getNameNodeAddress())) {
         BPOfferService tmp = bpos1;
         bpos1 = bpos2;
         bpos2 = tmp;
       }
 
-      assertEquals("wrong nn address", bpos1.nnAddr,
+      assertEquals("wrong nn address", bpos1.getNNSocketAddress(),
           nn1.getNameNodeAddress());
-      assertEquals("wrong nn address", bpos2.nnAddr,
+      assertEquals("wrong nn address", bpos2.getNNSocketAddress(),
           nn2.getNameNodeAddress());
       assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
       assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
@@ -156,15 +155,14 @@ public class TestDataNodeMultipleRegistrations {
 
       for (BPOfferService bpos : dn.getAllBpOs()) {
         LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
-            + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
+            + bpos.bpRegistration.storageID + "; nna=" + bpos.getNNSocketAddress());
       }
 
       // try block report
       BPOfferService bpos1 = dn.getAllBpOs()[0];
-      bpos1.lastBlockReport = 0;
-      bpos1.blockReport();
+      bpos1.triggerBlockReportForTests();
 
-      assertEquals("wrong nn address", bpos1.nnAddr,
+      assertEquals("wrong nn address", bpos1.getNNSocketAddress(),
           nn1.getNameNodeAddress());
       assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
       assertEquals("wrong cid", dn.getClusterId(), cid1);

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java

@@ -42,17 +42,19 @@ public class TestDatanodeRegister {
     DataNode mockDN = mock(DataNode.class);
     Mockito.doReturn(true).when(mockDN).shouldRun();
     
-    BPOfferService bpos = new BPOfferService(INVALID_ADDR, mockDN);
+    BPOfferService mockBPOS = Mockito.mock(BPOfferService.class);
+    Mockito.doReturn(mockDN).when(mockBPOS).getDataNode();
+    
+    BPServiceActor actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
 
     NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
     when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
     DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class);
     when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
 
-    bpos.setNameNode( fakeDNProt );
-    bpos.bpNSInfo = fakeNSInfo;
+    actor.setNameNode( fakeDNProt );
     try {   
-      bpos.retrieveNamespaceInfo();
+      actor.retrieveNamespaceInfo();
       fail("register() did not throw exception! " +
            "Expected: IncorrectVersionException");
     } catch (IncorrectVersionException ie) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java

@@ -72,7 +72,7 @@ public class TestRefreshNamenodes {
         InetSocketAddress addr = cluster.getNameNode(i).getNameNodeAddress();
         boolean found = false;
         for (int j = 0; j < bpoList.length; j++) {
-          if (bpoList[j] != null && addr.equals(bpoList[j].nnAddr)) {
+          if (bpoList[j] != null && addr.equals(bpoList[j].getNNSocketAddress())) {
             found = true;
             bpoList[j] = null; // Erase the address that matched
             break;