Kaynağa Gözat

HDFS-2642. Protobuf translators for DatanodeProtocol.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1212606 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 yıl önce
ebeveyn
işleme
38a19bc293
17 değiştirilmiş dosya ile 2499 ekleme ve 1497 silme
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 190 187
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DatanodeProtocolProtos.java
  3. 1190 405
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java
  4. 63 849
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/NamenodeProtocolProtos.java
  5. 302 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
  6. 48 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolPB.java
  7. 263 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
  8. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java
  9. 3 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
  11. 282 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  12. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java
  14. 12 9
      hadoop-hdfs-project/hadoop-hdfs/src/proto/DatanodeProtocol.proto
  15. 0 13
      hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto
  16. 16 3
      hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto
  17. 118 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -27,6 +27,8 @@ Trunk (unreleased changes)
     HDFS-2178. Contributing Hoop to HDFS, replacement for HDFS proxy with 
     read/write capabilities. (tucu)
 
+    HDFS-2642. Protobuf translators for DatanodeProtocol. (jitendra)
+
   IMPROVEMENTS
 
     HADOOP-7524 Change RPC to allow multiple protocols including multuple 

Dosya farkı çok büyük olduğundan ihmal edildi
+ 190 - 187
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DatanodeProtocolProtos.java


Dosya farkı çok büyük olduğundan ihmal edildi
+ 1190 - 405
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java


Dosya farkı çok büyük olduğundan ihmal edildi
+ 63 - 849
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/NamenodeProtocolProtos.java


