Browse Source

HDFS-2629. Implement protobuf service for InterDatanodeProtocol. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1211206 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 13 years ago
parent
commit
7a59150bff

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -17,6 +17,8 @@ Trunk (unreleased changes)
 
     HDFS-2618. Implement protobuf service for NamenodeProtocol. (suresh)
 
+    HDFS-2629. Implement protobuf service for InterDatanodeProtocol. (suresh)
+
   IMPROVEMENTS
 
     HADOOP-7524 Change RPC to allow multiple protocols including multuple 

+ 15 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java

@@ -8,7 +8,7 @@ public final class HdfsProtos {
   public static void registerAllExtensions(
       com.google.protobuf.ExtensionRegistry registry) {
   }
-  public enum ReplicaState
+  public enum ReplicaStateProto
       implements com.google.protobuf.ProtocolMessageEnum {
     FINALIZED(0, 0),
     RBW(1, 1),
@@ -26,7 +26,7 @@ public final class HdfsProtos {
     
     public final int getNumber() { return value; }
     
-    public static ReplicaState valueOf(int value) {
+    public static ReplicaStateProto valueOf(int value) {
       switch (value) {
         case 0: return FINALIZED;
         case 1: return RBW;
@@ -37,15 +37,15 @@ public final class HdfsProtos {
       }
     }
     
-    public static com.google.protobuf.Internal.EnumLiteMap<ReplicaState>
+    public static com.google.protobuf.Internal.EnumLiteMap<ReplicaStateProto>
         internalGetValueMap() {
       return internalValueMap;
     }
-    private static com.google.protobuf.Internal.EnumLiteMap<ReplicaState>
+    private static com.google.protobuf.Internal.EnumLiteMap<ReplicaStateProto>
         internalValueMap =
-          new com.google.protobuf.Internal.EnumLiteMap<ReplicaState>() {
-            public ReplicaState findValueByNumber(int number) {
-              return ReplicaState.valueOf(number);
+          new com.google.protobuf.Internal.EnumLiteMap<ReplicaStateProto>() {
+            public ReplicaStateProto findValueByNumber(int number) {
+              return ReplicaStateProto.valueOf(number);
             }
           };
     
@@ -62,11 +62,11 @@ public final class HdfsProtos {
       return org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.getDescriptor().getEnumTypes().get(0);
     }
     
-    private static final ReplicaState[] VALUES = {
+    private static final ReplicaStateProto[] VALUES = {
       FINALIZED, RBW, RWR, RUR, TEMPORARY, 
     };
     
-    public static ReplicaState valueOf(
+    public static ReplicaStateProto valueOf(
         com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
       if (desc.getType() != getDescriptor()) {
         throw new java.lang.IllegalArgumentException(
@@ -78,12 +78,12 @@ public final class HdfsProtos {
     private final int index;
     private final int value;
     
-    private ReplicaState(int index, int value) {
+    private ReplicaStateProto(int index, int value) {
       this.index = index;
       this.value = value;
     }
     
-    // @@protoc_insertion_point(enum_scope:ReplicaState)
+    // @@protoc_insertion_point(enum_scope:ReplicaStateProto)
   }
   
   public interface ExtendedBlockProtoOrBuilder
@@ -20192,10 +20192,10 @@ public final class HdfsProtos {
       "ntKey\030\004 \002(\0132\016.BlockKeyProto\022\037\n\007allKeys\030\005" +
       " \003(\0132\016.BlockKeyProto\"N\n\024RecoveringBlockP" +
       "roto\022\023\n\013newGenStamp\030\001 \002(\004\022!\n\005block\030\002 \002(\013" +
-      "2\022.LocatedBlockProto*G\n\014ReplicaState\022\r\n\t" +
-      "FINALIZED\020\000\022\007\n\003RBW\020\001\022\007\n\003RWR\020\002\022\007\n\003RUR\020\003\022\r" +
-      "\n\tTEMPORARY\020\004B6\n%org.apache.hadoop.hdfs.",
-      "protocol.protoB\nHdfsProtos\240\001\001"
+      "2\022.LocatedBlockProto*L\n\021ReplicaStateProt" +
+      "o\022\r\n\tFINALIZED\020\000\022\007\n\003RBW\020\001\022\007\n\003RWR\020\002\022\007\n\003RU" +
+      "R\020\003\022\r\n\tTEMPORARY\020\004B6\n%org.apache.hadoop.",
+      "hdfs.protocol.protoB\nHdfsProtos\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

+ 30 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/InterDatanodeProtocolProtos.java

@@ -484,9 +484,9 @@ public final class InterDatanodeProtocolProtos {
   public interface InitReplicaRecoveryResponseProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
     
-    // required .ReplicaState state = 1;
+    // required .ReplicaStateProto state = 1;
     boolean hasState();
-    org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState();
+    org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState();
     
     // required .BlockProto block = 2;
     boolean hasBlock();
@@ -522,13 +522,13 @@ public final class InterDatanodeProtocolProtos {
     }
     
     private int bitField0_;
-    // required .ReplicaState state = 1;
+    // required .ReplicaStateProto state = 1;
     public static final int STATE_FIELD_NUMBER = 1;
-    private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState state_;
+    private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto state_;
     public boolean hasState() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
-    public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState() {
+    public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState() {
       return state_;
     }
     
@@ -546,7 +546,7 @@ public final class InterDatanodeProtocolProtos {
     }
     
     private void initFields() {
-      state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED;
+      state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED;
       block_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
@@ -763,7 +763,7 @@ public final class InterDatanodeProtocolProtos {
       
       public Builder clear() {
         super.clear();
-        state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED;
+        state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED;
         bitField0_ = (bitField0_ & ~0x00000001);
         if (blockBuilder_ == null) {
           block_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto.getDefaultInstance();
@@ -888,7 +888,7 @@ public final class InterDatanodeProtocolProtos {
             }
             case 8: {
               int rawValue = input.readEnum();
-              org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState value = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.valueOf(rawValue);
+              org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto value = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.valueOf(rawValue);
               if (value == null) {
                 unknownFields.mergeVarintField(1, rawValue);
               } else {
@@ -912,15 +912,15 @@ public final class InterDatanodeProtocolProtos {
       
       private int bitField0_;
       
-      // required .ReplicaState state = 1;
-      private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED;
+      // required .ReplicaStateProto state = 1;
+      private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED;
       public boolean hasState() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
-      public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState() {
+      public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState() {
         return state_;
       }
-      public Builder setState(org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState value) {
+      public Builder setState(org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto value) {
         if (value == null) {
           throw new NullPointerException();
         }
@@ -931,7 +931,7 @@ public final class InterDatanodeProtocolProtos {
       }
       public Builder clearState() {
         bitField0_ = (bitField0_ & ~0x00000001);
-        state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED;
+        state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED;
         onChanged();
         return this;
       }
@@ -2448,22 +2448,23 @@ public final class InterDatanodeProtocolProtos {
     java.lang.String[] descriptorData = {
       "\n\033InterDatanodeProtocol.proto\032\nhdfs.prot" +
       "o\"G\n\037InitReplicaRecoveryRequestProto\022$\n\005" +
-      "block\030\001 \002(\0132\025.RecoveringBlockProto\"\\\n In" +
-      "itReplicaRecoveryResponseProto\022\034\n\005state\030" +
-      "\001 \002(\0162\r.ReplicaState\022\032\n\005block\030\002 \002(\0132\013.Bl" +
-      "ockProto\"s\n&UpdateReplicaUnderRecoveryRe" +
-      "questProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedBloc" +
-      "kProto\022\022\n\nrecoveryId\030\002 \002(\004\022\021\n\tnewLength\030" +
-      "\003 \002(\004\"M\n\'UpdateReplicaUnderRecoveryRespo" +
-      "nseProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedBlockP",
-      "roto2\353\001\n\034InterDatanodeProtocolService\022Z\n" +
-      "\023initReplicaRecovery\022 .InitReplicaRecove" +
-      "ryRequestProto\032!.InitReplicaRecoveryResp" +
-      "onseProto\022o\n\032updateReplicaUnderRecovery\022" +
-      "\'.UpdateReplicaUnderRecoveryRequestProto" +
-      "\032(.UpdateReplicaUnderRecoveryResponsePro" +
-      "toBJ\n%org.apache.hadoop.hdfs.protocol.pr" +
-      "otoB\033InterDatanodeProtocolProtos\210\001\001\240\001\001"
+      "block\030\001 \002(\0132\025.RecoveringBlockProto\"a\n In" +
+      "itReplicaRecoveryResponseProto\022!\n\005state\030" +
+      "\001 \002(\0162\022.ReplicaStateProto\022\032\n\005block\030\002 \002(\013" +
+      "2\013.BlockProto\"s\n&UpdateReplicaUnderRecov" +
+      "eryRequestProto\022\"\n\005block\030\001 \002(\0132\023.Extende" +
+      "dBlockProto\022\022\n\nrecoveryId\030\002 \002(\004\022\021\n\tnewLe" +
+      "ngth\030\003 \002(\004\"M\n\'UpdateReplicaUnderRecovery" +
+      "ResponseProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedB",
+      "lockProto2\353\001\n\034InterDatanodeProtocolServi" +
+      "ce\022Z\n\023initReplicaRecovery\022 .InitReplicaR" +
+      "ecoveryRequestProto\032!.InitReplicaRecover" +
+      "yResponseProto\022o\n\032updateReplicaUnderReco" +
+      "very\022\'.UpdateReplicaUnderRecoveryRequest" +
+      "Proto\032(.UpdateReplicaUnderRecoveryRespon" +
+      "seProtoBJ\n%org.apache.hadoop.hdfs.protoc" +
+      "ol.protoB\033InterDatanodeProtocolProtos\210\001\001" +
+      "\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolPB.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName = 
+    "org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface InterDatanodeProtocolPB extends
+    InterDatanodeProtocolService.BlockingInterface, VersionedProtocol {
+
+  /**
+   * This method is defined to get the protocol signature using 
+   * the R23 protocol - hence we have added the suffix of 2 the method name
+   * to avoid conflict.
+   */
+  public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException;
+}

+ 130 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java

@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link InterDatanodeProtocolPB} to the
+ * {@link InterDatanodeProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class InterDatanodeProtocolServerSideTranslatorPB implements
+    InterDatanodeProtocolPB {
+  private final InterDatanodeProtocol impl;
+
+  public InterDatanodeProtocolServerSideTranslatorPB(InterDatanodeProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public InitReplicaRecoveryResponseProto initReplicaRecovery(
+      RpcController unused, InitReplicaRecoveryRequestProto request)
+      throws ServiceException {
+    RecoveringBlock b = PBHelper.convert(request.getBlock());
+    ReplicaRecoveryInfo r;
+    try {
+      r = impl.initReplicaRecovery(b);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return InitReplicaRecoveryResponseProto.newBuilder()
+        .setBlock(PBHelper.convert(r)).build();
+  }
+
+  @Override
+  public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
+      RpcController unused, UpdateReplicaUnderRecoveryRequestProto request)
+      throws ServiceException {
+    ExtendedBlock b;
+    try {
+      b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()),
+          request.getRecoveryId(), request.getNewLength());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
+        .setBlock(PBHelper.convert(b)).build();
+  }
+
+  /** @see VersionedProtocol#getProtocolVersion */
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return RPC.getProtocolVersion(InterDatanodeProtocolPB.class);
+  }
+  
+  /**
+   * The client side will redirect getProtocolSignature to
+   * getProtocolSignature2.
+   * 
+   * However the RPC layer below on the Server side will call getProtocolVersion
+   * and possibly in the future getProtocolSignature. Hence we still implement
+   * it even though the end client will never call this method.
+   * 
+   * @see VersionedProtocol#getProtocolVersion
+   */
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    /**
+     * Don't forward this to the server. The protocol version and signature is
+     * that of {@link InterDatanodeProtocol}
+     */
+    if (!protocol.equals(RPC.getProtocolName(InterDatanodeProtocol.class))) {
+      throw new IOException("Namenode Serverside implements " +
+          RPC.getProtocolName(InterDatanodeProtocol.class) +
+          ". The following requested protocol is unknown: " + protocol);
+    }
+
+    return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+        RPC.getProtocolVersion(InterDatanodeProtocolPB.class),
+        InterDatanodeProtocol.class);
+  }
+
+
+  @Override
+  public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    /**
+     * Don't forward this to the server. The protocol version and signature is
+     * that of {@link InterDatanodeProtocol}
+     */
+    return ProtocolSignatureWritable.convert(
+        this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
+  }
+}

+ 114 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java

@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link InterDatanodeProtocol} interfaces to the RPC server implementing
+ * {@link InterDatanodeProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class InterDatanodeProtocolTranslatorPB implements
+    InterDatanodeProtocol, Closeable {
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  final private InterDatanodeProtocolPB rpcProxy;
+
+  public InterDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
+      Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class,
+        RPC.getProtocolVersion(InterDatanodeProtocolPB.class), nameNodeAddr,
+        conf);
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocolName, long clientVersion)
+      throws IOException {
+    return rpcProxy.getProtocolVersion(protocolName, clientVersion);
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
+        protocol, clientVersion, clientMethodsHash));
+  }
+
+  @Override
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+      throws IOException {
+    InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto
+        .newBuilder().setBlock(PBHelper.convert(rBlock)).build();
+    InitReplicaRecoveryResponseProto resp;
+    try {
+      resp = rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    BlockProto b = resp.getBlock();
+    return new ReplicaRecoveryInfo(b.getBlockId(), b.getNumBytes(),
+        b.getGenStamp(), PBHelper.convert(resp.getState()));
+  }
+
+  @Override
+  public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+      long recoveryId, long newLength) throws IOException {
+    UpdateReplicaUnderRecoveryRequestProto req = 
+        UpdateReplicaUnderRecoveryRequestProto.newBuilder()
+        .setBlock(PBHelper.convert(oldBlock))
+        .setNewLength(newLength).setRecoveryId(recoveryId).build();
+    try {
+      return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery(
+          NULL_CONTROLLER, req).getBlock());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
@@ -37,6 +38,7 @@ import com.google.protobuf.ServiceException;
  * received on {@link JournalProtocolPB} to the 
  * {@link JournalProtocol} server implementation.
  */
+@InterfaceAudience.Private
 public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB {
   /** Server side implementation to delegate the requests to */
   private final JournalProtocol impl;
@@ -118,4 +120,4 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
     return ProtocolSignatureWritable.convert(
         this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
   }
-}
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
@@ -150,7 +151,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
     GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
-        .setDatanode(PBHelper.convert(datanode)).setSize(size)
+        .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
         .build();
     try {
       return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)

+ 139 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -23,26 +23,41 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -51,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
 
@@ -155,7 +171,13 @@ class PBHelper {
   }
 
   public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) {
-    return new BlocksWithLocations(convert(blocks.getBlocksList()));
+    List<BlockWithLocationsProto> b = blocks.getBlocksList();
+    BlockWithLocations[] ret = new BlockWithLocations[b.size()];
+    int i = 0;
+    for (BlockWithLocationsProto entry : b) {
+      ret[i++] = convert(entry);
+    }
+    return new BlocksWithLocations(ret);
   }
 
   public static BlockKeyProto convert(BlockKey key) {
@@ -247,15 +269,6 @@ class PBHelper {
     return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build();
   }
 
-  public static BlockWithLocations[] convert(List<BlockWithLocationsProto> b) {
-    BlockWithLocations[] ret = new BlockWithLocations[b.size()];
-    int i = 0;
-    for (BlockWithLocationsProto entry : b) {
-      ret[i++] = convert(entry);
-    }
-    return ret;
-  }
-
   public static BlockKey[] convertBlockKeys(List<BlockKeyProto> list) {
     BlockKey[] ret = new BlockKey[list.size()];
     int i = 0;
@@ -281,4 +294,119 @@ class PBHelper {
       return new NamenodeCommand(cmd.getAction());
     }
   }
-}
+
+  public static ExtendedBlockProto convert(ExtendedBlock b) {
+    return ExtendedBlockProto.newBuilder().setBlockId(b.getBlockId())
+        .setGenerationStamp(b.getGenerationStamp())
+        .setNumBytes(b.getNumBytes()).setPoolId(b.getBlockPoolId()).build();
+  }
+
+  public static ExtendedBlock convert(ExtendedBlockProto b) {
+    return new ExtendedBlock(b.getPoolId(), b.getBlockId(), b.getNumBytes(),
+        b.getGenerationStamp());
+  }
+
+  public static RecoveringBlockProto convert(RecoveringBlock b) {
+    LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
+    return RecoveringBlockProto.newBuilder().setBlock(lb)
+        .setNewGenStamp(b.getNewGenerationStamp()).build();
+  }
+
+  public static RecoveringBlock convert(RecoveringBlockProto b) {
+    ExtendedBlock block = convert(b.getBlock().getB());
+    DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
+    return new RecoveringBlock(block, locs, b.getNewGenStamp());
+  }
+
+  public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
+    DatanodeInfo[] info = new DatanodeInfo[list.size()];
+    for (int i = 0; i < info.length; i++) {
+      info[i] = convert(list.get(i));
+    }
+    return info;
+  }
+
+  public static DatanodeInfo convert(DatanodeInfoProto info) {
+    DatanodeIDProto dnId = info.getId();
+    return new DatanodeInfo(dnId.getName(), dnId.getStorageID(),
+        dnId.getInfoPort(), dnId.getIpcPort(), info.getCapacity(),
+        info.getDfsUsed(), info.getRemaining(), info.getBlockPoolUsed(),
+        info.getLastUpdate(), info.getXceiverCount(), info.getLocation(),
+        info.getHostName(), convert(info.getAdminState()));
+  }
+  
+  public static DatanodeInfoProto convert(DatanodeInfo info) {
+    return DatanodeInfoProto.newBuilder()
+        .setAdminState(PBHelper.convert(info.getAdminState()))
+        .setBlockPoolUsed(info.getBlockPoolUsed())
+        .setCapacity(info.getCapacity())
+        .setDfsUsed(info.getDfsUsed())
+        .setHostName(info.getHostName())
+        .setId(PBHelper.convert((DatanodeID)info))
+        .setLastUpdate(info.getLastUpdate())
+        .setLocation(info.getNetworkLocation())
+        .setRemaining(info.getRemaining())
+        .setXceiverCount(info.getXceiverCount())
+        .build();
+  }
+
+  public static AdminStates convert(AdminState adminState) {
+    switch(adminState) {
+    case DECOMMISSION_INPROGRESS:
+      return AdminStates.DECOMMISSION_INPROGRESS;
+    case DECOMMISSIONED:
+      return AdminStates.DECOMMISSIONED;
+    case NORMAL:
+    default:
+      return AdminStates.NORMAL;
+    }
+  }
+  
+  public static AdminState convert(AdminStates adminState) {
+    switch(adminState) {
+    case DECOMMISSION_INPROGRESS:
+      return AdminState.DECOMMISSION_INPROGRESS;
+    case DECOMMISSIONED:
+      return AdminState.DECOMMISSIONED;
+    case NORMAL:
+    default:
+      return AdminState.NORMAL;
+    }
+  }
+
+  public static LocatedBlockProto convert(LocatedBlock b) {
+    Builder builder = LocatedBlockProto.newBuilder();
+    DatanodeInfo[] locs = b.getLocations();
+    for(DatanodeInfo loc : locs) {
+      builder.addLocs(PBHelper.convert(loc));
+    }
+    return builder.setB(PBHelper.convert(b.getBlock()))
+        .setBlockToken(PBHelper.convert(b.getBlockToken()))
+        .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
+  }
+
+  public static BlockTokenIdentifierProto convert(
+      Token<BlockTokenIdentifier> token) {
+    ByteString tokenId = ByteString.copyFrom(token.getIdentifier());
+    ByteString password = ByteString.copyFrom(token.getPassword());
+    return BlockTokenIdentifierProto.newBuilder().setIdentifier(tokenId)
+        .setKind(token.getKind().toString()).setPassword(password)
+        .setService(token.getService().toString()).build();
+  }
+
+  public static ReplicaState convert(ReplicaStateProto state) {
+    switch (state) {
+    case RBW:
+      return ReplicaState.RBW;
+    case RUR:
+      return ReplicaState.RUR;
+    case RWR:
+      return ReplicaState.RWR;
+    case TEMPORARY:
+      return ReplicaState.TEMPORARY;
+    case FINALIZED:
+    default:
+      return ReplicaState.FINALIZED;
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto

@@ -38,7 +38,7 @@ message InitReplicaRecoveryRequestProto {
  * Repica recovery information
  */
 message InitReplicaRecoveryResponseProto {
-  required ReplicaState state = 1; // State fo the replica
+  required ReplicaStateProto state = 1; // State fo the replica
   required BlockProto block = 2;   // block information
 }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto

@@ -329,7 +329,7 @@ message ExportedBlockKeysProto {
 /**
  * State of a block replica at a datanode
  */
-enum ReplicaState {
+enum ReplicaStateProto {
   FINALIZED = 0;  // State of a replica when it is not modified
   RBW = 1;        // State of replica that is being written to
   RWR = 2;        // State of replica that is waiting to be recovered

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -25,6 +25,8 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
@@ -33,8 +35,10 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
@@ -43,11 +47,13 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 /**
@@ -241,4 +247,59 @@ public class TestPBHelper {
       compare(logs.get(i), logs1.get(i));
     }
   }
+  
+  public ExtendedBlock getExtendedBlock() {
+    return new ExtendedBlock("bpid", 1, 100, 2);
+  }
+  
+  public DatanodeInfo getDNInfo() {
+    return new DatanodeInfo(new DatanodeID("node", "sid", 1, 2));
+  }
+  
+  private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
+      assertEquals(dn1.getAdminState(), dn2.getAdminState());
+      assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed());
+      assertEquals(dn1.getBlockPoolUsedPercent(), dn2.getBlockPoolUsedPercent());
+      assertEquals(dn1.getCapacity(), dn2.getCapacity());
+      assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
+      assertEquals(dn1.getDfsUsed(), dn1.getDfsUsed());
+      assertEquals(dn1.getDfsUsedPercent(), dn1.getDfsUsedPercent());
+      assertEquals(dn1.getHost(), dn2.getHost());
+      assertEquals(dn1.getHostName(), dn2.getHostName());
+      assertEquals(dn1.getInfoPort(), dn2.getInfoPort());
+      assertEquals(dn1.getIpcPort(), dn2.getIpcPort());
+      assertEquals(dn1.getLastUpdate(), dn2.getLastUpdate());
+      assertEquals(dn1.getLevel(), dn2.getLevel());
+      assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation());
+  }
+  
+  @Test
+  public void testConvertExtendedBlock() {
+    ExtendedBlock b = getExtendedBlock();
+    ExtendedBlockProto bProto = PBHelper.convert(b);
+    ExtendedBlock b1 = PBHelper.convert(bProto);
+    assertEquals(b, b1);
+  }
+  
+  @Test
+  public void testConvertRecoveringBlock() {
+    DatanodeInfo[] dnInfo = new DatanodeInfo[] { getDNInfo(), getDNInfo() };
+    RecoveringBlock b = new RecoveringBlock(getExtendedBlock(), dnInfo, 3);
+    RecoveringBlockProto bProto = PBHelper.convert(b);
+    RecoveringBlock b1 = PBHelper.convert(bProto);
+    assertEquals(b.getBlock(), b1.getBlock());
+    DatanodeInfo[] dnInfo1 = b1.getLocations();
+    assertEquals(dnInfo.length, dnInfo1.length);
+    for (int i=0; i < dnInfo.length; i++) {
+      compare(dnInfo[0], dnInfo1[0]);
+    }
+  }
+  
+  @Test
+  public void testConvertText() {
+    Text t = new Text("abc".getBytes());
+    String s = t.toString();
+    Text t1 = new Text(s);
+    assertEquals(t, t1);
+  }
 }