Browse Source

HDFS-2197. Refactor RPC call implementations out of NameNode class. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1169868 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
dacd3e5b18
42 changed files with 1226 additions and 1055 deletions
  1. 1 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 85 68
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
  6. 33 838
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  7. 920 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  8. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  9. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
  11. 17 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  12. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
  13. 9 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  14. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
  15. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  17. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java
  19. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
  20. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
  21. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
  22. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  23. 16 14
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
  24. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  25. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
  26. 33 30
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  27. 0 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  28. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
  29. 11 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
  30. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
  31. 15 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
  32. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
  33. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  34. 10 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
  35. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
  36. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java
  37. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java
  38. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
  39. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
  40. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
  41. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java
  42. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -684,6 +684,7 @@ Release 0.23.0 - Unreleased
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
                HdfsConstants. (Harsh J Chouraria via atm)
+    HDFS-2197. Refactor RPC call implementations out of NameNode class (todd)
 
   OPTIMIZATIONS
 

+ 85 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -52,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
  * </ol>
  */
 @InterfaceAudience.Private
-public class BackupNode extends NameNode implements JournalProtocol {
+public class BackupNode extends NameNode {
   private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
   private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
   private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -95,18 +95,20 @@ public class BackupNode extends NameNode implements JournalProtocol {
   }
 
   @Override // NameNode
-  protected void setRpcServerAddress(Configuration conf) {
-    conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress));
+  protected void setRpcServerAddress(Configuration conf,
+      InetSocketAddress addr) {
+    conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(addr));
   }
   
   @Override // Namenode
-  protected void setRpcServiceServerAddress(Configuration conf) {
-    conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(serviceRPCAddress));
+  protected void setRpcServiceServerAddress(Configuration conf,
+      InetSocketAddress addr) {
+    conf.set(BN_SERVICE_RPC_ADDRESS_KEY,  getHostPortString(addr));
   }
 
   @Override // NameNode
   protected InetSocketAddress getHttpServerAddress(Configuration conf) {
-    assert rpcAddress != null : "rpcAddress should be calculated first";
+    assert getNameNodeAddress() != null : "rpcAddress should be calculated first";
     String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
     return NetUtils.createSocketAddr(addr);
   }
@@ -145,6 +147,12 @@ public class BackupNode extends NameNode implements JournalProtocol {
     runCheckpointDaemon(conf);
   }
 