+ 302 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link DatanodeProtocolProtocol} interfaces to the RPC server implementing
+ * {@link DatanodeProtocolProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
+    Closeable {
+  
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final DatanodeProtocolPB rpcProxy;
+  private static final VersionRequestProto VERSION_REQUEST = 
+      VersionRequestProto.newBuilder().build();
+  
+  public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
+      Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
+  }
+
+  private static DatanodeProtocolPB createNamenode(
+      InetSocketAddress nameNodeAddr, Configuration conf,
+      UserGroupInformation ugi) throws IOException {
+    return RPC.getProxy(DatanodeProtocolPB.class,
+        RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
+        conf, NetUtils.getSocketFactory(conf, DatanodeWireProtocol.class));
+  }
+
+  /** Create a {@link NameNode} proxy */
+  static DatanodeProtocolPB createNamenodeWithRetry(
+      DatanodeProtocolPB rpcNamenode) {
+    RetryPolicy createPolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(5,
+            HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = 
+        new HashMap<Class<? extends Exception>, RetryPolicy>();
+    remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
+        createPolicy);
+
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+        new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
+        .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+            remoteExceptionToPolicyMap));
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
+
+    methodNameToPolicyMap.put("create", methodPolicy);
+
+    return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class,
+        rpcNamenode, methodNameToPolicyMap);
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return rpcProxy.getProtocolVersion(protocol, clientVersion);
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocolName,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
+        protocolName, clientVersion, clientMethodsHash));
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public DatanodeRegistration registerDatanode(DatanodeRegistration registration)
+      throws IOException {
+    RegisterDatanodeRequestProto req = RegisterDatanodeRequestProto
+        .newBuilder().setRegistration(PBHelper.convert(registration)).build();
+    RegisterDatanodeResponseProto resp;
+    try {
+      resp = rpcProxy.registerDatanode(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+    return PBHelper.convert(resp.getRegistration());
+  }
+
+  @Override
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes)
+      throws IOException {
+    HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
+        .setCapacity(dfsUsed).setRemaining(remaining)
+        .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
+        .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
+    HeartbeatResponseProto resp;
+    try {
+      resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+    DatanodeCommand[] cmds = new DatanodeCommand[resp.getCmdsList().size()];
+    int index = 0;
+    for (DatanodeCommandProto p : resp.getCmdsList()) {
+      cmds[index] = PBHelper.convert(p);
+      index++;
+    }
+    return cmds;
+  }
+
+  @Override
+  public DatanodeCommand blockReport(DatanodeRegistration registration,
+      String poolId, long[] blocks) throws IOException {
+    BlockReportRequestProto.Builder builder = BlockReportRequestProto
+        .newBuilder().setRegistration(PBHelper.convert(registration))
+        .setBlockPoolId(poolId);
+    if (blocks != null) {
+      for (int i = 0; i < blocks.length; i++) {
+        builder.setBlocks(i, blocks[i]);
+      }
+    }
+    BlockReportRequestProto req = builder.build();
+    BlockReportResponseProto resp;
+    try {
+      resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+    return PBHelper.convert(resp.getCmd());
+  }
+
+  @Override
+  public void blockReceivedAndDeleted(DatanodeRegistration registration,
+      String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
+      throws IOException {
+    BlockReceivedAndDeletedRequestProto.Builder builder = 
+        BlockReceivedAndDeletedRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(registration))
+        .setBlockPoolId(poolId);
+    if (receivedAndDeletedBlocks != null) {
+      for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
+        builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i]));
+      }
+    }
+    BlockReceivedAndDeletedRequestProto req = builder.build();
+    try {
+      rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override
+  public void errorReport(DatanodeRegistration registration, int errorCode,
+      String msg) throws IOException {
+    ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
+        .setRegistartion(PBHelper.convert(registration))
+        .setErrorCode(errorCode).setMsg(msg).build();
+    try {
+      rpcProxy.errorReport(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    try {
+      return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
+          VERSION_REQUEST).getInfo());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
+      throws IOException {
+    ProcessUpgradeRequestProto req = ProcessUpgradeRequestProto.newBuilder()
+        .setCmd(PBHelper.convert(comm)).build();
+    ProcessUpgradeResponseProto resp;
+    try {
+      resp = rpcProxy.processUpgrade(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+    return PBHelper.convert(resp.getCmd());
+  }
+
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
+        .newBuilder();
+    for (int i = 0; i < blocks.length; i++) {
+      builder.addBlocks(i, PBHelper.convert(blocks[i]));
+    }
+    ReportBadBlocksRequestProto req = builder.build();
+    try {
+      rpcProxy.reportBadBlocks(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override
+  public void commitBlockSynchronization(ExtendedBlock block,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      boolean deleteblock, DatanodeID[] newtargets) throws IOException {
+    CommitBlockSynchronizationRequestProto.Builder builder = 
+        CommitBlockSynchronizationRequestProto.newBuilder()
+        .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
+        .setNewLength(newlength).setCloseFile(closeFile)
+        .setDeleteBlock(deleteblock);
+    for (int i = 0; i < newtargets.length; i++) {
+      builder.setNewTaragets(i, PBHelper.convert(newtargets[i]));
+    }
+    CommitBlockSynchronizationRequestProto req = builder.build();
+    try {
+      rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+}

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolPB.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, 
+    clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
+@ProtocolInfo(
+    protocolName = "org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol", 
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface DatanodeProtocolPB extends
+    DatanodeProtocolService.BlockingInterface, VersionedProtocol {
+  
+  /**
+   * This method is defined to get the protocol signature using 
+   * the R23 protocol - hence we have added the suffix of 2 the method name
+   * to avoid conflict.
+   */
+  public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException;
+}

+ 263 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class DatanodeProtocolServerSideTranslatorPB implements
+    DatanodeProtocolPB {
+
+  private final DatanodeProtocol impl;
+  private static final ErrorReportResponseProto ERROR_REPORT_RESPONSE_PROTO = 
+      ErrorReportResponseProto.newBuilder().build();
+  private static final BlockReceivedAndDeletedResponseProto 
+      BLOCK_RECEIVED_AND_DELETE_RESPONSE = 
+          BlockReceivedAndDeletedResponseProto.newBuilder().build();
+  private static final ReportBadBlocksResponseProto REPORT_BAD_BLOCK_RESPONSE = 
+      ReportBadBlocksResponseProto.newBuilder().build();
+  private static final CommitBlockSynchronizationResponseProto 
+      COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
+          CommitBlockSynchronizationResponseProto.newBuilder().build();
+
+  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public RegisterDatanodeResponseProto registerDatanode(
+      RpcController controller, RegisterDatanodeRequestProto request)
+      throws ServiceException {
+    DatanodeRegistration registration = PBHelper.convert(request
+        .getRegistration());
+    DatanodeRegistration registrationResp;
+    try {
+      registrationResp = impl.registerDatanode(registration);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return RegisterDatanodeResponseProto.newBuilder()
+        .setRegistration(PBHelper.convert(registrationResp)).build();
+  }
+
+  @Override
+  public HeartbeatResponseProto sendHeartbeat(RpcController controller,
+      HeartbeatRequestProto request) throws ServiceException {
+    DatanodeCommand[] cmds;
+    try {
+      cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
+          request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
+          request.getBlockPoolUsed(), request.getXmitsInProgress(),
+          request.getXceiverCount(), request.getFailedVolumes());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
+        .newBuilder();
+    if (cmds != null) {
+      for (int i = 0; i < cmds.length; i++) {
+        builder.addCmds(i, PBHelper.convert(cmds[i]));
+      }
+    }
+    return builder.build();
+  }
+
+  @Override
+  public BlockReportResponseProto blockReport(RpcController controller,
+      BlockReportRequestProto request) throws ServiceException {
+    DatanodeCommand cmd;
+    List<Long> blockIds = request.getBlocksList();
+    long[] blocks = new long[blockIds.size()];
+    for (int i = 0; i < blockIds.size(); i++) {
+      blocks[i] = blockIds.get(i);
+    }
+    try {
+      cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
+          request.getBlockPoolId(), blocks);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return BlockReportResponseProto.newBuilder().setCmd(PBHelper.convert(cmd))
+        .build();
+  }
+
+  @Override
+  public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
+      RpcController controller, BlockReceivedAndDeletedRequestProto request)
+      throws ServiceException {
+    List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocksList();
+    ReceivedDeletedBlockInfo[] info = 
+        new ReceivedDeletedBlockInfo[rdbip.size()];
+    for (int i = 0; i < rdbip.size(); i++) {
+      info[i] = PBHelper.convert(rdbip.get(i));
+    }
+    try {
+      impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
+          request.getBlockPoolId(), info);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return BLOCK_RECEIVED_AND_DELETE_RESPONSE;
+  }
+
+  @Override
+  public ErrorReportResponseProto errorReport(RpcController controller,
+      ErrorReportRequestProto request) throws ServiceException {
+    try {
+      impl.errorReport(PBHelper.convert(request.getRegistartion()),
+          request.getErrorCode(), request.getMsg());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return ERROR_REPORT_RESPONSE_PROTO;
+  }
+
+  @Override
+  public VersionResponseProto versionRequest(RpcController controller,
+      VersionRequestProto request) throws ServiceException {
+    NamespaceInfo info;
+    try {
+      info = impl.versionRequest();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return VersionResponseProto.newBuilder()
+        .setInfo(PBHelper.convert(info)).build();
+  }
+
+  @Override
+  public ProcessUpgradeResponseProto processUpgrade(RpcController controller,
+      ProcessUpgradeRequestProto request) throws ServiceException {
+    UpgradeCommand cmd;
+    try {
+      cmd = impl.processUpgradeCommand(PBHelper.convert(request.getCmd()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return ProcessUpgradeResponseProto.newBuilder()
+        .setCmd(PBHelper.convert(cmd)).build();
+  }
+
+  @Override
+  public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
+      ReportBadBlocksRequestProto request) throws ServiceException {
+    List<LocatedBlockProto> lbps = request.getBlocksList();
+    LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
+    for(int i=0; i<lbps.size(); i++) {
+      blocks[i] = PBHelper.convert(lbps.get(i));
+    }
+    try {
+      impl.reportBadBlocks(blocks);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return REPORT_BAD_BLOCK_RESPONSE;
+  }
+
+  @Override
+  public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
+      RpcController controller, CommitBlockSynchronizationRequestProto request)
+      throws ServiceException {
+    List<DatanodeIDProto> dnprotos = request.getNewTaragetsList();
+    DatanodeID[] dns = new DatanodeID[dnprotos.size()];
+    for (int i = 0; i < dnprotos.size(); i++) {
+      dns[i] = PBHelper.convert(dnprotos.get(i));
+    }
+    try {
+      impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
+          request.getNewGenStamp(), request.getNewLength(),
+          request.getCloseFile(), request.getDeleteBlock(), dns);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO;
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return RPC.getProtocolVersion(DatanodeProtocolPB.class);
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    /**
+     * Don't forward this to the server. The protocol version and signature is
+     * that of {@link DatanodeProtocol}
+     */
+    if (!protocol.equals(RPC.getProtocolName(DatanodeProtocolPB.class))) {
+      throw new IOException("Namenode Serverside implements " +
+          RPC.getProtocolName(DatanodeProtocolPB.class) +
+          ". The following requested protocol is unknown: " + protocol);
+    }
+
+    return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+        RPC.getProtocolVersion(DatanodeProtocolPB.class),
+        DatanodeProtocolPB.class);
+  }
+
+  @Override
+  public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    /**
+     * Don't forward this to the server. The protocol version and signature is
+     * that of {@link DatanodeProtocolPB}
+     */
+    return ProtocolSignatureWritable.convert(
+        this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
+  }
+
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java

@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;

+ 3 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java

@@ -20,7 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogR
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionResponseProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
@@ -240,14 +239,6 @@ public class NamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
     return VersionResponseProto.newBuilder()
-        .setInfo(convert(info)).build();
-  }
-
-  private NamespaceInfoProto convert(NamespaceInfo info) {
-    return NamespaceInfoProto.newBuilder()
-        .setBlockPoolID(info.getBlockPoolID())
-        .setBuildVersion(info.getBuildVersion())
-        .setDistUpgradeVersion(info.getDistributedUpgradeVersion())
-        .setStorageInfo(PBHelper.convert(info)).build();
+        .setInfo(PBHelper.convert(info)).build();
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransacti
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;

+ 282 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -27,6 +27,16 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -37,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -57,15 +68,26 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 
@@ -337,10 +359,10 @@ class PBHelper {
   }
   
   public static DatanodeInfoProto convert(DatanodeInfo info) {
-    return DatanodeInfoProto.newBuilder()
-        .setAdminState(PBHelper.convert(info.getAdminState()))
-        .setBlockPoolUsed(info.getBlockPoolUsed())
-        .setCapacity(info.getCapacity())
+    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
+    builder.setBlockPoolUsed(info.getBlockPoolUsed());
+    builder.setAdminState(PBHelper.convert(info.getAdminState()));
+    builder.setCapacity(info.getCapacity())
         .setDfsUsed(info.getDfsUsed())
         .setHostName(info.getHostName())
         .setId(PBHelper.convert((DatanodeID)info))
@@ -349,6 +371,7 @@ class PBHelper {
         .setRemaining(info.getRemaining())
         .setXceiverCount(info.getXceiverCount())
         .build();
+    return builder.build();
   }
 
   public static AdminStates convert(AdminState adminState) {
@@ -378,13 +401,25 @@ class PBHelper {
   public static LocatedBlockProto convert(LocatedBlock b) {
     Builder builder = LocatedBlockProto.newBuilder();
     DatanodeInfo[] locs = b.getLocations();
-    for(DatanodeInfo loc : locs) {
-      builder.addLocs(PBHelper.convert(loc));
+    for (int i = 0; i < locs.length; i++) {
+      builder.addLocs(i, PBHelper.convert(locs[i]));
     }
     return builder.setB(PBHelper.convert(b.getBlock()))
         .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
+  
+  public static LocatedBlock convert(LocatedBlockProto proto) {
+    List<DatanodeInfoProto> locs = proto.getLocsList();
+    DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
+    for (int i = 0; i < locs.size(); i++) {
+      targets[i] = PBHelper.convert(locs.get(i));
+    }
+    LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
+        proto.getOffset(), proto.getCorrupt());
+    lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
+    return lb;
+  }
 
   public static BlockTokenIdentifierProto convert(
       Token<BlockTokenIdentifier> token) {
@@ -417,4 +452,245 @@ class PBHelper {
       return ReplicaState.FINALIZED;
     }
   }
+  
+  public static DatanodeRegistrationProto convert(
+      DatanodeRegistration registration) {
+    DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
+        .newBuilder();
+    return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
+        .setStorageInfo(PBHelper.convert(registration.storageInfo))
+        .setKeys(PBHelper.convert(registration.exportedKeys)).build();
+  }
+
+  public static DatanodeRegistration convert(DatanodeRegistrationProto proto) {
+    return new DatanodeRegistration(PBHelper.convert(proto.getDatanodeID()),
+        PBHelper.convert(proto.getStorageInfo()), PBHelper.convert(proto
+            .getKeys()));
+  }
+
+  public static DatanodeCommand convert(DatanodeCommandProto proto) {
+    switch (proto.getCmdType()) {
+    case BalancerBandwidthCommand:
+      return PBHelper.convert(proto.getBalancerCmd());
+    case BlockCommand:
+      return PBHelper.convert(proto.getBlkCmd());
+    case BlockRecoveryCommand:
+      return PBHelper.convert(proto.getRecoveryCmd());
+    case FinalizeCommand:
+      return PBHelper.convert(proto.getFinalizeCmd());
+    case KeyUpdateCommand:
+      return PBHelper.convert(proto.getKeyUpdateCmd());
+    case RegisterCommand:
+      return PBHelper.convert(proto.getRegisterCmd());
+    case UpgradeCommand:
+      return PBHelper.convert(proto.getUpgradeCmd());
+    }
+    return null;
+  }
+  
+  public static BalancerBandwidthCommandProto convert(
+      BalancerBandwidthCommand bbCmd) {
+    return BalancerBandwidthCommandProto.newBuilder()
+        .setBandwidth(bbCmd.getBalancerBandwidthValue()).build();
+  }
+
+  public static KeyUpdateCommandProto convert(KeyUpdateCommand cmd) {
+    return KeyUpdateCommandProto.newBuilder()
+        .setKeys(PBHelper.convert(cmd.getExportedKeys())).build();
+  }
+
+  public static BlockRecoveryCommandProto convert(BlockRecoveryCommand cmd) {
+    BlockRecoveryCommandProto.Builder builder = BlockRecoveryCommandProto
+        .newBuilder();
+    for (RecoveringBlock b : cmd.getRecoveringBlocks()) {
+      builder.addBlocks(PBHelper.convert(b));
+    }
+    return builder.build();
+  }
+
+  public static FinalizeCommandProto convert(FinalizeCommand cmd) {
+    return FinalizeCommandProto.newBuilder()
+        .setBlockPoolId(cmd.getBlockPoolId()).build();
+  }
+  
+  public static RegisterCommandProto convert(RegisterCommand cmd) {
+    return RegisterCommandProto.newBuilder().build();
+  }
+
+  public static BlockCommandProto convert(BlockCommand cmd) {
+    BlockCommandProto.Builder builder = BlockCommandProto.newBuilder()
+        .setBlockPoolId(cmd.getBlockPoolId());
+    switch (cmd.getAction()) {
+    case DatanodeProtocol.DNA_TRANSFER:
+      builder.setAction(BlockCommandProto.Action.TRANSFER);
+      break;
+    case DatanodeProtocol.DNA_INVALIDATE:
+      builder.setAction(BlockCommandProto.Action.INVALIDATE);
+      break;
+    }
+    Block[] blocks = cmd.getBlocks();
+    for (int i = 0; i < blocks.length; i++) {
+      builder.addBlocks(PBHelper.convert(blocks[i]));
+    }
+    DatanodeInfo[][] infos = cmd.getTargets();
+    for (int i = 0; i < infos.length; i++) {
+      builder.addTargets(PBHelper.convert(infos[i]));
+    }
+    return builder.build();
+  }
+
+  public static DatanodeInfosProto convert(DatanodeInfo[] datanodeInfos) {
+    DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      builder.addDatanodes(PBHelper.convert(datanodeInfos[i]));
+    }
+    return builder.build();
+  }
+
+  public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
+    DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
+    switch (datanodeCommand.getAction()) {
+    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+      builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
+          .setBalancerCmd(
+              PBHelper.convert((BalancerBandwidthCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+      builder
+          .setCmdType(DatanodeCommandProto.Type.KeyUpdateCommand)
+          .setKeyUpdateCmd(PBHelper.convert((KeyUpdateCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockRecoveryCommand)
+          .setRecoveryCmd(
+              PBHelper.convert((BlockRecoveryCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_FINALIZE:
+      builder.setCmdType(DatanodeCommandProto.Type.FinalizeCommand)
+          .setFinalizeCmd(PBHelper.convert((FinalizeCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_REGISTER:
+      builder.setCmdType(DatanodeCommandProto.Type.RegisterCommand)
+          .setRegisterCmd(PBHelper.convert((RegisterCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_TRANSFER:
+    case DatanodeProtocol.DNA_INVALIDATE:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
+          PBHelper.convert((BlockCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_SHUTDOWN: //Not expected
+    case DatanodeProtocol.DNA_UNKNOWN: //Not expected
+    }
+    return builder.build();
+  }
+
+  public static UpgradeCommand convert(UpgradeCommandProto upgradeCmd) {
+    int action = UpgradeCommand.UC_ACTION_UNKNOWN;
+    switch (upgradeCmd.getAction()) {
+    case REPORT_STATUS:
+      action = UpgradeCommand.UC_ACTION_REPORT_STATUS;
+      break;
+    case START_UPGRADE:
+      action = UpgradeCommand.UC_ACTION_START_UPGRADE;
+    }
+    return new UpgradeCommand(action, upgradeCmd.getVersion(),
+        (short) upgradeCmd.getUpgradeStatus());
+  }
+
+  public static RegisterCommand convert(RegisterCommandProto registerCmd) {
+    return new RegisterCommand();
+  }
+
+  public static KeyUpdateCommand convert(KeyUpdateCommandProto keyUpdateCmd) {
+    return new KeyUpdateCommand(PBHelper.convert(keyUpdateCmd.getKeys()));
+  }
+
+  public static FinalizeCommand convert(FinalizeCommandProto finalizeCmd) {
+    return new FinalizeCommand(finalizeCmd.getBlockPoolId());
+  }
+
+  public static BlockRecoveryCommand convert(
+      BlockRecoveryCommandProto recoveryCmd) {
+    List<RecoveringBlockProto> list = recoveryCmd.getBlocksList();
+    List<RecoveringBlock> recoveringBlocks = new ArrayList<RecoveringBlock>(
+        list.size());
+    for (int i = 0; i < list.size(); i++) {
+      recoveringBlocks.add(PBHelper.convert(list.get(0)));
+    }
+    return new BlockRecoveryCommand(recoveringBlocks);
+  }
+
+  public static BlockCommand convert(BlockCommandProto blkCmd) {
+    List<BlockProto> blockProtoList = blkCmd.getBlocksList();
+    List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
+    DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][];
+    Block[] blocks = new Block[blockProtoList.size()];
+    for (int i = 0; i < blockProtoList.size(); i++) {
+      targets[i] = PBHelper.convert(targetList.get(i));
+      blocks[i] = PBHelper.convert(blockProtoList.get(i));
+    }
+    int action = DatanodeProtocol.DNA_UNKNOWN;
+    switch (blkCmd.getAction()) {
+    case TRANSFER:
+      action = DatanodeProtocol.DNA_TRANSFER;
+      break;
+    case INVALIDATE:
+      action = DatanodeProtocol.DNA_INVALIDATE;
+      break;
+    }
+    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+  }
+
+  public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
+    List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
+    DatanodeInfo[] infos = new DatanodeInfo[proto.size()];
+    for (int i = 0; i < infos.length; i++) {
+      infos[i] = PBHelper.convert(proto.get(i));
+    }
+    return infos;
+  }
+
+  public static BalancerBandwidthCommand convert(
+      BalancerBandwidthCommandProto balancerCmd) {
+    return new BalancerBandwidthCommand(balancerCmd.getBandwidth());
+  }
+
+  public static ReceivedDeletedBlockInfoProto convert(
+      ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
+    return ReceivedDeletedBlockInfoProto.newBuilder()
+        .setBlock(PBHelper.convert(receivedDeletedBlockInfo.getBlock()))
+        .setDeleteHint(receivedDeletedBlockInfo.getDelHints()).build();
+  }
+
+  public static UpgradeCommandProto convert(UpgradeCommand comm) {
+    UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder()
+        .setVersion(comm.getVersion())
+        .setUpgradeStatus(comm.getCurrentStatus());
+    switch (comm.getAction()) {
+    case UpgradeCommand.UC_ACTION_REPORT_STATUS:
+      builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
+      break;
+    case UpgradeCommand.UC_ACTION_START_UPGRADE:
+      builder.setAction(UpgradeCommandProto.Action.START_UPGRADE);
+      break;
+    default:
+      builder.setAction(UpgradeCommandProto.Action.UNKNOWN);
+      break;
+    }
+    return builder.build();
+  }
+
+  public static ReceivedDeletedBlockInfo convert(
+      ReceivedDeletedBlockInfoProto proto) {
+    return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
+        proto.getDeleteHint());
+  }
+  
+  public static NamespaceInfoProto convert(NamespaceInfo info) {
+    return NamespaceInfoProto.newBuilder()
+        .setBlockPoolID(info.getBlockPoolID())
+        .setBuildVersion(info.getBuildVersion())
+        .setDistUpgradeVersion(info.getDistributedUpgradeVersion())
+        .setStorageInfo(PBHelper.convert((StorageInfo)info)).build();
+  }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -66,6 +66,13 @@ implements Writable, NodeRegistration {
     this(nodeName, new StorageInfo(), new ExportedBlockKeys());
   }
   
+  public DatanodeRegistration(DatanodeID dn, StorageInfo info,
+      ExportedBlockKeys keys) {
+    super(dn);
+    this.storageInfo = info;
+    this.exportedKeys = keys;
+  }
+  
   public DatanodeRegistration(String nodeName, StorageInfo info,
       ExportedBlockKeys keys) {
     super(nodeName);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java

@@ -40,7 +40,7 @@ import org.apache.hadoop.io.WritableFactory;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class UpgradeCommand extends DatanodeCommand {
-  final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
+  public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
   public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
   public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade
 

+ 12 - 9
hadoop-hdfs-project/hadoop-hdfs/src/proto/DatanodeProtocol.proto

@@ -31,7 +31,7 @@ import "hdfs.proto";
  */
 message DatanodeRegistrationProto {
   required DatanodeIDProto datanodeID = 1;    // Datanode information
-  required StorageInfoProto storateInfo = 2;  // Node information
+  required StorageInfoProto storageInfo = 2;  // Node information
   required ExportedBlockKeysProto keys = 3;   // Block keys
 }
 
@@ -55,7 +55,7 @@ message DatanodeCommandProto {
   // cmdType is set
   optional BalancerBandwidthCommandProto balancerCmd = 2;
   optional BlockCommandProto blkCmd = 3;
-  optional BlockRecoveryCommndProto recoveryCmd = 4;
+  optional BlockRecoveryCommandProto recoveryCmd = 4;
   optional FinalizeCommandProto finalizeCmd = 5;
   optional KeyUpdateCommandProto keyUpdateCmd = 6;
   optional RegisterCommandProto registerCmd = 7;
@@ -77,22 +77,20 @@ message BalancerBandwidthCommandProto {
  * on the given set of blocks.
  */
 message BlockCommandProto {
-  enum Action {
-    UNKNOWN = 0;    // Unknown action   
+  enum Action {  
     TRANSFER = 1;   // Transfer blocks to another datanode
     INVALIDATE = 2; // Invalidate blocks
-    SHUTDOWN = 3;   // Shutdown node
   }
-  required uint32 action = 1;
+  required Action action = 1;
   required string blockPoolId = 2;
   repeated BlockProto blocks = 3;
-  repeated DatanodeIDsProto targets = 4;
+  repeated DatanodeInfosProto targets = 4;
 }
 
 /**
  * List of blocks to be recovered by the datanode
  */
-message BlockRecoveryCommndProto {
+message BlockRecoveryCommandProto {
   repeated RecoveringBlockProto blocks = 1;
 }
 
@@ -126,7 +124,7 @@ message UpgradeCommandProto {
     REPORT_STATUS = 100;  // Report upgrade status
     START_UPGRADE = 101;  // Start upgrade
   }
-  required uint32 action = 1;  // Upgrade action
+  required Action action = 1;  // Upgrade action
   required uint32 version = 2; // Version of the upgrade
   required uint32 upgradeStatus = 3; // % completed in range 0 & 100
 }
@@ -324,6 +322,11 @@ service DatanodeProtocolService {
    * Used for debugging.
    */
   rpc errorReport(ErrorReportRequestProto) returns(ErrorReportResponseProto);
+  
+  /**
+   * Request the version
+   */
+  rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);
 
   /**
    * Generic way to send commands from datanode to namenode during

+ 0 - 13
hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto

@@ -84,19 +84,6 @@ message RollEditLogResponseProto {
   required CheckpointSignatureProto signature = 1;
 }
 
-/**
- * void request
- */
-message VersionRequestProto {
-}
-
-/**
- * void request
- */
-message VersionResponseProto {
-  required NamespaceInfoProto info = 1;
-}
-
 /**
  * registration - Namenode reporting the error
  * errorCode - error code indicating the error

+ 16 - 3
hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto

@@ -54,10 +54,10 @@ message DatanodeIDProto {
 }
 
 /**
- * DatanodeID array
+ * DatanodeInfo array
  */
-message DatanodeIDsProto {
-  repeated DatanodeIDProto datanodes = 1;
+message DatanodeInfosProto {
+  repeated DatanodeInfoProto datanodes = 1;
 }
 
 /**
@@ -345,3 +345,16 @@ message RecoveringBlockProto {
   required LocatedBlockProto block = 2; // Block to be recovered
 }
 
+/**
+ * void request
+ */
+message VersionRequestProto {
+}
+
+/**
+ * Version response from namenode.
+ */
+message VersionResponseProto {
+  required NamespaceInfoProto info = 1;
+}
+

+ 118 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -17,29 +17,34 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
-import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
@@ -47,14 +52,17 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.Text;
@@ -120,6 +128,10 @@ public class TestPBHelper {
     DatanodeID dn = new DatanodeID("node", "sid", 1, 2);
     DatanodeIDProto dnProto = PBHelper.convert(dn);
     DatanodeID dn2 = PBHelper.convert(dnProto);
+    compare(dn, dn2);
+  }
+  
+  void compare(DatanodeID dn, DatanodeID dn2) {
     assertEquals(dn.getHost(), dn2.getHost());
     assertEquals(dn.getInfoPort(), dn2.getInfoPort());
     assertEquals(dn.getIpcPort(), dn2.getIpcPort());
@@ -177,7 +189,6 @@ public class TestPBHelper {
     assertEquals(k1.getExpiryDate(), k2.getExpiryDate());
     assertEquals(k1.getKeyId(), k2.getKeyId());
     assertTrue(Arrays.equals(k1.getEncodedKey(), k2.getEncodedKey()));
-
   }
 
   @Test
@@ -195,7 +206,10 @@ public class TestPBHelper {
         getBlockKey(1), keys);
     ExportedBlockKeysProto expKeysProto = PBHelper.convert(expKeys);
     ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
-
+    compare(expKeys, expKeys1);
+  }
+  
+  void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) {
     BlockKey[] allKeys = expKeys.getAllKeys();
     BlockKey[] allKeys1 = expKeys1.getAllKeys();
     assertEquals(allKeys.length, allKeys1.length);
@@ -314,15 +328,108 @@ public class TestPBHelper {
   }
   
   @Test
-  public void testBlockTokenIdentifier() {
+  public void testConvertBlockToken() {
     Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
         "identifier".getBytes(), "password".getBytes(), new Text("kind"),
         new Text("service"));
     BlockTokenIdentifierProto tokenProto = PBHelper.convert(token);
     Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
-    assertTrue(Arrays.equals(token.getIdentifier(), token2.getIdentifier()));
-    assertTrue(Arrays.equals(token.getPassword(), token2.getPassword()));
-    assertEquals(token.getKind(), token2.getKind());
-    assertEquals(token.getService(), token2.getService());
+    compare(token, token2);
+  }
+  
+  @Test
+  public void testConvertNamespaceInfo() {
+    NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300, 53);
+    NamespaceInfoProto proto = PBHelper.convert(info);
+    NamespaceInfo info2 = PBHelper.convert(proto);
+    compare(info, info2); //Compare the StorageInfo
+    assertEquals(info.getBlockPoolID(), info2.getBlockPoolID());
+    assertEquals(info.getBuildVersion(), info2.getBuildVersion());
+    assertEquals(info.getDistributedUpgradeVersion(),
+        info2.getDistributedUpgradeVersion());
+  }
+
+  private void compare(StorageInfo expected, StorageInfo actual) {
+    assertEquals(expected.clusterID, actual.clusterID);
+    assertEquals(expected.namespaceID, actual.namespaceID);
+    assertEquals(expected.cTime, actual.cTime);
+    assertEquals(expected.layoutVersion, actual.layoutVersion);
+  }
+
+  private void compare(Token<BlockTokenIdentifier> expected,
+      Token<BlockTokenIdentifier> actual) {
+    assertTrue(Arrays.equals(expected.getIdentifier(), actual.getIdentifier()));
+    assertTrue(Arrays.equals(expected.getPassword(), actual.getPassword()));
+    assertEquals(expected.getKind(), actual.getKind());
+    assertEquals(expected.getService(), actual.getService());
+  }
+  
+  @Test
+  public void testConvertLocatedBlock() {
+    DatanodeInfo [] dnInfos = new DatanodeInfo[3];
+    dnInfos[0] = new DatanodeInfo("host0", "0", 5000, 5001, 20000, 10001, 9999,
+        59, 69, 32, "local", "host0", AdminStates.DECOMMISSION_INPROGRESS);
+    dnInfos[1] = new DatanodeInfo("host1", "1", 5000, 5001, 20000, 10001, 9999,
+        59, 69, 32, "local", "host1", AdminStates.DECOMMISSIONED);
+    dnInfos[2] = new DatanodeInfo("host2", "2", 5000, 5001, 20000, 10001, 9999,
+        59, 69, 32, "local", "host1", AdminStates.NORMAL);
+    LocatedBlock lb = new LocatedBlock(
+        new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
+    LocatedBlockProto lbProto = PBHelper.convert(lb);
+    LocatedBlock lb2 = PBHelper.convert(lbProto);
+    assertEquals(lb.getBlock(), lb2.getBlock());
+    compare(lb.getBlockToken(), lb2.getBlockToken());
+    assertEquals(lb.getStartOffset(), lb2.getStartOffset());
+    assertEquals(lb.isCorrupt(), lb2.isCorrupt());
+    DatanodeInfo [] dnInfos2 = lb2.getLocations();
+    assertEquals(dnInfos.length, dnInfos2.length);
+    for (int i = 0; i < dnInfos.length ; i++) {
+      compare(dnInfos[i], dnInfos2[i]);
+    }
+  }
+  
+  @Test
+  public void testConvertDatanodeRegistration() {
+    DatanodeID dnId = new DatanodeID("host", "xyz", 1, 0);
+    BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) };
+    ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
+        getBlockKey(1), keys);
+    DatanodeRegistration reg = new DatanodeRegistration(dnId,
+        new StorageInfo(), expKeys);
+    DatanodeRegistrationProto proto = PBHelper.convert(reg);
+    DatanodeRegistration reg2 = PBHelper.convert(proto);
+    compare(reg.storageInfo, reg2.storageInfo);
+    compare(reg.exportedKeys, reg2.exportedKeys);
+    compare((DatanodeID)reg, (DatanodeID)reg2);
+  }
+  
+  @Test
+  public void testConvertBlockCommand() {
+    Block[] blocks = new Block[] { new Block(21), new Block(22) };
+    DatanodeInfo[][] dnInfos = new DatanodeInfo[][] { new DatanodeInfo[1],
+        new DatanodeInfo[2] };
+    dnInfos[0][0] = new DatanodeInfo();
+    dnInfos[1][0] = new DatanodeInfo();
+    dnInfos[1][1] = new DatanodeInfo();
+    BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
+        blocks, dnInfos);
+    BlockCommandProto bcProto = PBHelper.convert(bc);
+    BlockCommand bc2 = PBHelper.convert(bcProto);
+    assertEquals(bc.getAction(), bc2.getAction());
+    assertEquals(bc.getBlocks().length, bc2.getBlocks().length);
+    Block[] blocks2 = bc2.getBlocks();
+    for (int i = 0; i < blocks.length; i++) {
+      assertEquals(blocks[i], blocks2[i]);
+    }
+    DatanodeInfo[][] dnInfos2 = bc2.getTargets();
+    assertEquals(dnInfos.length, dnInfos2.length);
+    for (int i = 0; i < dnInfos.length; i++) {
+      DatanodeInfo[] d1 = dnInfos[i];
+      DatanodeInfo[] d2 = dnInfos2[i];
+      assertEquals(d1.length, d2.length);
+      for (int j = 0; j < d1.length; j++) {
+        compare(d1[j], d2[j]);
+      }
+    }
   }
 }

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor