Ver Fonte

svn merge -c 1214128 from trunk for HDFS-2669.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230897 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze há 13 anos atrás
pai
commit
5fd8fb6a7d

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -74,6 +74,8 @@ Release 0.23-PB - Unreleased
 
     HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh).
 
+    HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
+
   BUG FIXES
  
     HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay)

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -649,12 +649,12 @@ public class DFSUtil {
       Configuration conf, UserGroupInformation ugi) throws IOException {
     /** 
      * Currently we have simply burnt-in support for a SINGLE
-     * protocol - protocolR23Compatible. This will be replaced
+     * protocol - protocolPB. This will be replaced
      * by a way to pick the right protocol based on the 
      * version of the target server.  
      */
-    return new org.apache.hadoop.hdfs.protocolR23Compatible.
-        ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi);
+    return new org.apache.hadoop.hdfs.protocolPB.
+        ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
   }
 
   /** Create a {@link NameNode} proxy */

+ 113 - 79
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -19,11 +19,16 @@ package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -124,9 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.io.Text;
@@ -218,9 +221,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       Builder builder = GetBlockLocationsResponseProto
           .newBuilder();
       if (b != null) {
-        builder.setLocations(
-            PBHelper.convert(server.getBlockLocations(req.getSrc(),
-                req.getOffset(), req.getLength()))).build();
+        builder.setLocations(PBHelper.convert(b)).build();
       }
       return builder.build();
     } catch (IOException e) {
@@ -233,14 +234,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, GetServerDefaultsRequestProto req)
       throws ServiceException {
     try {
+      FsServerDefaults result = server.getServerDefaults();
       return GetServerDefaultsResponseProto.newBuilder()
-          .setServerDefaults(PBHelper.convert(server.getServerDefaults()))
+          .setServerDefaults(PBHelper.convert(result))
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
+  
+  static final CreateResponseProto VOID_CREATE_RESPONSE = 
+      CreateResponseProto.newBuilder().build();
+  
   @Override
   public CreateResponseProto create(RpcController controller,
       CreateRequestProto req) throws ServiceException {
@@ -252,19 +258,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return CreateResponseProto.newBuilder().build();
-
+    return VOID_CREATE_RESPONSE;
   }
   
+  static final AppendResponseProto NULL_APPEND_RESPONSE = 
+      AppendResponseProto.newBuilder().build();
+  
   @Override
   public AppendResponseProto append(RpcController controller,
       AppendRequestProto req) throws ServiceException {
     try {
-      return AppendResponseProto
-          .newBuilder()
-          .setBlock(
-              PBHelper.convert(server.append(req.getSrc(), req.getClientName())))
-          .build();
+      LocatedBlock result = server.append(req.getSrc(), req.getClientName());
+      if (result != null) {
+        return AppendResponseProto.newBuilder()
+            .setBlock(PBHelper.convert(result)).build();
+      }
+      return NULL_APPEND_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -274,18 +283,16 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public SetReplicationResponseProto setReplication(RpcController controller,
       SetReplicationRequestProto req) throws ServiceException {
     try {
-      return SetReplicationResponseProto
-          .newBuilder()
-          .setResult(
-              server.setReplication(req.getSrc(), (short) req.getReplication()))
-          .build();
+      boolean result = 
+          server.setReplication(req.getSrc(), (short) req.getReplication());
+      return SetReplicationResponseProto.newBuilder().setResult(result).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
 
-  static final SetPermissionResponseProto SET_PERM_RESPONSE = 
+  static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = 
       SetPermissionResponseProto.newBuilder().build();
 
   @Override
@@ -296,24 +303,26 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return SET_PERM_RESPONSE;
+    return VOID_SET_PERM_RESPONSE;
   }
 
-  static final SetOwnerResponseProto SET_OWNER_RESPONSE = 
+  static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = 
       SetOwnerResponseProto.newBuilder().build();
 
   @Override
   public SetOwnerResponseProto setOwner(RpcController controller,
       SetOwnerRequestProto req) throws ServiceException {
     try {
-      server.setOwner(req.getSrc(), req.getUsername(), req.getGroupname());
+      server.setOwner(req.getSrc(), 
+          req.hasUsername() ? req.getUsername() : null,
+          req.hasGroupname() ? req.getGroupname() : null);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return SET_OWNER_RESPONSE;
+    return VOID_SET_OWNER_RESPONSE;
   }
 
-  static final AbandonBlockResponseProto ABD_BLOCK_RESPONSE = 
+  static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = 
       AbandonBlockResponseProto.newBuilder().build();
 
   @Override
@@ -325,20 +334,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return ABD_BLOCK_RESPONSE;
+    return VOID_ADD_BLOCK_RESPONSE;
   }
 
   @Override
   public AddBlockResponseProto addBlock(RpcController controller,
       AddBlockRequestProto req) throws ServiceException {
-    try {
+    
+    try {
+      List<DatanodeInfoProto> excl = req.getExcludeNodesList();
+      LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(),
+          req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
+          (excl == null || 
+           excl.size() == 0) ? null : 
+            PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()])));
       return AddBlockResponseProto.newBuilder().setBlock(
-          PBHelper.convert(
-          server.addBlock(req.getSrc(), req.getClientName(), 
-                req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, 
-                PBHelper.convert(
-                  (DatanodeInfoProto[]) req.getExcludeNodesList().toArray()))))
-           .build();
+          PBHelper.convert(result)).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -349,15 +360,17 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, GetAdditionalDatanodeRequestProto req)
       throws ServiceException {
     try {
+      List<DatanodeInfoProto> existingList = req.getExistingsList();
+      List<DatanodeInfoProto> excludesList = req.getExcludesList();
+      LocatedBlock result = server.getAdditionalDatanode(
+          req.getSrc(), PBHelper.convert(req.getBlk()),
+          PBHelper.convert(existingList.toArray(
+              new DatanodeInfoProto[existingList.size()])),
+          PBHelper.convert(excludesList.toArray(
+              new DatanodeInfoProto[excludesList.size()])), 
+              req.getNumAdditionalNodes(), req.getClientName());
       return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
-          PBHelper.convert(
-              server.getAdditionalDatanode(req.getSrc(),
-                  PBHelper.convert(req.getBlk()), 
-                  PBHelper.convert((DatanodeInfoProto[]) req.getExistingsList()
-                      .toArray()), PBHelper
-                  .convert((DatanodeInfoProto[]) req.getExcludesList()
-                      .toArray()), req.getNumAdditionalNodes(), req
-                  .getClientName())))
+          PBHelper.convert(result))
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -368,10 +381,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public CompleteResponseProto complete(RpcController controller,
       CompleteRequestProto req) throws ServiceException {
     try {
-      return CompleteResponseProto.newBuilder().setResult(
-                server.complete(req.getSrc(), req.getClientName(),
-                PBHelper.convert(req.getLast())))
-          .build();
+      boolean result = 
+          server.complete(req.getSrc(), req.getClientName(),
+          req.hasLast() ? PBHelper.convert(req.getLast()) : null);
+      return CompleteResponseProto.newBuilder().setResult(result).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -384,8 +397,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
       ReportBadBlocksRequestProto req) throws ServiceException {
     try {
+      List<LocatedBlockProto> bl = req.getBlocksList();
       server.reportBadBlocks(PBHelper.convertLocatedBlock(
-          (LocatedBlockProto[]) req.getBlocksList().toArray()));
+              bl.toArray(new LocatedBlockProto[bl.size()])));
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -399,7 +413,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public ConcatResponseProto concat(RpcController controller,
       ConcatRequestProto req) throws ServiceException {
     try {
-      server.concat(req.getTrg(), (String[])req.getSrcsList().toArray());
+      List<String> srcs = req.getSrcsList();
+      server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()]));
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -456,14 +471,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     }
   }
 
