浏览代码

HDFS-4363. Merge change r1431088 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1431101 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 年之前
父节点
当前提交
fa72c8f6eb
共有 19 个文件被更改,包括 138 次插入306 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 10 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  3. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  4. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  5. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  6. 0 180
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
  8. 7 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
  10. 18 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
  11. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
  12. 2 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  13. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  14. 53 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  17. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  18. 0 42
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java
  19. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -176,6 +176,9 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4035. LightWeightGSet and LightWeightHashSet increment a
     volatile without synchronization. (eli)
+    
+    HDFS-4363. Combine PBHelper and HdfsProtoUtil and remove redundant
+    methods. (suresh)
 
   OPTIMIZATIONS
 

+ 10 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -116,7 +116,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -130,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -364,7 +364,7 @@ public class DFSClient implements java.io.Closeable {
 
   /**
    * Same as this(nameNodeUri, conf, null);
-   * @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
+   * @see #DFSClient(URI, Configuration, FileSystem.Statistics)
    */
   public DFSClient(URI nameNodeUri, Configuration conf
       ) throws IOException {
@@ -373,7 +373,7 @@ public class DFSClient implements java.io.Closeable {
 
   /**
    * Same as this(nameNodeUri, null, conf, stats);
-   * @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) 
+   * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) 
    */
   public DFSClient(URI nameNodeUri, Configuration conf,
                    FileSystem.Statistics stats)
@@ -1183,7 +1183,7 @@ public class DFSClient implements java.io.Closeable {
 
   /**
    * Call {@link #create(String, FsPermission, EnumSet, short, long, 
-   * Progressable, int)} with default <code>permission</code>
+   * Progressable, int, ChecksumOpt)} with default <code>permission</code>
    * {@link FsPermission#getDefault()}.
    * 
    * @param src File name
@@ -1294,7 +1294,7 @@ public class DFSClient implements java.io.Closeable {
   
   /**
    * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
-   *  Progressable, int)} except that the permission
+   *  Progressable, int, ChecksumOpt)} except that the permission
    *  is absolute (ie has already been masked with umask.
    */
   public DFSOutputStream primitiveCreate(String src, 
@@ -1479,7 +1479,7 @@ public class DFSClient implements java.io.Closeable {
   }
   /**
    * Delete file or directory.
-   * See {@link ClientProtocol#delete(String)}. 
+   * See {@link ClientProtocol#delete(String, boolean)}. 
    */
   @Deprecated
   public boolean delete(String src) throws IOException {
@@ -1704,7 +1704,7 @@ public class DFSClient implements java.io.Closeable {
           new Sender(out).blockChecksum(block, lb.getBlockToken());
 
           final BlockOpResponseProto reply =
-            BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+            BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
 
           if (reply.getStatus() != Status.SUCCESS) {
             if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
@@ -1751,8 +1751,8 @@ public class DFSClient implements java.io.Closeable {
           md5.write(md5out);
           
           // read crc-type
-          final DataChecksum.Type ct = HdfsProtoUtil.
-              fromProto(checksumData.getCrcType());
+          final DataChecksum.Type ct = PBHelper.convert(checksumData
+              .getCrcType());
           if (i == 0) { // first block
             crcType = ct;
           } else if (crcType != DataChecksum.Type.MIXED
@@ -1914,7 +1914,7 @@ public class DFSClient implements java.io.Closeable {
    * @param isChecked
    *          If true, then check only active namenode's safemode status, else
    *          check first namenode's status.
-   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeActio,boolean)
+   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
    */
   public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
     return namenode.setSafeMode(action, isChecked);    

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -66,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -888,7 +888,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
 
         //ack
         BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
         if (SUCCESS != response.getStatus()) {
           throw new IOException("Failed to add a datanode");
         }
@@ -1078,7 +1078,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-              HdfsProtoUtil.vintPrefixed(blockReplyStream));
+              PBHelper.vintPrefixed(blockReplyStream));
           pipelineStatus = resp.getStatus();
           firstBadLink = resp.getFirstBadLink();
           

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.io.IOUtils;
@@ -392,7 +391,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
                                 bufferSize));
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        vintPrefixed(in));
+        PBHelper.vintPrefixed(in));
     RemoteBlockReader2.checkSuccess(status, sock, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.net.SocketInputWrapper;
@@ -401,7 +400,7 @@ public class RemoteBlockReader2  implements BlockReader {
     DataInputStream in = new DataInputStream(ioStreams.in);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        vintPrefixed(in));
+        PBHelper.vintPrefixed(in));
     checkSuccess(status, sock, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

+ 0 - 180
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java

@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
-import org.apache.hadoop.security.token.Token;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedInputStream;
-
-/**
- * Utilities for converting to and from protocol buffers used in the
- * HDFS wire protocol, as well as some generic utilities useful
- * for dealing with protocol buffers.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public abstract class HdfsProtoUtil {
-  
-  //// Block Token ////
-  
-  public static TokenProto toProto(Token<?> blockToken) {
-    return TokenProto.newBuilder()
-      .setIdentifier(ByteString.copyFrom(blockToken.getIdentifier()))
-      .setPassword(ByteString.copyFrom(blockToken.getPassword()))
-      .setKind(blockToken.getKind().toString())
-      .setService(blockToken.getService().toString())
-      .build();
-  }
-
-  public static Token<BlockTokenIdentifier> fromProto(TokenProto proto) {
-    return new Token<BlockTokenIdentifier>(proto.getIdentifier().toByteArray(),
-        proto.getPassword().toByteArray(),
-        new Text(proto.getKind()),
-        new Text(proto.getService()));
-  }
-
-  //// Extended Block ////
-  
-  public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) {
-    return HdfsProtos.ExtendedBlockProto.newBuilder()
-      .setBlockId(block.getBlockId())
-      .setPoolId(block.getBlockPoolId())
-      .setNumBytes(block.getNumBytes())
-      .setGenerationStamp(block.getGenerationStamp())
-      .build();
-  }
-    
-  public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) {
-    return new ExtendedBlock(
-        proto.getPoolId(), proto.getBlockId(),
-        proto.getNumBytes(), proto.getGenerationStamp());
-  }
-
-  //// DatanodeID ////
-  
-  private static HdfsProtos.DatanodeIDProto toProto(
-      DatanodeID dni) {
-    return HdfsProtos.DatanodeIDProto.newBuilder()
-      .setIpAddr(dni.getIpAddr())
-      .setHostName(dni.getHostName())
-      .setStorageID(dni.getStorageID())
-      .setXferPort(dni.getXferPort())
-      .setInfoPort(dni.getInfoPort())
-      .setIpcPort(dni.getIpcPort())
-      .build();
-  }
-  
-  private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
-    return new DatanodeID(
-        idProto.getIpAddr(),
-        idProto.getHostName(),
-        idProto.getStorageID(),
-        idProto.getXferPort(),
-        idProto.getInfoPort(),
-        idProto.getIpcPort());
-  }
-  
-  //// DatanodeInfo ////
-  
-  public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) {
-    return HdfsProtos.DatanodeInfoProto.newBuilder()
-      .setId(toProto((DatanodeID)dni))
-      .setCapacity(dni.getCapacity())
-      .setDfsUsed(dni.getDfsUsed())
-      .setRemaining(dni.getRemaining())
-      .setBlockPoolUsed(dni.getBlockPoolUsed())
-      .setLastUpdate(dni.getLastUpdate())
-      .setXceiverCount(dni.getXceiverCount())
-      .setLocation(dni.getNetworkLocation())
-      .setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf(
-          dni.getAdminState().name()))
-      .build();
-  }
-
-  public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) {
-    DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()),
-        dniProto.getLocation());
-
-    dniObj.setCapacity(dniProto.getCapacity());
-    dniObj.setDfsUsed(dniProto.getDfsUsed());
-    dniObj.setRemaining(dniProto.getRemaining());
-    dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed());
-    dniObj.setLastUpdate(dniProto.getLastUpdate());
-    dniObj.setXceiverCount(dniProto.getXceiverCount());
-    dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf(
-        dniProto.getAdminState().name()));
-    return dniObj;
-  }
-  
-  public static ArrayList<? extends HdfsProtos.DatanodeInfoProto> toProtos(
-      DatanodeInfo[] dnInfos, int startIdx) {
-    ArrayList<HdfsProtos.DatanodeInfoProto> protos =
-      Lists.newArrayListWithCapacity(dnInfos.length);
-    for (int i = startIdx; i < dnInfos.length; i++) {
-      protos.add(toProto(dnInfos[i]));
-    }
-    return protos;
-  }
-  
-  public static DatanodeInfo[] fromProtos(
-      List<HdfsProtos.DatanodeInfoProto> targetsList) {
-    DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()];
-    int i = 0;
-    for (HdfsProtos.DatanodeInfoProto proto : targetsList) {
-      ret[i++] = fromProto(proto);
-    }
-    return ret;
-  }
-
-  public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
-    return DataChecksum.Type.valueOf(type.getNumber());
-  }
-
-  public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
-    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
-  }
-
-  public static InputStream vintPrefixed(final InputStream input)
-  throws IOException {
-    final int firstByte = input.read();
-    if (firstByte == -1) {
-      throw new EOFException("Premature EOF: no length prefix available");
-    }
-    
-    int size = CodedInputStream.readRawVarint32(firstByte, input);
-    assert size >= 0;
-  
-    return new ExactSizeInputStream(input, size);
-  }
-}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;

+ 7 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java

@@ -21,12 +21,12 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -41,18 +41,16 @@ import org.apache.hadoop.util.DataChecksum;
 public abstract class DataTransferProtoUtil {
   static BlockConstructionStage fromProto(
       OpWriteBlockProto.BlockConstructionStage stage) {
-    return BlockConstructionStage.valueOf(BlockConstructionStage.class,
-        stage.name());
+    return BlockConstructionStage.valueOf(stage.name());
   }
 
   static OpWriteBlockProto.BlockConstructionStage toProto(
       BlockConstructionStage stage) {
-    return OpWriteBlockProto.BlockConstructionStage.valueOf(
-        stage.name());
+    return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
   }
 
   public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumTypeProto type = HdfsProtoUtil.toProto(checksum.getChecksumType());
+    ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
     // ChecksumType#valueOf never returns null
     return ChecksumProto.newBuilder()
       .setBytesPerChecksum(checksum.getBytesPerChecksum())
@@ -64,8 +62,7 @@ public abstract class DataTransferProtoUtil {
     if (proto == null) return null;
 
     int bytesPerChecksum = proto.getBytesPerChecksum();
-    DataChecksum.Type type = HdfsProtoUtil.fromProto(proto.getType());
-    
+    DataChecksum.Type type = PBHelper.convert(proto.getType());
     return DataChecksum.newDataChecksum(type, bytesPerChecksum);
   }
 
@@ -82,8 +79,8 @@ public abstract class DataTransferProtoUtil {
   static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
       Token<BlockTokenIdentifier> blockToken) {
     return BaseHeaderProto.newBuilder()
-      .setBlock(HdfsProtoUtil.toProto(blk))
-      .setToken(HdfsProtoUtil.toProto(blockToken))
+      .setBlock(PBHelper.convert(blk))
+      .setToken(PBHelper.convert(blockToken))
       .build();
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.IOException;
 import java.io.InputStream;

+ 18 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
 
 import java.io.DataInputStream;
@@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 
 /** Receiver */
 @InterfaceAudience.Private
@@ -85,8 +84,8 @@ public abstract class Receiver implements DataTransferProtocol {
   /** Receive OP_READ_BLOCK */
   private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-    readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
-        fromProto(proto.getHeader().getBaseHeader().getToken()),
+    readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
         proto.getOffset(),
         proto.getLen());
@@ -95,11 +94,11 @@ public abstract class Receiver implements DataTransferProtocol {
   /** Receive OP_WRITE_BLOCK */
   private void opWriteBlock(DataInputStream in) throws IOException {
     final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
-    writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
-        fromProto(proto.getHeader().getBaseHeader().getToken()),
+    writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        fromProtos(proto.getTargetsList()),
-        fromProto(proto.getSource()),
+        PBHelper.convert(proto.getTargetsList()),
+        PBHelper.convert(proto.getSource()),
         fromProto(proto.getStage()),
         proto.getPipelineSize(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
@@ -111,33 +110,33 @@ public abstract class Receiver implements DataTransferProtocol {
   private void opTransferBlock(DataInputStream in) throws IOException {
     final OpTransferBlockProto proto =
       OpTransferBlockProto.parseFrom(vintPrefixed(in));
-    transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
-        fromProto(proto.getHeader().getBaseHeader().getToken()),
+    transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        fromProtos(proto.getTargetsList()));
+        PBHelper.convert(proto.getTargetsList()));
   }
 
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-    replaceBlock(fromProto(proto.getHeader().getBlock()),
-        fromProto(proto.getHeader().getToken()),
+    replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getToken()),
         proto.getDelHint(),
-        fromProto(proto.getSource()));
+        PBHelper.convert(proto.getSource()));
   }
 
   /** Receive OP_COPY_BLOCK */
   private void opCopyBlock(DataInputStream in) throws IOException {
     OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-    copyBlock(fromProto(proto.getHeader().getBlock()),
-        fromProto(proto.getHeader().getToken()));
+    copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getToken()));
   }
 
   /** Receive OP_BLOCK_CHECKSUM */
   private void opBlockChecksum(DataInputStream in) throws IOException {
     OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
     
-    blockChecksum(fromProto(proto.getHeader().getBlock()),
-        fromProto(proto.getHeader().getToken()));
+    blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getToken()));
   }
 }

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
 
 import java.io.DataOutput;