+  @Override
+  protected NameNodeRpcServer createRpcServer(Configuration conf)
+      throws IOException {
+    return new BackupNodeRpcServer(conf, this);
+  }
+
   @Override // NameNode
   public void stop() {
     if(checkpointManager != null) {
@@ -177,74 +185,83 @@ public class BackupNode extends NameNode implements JournalProtocol {
     super.stop();
   }
 
-  
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion)
-      throws IOException {
-    if (protocol.equals(JournalProtocol.class.getName())) {
-      return JournalProtocol.versionID;
-    } else {
-      return super.getProtocolVersion(protocol, clientVersion);
+  static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol {
+    private final String nnRpcAddress;
+    
+    private BackupNodeRpcServer(Configuration conf, BackupNode nn)
+        throws IOException {
+      super(conf, nn);
+      nnRpcAddress = nn.nnRpcAddress;
     }
-  }
-
-  /////////////////////////////////////////////////////
-  // NamenodeProtocol implementation for backup node.
-  /////////////////////////////////////////////////////
-  @Override // NamenodeProtocol
-  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
-  throws IOException {
-    throw new UnsupportedActionException("getBlocks");
-  }
-
-  // Only active name-node can register other nodes.
-  @Override // NamenodeProtocol
-  public NamenodeRegistration register(NamenodeRegistration registration
-  ) throws IOException {
-    throw new UnsupportedActionException("register");
-  }
-
-  @Override // NamenodeProtocol
-  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
-  throws IOException {
-    throw new UnsupportedActionException("startCheckpoint");
-  }
 
-  @Override // NamenodeProtocol
-  public void endCheckpoint(NamenodeRegistration registration,
-                            CheckpointSignature sig) throws IOException {
-    throw new UnsupportedActionException("endCheckpoint");
-  }  
-
-  /////////////////////////////////////////////////////
-  // BackupNodeProtocol implementation for backup node.
-  /////////////////////////////////////////////////////
-
-  @Override
-  public void journal(NamenodeRegistration nnReg,
-      long firstTxId, int numTxns,
-      byte[] records) throws IOException {
-    verifyRequest(nnReg);
-    if(!nnRpcAddress.equals(nnReg.getAddress()))
-      throw new IOException("Journal request from unexpected name-node: "
-          + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    getBNImage().journal(firstTxId, numTxns, records);
-  }
-
-  @Override
-  public void startLogSegment(NamenodeRegistration registration, long txid)
-      throws IOException {
-    verifyRequest(registration);
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      if (protocol.equals(JournalProtocol.class.getName())) {
+        return JournalProtocol.versionID;
+      } else {
+        return super.getProtocolVersion(protocol, clientVersion);
+      }
+    }
   
-    getBNImage().namenodeStartedLogSegment(txid);
-  }
-
-  //////////////////////////////////////////////////////
+    /////////////////////////////////////////////////////
+    // NamenodeProtocol implementation for backup node.
+    /////////////////////////////////////////////////////
+    @Override // NamenodeProtocol
+    public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+    throws IOException {
+      throw new UnsupportedActionException("getBlocks");
+    }
   
+    // Only active name-node can register other nodes.
+    @Override // NamenodeProtocol
+    public NamenodeRegistration register(NamenodeRegistration registration
+    ) throws IOException {
+      throw new UnsupportedActionException("register");
+    }
+  
+    @Override // NamenodeProtocol
+    public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+    throws IOException {
+      throw new UnsupportedActionException("startCheckpoint");
+    }
   
-  BackupImage getBNImage() {
-    return (BackupImage)getFSImage();
+    @Override // NamenodeProtocol
+    public void endCheckpoint(NamenodeRegistration registration,
+                              CheckpointSignature sig) throws IOException {
+      throw new UnsupportedActionException("endCheckpoint");
+    }  
+  
+    /////////////////////////////////////////////////////
+    // BackupNodeProtocol implementation for backup node.
+    /////////////////////////////////////////////////////
+  
+    @Override
+    public void journal(NamenodeRegistration nnReg,
+        long firstTxId, int numTxns,
+        byte[] records) throws IOException {
+      verifyRequest(nnReg);
+      if(!nnRpcAddress.equals(nnReg.getAddress()))
+        throw new IOException("Journal request from unexpected name-node: "
+            + nnReg.getAddress() + " expecting " + rpcAddress);
+      getBNImage().journal(firstTxId, numTxns, records);
+    }
+  
+    @Override
+    public void startLogSegment(NamenodeRegistration registration, long txid)
+        throws IOException {
+      verifyRequest(registration);
+    
+      getBNImage().namenodeStartedLogSegment(txid);
+    }
+    
+    private BackupImage getBNImage() {
+      return (BackupImage)nn.getFSImage();
+    }
   }
+  
+  //////////////////////////////////////////////////////
+  
 
   boolean shouldCheckpointAtStartup() {
     FSImage fsImage = getFSImage();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java

@@ -69,7 +69,7 @@ public class CancelDelegationTokenServlet extends DfsServlet {
     try {
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
         public Void run() throws Exception {
-          nn.cancelDelegationToken(token);
+          nn.getRpcServer().cancelDelegationToken(token);
           return null;
         }
       });

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java

@@ -73,7 +73,7 @@ abstract class DfsServlet extends HttpServlet {
     // rpc
     NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     if (nn != null) {
-      return nn;
+      return nn.getRpcServer();
     }
     InetSocketAddress nnAddr =
       NameNodeHttpServer.getNameNodeAddressFromContext(context);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java

@@ -75,7 +75,7 @@ public class GetDelegationTokenServlet extends DfsServlet {
           + ":" + NameNode.getAddress(conf).getPort();
 
           Token<DelegationTokenIdentifier> token = 
-            nn.getDelegationToken(new Text(renewerFinal));
+            nn.getRpcServer().getDelegationToken(new Text(renewerFinal));
           if(token == null) {
             throw new Exception("couldn't get the token for " +s);
           }

File diff suppressed because it is too large
+ 33 - 838
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java


+ 920 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+
+/**
+ * This class is responsible for handling all of the RPC calls to the NameNode.
+ * It is created, started, and stopped by {@link NameNode}.
+ */
+class NameNodeRpcServer implements NamenodeProtocols {
+  
+  private static final Log LOG = NameNode.LOG;
+  private static final Log stateChangeLog = NameNode.stateChangeLog;
+  
+  // Dependencies from other parts of NN.
+  private final FSNamesystem namesystem;
+  protected final NameNode nn;
+  private final NameNodeMetrics metrics;
+  
+  private final boolean serviceAuthEnabled;
+
+  /** The RPC server that listens to requests from DataNodes */
+  private final RPC.Server serviceRpcServer;
+  private final InetSocketAddress serviceRPCAddress;
+  
+  /** The RPC server that listens to requests from clients */
+  protected final RPC.Server server;
+  protected final InetSocketAddress rpcAddress;
+
+  public NameNodeRpcServer(Configuration conf, NameNode nn)
+      throws IOException {
+    this.nn = nn;
+    this.namesystem = nn.getNamesystem();
+    this.metrics = NameNode.getNameNodeMetrics();
+    
+    int handlerCount = 
+      conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
+                  DFS_DATANODE_HANDLER_COUNT_DEFAULT);
+    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
+
+    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
+    if (dnSocketAddr != null) {
+      int serviceHandlerCount =
+        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
+                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
+      this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
+          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
+          false, conf, namesystem.getDelegationTokenSecretManager());
+      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
+      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
+    } else {
+      serviceRpcServer = null;
+      serviceRPCAddress = null;
+    }
+    this.server = RPC.getServer(NamenodeProtocols.class, this,
+                                socAddr.getHostName(), socAddr.getPort(),
+                                handlerCount, false, conf, 
+                                namesystem.getDelegationTokenSecretManager());
+
+    // set service-level authorization security policy
+    if (serviceAuthEnabled =
+          conf.getBoolean(
+            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      if (this.serviceRpcServer != null) {
+        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      }
+    }
+
+    // The rpc-server port can be ephemeral... ensure we have the correct info
+    this.rpcAddress = this.server.getListenerAddress(); 
+    nn.setRpcServerAddress(conf, rpcAddress);
+  }
+  
+  /**
+   * Actually start serving requests.
+   */
+  void start() {
+    server.start();  //start RPC server
+    if (serviceRpcServer != null) {
+      serviceRpcServer.start();      
+    }
+  }
+  
+  /**
+   * Wait until the RPC server has shut down.
+   */
+  void join() throws InterruptedException {
+    this.server.join();
+  }
+  
+  void stop() {
+    if(server != null) server.stop();
+    if(serviceRpcServer != null) serviceRpcServer.stop();
+  }
+  
+  InetSocketAddress getServiceRpcAddress() {
+    return serviceRPCAddress;
+  }
+
+  InetSocketAddress getRpcAddress() {
+    return rpcAddress;
+  }
+  
+  @Override // VersionedProtocol
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException {
+    if (protocol.equals(ClientProtocol.class.getName())) {
+      return ClientProtocol.versionID; 
+    } else if (protocol.equals(DatanodeProtocol.class.getName())){
+      return DatanodeProtocol.versionID;
+    } else if (protocol.equals(NamenodeProtocol.class.getName())){
+      return NamenodeProtocol.versionID;
+    } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
+      return RefreshAuthorizationPolicyProtocol.versionID;
+    } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
+      return RefreshUserMappingsProtocol.versionID;
+    } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
+      return GetUserMappingsProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+
+  /////////////////////////////////////////////////////
+  // NamenodeProtocol
+  /////////////////////////////////////////////////////
+  @Override // NamenodeProtocol
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+  throws IOException {
+    if(size <= 0) {
+      throw new IllegalArgumentException(
+        "Unexpected not positive size: "+size);
+    }
+
+    return namesystem.getBlockManager().getBlocks(datanode, size); 
+  }
+
+  @Override // NamenodeProtocol
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    return namesystem.getBlockManager().getBlockKeys();
+  }
+
+  @Override // NamenodeProtocol
+  public void errorReport(NamenodeRegistration registration,
+                          int errorCode, 
+                          String msg) throws IOException {
+    verifyRequest(registration);
+    LOG.info("Error report from " + registration + ": " + msg);
+    if(errorCode == FATAL)
+      namesystem.releaseBackupNode(registration);
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeRegistration register(NamenodeRegistration registration)
+  throws IOException {
+    verifyVersion(registration.getVersion());
+    NamenodeRegistration myRegistration = nn.setRegistration();
+    namesystem.registerBackupNode(registration, myRegistration);
+    return myRegistration;
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+  throws IOException {
+    verifyRequest(registration);
+    if(!nn.isRole(NamenodeRole.NAMENODE))
+      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
+    return namesystem.startCheckpoint(registration, nn.setRegistration());
+  }
+
+  @Override // NamenodeProtocol
+  public void endCheckpoint(NamenodeRegistration registration,
+                            CheckpointSignature sig) throws IOException {
+    verifyRequest(registration);
+    if(!nn.isRole(NamenodeRole.NAMENODE))
+      throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
+    namesystem.endCheckpoint(registration, sig);
+  }
+
+  @Override // ClientProtocol
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException {
+    return namesystem.getDelegationToken(renewer);
+  }
+
+  @Override // ClientProtocol
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    return namesystem.renewDelegationToken(token);
+  }
+
+  @Override // ClientProtocol
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    namesystem.cancelDelegationToken(token);
+  }
+  
+  @Override // ClientProtocol
+  public LocatedBlocks getBlockLocations(String src, 
+                                          long offset, 
+                                          long length) 
+      throws IOException {
+    metrics.incrGetBlockLocations();
+    return namesystem.getBlockLocations(getClientMachine(), 
+                                        src, offset, length);
+  }
+  
+  @Override // ClientProtocol
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return namesystem.getServerDefaults();
+  }
+
+  @Override // ClientProtocol
+  public void create(String src, 
+                     FsPermission masked,
+                     String clientName, 
+                     EnumSetWritable<CreateFlag> flag,
+                     boolean createParent,
+                     short replication,
+                     long blockSize) throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.create: file "
+                         +src+" for "+clientName+" at "+clientMachine);
+    }
+    if (!checkPathLength(src)) {
+      throw new IOException("create: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.startFile(src,
+        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+            null, masked),
+        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
+    metrics.incrFilesCreated();
+    metrics.incrCreateFileOps();
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock append(String src, String clientName) 
+      throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.append: file "
+          +src+" for "+clientName+" at "+clientMachine);
+    }
+    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+    metrics.incrFilesAppended();
+    return info;
+  }
+
+  @Override // ClientProtocol
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    return namesystem.recoverLease(src, clientName, clientMachine);
+  }
+
+  @Override // ClientProtocol
+  public boolean setReplication(String src, short replication) 
+    throws IOException {  
+    return namesystem.setReplication(src, replication);
+  }
+    
+  @Override // ClientProtocol
+  public void setPermission(String src, FsPermission permissions)
+      throws IOException {
+    namesystem.setPermission(src, permissions);
+  }
+
+  @Override // ClientProtocol
+  public void setOwner(String src, String username, String groupname)
+      throws IOException {
+    namesystem.setOwner(src, username, groupname);
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock addBlock(String src,
+                               String clientName,
+                               ExtendedBlock previous,
+                               DatanodeInfo[] excludedNodes)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+          +src+" for "+clientName);
+    }
+    HashMap<Node, Node> excludedNodesSet = null;
+    if (excludedNodes != null) {
+      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      for (Node node:excludedNodes) {
+        excludedNodesSet.put(node, node);
+      }
+    }
+    LocatedBlock locatedBlock = 
+      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
+    if (locatedBlock != null)
+      metrics.incrAddBlockOps();
+    return locatedBlock;
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    metrics.incrGetAdditionalDatanodeOps();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
+  /**
+   * The client needs to give up on the block.
+   */
+  public void abandonBlock(ExtendedBlock b, String src, String holder)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+          +b+" of file "+src);
+    }
+    if (!namesystem.abandonBlock(b, src, holder)) {
+      throw new IOException("Cannot abandon block during write to " + src);
+    }
+  }
+
+  @Override // ClientProtocol
+  public boolean complete(String src, String clientName, ExtendedBlock last)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.complete: "
+          + src + " for " + clientName);
+    }
+    return namesystem.completeFile(src, clientName, last);
+  }
+
+  /**
+   * The client has detected an error on the specified located blocks 
+   * and is reporting them to the server.  For now, the namenode will 
+   * mark the block as corrupt.  In the future we might 
+   * check the blocks are actually corrupt. 
+   */
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
+    for (int i = 0; i < blocks.length; i++) {
+      ExtendedBlock blk = blocks[i].getBlock();
+      DatanodeInfo[] nodes = blocks[i].getLocations();
+      for (int j = 0; j < nodes.length; j++) {
+        DatanodeInfo dn = nodes[j];
+        namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
+      }
+    }
+  }
+
+  @Override // ClientProtocol
+  public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
+      throws IOException {
+    return namesystem.updateBlockForPipeline(block, clientName);
+  }
+
+
+  @Override // ClientProtocol
+  public void updatePipeline(String clientName, ExtendedBlock oldBlock,
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
+  @Override // DatanodeProtocol
+  public void commitBlockSynchronization(ExtendedBlock block,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+      throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+  }
+  
+  @Override // ClientProtocol
+  public long getPreferredBlockSize(String filename) 
+      throws IOException {
+    return namesystem.getPreferredBlockSize(filename);
+  }
+    
+  @Deprecated
+  @Override // ClientProtocol
+  public boolean rename(String src, String dst) throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    }
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    boolean ret = namesystem.renameTo(src, dst);
+    if (ret) {
+      metrics.incrFilesRenamed();
+    }
+    return ret;
+  }
+  
+  @Override // ClientProtocol
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
+  
+  @Override // ClientProtocol
+  public void rename(String src, String dst, Options.Rename... options)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    }
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.renameTo(src, dst, options);
+    metrics.incrFilesRenamed();
+  }
+
+  @Deprecated
+  @Override // ClientProtocol
+  public boolean delete(String src) throws IOException {
+    return delete(src, true);
+  }
+
+  @Override // ClientProtocol
+  public boolean delete(String src, boolean recursive) throws IOException {
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+          + ", recursive=" + recursive);
+    }
+    boolean ret = namesystem.delete(src, recursive);
+    if (ret) 
+      metrics.incrDeleteFileOps();
+    return ret;
+  }
+
+  /**
+   * Check path length does not exceed maximum.  Returns true if
+   * length and depth are okay.  Returns false if length is too long 
+   * or depth is too great.
+   */
+  private boolean checkPathLength(String src) {
+    Path srcPath = new Path(src);
+    return (src.length() <= MAX_PATH_LENGTH &&
+            srcPath.depth() <= MAX_PATH_DEPTH);
+  }
+    
+  @Override // ClientProtocol
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException {
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
+    }
+    if (!checkPathLength(src)) {
+      throw new IOException("mkdirs: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    return namesystem.mkdirs(src,
+        new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
+            null, masked), createParent);
+  }
+
+  @Override // ClientProtocol
+  public void renewLease(String clientName) throws IOException {
+    namesystem.renewLease(clientName);        
+  }
+
+  @Override // ClientProtocol
+  public DirectoryListing getListing(String src, byte[] startAfter,
+      boolean needLocation)
+  throws IOException {
+    DirectoryListing files = namesystem.getListing(
+        src, startAfter, needLocation);
+    if (files != null) {
+      metrics.incrGetListingOps();
+      metrics.incrFilesInGetListingOps(files.getPartialListing().length);
+    }
+    return files;
+  }
+
+  @Override // ClientProtocol
+  public HdfsFileStatus getFileInfo(String src)  throws IOException {
+    metrics.incrFileInfoOps();
+    return namesystem.getFileInfo(src, true);
+  }
+
+  @Override // ClientProtocol
+  public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 
+    metrics.incrFileInfoOps();
+    return namesystem.getFileInfo(src, false);
+  }
+  
+  @Override
+  public long[] getStats() {
+    return namesystem.getStats();
+  }
+
+  @Override // ClientProtocol
+  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
+      throws IOException {
+    DatanodeInfo results[] = namesystem.datanodeReport(type);
+    if (results == null ) {
+      throw new IOException("Cannot find datanode report");
+    }
+    return results;
+  }
+    
+  @Override // ClientProtocol
+  public boolean setSafeMode(SafeModeAction action) throws IOException {
+    return namesystem.setSafeMode(action);
+  }
+
+  @Override // ClientProtocol
+  public boolean restoreFailedStorage(String arg) 
+      throws AccessControlException {
+    return namesystem.restoreFailedStorage(arg);
+  }
+
+  @Override // ClientProtocol
+  public void saveNamespace() throws IOException {
+    namesystem.saveNamespace();
+  }
+
+  @Override // ClientProtocol
+  public void refreshNodes() throws IOException {
+    namesystem.getBlockManager().getDatanodeManager().refreshNodes(
+        new HdfsConfiguration());
+  }
+
+  @Override // NamenodeProtocol
+  public long getTransactionID() {
+    return namesystem.getEditLog().getSyncTxId();
+  }
+
+  @Override // NamenodeProtocol
+  public CheckpointSignature rollEditLog() throws IOException {
+    return namesystem.rollEditLog();
+  }
+  
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+  throws IOException {
+    return namesystem.getEditLog().getEditLogManifest(sinceTxId);
+  }
+    
+  @Override // ClientProtocol
+  public void finalizeUpgrade() throws IOException {
+    namesystem.finalizeUpgrade();
+  }
+
+  @Override // ClientProtocol
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
+      throws IOException {
+    return namesystem.distributedUpgradeProgress(action);
+  }
+
+  @Override // ClientProtocol
+  public void metaSave(String filename) throws IOException {
+    namesystem.metaSave(filename);
+  }
+
+  @Override // ClientProtocol
+  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
+      throws IOException {
+    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
+      namesystem.listCorruptFileBlocks(path, cookie);
+    
+    String[] files = new String[fbs.size()];
+    String lastCookie = "";
+    int i = 0;
+    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
+      files[i++] = fb.path;
+      lastCookie = fb.block.getBlockName();
+    }
+    return new CorruptFileBlocks(files, lastCookie);
+  }
+
+  /**
+   * Tell all datanodes to use a new, non-persistent bandwidth value for
+   * dfs.datanode.balance.bandwidthPerSec.
+   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @throws IOException
+   */
+  @Override // ClientProtocol
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
+  }
+  
+  @Override // ClientProtocol
+  public ContentSummary getContentSummary(String path) throws IOException {
+    return namesystem.getContentSummary(path);
+  }
+
+  @Override // ClientProtocol
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
+      throws IOException {
+    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
+  }
+  
+  @Override // ClientProtocol
+  public void fsync(String src, String clientName) throws IOException {
+    namesystem.fsync(src, clientName);
+  }
+
+  @Override // ClientProtocol
+  public void setTimes(String src, long mtime, long atime) 
+      throws IOException {
+    namesystem.setTimes(src, mtime, atime);
+  }
+
+  @Override // ClientProtocol
+  public void createSymlink(String target, String link, FsPermission dirPerms,
+      boolean createParent) throws IOException {
+    metrics.incrCreateSymlinkOps();
+    /* We enforce the MAX_PATH_LENGTH limit even though a symlink target 
+     * URI may refer to a non-HDFS file system. 
+     */
+    if (!checkPathLength(link)) {
+      throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
+                            " character limit");
+                            
+    }
+    if ("".equals(target)) {
+      throw new IOException("Invalid symlink target");
+    }
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    namesystem.createSymlink(target, link,
+      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
+  }
+
+  @Override // ClientProtocol
+  public String getLinkTarget(String path) throws IOException {
+    metrics.incrGetLinkTargetOps();
+    /* Resolves the first symlink in the given path, returning a
+     * new path consisting of the target of the symlink and any 
+     * remaining path components from the original path.
+     */
+    try {
+      HdfsFileStatus stat = namesystem.getFileInfo(path, false);
+      if (stat != null) {
+        // NB: getSymlink throws IOException if !stat.isSymlink() 
+        return stat.getSymlink();
+      }
+    } catch (UnresolvedPathException e) {
+      return e.getResolvedPath().toString();
+    } catch (UnresolvedLinkException e) {
+      // The NameNode should only throw an UnresolvedPathException
+      throw new AssertionError("UnresolvedLinkException thrown");
+    }
+    return null;
+  }
+
+
+  @Override // DatanodeProtocol
+  public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
+    verifyVersion(nodeReg.getVersion());
+    namesystem.registerDatanode(nodeReg);
+      
+    return nodeReg;
+  }
+
+  @Override // DatanodeProtocol
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes)
+      throws IOException {
+    verifyRequest(nodeReg);
+    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
+        blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
+  }
+
+  @Override // DatanodeProtocol
+  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
+      String poolId, long[] blocks) throws IOException {
+    verifyRequest(nodeReg);
+    BlockListAsLongs blist = new BlockListAsLongs(blocks);
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+           + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
+           + " blocks");
+    }
+
+    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
+    if (nn.getFSImage().isUpgradeFinalized())
+      return new DatanodeCommand.Finalize(poolId);
+    return null;
+  }
+
+  @Override // DatanodeProtocol
+  public void blockReceived(DatanodeRegistration nodeReg, String poolId,
+      Block blocks[], String delHints[]) throws IOException {
+    verifyRequest(nodeReg);
+    if(stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
+          +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
+    }
+    for (int i = 0; i < blocks.length; i++) {
+      namesystem.getBlockManager().blockReceived(
+          nodeReg, poolId, blocks[i], delHints[i]);
+    }
+  }
+  
+  @Override // DatanodeProtocol
+  public void errorReport(DatanodeRegistration nodeReg,
+                          int errorCode, String msg) throws IOException { 
+    String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
+
+    if (errorCode == DatanodeProtocol.NOTIFY) {
+      LOG.info("Error report from " + dnName + ": " + msg);
+      return;
+    }
+    verifyRequest(nodeReg);
+
+    if (errorCode == DatanodeProtocol.DISK_ERROR) {
+      LOG.warn("Disk error on " + dnName + ": " + msg);
+    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
+      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
+      namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);            
+    } else {
+      LOG.info("Error report from " + dnName + ": " + msg);
+    }
+  }
+    
+  @Override // DatanodeProtocol, NamenodeProtocol
+  public NamespaceInfo versionRequest() throws IOException {
+    return namesystem.getNamespaceInfo();
+  }
+
+  @Override // DatanodeProtocol
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
+    return namesystem.processDistributedUpgradeCommand(comm);
+  }
+
+  /** 
+   * Verify request.
+   * 
+   * Verifies correctness of the datanode version, registration ID, and 
+   * if the datanode does not need to be shutdown.
+   * 
+   * @param nodeReg data node registration
+   * @throws IOException
+   */
+  void verifyRequest(NodeRegistration nodeReg) throws IOException {
+    verifyVersion(nodeReg.getVersion());
+    if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
+      LOG.warn("Invalid registrationID - expected: "
+          + namesystem.getRegistrationID() + " received: "
+          + nodeReg.getRegistrationID());
+      throw new UnregisteredNodeException(nodeReg);
+    }
+  }
+    
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshServiceAcl() throws IOException {
+    if (!serviceAuthEnabled) {
+      throw new AuthorizationException("Service Level Authorization not enabled!");
+    }
+
+    this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    if (this.serviceRpcServer != null) {
+      this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
+    }
+  }
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshUserToGroupsMappings() throws IOException {
+    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
+             UserGroupInformation.getCurrentUser().getShortUserName());
+    Groups.getUserToGroupsMappingService().refresh();
+  }
+
+  @Override // RefreshAuthorizationPolicyProtocol
+  public void refreshSuperUserGroupsConfiguration() {
+    LOG.info("Refreshing SuperUser proxy group mapping list ");
+
+    ProxyUsers.refreshSuperUserGroupsConfiguration();
+  }
+  
+  @Override // GetUserMappingsProtocol
+  public String[] getGroupsForUser(String user) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Getting groups for user " + user);
+    }
+    return UserGroupInformation.createRemoteUser(user).getGroupNames();
+  }
+
+
+  /**
+   * Verify version.
+   * 
+   * @param version
+   * @throws IOException
+   */
+  void verifyVersion(int version) throws IOException {
+    if (version != HdfsConstants.LAYOUT_VERSION)
+      throw new IncorrectVersionException(version, "data node");
+  }
+
+  private static String getClientMachine() {
+    String clientMachine = Server.getRemoteAddress();
+    if (clientMachine == null) {
+      clientMachine = "";
+    }
+    return clientMachine;
+  }
+}

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -173,7 +173,7 @@ public class NamenodeFsck {
       out.println(msg);
       namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
-      final HdfsFileStatus file = namenode.getFileInfo(path);
+      final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
       if (file != null) {
 
         if (showCorruptFileBlocks) {
@@ -250,7 +250,8 @@ public class NamenodeFsck {
       res.totalDirs++;
       do {
         assert lastReturnedName != null;
-        thisListing = namenode.getListing(path, lastReturnedName, false);
+        thisListing = namenode.getRpcServer().getListing(
+            path, lastReturnedName, false);
         if (thisListing == null) {
           return;
         }
@@ -385,7 +386,7 @@ public class NamenodeFsck {
         break;
       case FIXING_DELETE:
         if (!isOpen)
-          namenode.delete(path, true);
+          namenode.getRpcServer().delete(path, true);
       }
     }
     if (showFiles) {
@@ -414,7 +415,8 @@ public class NamenodeFsck {
     String target = lostFound + fullName;
     String errmsg = "Failed to move " + fullName + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission(), true)) {
+      if (!namenode.getRpcServer().mkdirs(
+          target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -354,7 +355,7 @@ class NamenodeJspHelper {
     }
   }
 
-  static String getDelegationToken(final NameNode nn,
+  static String getDelegationToken(final NamenodeProtocols nn,
       HttpServletRequest request, Configuration conf,
       final UserGroupInformation ugi) throws IOException, InterruptedException {
     Token<DelegationTokenIdentifier> token = ugi
@@ -381,7 +382,8 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     final DatanodeID datanode = getRandomDatanode(nn);
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
-    String tokenString = getDelegationToken(nn, request, conf, ugi);
+    String tokenString = getDelegationToken(
+        nn.getRpcServer(), request, conf, ugi);
     // if the user is defined, get a delegation token and stringify it
     final String redirectLocation;
     final String nodeToRedirect;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java

@@ -70,7 +70,7 @@ public class RenewDelegationTokenServlet extends DfsServlet {
     try {
       long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
         public Long run() throws Exception {
-          return nn.renewDelegationToken(token);
+          return nn.getRpcServer().renewDelegationToken(token);
         }
       });
       PrintStream os = new PrintStream(resp.getOutputStream());

+ 17 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1025,6 +1025,14 @@ public class MiniDFSCluster {
     return getNameNode(0);
   }
   
+  /**
+   * Get an instance of the NameNode's RPC handler.
+   */
+  public NamenodeProtocols getNameNodeRpc() {
+    checkSingleNameNode();
+    return getNameNode(0).getRpcServer();
+  }
+  
   /**
    * Gets the NameNode for the index.  May be null.
    */
@@ -1361,7 +1369,15 @@ public class MiniDFSCluster {
     if (nameNode == null) {
       return false;
     }
-    long[] sizes = nameNode.getStats();
+    long[] sizes;
+    try {
+      sizes = nameNode.getRpcServer().getStats();
+    } catch (IOException ioe) {
+      // This method above should never throw.
+      // It only throws IOE since it is exposed via RPC
+      throw new AssertionError("Unexpected IOE thrown: "
+          + StringUtils.stringifyException(ioe));
+    }
     boolean isUp = false;
     synchronized (this) {
       isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 
@@ -45,7 +46,7 @@ public class TestClientProtocolForPipelineRecovery {
     try {
       cluster.waitActive();
       FileSystem fileSys = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols namenode = cluster.getNameNodeRpc();
 
       /* Test writing to finalized replicas */
       Path file = new Path("dataprotocol.dat");    

+ 9 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Client;
@@ -190,7 +191,7 @@ public class TestDFSClientRetries extends TestCase {
     final int maxRetries = 1; // Allow one retry (total of two calls)
     conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
     
-    NameNode mockNN = mock(NameNode.class);
+    NamenodeProtocols mockNN = mock(NamenodeProtocols.class);
     Answer<Object> answer = new ThrowsException(new IOException()) {
       int retryCount = 0;
       
@@ -240,8 +241,8 @@ public class TestDFSClientRetries extends TestCase {
     try {
       cluster.waitActive();
       FileSystem fs = cluster.getFileSystem();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
       DFSClient client = new DFSClient(null, spyNN, conf, null);
       int maxBlockAcquires = client.getMaxBlockAcquireFailures();
       assertTrue(maxBlockAcquires > 0);
@@ -305,11 +306,11 @@ public class TestDFSClientRetries extends TestCase {
    */
   private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
     private int failuresLeft;
-    private NameNode realNN;
+    private NamenodeProtocols realNN;
 
-    public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+    public FailNTimesAnswer(NamenodeProtocols preSpyNN, int timesToFail) {
       failuresLeft = timesToFail;
-      this.realNN = realNN;
+      this.realNN = preSpyNN;
     }
 
     public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
@@ -603,7 +604,8 @@ public class TestDFSClientRetries extends TestCase {
 
       //stop the first datanode
       final List<LocatedBlock> locatedblocks = DFSClient.callGetBlockLocations(
-          cluster.getNameNode(), f, 0, Long.MAX_VALUE).getLocatedBlocks();
+          cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE)
+            .getLocatedBlocks();
       final DatanodeInfo first = locatedblocks.get(0).getLocations()[0];
       cluster.stopDataNode(first.getName());
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

@@ -293,10 +293,11 @@ public class TestDecommission {
   }
   
   private void verifyStats(NameNode namenode, FSNamesystem fsn,
-      DatanodeInfo node, boolean decommissioning) throws InterruptedException {
+      DatanodeInfo node, boolean decommissioning)
+      throws InterruptedException, IOException {
     // Do the stats check over 10 iterations
     for (int i = 0; i < 10; i++) {
-      long[] newStats = namenode.getStats();
+      long[] newStats = namenode.getRpcServer().getStats();
 
       // For decommissioning nodes, ensure capacity of the DN is no longer
       // counted. Only used space of the DN is counted in cluster capacity

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Before;
@@ -151,8 +151,8 @@ public class TestFileAppend4 {
  
     try {
       cluster.waitActive();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
  
       // Delay completeFile
       GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
@@ -222,8 +222,8 @@ public class TestFileAppend4 {
  
     try {
       cluster.waitActive();
-      NameNode preSpyNN = cluster.getNameNode();
-      NameNode spyNN = spy(preSpyNN);
+      NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+      NamenodeProtocols spyNN = spy(preSpyNN);
  
       // Delay completeFile
       GenericTestUtils.DelayAnswer delayer =

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -420,7 +420,7 @@ public class TestFileCreation extends junit.framework.TestCase {
       final Path f = new Path("/foo.txt");
       createFile(dfs, f, 3);
       try {
-        cluster.getNameNode().addBlock(f.toString(), 
+        cluster.getNameNodeRpc().addBlock(f.toString(), 
             client.clientName, null, null);
         fail();
       } catch(IOException ioe) {

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java

@@ -106,7 +106,7 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
 
 
       DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
-      cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+      cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
 
       // expire lease to trigger block recovery.
       waitLeaseRecovery(cluster);
@@ -129,14 +129,14 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
       filestr = "/foo.safemode";
       filepath = new Path(filestr);
       dfs.create(filepath, (short)1);
-      cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+      cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
       assertTrue(dfs.dfs.exists(filestr));
       DFSTestUtil.waitReplication(dfs, filepath, (short)1);
       waitLeaseRecovery(cluster);
       // verify that we still cannot recover the lease
       LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
       assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1);
-      cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java

@@ -100,7 +100,7 @@ public class TestPipelines {
     ofs.writeBytes("Some more stuff to write");
     ((DFSOutputStream) ofs.getWrappedStream()).hflush();
 
-    List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
+    List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
       filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
 
     String bpid = cluster.getNamesystem().getBlockPoolId();

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java

@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 
 /**
  * This class defines a number of static helper methods used by the
@@ -121,7 +121,7 @@ public class UpgradeUtilities {
                                    .manageNameDfsDirs(false)
                                    .build();
         
-      NameNode namenode = cluster.getNameNode();
+      NamenodeProtocols namenode = cluster.getNameNodeRpc();
       namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID();
       namenodeStorageFsscTime = namenode.versionRequest().getCTime();
       namenodeStorageClusterID = namenode.versionRequest().getClusterID();
@@ -517,7 +517,7 @@ public class UpgradeUtilities {
    */
   public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getNamespaceID();
+      return cluster.getNameNodeRpc().versionRequest().getNamespaceID();
     }
     return namenodeStorageNamespaceID;
   }
@@ -528,7 +528,7 @@ public class UpgradeUtilities {
    */
   public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getClusterID();
+      return cluster.getNameNodeRpc().versionRequest().getClusterID();
     }
     return namenodeStorageClusterID;
   }
@@ -539,7 +539,7 @@ public class UpgradeUtilities {
    */
   public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getBlockPoolID();
+      return cluster.getNameNodeRpc().versionRequest().getBlockPoolID();
     }
     return namenodeStorageBlockPoolID;
   }
@@ -554,7 +554,7 @@ public class UpgradeUtilities {
    */
   public static long getCurrentFsscTime(MiniDFSCluster cluster) throws IOException {
     if (cluster != null) {
-      return cluster.getNameNode().versionRequest().getCTime();
+      return cluster.getNameNodeRpc().versionRequest().getCTime();
     }
     return namenodeStorageFsscTime;
   }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java

@@ -375,11 +375,11 @@ public class TestBlockToken {
       Path filePath = new Path(fileName);
       FSDataOutputStream out = fs.create(filePath, (short) 1);
       out.write(new byte[1000]);
-      LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations(
+      LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
           fileName, 0, 1000);
       while (locatedBlocks.getLastLocatedBlock() == null) {
         Thread.sleep(100);
-        locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0,
+        locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(fileName, 0,
             1000);
       }
       Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java

@@ -89,7 +89,7 @@ public class TestBalancerWithMultipleNameNodes {
       this.cluster = cluster;
       clients = new ClientProtocol[nNameNodes];
       for(int i = 0; i < nNameNodes; i++) {
-        clients[i] = cluster.getNameNode(i);
+        clients[i] = cluster.getNameNode(i).getRpcServer();
       }
       replication = (short)Math.max(1, nDataNodes - 1);
     }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -314,6 +315,7 @@ public class TestBlockTokenWithDFS {
       assertEquals(numDataNodes, cluster.getDataNodes().size());
 
       final NameNode nn = cluster.getNameNode();
+      final NamenodeProtocols nnProto = nn.getRpcServer();
       final BlockManager bm = nn.getNamesystem().getBlockManager();
       final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
 
@@ -344,7 +346,7 @@ public class TestBlockTokenWithDFS {
 
       new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
-      List<LocatedBlock> locatedBlocks = nn.getBlockLocations(
+      List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block
       Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();

+ 16 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -139,7 +138,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
@@ -181,9 +180,10 @@ public class TestBlockReport {
 
     List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
     List<Integer> removedIndex = new ArrayList<Integer>();
-    List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
-      filePath.toString(), FILE_START,
-      FILE_SIZE).getLocatedBlocks();
+    List<LocatedBlock> lBlocks =
+      cluster.getNameNodeRpc().getBlockLocations(
+          filePath.toString(), FILE_START,
+          FILE_SIZE).getLocatedBlocks();
 
     while (removedIndex.size() != 2) {
       int newRemoveIndex = rand.nextInt(lBlocks.size());
@@ -218,7 +218,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
@@ -258,7 +258,8 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N0);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
+    DatanodeCommand dnCmd =
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Got the command: " + dnCmd);
@@ -310,7 +311,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N1);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
@@ -359,7 +360,7 @@ public class TestBlockReport {
     DataNode dn = cluster.getDataNodes().get(DN_N1);
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of Corrupted blocks",
@@ -381,7 +382,7 @@ public class TestBlockReport {
       LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
     }
     
-    cluster.getNameNode().blockReport(dnR, poolId,
+    cluster.getNameNodeRpc().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
 
@@ -431,7 +432,7 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      cluster.getNameNode().blockReport(dnR, poolId,
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
           new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
@@ -477,7 +478,7 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      cluster.getNameNode().blockReport(dnR, poolId,
+      cluster.getNameNodeRpc().blockReport(dnR, poolId,
           new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
@@ -590,7 +591,7 @@ public class TestBlockReport {
     DFSTestUtil.createFile(fs, filePath, fileSize,
       REPL_FACTOR, rand.nextLong());
 
-    return locatedToBlocks(cluster.getNameNode()
+    return locatedToBlocks(cluster.getNameNodeRpc()
       .getBlockLocations(filePath.toString(), FILE_START,
         fileSize).getLocatedBlocks(), null);
   }
@@ -707,7 +708,8 @@ public class TestBlockReport {
   private Block findBlock(Path path, long size) throws IOException {
     Block ret;
       List<LocatedBlock> lbs =
-        cluster.getNameNode().getBlockLocations(path.toString(),
+        cluster.getNameNodeRpc()
+        .getBlockLocations(path.toString(),
           FILE_START, size).getLocatedBlocks();
       LocatedBlock lb = lbs.get(lbs.size() - 1);
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 
@@ -144,7 +144,7 @@ public class TestDataNodeVolumeFailure {
     String bpid = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
     long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
-    cluster.getNameNode().blockReport(dnR, bpid, bReport);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);
@@ -216,7 +216,7 @@ public class TestDataNodeVolumeFailure {
    * @throws IOException
    */
   private void triggerFailure(String path, long size) throws IOException {
-    NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nn = cluster.getNameNodeRpc();
     List<LocatedBlock> locatedBlocks =
       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
     
@@ -291,7 +291,7 @@ public class TestDataNodeVolumeFailure {
     throws IOException {
     int total = 0;
     
-    NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nn = cluster.getNameNodeRpc();
     List<LocatedBlock> locatedBlocks = 
       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
     //System.out.println("Number of blocks: " + locatedBlocks.size()); 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java

@@ -109,7 +109,7 @@ public class TestTransferRbw {
         
         final DatanodeInfo oldnodeinfo;
         {
-          final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
+          final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc(
               ).getDatanodeReport(DatanodeReportType.LIVE);
           Assert.assertEquals(2, datatnodeinfos.length);
           int i = 0;

+ 33 - 30
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -100,6 +101,7 @@ public class NNThroughputBenchmark {
 
   static Configuration config;
   static NameNode nameNode;
+  static NamenodeProtocols nameNodeProto;
 
   NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
     config = conf;
@@ -119,6 +121,7 @@ public class NNThroughputBenchmark {
     // Start the NameNode
     String[] argv = new String[] {};
     nameNode = NameNode.createNameNode(argv, config);
+    nameNodeProto = nameNode.getRpcServer();
   }
 
   void close() throws IOException {
@@ -264,9 +267,9 @@ public class NNThroughputBenchmark {
     }
 
     void cleanUp() throws IOException {
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       if(!keepResults)
-        nameNode.delete(getBaseDir(), true);
+        nameNodeProto.delete(getBaseDir(), true);
     }
 
     int getNumOpsExecuted() {
@@ -397,7 +400,7 @@ public class NNThroughputBenchmark {
     void benchmarkOne() throws IOException {
       for(int idx = 0; idx < opsPerThread; idx++) {
         if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
-          nameNode.refreshUserToGroupsMappings();
+          nameNodeProto.refreshUserToGroupsMappings();
         long stat = statsOp.executeOp(daemonId, idx, arg1);
         localNumOpsExecuted++;
         localCumulativeTime += stat;
@@ -458,9 +461,9 @@ public class NNThroughputBenchmark {
      */
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       long start = System.currentTimeMillis();
-      nameNode.delete(BASE_DIR_NAME, true);
+      nameNodeProto.delete(BASE_DIR_NAME, true);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -522,7 +525,7 @@ public class NNThroughputBenchmark {
 
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       // int generatedFileIdx = 0;
       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
       fileNames = new String[numThreads][];
@@ -554,12 +557,12 @@ public class NNThroughputBenchmark {
     throws IOException {
       long start = System.currentTimeMillis();
       // dummyActionNoSynch(fileIdx);
-      nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+      nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
               .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNode.complete(fileNames[daemonId][inputIdx],
+        written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
                                     clientName, null));
       return end-start;
     }
@@ -627,11 +630,11 @@ public class NNThroughputBenchmark {
       }
       // use the same files for open
       super.generateInputs(opsPerThread);
-      if(nameNode.getFileInfo(opCreate.getBaseDir()) != null
-          && nameNode.getFileInfo(getBaseDir()) == null) {
-        nameNode.rename(opCreate.getBaseDir(), getBaseDir());
+      if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
+          && nameNodeProto.getFileInfo(getBaseDir()) == null) {
+        nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
       }
-      if(nameNode.getFileInfo(getBaseDir()) == null) {
+      if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
         throw new IOException(getBaseDir() + " does not exist.");
       }
     }
@@ -642,7 +645,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+      nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -670,7 +673,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.delete(fileNames[daemonId][inputIdx], false);
+      nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -698,7 +701,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.getFileInfo(fileNames[daemonId][inputIdx]);
+      nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -740,7 +743,7 @@ public class NNThroughputBenchmark {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();
-      nameNode.rename(fileNames[daemonId][inputIdx],
+      nameNodeProto.rename(fileNames[daemonId][inputIdx],
                       destNames[daemonId][inputIdx]);
       long end = System.currentTimeMillis();
       return end-start;
@@ -787,11 +790,11 @@ public class NNThroughputBenchmark {
 
     void register() throws IOException {
       // get versions from the namenode
-      nsInfo = nameNode.versionRequest();
+      nsInfo = nameNodeProto.versionRequest();
       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
       DataNode.setNewStorageID(dnRegistration);
       // register datanode
-      dnRegistration = nameNode.registerDatanode(dnRegistration);
+      dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
     }
 
     /**
@@ -801,7 +804,7 @@ public class NNThroughputBenchmark {
     void sendHeartbeat() throws IOException {
       // register datanode
       // TODO:FEDERATION currently a single block pool is supported
-      DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
@@ -846,7 +849,7 @@ public class NNThroughputBenchmark {
     int replicateBlocks() throws IOException {
       // register datanode
       // TODO:FEDERATION currently a single block pool is supported
-      DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
@@ -877,7 +880,7 @@ public class NNThroughputBenchmark {
           receivedDNReg.setStorageInfo(
                           new DataStorage(nsInfo, dnInfo.getStorageID()));
           receivedDNReg.setInfoPort(dnInfo.getInfoPort());
-          nameNode.blockReceived( receivedDNReg, 
+          nameNodeProto.blockReceived( receivedDNReg, 
                                   nameNode.getNamesystem().getBlockPoolId(),
                                   new Block[] {blocks[i]},
                                   new String[] {DataNode.EMPTY_DEL_HINT});
@@ -968,14 +971,14 @@ public class NNThroughputBenchmark {
       FileNameGenerator nameGenerator;
       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
       String clientName = getClientName(007);
-      nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNode.create(fileName, FsPermission.getDefault(), clientName,
+        nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
-        nameNode.complete(fileName, clientName, lastBlock);
+        nameNodeProto.complete(fileName, clientName, lastBlock);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -987,12 +990,12 @@ public class NNThroughputBenchmark {
     throws IOException {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null);
+        LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
           datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
-          nameNode.blockReceived(
+          nameNodeProto.blockReceived(
               datanodes[dnIdx].dnRegistration, 
               loc.getBlock().getBlockPoolId(),
               new Block[] {loc.getBlock().getLocalBlock()},
@@ -1013,7 +1016,7 @@ public class NNThroughputBenchmark {
       assert daemonId < numThreads : "Wrong daemonId.";
       TinyDatanode dn = datanodes[daemonId];
       long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration, nameNode.getNamesystem()
+      nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
           .getBlockPoolId(), dn.getBlockReportList());
       long end = System.currentTimeMillis();
       return end-start;
@@ -1146,7 +1149,7 @@ public class NNThroughputBenchmark {
         LOG.info("Datanode " + dn.getName() + " is decommissioned.");
       }
       excludeFile.close();
-      nameNode.refreshNodes();
+      nameNodeProto.refreshNodes();
     }
 
     /**
@@ -1160,8 +1163,8 @@ public class NNThroughputBenchmark {
       assert daemonId < numThreads : "Wrong daemonId.";
       long start = System.currentTimeMillis();
       // compute data-node work
-      int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
-          .getNamesystem().getBlockManager());
+      int work = BlockManagerTestUtil.getComputedDatanodeWork(
+          nameNode.getNamesystem().getBlockManager());
       long end = System.currentTimeMillis();
       numPendingBlocks += work;
       if(work == 0)

+ 0 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -47,14 +47,6 @@ public class NameNodeAdapter {
         src, offset, length, false, true);
   }
 
-  /**
-   * Get the internal RPC server instance.
-   * @return rpc server
-   */
-  public static Server getRpcServer(NameNode namenode) {
-    return namenode.server;
-  }
-
   public static DelegationTokenSecretManager getDtSecretManager(
       final FSNamesystem ns) {
     return ns.getDelegationTokenSecretManager();

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java

@@ -239,10 +239,10 @@ public class OfflineEditsViewerHelper {
         LOG.info("Innocuous exception", e);
       }
       locatedBlocks = DFSClientAdapter.callGetBlockLocations(
-          cluster.getNameNode(), filePath, 0L, bytes.length);
+          cluster.getNameNodeRpc(), filePath, 0L, bytes.length);
     } while (locatedBlocks.isUnderConstruction());
 
     // Force a roll so we get an OP_END_LOG_SEGMENT txn
-    return cluster.getNameNode().rollEditLog();
+    return cluster.getNameNodeRpc().rollEditLog();
   }
 }

+ 11 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -128,12 +129,13 @@ public class TestBackupNode extends TestCase {
       fileSys = cluster.getFileSystem();
       backup = startBackupNode(conf, StartupOption.BACKUP, 1);
       
-      BackupImage bnImage = backup.getBNImage();
+      BackupImage bnImage = (BackupImage) backup.getFSImage();
       testBNInSync(cluster, backup, 1);
       
       // Force a roll -- BN should roll with NN.
       NameNode nn = cluster.getNameNode();
-      nn.rollEditLog();
+      NamenodeProtocols nnRpc = nn.getRpcServer();
+      nnRpc.rollEditLog();
       assertEquals(bnImage.getEditLog().getCurSegmentTxId(),
           nn.getFSImage().getEditLog().getCurSegmentTxId());
       
@@ -207,7 +209,9 @@ public class TestBackupNode extends TestCase {
           LOG.info("Checking for " + src + " on BN");
           try {
             boolean hasFile = backup.getNamesystem().getFileInfo(src, false) != null;
-            boolean txnIdMatch = backup.getTransactionID() == nn.getTransactionID();
+            boolean txnIdMatch =
+              backup.getRpcServer().getTransactionID() ==
+              nn.getRpcServer().getTransactionID();
             return hasFile && txnIdMatch;
           } catch (Exception e) {
             throw new RuntimeException(e);
@@ -264,7 +268,7 @@ public class TestBackupNode extends TestCase {
       //
       // Take a checkpoint
       //
-      long txid = cluster.getNameNode().getTransactionID();
+      long txid = cluster.getNameNodeRpc().getTransactionID();
       backup = startBackupNode(conf, op, 1);
       waitCheckpointDone(cluster, backup, txid);
     } catch(IOException e) {
@@ -300,18 +304,18 @@ public class TestBackupNode extends TestCase {
       // Take a checkpoint
       //
       backup = startBackupNode(conf, op, 1);
-      long txid = cluster.getNameNode().getTransactionID();
+      long txid = cluster.getNameNodeRpc().getTransactionID();
       waitCheckpointDone(cluster, backup, txid);
 
       for (int i = 0; i < 10; i++) {
         fileSys.mkdirs(new Path("file_" + i));
       }
 
-      txid = cluster.getNameNode().getTransactionID();
+      txid = cluster.getNameNodeRpc().getTransactionID();
       backup.doCheckpoint();
       waitCheckpointDone(cluster, backup, txid);
 
-      txid = cluster.getNameNode().getTransactionID();
+      txid = cluster.getNameNodeRpc().getTransactionID();
       backup.doCheckpoint();
       waitCheckpointDone(cluster, backup, txid);
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -152,7 +153,7 @@ public class TestBlockUnderConstruction {
    */
   @Test
   public void testGetBlockLocations() throws IOException {
-    final NameNode namenode = cluster.getNameNode();
+    final NamenodeProtocols namenode = cluster.getNameNodeRpc();
     final Path p = new Path(BASE_DIR, "file2.dat");
     final String src = p.toString();
     final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3);

+ 15 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -982,11 +983,12 @@ public class TestCheckpoint extends TestCase {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
         .format(true).build();
     NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nnRpc = nn.getRpcServer();
 
     SecondaryNameNode secondary = startSecondaryNameNode(conf);
     // prepare checkpoint image
     secondary.doCheckpoint();
-    CheckpointSignature sig = nn.rollEditLog();
+    CheckpointSignature sig = nnRpc.rollEditLog();
     // manipulate the CheckpointSignature fields
     sig.setBlockpoolID("somerandomebpid");
     sig.clusterID = "somerandomcid";
@@ -1073,8 +1075,10 @@ public class TestCheckpoint extends TestCase {
         .nameNodePort(9928).build();
     Configuration snConf1 = new HdfsConfiguration(cluster.getConfiguration(0));
     Configuration snConf2 = new HdfsConfiguration(cluster.getConfiguration(1));
-    InetSocketAddress nn1RpcAddress = cluster.getNameNode(0).rpcAddress;
-    InetSocketAddress nn2RpcAddress = cluster.getNameNode(1).rpcAddress;
+    InetSocketAddress nn1RpcAddress =
+      cluster.getNameNode(0).getNameNodeAddress();
+    InetSocketAddress nn2RpcAddress =
+      cluster.getNameNode(1).getNameNodeAddress();
     String nn1 = nn1RpcAddress.getHostName() + ":" + nn1RpcAddress.getPort();
     String nn2 = nn2RpcAddress.getHostName() + ":" + nn2RpcAddress.getPort();
 
@@ -1444,9 +1448,9 @@ public class TestCheckpoint extends TestCase {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
           .format(true).build();
       
-      NameNode nn = cluster.getNameNode();
-      String fsName = NameNode.getHostPortString(nn.getHttpAddress());
-
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
+      String fsName = NameNode.getHostPortString(
+          cluster.getNameNode().getHttpAddress());
 
       // Make a finalized log on the server side. 
       nn.rollEditLog();
@@ -1515,8 +1519,8 @@ public class TestCheckpoint extends TestCase {
 
       // Now primary NN experiences failure of a volume -- fake by
       // setting its current dir to a-x permissions
-      NameNode nn = cluster.getNameNode();
-      NNStorage storage = nn.getFSImage().getStorage();
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
       StorageDirectory sd0 = storage.getStorageDir(0);
       StorageDirectory sd1 = storage.getStorageDir(1);
       
@@ -1590,8 +1594,8 @@ public class TestCheckpoint extends TestCase {
 
       // Now primary NN experiences failure of its only name dir -- fake by
       // setting its current dir to a-x permissions
-      NameNode nn = cluster.getNameNode();
-      NNStorage storage = nn.getFSImage().getStorage();
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
       StorageDirectory sd0 = storage.getStorageDir(0);
       assertEquals(NameNodeDirType.IMAGE, sd0.getStorageDirType());
       currentDir = sd0.getCurrentDir();
@@ -1704,7 +1708,7 @@ public class TestCheckpoint extends TestCase {
       secondary.doCheckpoint();
 
       // Now primary NN saves namespace 3 times
-      NameNode nn = cluster.getNameNode();
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
       nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
       for (int i = 0; i < 3; i++) {
         nn.saveNamespace();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

@@ -102,7 +102,7 @@ public class TestDeadDatanode {
     dn.shutdown();
     waitForDatanodeState(reg.getStorageID(), false, 20000);
 
-    DatanodeProtocol dnp = cluster.getNameNode();
+    DatanodeProtocol dnp = cluster.getNameNodeRpc();
     
     Block[] blocks = new Block[] { new Block(0) };
     String[] delHints = new String[] { "" };

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -481,7 +482,7 @@ public class TestFsck extends TestCase {
       }
 
       // wait for the namenode to see the corruption
-      final NameNode namenode = cluster.getNameNode();
+      final NamenodeProtocols namenode = cluster.getNameNodeRpc();
       CorruptFileBlocks corruptFileBlocks = namenode
           .listCorruptFileBlocks("/corruptData", null);
       int numCorrupt = corruptFileBlocks.getFiles().length;

+ 10 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
@@ -52,7 +53,7 @@ public class TestHDFSConcat {
   private static final short REPL_FACTOR = 2;
   
   private MiniDFSCluster cluster;
-  private NameNode nn;
+  private NamenodeProtocols nn;
   private DistributedFileSystem dfs;
 
   private static long blockSize = 512;
@@ -72,7 +73,7 @@ public class TestHDFSConcat {
     cluster.waitClusterUp();
     dfs = (DistributedFileSystem) cluster.getFileSystem();
     assertNotNull("Failed to get FileSystem", dfs);
-    nn = cluster.getNameNode();
+    nn = cluster.getNameNodeRpc();
     assertNotNull("Failed to get NameNode", nn);
   }
 
@@ -283,7 +284,7 @@ public class TestHDFSConcat {
     Path filePath1 = new Path(name1);
     DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
     
-    HdfsFileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
+    HdfsFileStatus fStatus = nn.getFileInfo(name1);
     long fileLen = fStatus.getLen();
     assertEquals(fileLen, trgFileLen);
     
@@ -293,11 +294,11 @@ public class TestHDFSConcat {
     stm.readFully(0, byteFile1);
     stm.close();
     
-    LocatedBlocks lb1 = cluster.getNameNode().getBlockLocations(name1, 0, trgFileLen);
+    LocatedBlocks lb1 = nn.getBlockLocations(name1, 0, trgFileLen);
     
     Path filePath2 = new Path(name2);
     DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1);
-    fStatus = cluster.getNameNode().getFileInfo(name2);
+    fStatus = nn.getFileInfo(name2);
     fileLen = fStatus.getLen();
     assertEquals(srcFileLen, fileLen);
     
@@ -307,7 +308,7 @@ public class TestHDFSConcat {
     stm.readFully(0, byteFile2);
     stm.close();
     
-    LocatedBlocks lb2 = cluster.getNameNode().getBlockLocations(name2, 0, srcFileLen);
+    LocatedBlocks lb2 = nn.getBlockLocations(name2, 0, srcFileLen);
     
     
     System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen);
@@ -316,7 +317,7 @@ public class TestHDFSConcat {
     dfs.concat(filePath1, new Path [] {filePath2});
     
     long totalLen = trgFileLen + srcFileLen;
-    fStatus = cluster.getNameNode().getFileInfo(name1);
+    fStatus = nn.getFileInfo(name1);
     fileLen = fStatus.getLen();
     
     // read the resulting file
@@ -325,7 +326,7 @@ public class TestHDFSConcat {
     stm.readFully(0, byteFileConcat);
     stm.close();
     
-    LocatedBlocks lbConcat = cluster.getNameNode().getBlockLocations(name1, 0, fileLen);
+    LocatedBlocks lbConcat = nn.getBlockLocations(name1, 0, fileLen);
     
     //verifications
     // 1. number of blocks
@@ -337,7 +338,7 @@ public class TestHDFSConcat {
     assertEquals(fileLen, totalLen);
     
     // 3. removal of the src file
-    fStatus = cluster.getNameNode().getFileInfo(name2);
+    fStatus = nn.getFileInfo(name2);
     assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist
   
     // 4. content

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

@@ -147,7 +147,7 @@ public class TestListCorruptFileBlocks {
       conf.setFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
                     0f);
       cluster = new MiniDFSCluster.Builder(conf).waitSafeMode(false).build();
-      cluster.getNameNode().
+      cluster.getNameNodeRpc().
         setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
       FileSystem fs = cluster.getFileSystem();
 
@@ -244,7 +244,7 @@ public class TestListCorruptFileBlocks {
                  cluster.getNameNode().isInSafeMode());
 
       // now leave safe mode so that we can clean up
-      cluster.getNameNode().
+      cluster.getNameNodeRpc().
         setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
 
       util.cleanup(fs, "/srcdat10");

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java

@@ -148,8 +148,8 @@ public class TestNNStorageRetentionFunctional {
 
   private static void doSaveNamespace(NameNode nn) throws IOException {
     LOG.info("Saving namespace...");
-    nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-    nn.saveNamespace();
-    nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    nn.getRpcServer().saveNamespace();
+    nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
   }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Assert;
@@ -54,7 +55,7 @@ public class TestNameNodeJspHelper {
 
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
-    NameNode nn = cluster.getNameNode();
+    NamenodeProtocols nn = cluster.getNameNodeRpc();
     HttpServletRequest request = mock(HttpServletRequest.class);
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser("auser");
     String tokenString = NamenodeJspHelper.getDelegationToken(nn, request,

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java

@@ -108,7 +108,7 @@ public class TestParallelImageWrite extends TestCase {
       files.cleanup(fs, dir);
       files.createFiles(fs, dir);
       fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      cluster.getNameNode().saveNamespace();
+      cluster.getNameNodeRpc().saveNamespace();
       final String checkAfterModify = checkImages(fsn, numNamenodeDirs);
       assertFalse("Modified namespace should change fsimage contents. " +
           "was: " + checkAfterRestart + " now: " + checkAfterModify,

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.StringUtils;
@@ -379,9 +380,10 @@ public class TestStartup extends TestCase {
     NameNode namenode = new NameNode(conf);
     namenode.getNamesystem().mkdirs("/test",
         new PermissionStatus("hairong", null, FsPermission.getDefault()), true);
-    assertTrue(namenode.getFileInfo("/test").isDir());
-    namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-    namenode.saveNamespace();
+    NamenodeProtocols nnRpc = namenode.getRpcServer();
+    assertTrue(nnRpc.getFileInfo("/test").isDir());
+    nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    nnRpc.saveNamespace();
     namenode.stop();
     namenode.join();
 
@@ -408,9 +410,10 @@ public class TestStartup extends TestCase {
 
   private void checkNameSpace(Configuration conf) throws IOException {
     NameNode namenode = new NameNode(conf);
-    assertTrue(namenode.getFileInfo("/test").isDir());
-    namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-    namenode.saveNamespace();
+    NamenodeProtocols nnRpc = namenode.getRpcServer();
+    assertTrue(nnRpc.getFileInfo("/test").isDir());
+    nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    nnRpc.saveNamespace();
     namenode.stop();
     namenode.join();
   }
@@ -515,7 +518,7 @@ public class TestStartup extends TestCase {
       cluster.waitActive();
   
       cluster.restartNameNode();
-      NameNode nn = cluster.getNameNode();
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
       assertNotNull(nn);
       Assert.assertTrue(cluster.isDataNodeUp());
       

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

@@ -339,7 +339,7 @@ public class TestStorageRestore {
       
       // Simulate a 2NN beginning a checkpoint, but not finishing. This will
       // cause name1 to be restored.
-      cluster.getNameNode().rollEditLog();
+      cluster.getNameNodeRpc().rollEditLog();
       
       printStorages(fsImage);
       

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java

@@ -74,9 +74,9 @@ public class TestNNMetricFilesInGetListingOps extends TestCase {
     createFile("/tmp1/t2", 3200, (short)3);
     createFile("/tmp2/t1", 3200, (short)3);
     createFile("/tmp2/t2", 3200, (short)3);
-    cluster.getNameNode().getListing("/tmp1", HdfsFileStatus.EMPTY_NAME, false);
+    cluster.getNameNodeRpc().getListing("/tmp1", HdfsFileStatus.EMPTY_NAME, false);
     assertCounter("FilesInGetListingOps", 2L, getMetrics(NN_METRICS));
-    cluster.getNameNode().getListing("/tmp2", HdfsFileStatus.EMPTY_NAME, false);
+    cluster.getNameNodeRpc().getListing("/tmp2", HdfsFileStatus.EMPTY_NAME, false);
     assertCounter("FilesInGetListingOps", 4L, getMetrics(NN_METRICS));
   }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

@@ -124,8 +124,8 @@ public class TestOfflineImageViewer extends TestCase {
       }
 
       // Write results to the fsimage file
-      cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      cluster.getNameNode().saveNamespace();
+      cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      cluster.getNameNodeRpc().saveNamespace();
       
       // Determine location of fsimage file
       orig = FSImageTestUtil.findLatestImageFile(

Some files were not shown because too many files changed in this diff