+  static final GetListingResponseProto NULL_GETLISTING_RESPONSE = 
+      GetListingResponseProto.newBuilder().build();
   @Override
   public GetListingResponseProto getListing(RpcController controller,
       GetListingRequestProto req) throws ServiceException {
     try {
-      DirectoryListingProto result = PBHelper.convert(server.getListing(
+      DirectoryListing result = server.getListing(
           req.getSrc(), req.getStartAfter().toByteArray(),
-          req.getNeedLocation()));
-      return GetListingResponseProto.newBuilder().setDirList(result).build();
+          req.getNeedLocation());
+      if (result !=null) {
+        return GetListingResponseProto.newBuilder().setDirList(
+          PBHelper.convert(result)).build();
+      } else {
+        return NULL_GETLISTING_RESPONSE;
+      }
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -494,6 +516,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     }
   }
   
+  @Override
+  public RestoreFailedStorageResponseProto restoreFailedStorage(
+      RpcController controller, RestoreFailedStorageRequestProto req)
+      throws ServiceException {
+    try {
+      boolean result = server.restoreFailedStorage(req.getArg());
+      return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
   @Override
   public GetFsStatsResponseProto getFsStats(RpcController controller,
       GetFsStatusRequestProto req) throws ServiceException {
@@ -557,19 +592,6 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   }
 
-  @Override
-  public RestoreFailedStorageResponseProto restoreFailedStorage(
-      RpcController controller, RestoreFailedStorageRequestProto req)
-      throws ServiceException {
-    try {
-      boolean result = server.restoreFailedStorage(req.getArg());
-      return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
-          .build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
   static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = 
       RefreshNodesResponseProto.newBuilder().build();
 
@@ -622,9 +644,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, ListCorruptFileBlocksRequestProto req)
       throws ServiceException {
     try {
-      CorruptFileBlocksProto result = PBHelper.convert(server
-          .listCorruptFileBlocks(req.getPath(), req.getCookie()));
-      return ListCorruptFileBlocksResponseProto.newBuilder().setCorrupt(result)
+      CorruptFileBlocks result = server.listCorruptFileBlocks(
+          req.getPath(), req.hasCookie() ? req.getCookie(): null);
+      return ListCorruptFileBlocksResponseProto.newBuilder()
+          .setCorrupt(PBHelper.convert(result))
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -646,29 +669,40 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
 
   }
 
+  static final GetFileInfoResponseProto NULL_GETFILEINFO_RESPONSE = 
+      GetFileInfoResponseProto.newBuilder().build();
   @Override
   public GetFileInfoResponseProto getFileInfo(RpcController controller,
       GetFileInfoRequestProto req) throws ServiceException {
     try {
-      HdfsFileStatus res = server.getFileInfo(req.getSrc());
-      GetFileInfoResponseProto.Builder builder = 
-          GetFileInfoResponseProto.newBuilder();
-      if (res != null) {
-        builder.setFs(PBHelper.convert(res));
+      HdfsFileStatus result = server.getFileInfo(req.getSrc());
+ 
+      if (result != null) {
+        return GetFileInfoResponseProto.newBuilder().setFs(
+            PBHelper.convert(result)).build();
       }
-      return builder.build();
+      return NULL_GETFILEINFO_RESPONSE;      
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
+  static final GetFileLinkInfoResponseProto NULL_GETFILELINKINFO_RESPONSE = 
+      GetFileLinkInfoResponseProto.newBuilder().build();
   @Override
   public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller,
       GetFileLinkInfoRequestProto req) throws ServiceException {
     try {
-      HdfsFileStatusProto result = 
-          PBHelper.convert(server.getFileLinkInfo(req.getSrc()));
-      return GetFileLinkInfoResponseProto.newBuilder().setFs(result).build();
+      HdfsFileStatus result = server.getFileLinkInfo(req.getSrc());
+      if (result != null) {
+        System.out.println("got non null result for getFileLinkInfo for " + req.getSrc());
+        return GetFileLinkInfoResponseProto.newBuilder().setFs(
+            PBHelper.convert(result)).build();
+      } else {
+        System.out.println("got  null result for getFileLinkInfo for " + req.getSrc());
+        return NULL_GETFILELINKINFO_RESPONSE;      
+      }
+
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -679,10 +713,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, GetContentSummaryRequestProto req)
       throws ServiceException {
     try {
-      ContentSummaryProto result = 
-          PBHelper.convert(server.getContentSummary(req.getPath()));
-      return 
-        GetContentSummaryResponseProto.newBuilder().setSummary(result).build();
+      ContentSummary result = server.getContentSummary(req.getPath());
+      return GetContentSummaryResponseProto.newBuilder()
+          .setSummary(PBHelper.convert(result)).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -780,10 +813,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public UpdatePipelineResponseProto updatePipeline(RpcController controller,
       UpdatePipelineRequestProto req) throws ServiceException {
     try {
+      List<DatanodeIDProto> newNodes = req.getNewNodesList();
       server
           .updatePipeline(req.getClientName(), PBHelper.convert(req
               .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
-              .convert((DatanodeIDProto[]) req.getNewNodesList().toArray()));
+              .convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])));
       return VOID_UPDATEPIPELINE_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);

+ 40 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@@ -95,9 +96,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDel
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
@@ -121,6 +124,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSaf
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
@@ -263,7 +268,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setClientName(clientName)
         .build();
     try {
-      return PBHelper.convert(rpcProxy.append(null, req).getBlock());
+      AppendResponseProto res = rpcProxy.append(null, req);
+      return res.hasBlock() ? PBHelper.convert(res.getBlock()) : null;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -304,13 +310,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public void setOwner(String src, String username, String groupname)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
-    SetOwnerRequestProto req = SetOwnerRequestProto.newBuilder()
-        .setSrc(src)
-        .setUsername(username)
-        .setGroupname(groupname)
-        .build();
+    SetOwnerRequestProto.Builder req = SetOwnerRequestProto.newBuilder()
+        .setSrc(src);
+    if (username != null)
+        req.setUsername(username);
+    if (groupname != null)
+        req.setGroupname(groupname);
     try {
-      rpcProxy.setOwner(null, req);
+      rpcProxy.setOwner(null, req.build());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -335,15 +342,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException {
-    AddBlockRequestProto.Builder builder = AddBlockRequestProto.newBuilder();
-    builder.setSrc(src)
-        .setClientName(clientName)
-        .addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
-    if (previous != null) {
-      builder.setPrevious(PBHelper.convert(previous));
-    }
+    AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src)
+        .setClientName(clientName);
+    if (previous != null) 
+      req.setPrevious(PBHelper.convert(previous)); 
+    if (excludeNodes != null) 
+      req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
     try {
-      return PBHelper.convert(rpcProxy.addBlock(null, builder.build()).getBlock());
+      return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -376,13 +382,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public boolean complete(String src, String clientName, ExtendedBlock last)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
-    CompleteRequestProto req = CompleteRequestProto.newBuilder()
+    CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder()
         .setSrc(src)
-        .setClientName(clientName)
-        .setLast(PBHelper.convert(last))
-        .build();
+        .setClientName(clientName);   
+    if (last != null)
+      req.setLast(PBHelper.convert(last));
     try {
-      return rpcProxy.complete(null, req).getResult();
+      return rpcProxy.complete(null, req.build()).getResult();
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -493,7 +499,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setStartAfter(ByteString.copyFrom(startAfter))
         .setNeedLocation(needLocation).build();
     try {
-      return PBHelper.convert(rpcProxy.getListing(null, req).getDirList());
+      GetListingResponseProto result = rpcProxy.getListing(null, req);
+      
+      if (result.hasDirList()) {
+        return PBHelper.convert(result.getDirList());
+      }
+      return null;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -635,11 +646,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
   @Override
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
-    ListCorruptFileBlocksRequestProto req = ListCorruptFileBlocksRequestProto
-        .newBuilder().setPath(path).setCookie(cookie).build();
+    ListCorruptFileBlocksRequestProto.Builder req = 
+        ListCorruptFileBlocksRequestProto.newBuilder().setPath(path);   
+    if (cookie != null) 
+      req.setCookie(cookie);
     try {
       return PBHelper.convert(
-          rpcProxy.listCorruptFileBlocks(null, req).getCorrupt());
+          rpcProxy.listCorruptFileBlocks(null, req.build()).getCorrupt());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -676,7 +689,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
     GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
         .setSrc(src).build();
     try {
-      return PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs());
+      GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
+      return result.hasFs() ?  
+          PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

+ 15 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -974,6 +974,13 @@ public class PBHelper {
     if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
       result.add(CreateFlag.APPEND);
     }
+    if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) {
+      result.add(CreateFlag.CREATE);
+    }
+    if ((flag & CreateFlagProto.OVERWRITE_VALUE) 
+        == CreateFlagProto.OVERWRITE_VALUE) {
+      result.add(CreateFlag.OVERWRITE);
+    }
     return new EnumSetWritable<CreateFlag>(result);
   }
   
@@ -1005,7 +1012,7 @@ public class PBHelper {
   public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
     if (fs == null)
       return null;
-    FileType fType = FileType.IS_DIR;;
+    FileType fType = FileType.IS_FILE;
     if (fs.isDir()) {
       fType = FileType.IS_DIR;
     } else if (fs.isSymlink()) {
@@ -1024,8 +1031,7 @@ public class PBHelper {
       setOwner(fs.getOwner()).
       setGroup(fs.getGroup()).
       setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
-    
-    if (fs.getSymlink() != null) {
+    if (fs.isSymlink())  {
       builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
     }
     if (fs instanceof HdfsLocatedFileStatus) {
@@ -1052,7 +1058,7 @@ public class PBHelper {
     final int len = fs.length;
     HdfsFileStatus[] result = new HdfsFileStatus[len];
     for (int i = 0; i < len; ++i) {
-      PBHelper.convert(fs[i]);
+      result[i] = PBHelper.convert(fs[i]);
     }
     return result;
   }
@@ -1060,9 +1066,11 @@ public class PBHelper {
   public static DirectoryListing convert(DirectoryListingProto dl) {
     if (dl == null)
       return null;
-    return new DirectoryListing(
-        PBHelper.convert((HdfsFileStatusProto[]) 
-            dl.getPartialListingList().toArray()),
+    List<HdfsFileStatusProto> partList =  dl.getPartialListingList();
+    return new DirectoryListing( 
+        partList.isEmpty() ? new HdfsFileStatus[0] 
+          : PBHelper.convert(
+              partList.toArray(new HdfsFileStatusProto[partList.size()])),
         dl.getRemainingEntries());
   }
 

+ 20 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -59,14 +59,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
-import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
@@ -142,9 +144,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
       conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
                   DFS_DATANODE_HANDLER_COUNT_DEFAULT);
     InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
-    ClientNamenodeProtocolServerSideTranslatorR23 
-    clientProtocolServerTranslator = 
-        new ClientNamenodeProtocolServerSideTranslatorR23(this);
+		RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
+         ProtobufRpcEngine.class);
+     ClientNamenodeProtocolServerSideTranslatorPB 
+       clientProtocolServerTranslator = 
+         new ClientNamenodeProtocolServerSideTranslatorPB(this);
+     BlockingService clientNNPbService = ClientNamenodeProtocol.
+         newReflectiveBlockingService(clientProtocolServerTranslator);
     
     DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = 
         new DatanodeProtocolServerSideTranslatorPB(this);
@@ -153,8 +159,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
     NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = 
         new NamenodeProtocolServerSideTranslatorPB(this);
-    BlockingService service = NamenodeProtocolService
-        .newReflectiveBlockingService(namenodeProtocolXlator);
+	  BlockingService NNPbService = NamenodeProtocolService
+          .newReflectiveBlockingService(namenodeProtocolXlator);
     
     InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
     if (dnSocketAddr != null) {
@@ -163,8 +169,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
                     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
       // Add all the RPC protocols that the namenode implements
       this.serviceRpcServer = 
-          RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
-              ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
+          RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
+              ClientNamenodeProtocolPB.class, clientNNPbService,
           dnSocketAddr.getHostName(), dnSocketAddr.getPort(), 
           serviceHandlerCount,
           false, conf, namesystem.getDelegationTokenSecretManager());
@@ -174,7 +180,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
           RefreshUserMappingsProtocol.class, this);
       this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, 
           GetUserMappingsProtocol.class, this);
-      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
+      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
           serviceRpcServer);
       DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
           serviceRpcServer);
@@ -187,9 +193,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     // Add all the RPC protocols that the namenode implements
     this.clientRpcServer = RPC.getServer(
-            org.apache.hadoop.hdfs.protocolR23Compatible.
-            ClientNamenodeWireProtocol.class,
-            clientProtocolServerTranslator, socAddr.getHostName(),
+        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, 
+        clientNNPbService, socAddr.getHostName(),
             socAddr.getPort(), handlerCount, false, conf,
             namesystem.getDelegationTokenSecretManager());
     this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
@@ -198,7 +203,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
         RefreshUserMappingsProtocol.class, this);
     this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
         GetUserMappingsProtocol.class, this);
-    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
+    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
         clientRpcServer);
     DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
         clientRpcServer);
@@ -260,7 +265,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
                                  long clientVersion) throws IOException {
     if (protocol.equals(ClientProtocol.class.getName())) {
       throw new IOException("Old Namenode Client protocol is not supported:" + 
-      protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class); 
+      protocol + "Switch your clientside to " + ClientNamenodeProtocol.class); 
     } else if (protocol.equals(DatanodeProtocol.class.getName())){
       return DatanodeProtocol.versionID;
     } else if (protocol.equals(NamenodeProtocol.class.getName())){

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -74,7 +74,7 @@ message AppendRequestProto {
 }
 
 message AppendResponseProto {
-  required LocatedBlockProto block = 1;
+  optional LocatedBlockProto block = 1;
 }
 
 message SetReplicationRequestProto {
@@ -96,8 +96,8 @@ message SetPermissionResponseProto { // void response
 
 message SetOwnerRequestProto {
   required string src = 1;
-  required string username = 2;
-  required string groupname = 3;
+  optional string username = 2;
+  optional string groupname = 3;
 }
 
 message SetOwnerResponseProto { // void response
@@ -139,7 +139,7 @@ message GetAdditionalDatanodeResponseProto {
 message CompleteRequestProto {
   required string src = 1;
   required string clientName = 2;
-  required ExtendedBlockProto last = 3;
+  optional ExtendedBlockProto last = 3;
 }
 
 message CompleteResponseProto {
@@ -204,7 +204,7 @@ message GetListingRequestProto {
   required bool needLocation = 3;
 }
 message GetListingResponseProto {
-  required DirectoryListingProto dirList = 1;
+  optional DirectoryListingProto dirList = 1;
 }
 
 message RenewLeaseRequestProto {
@@ -311,7 +311,7 @@ message DistributedUpgradeProgressResponseProto {
 
 message ListCorruptFileBlocksRequestProto {
   required string path = 1;
-  required string cookie = 2;
+  optional string cookie = 2;
 }
 
 message ListCorruptFileBlocksResponseProto {
@@ -338,7 +338,7 @@ message GetFileLinkInfoRequestProto {
 }
 
 message GetFileLinkInfoResponseProto {
-  required HdfsFileStatusProto fs = 1;
+  optional HdfsFileStatusProto fs = 1;
 }
 
 message GetContentSummaryRequestProto {