@@ -37,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -105,7 +104,7 @@ public class Sender implements DataTransferProtocol {
 
     OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
       .setHeader(header)
-      .addAllTargets(toProtos(targets, 1))
+      .addAllTargets(PBHelper.convert(targets, 1))
       .setStage(toProto(stage))
       .setPipelineSize(pipelineSize)
       .setMinBytesRcvd(minBytesRcvd)
@@ -114,7 +113,7 @@ public class Sender implements DataTransferProtocol {
       .setRequestedChecksum(checksumProto);
     
     if (source != null) {
-      proto.setSource(toProto(source));
+      proto.setSource(PBHelper.convertDatanodeInfo(source));
     }
 
     send(out, Op.WRITE_BLOCK, proto.build());
@@ -129,7 +128,7 @@ public class Sender implements DataTransferProtocol {
     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(
           blk, clientName, blockToken))
-      .addAllTargets(toProtos(targets, 0))
+      .addAllTargets(PBHelper.convert(targets))
       .build();
 
     send(out, Op.TRANSFER_BLOCK, proto);
@@ -143,7 +142,7 @@ public class Sender implements DataTransferProtocol {
     OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
       .setDelHint(delHint)
-      .setSource(toProto(source))
+      .setSource(PBHelper.convertDatanodeInfo(source))
       .build();
     
     send(out, Op.REPLACE_BLOCK, proto);

+ 2 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -131,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.RpcController;
@@ -494,10 +492,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, GetDatanodeReportRequestProto req)
       throws ServiceException {
     try {
-      DatanodeInfoProto[] result = PBHelper.convert(server
+      List<? extends DatanodeInfoProto> result = PBHelper.convert(server
           .getDatanodeReport(PBHelper.convert(req.getType())));
       return GetDatanodeReportResponseProto.newBuilder()
-          .addAllDi(Arrays.asList(result)).build();
+          .addAllDi(result).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -282,7 +282,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     if (previous != null) 
       req.setPrevious(PBHelper.convert(previous)); 
     if (excludeNodes != null) 
-      req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
+      req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
     try {
       return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {
@@ -300,8 +300,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .newBuilder()
         .setSrc(src)
         .setBlk(PBHelper.convert(blk))
-        .addAllExistings(Arrays.asList(PBHelper.convert(existings)))
-        .addAllExcludes(Arrays.asList(PBHelper.convert(excludes)))
+        .addAllExistings(PBHelper.convert(existings))
+        .addAllExcludes(PBHelper.convert(excludes))
         .setNumAdditionalNodes(numAdditionalNodes)
         .setClientName(clientName)
         .build();

+ 53 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -40,10 +43,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
@@ -127,15 +130,20 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
 
 /**
- * Utilities for converting protobuf classes to and from implementation classes.
+ * Utilities for converting protobuf classes to and from implementation classes
+ * and other helper utilities to help in dealing with protobuf.
  * 
  * Note that when converting from an internal type to protobuf type, the
  * converter never return null for protobuf type. The check for internal type
@@ -219,7 +227,8 @@ public class PBHelper {
 
   // Arrays of DatanodeId
   public static DatanodeIDProto[] convert(DatanodeID[] did) {
-    if (did == null) return null;
+    if (did == null)
+      return null;
     final int len = did.length;
     DatanodeIDProto[] result = new DatanodeIDProto[len];
     for (int i = 0; i < len; ++i) {
@@ -482,14 +491,26 @@ public class PBHelper {
     }    
     return result;
   }
+
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+      DatanodeInfo[] dnInfos) {
+    return convert(dnInfos, 0);
+  }
   
-  static public DatanodeInfoProto[] convert(DatanodeInfo[] di) {
-    if (di == null) return null;
-    DatanodeInfoProto[] result = new DatanodeInfoProto[di.length];
-    for (int i = 0; i < di.length; i++) {
-      result[i] = PBHelper.convertDatanodeInfo(di[i]);
+  /**
+   * Copy from {@code dnInfos} to a target of list of same size starting at
+   * {@code startIdx}.
+   */
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+      DatanodeInfo[] dnInfos, int startIdx) {
+    if (dnInfos == null)
+      return null;
+    ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
+        .newArrayListWithCapacity(dnInfos.length);
+    for (int i = startIdx; i < dnInfos.length; i++) {
+      protos.add(convert(dnInfos[i]));
     }
-    return result;
+    return protos;
   }
 
   public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
@@ -694,7 +715,7 @@ public class PBHelper {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
     for (int i = 0; i < targets.length; i++) {
       ret[i] = DatanodeInfosProto.newBuilder()
-          .addAllDatanodes(Arrays.asList(PBHelper.convert(targets[i]))).build();
+          .addAllDatanodes(PBHelper.convert(targets[i])).build();
     }
     return Arrays.asList(ret);
   }
@@ -963,7 +984,7 @@ public class PBHelper {
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
-        HdfsProtoUtil.fromProto(fs.getChecksumType()));
+        PBHelper.convert(fs.getChecksumType()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -976,7 +997,7 @@ public class PBHelper {
       .setFileBufferSize(fs.getFileBufferSize())
       .setEncryptDataTransfer(fs.getEncryptDataTransfer())
       .setTrashInterval(fs.getTrashInterval())
-      .setChecksumType(HdfsProtoUtil.toProto(fs.getChecksumType()))
+      .setChecksumType(PBHelper.convert(fs.getChecksumType()))
       .build();
   }
   
@@ -1314,4 +1335,24 @@ public class PBHelper {
         .setLayoutVersion(j.getLayoutVersion())
         .setNamespaceID(j.getNamespaceId()).build();
   }
+
+  public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.getNumber());
+  }
+
+  public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
+  }
+
+  public static InputStream vintPrefixed(final InputStream input)
+      throws IOException {
+    final int firstByte = input.read();
+    if (firstByte == -1) {
+      throw new EOFException("Premature EOF: no length prefix available");
+    }
+
+    int size = CodedInputStream.readRawVarint32(firstByte, input);
+    assert size >= 0;
+    return new ExactSizeInputStream(input, size);
+  }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -18,7 +18,8 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
@@ -117,6 +116,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -1468,7 +1468,7 @@ public class DataNode extends Configured
         // read ack
         if (isClient) {
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              HdfsProtoUtil.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
             LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -42,7 +42,6 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatus
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -144,7 +144,7 @@ class DataXceiver extends Receiver implements Runnable {
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
   
-  private OutputStream getOutputStream() throws IOException {
+  private OutputStream getOutputStream() {
     return socketOut;
   }
 
@@ -284,7 +284,7 @@ class DataXceiver extends Receiver implements Runnable {
         // to respond with a Status enum.
         try {
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
-              HdfsProtoUtil.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (!stat.hasStatus()) {
             LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
                      "code after reading. Will close connection.");
@@ -445,7 +445,7 @@ class DataXceiver extends Receiver implements Runnable {
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
             BlockOpResponseProto connectAck =
-              BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
             mirrorInStatus = connectAck.getStatus();
             firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@@ -606,7 +606,7 @@ class DataXceiver extends Receiver implements Runnable {
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
+          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
           )
         .build()
         .writeDelimitedTo(out);
@@ -765,7 +765,7 @@ class DataXceiver extends Receiver implements Runnable {
       // receive the response from the proxy
       
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-          HdfsProtoUtil.vintPrefixed(proxyReply));
+          PBHelper.vintPrefixed(proxyReply));
 
       if (copyResponse.getStatus() != SUCCESS) {
         if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {

+ 0 - 42
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java

@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
-import org.apache.hadoop.util.DataChecksum;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestHdfsProtoUtil {
-  @Test
-  public void testChecksumTypeProto() {
-    assertEquals(DataChecksum.Type.NULL,
-        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
-    assertEquals(DataChecksum.Type.CRC32,
-        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
-    assertEquals(DataChecksum.Type.CRC32C,
-        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
-    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.NULL),
-        HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
-    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32),
-        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
-    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32C),
-        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
-  }
-}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -471,4 +473,20 @@ public class TestPBHelper {
       }
     }
   }
+  
+  @Test
+  public void testChecksumTypeProto() {
+    assertEquals(DataChecksum.Type.NULL,
+        PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
+    assertEquals(DataChecksum.Type.CRC32,
+        PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
+    assertEquals(DataChecksum.Type.CRC32C,
+        PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
+    assertEquals(PBHelper.convert(DataChecksum.Type.NULL),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
+    assertEquals(PBHelper.convert(DataChecksum.Type.CRC32),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
+    assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
+  }
 }