فهرست منبع

svn merge -c 1240653 from trunk for HDFS-2880.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1241595 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 سال پیش
والد
کامیت
9de9d30160

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -41,8 +41,8 @@ Release 0.23-PB - Unreleased
     HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings 
     HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings 
     protocol to protocol buffers. (jitendra)
     protocol to protocol buffers. (jitendra)
 
 
-    HDFS-2801. Provide a method in client side translators to check for a 
-    methods supported in underlying protocol. (jitendra)
+    HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages.
+    (suresh)
 
 
   IMPROVEMENTS
   IMPROVEMENTS
 
 
@@ -82,6 +82,9 @@ Release 0.23-PB - Unreleased
 
 
     HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
     HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
 
 
+    HDFS-2801. Provide a method in client side translators to check for a 
+    methods supported in underlying protocol. (jitendra)
+
   BUG FIXES
   BUG FIXES
  
  
     HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay)
     HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay)

+ 27 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -46,6 +46,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgra
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -169,11 +172,16 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xmitsInProgress, int xceiverCount, int failedVolumes)
       int xmitsInProgress, int xceiverCount, int failedVolumes)
       throws IOException {
       throws IOException {
-    HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
-        .setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
+    StorageReportProto report = StorageReportProto.newBuilder()
+        .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity)
         .setDfsUsed(dfsUsed).setRemaining(remaining)
         .setDfsUsed(dfsUsed).setRemaining(remaining)
-        .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
-        .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
+        .setStorageID(registration.getStorageID()).build();
+    
+    HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(registration)).addReports(report)
+        .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
+        .setFailedVolumes(failedVolumes)
+        .build();
     HeartbeatResponseProto resp;
     HeartbeatResponseProto resp;
     try {
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
@@ -192,15 +200,17 @@ public class DatanodeProtocolClientSideTranslatorPB implements
   @Override
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
   public DatanodeCommand blockReport(DatanodeRegistration registration,
       String poolId, long[] blocks) throws IOException {
       String poolId, long[] blocks) throws IOException {
-    BlockReportRequestProto.Builder builder = BlockReportRequestProto
-        .newBuilder().setRegistration(PBHelper.convert(registration))
-        .setBlockPoolId(poolId);
+    StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
+        .newBuilder().setStorageID(registration.getStorageID());
+    
     if (blocks != null) {
     if (blocks != null) {
       for (int i = 0; i < blocks.length; i++) {
       for (int i = 0; i < blocks.length; i++) {
-        builder.addBlocks(blocks[i]);
+        reportBuilder.addBlocks(blocks[i]);
       }
       }
     }
     }
-    BlockReportRequestProto req = builder.build();
+    BlockReportRequestProto req = BlockReportRequestProto
+        .newBuilder().setRegistration(PBHelper.convert(registration))
+        .setBlockPoolId(poolId).addReports(reportBuilder.build()).build();
     BlockReportResponseProto resp;
     BlockReportResponseProto resp;
     try {
     try {
       resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
       resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
@@ -211,19 +221,21 @@ public class DatanodeProtocolClientSideTranslatorPB implements
   }
   }
 
 
   @Override
   @Override
-  public void blockReceivedAndDeleted(DatanodeRegistration registration,
+  public void blockReceivedAndDeleted(DatanodeRegistration reg,
       String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
       String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
       throws IOException {
       throws IOException {
-    BlockReceivedAndDeletedRequestProto.Builder builder = 
-        BlockReceivedAndDeletedRequestProto.newBuilder()
-        .setRegistration(PBHelper.convert(registration))
-        .setBlockPoolId(poolId);
+    StorageReceivedDeletedBlocksProto.Builder builder = 
+        StorageReceivedDeletedBlocksProto.newBuilder()
+        .setStorageID(reg.getStorageID());
     if (receivedAndDeletedBlocks != null) {
     if (receivedAndDeletedBlocks != null) {
       for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
       for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
         builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
         builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
       }
       }
     }
     }
-    BlockReceivedAndDeletedRequestProto req = builder.build();
+    BlockReceivedAndDeletedRequestProto req = 
+        BlockReceivedAndDeletedRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(reg))
+        .setBlockPoolId(poolId).addBlocks(builder.build()).build();
     try {
     try {
       rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
       rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
     } catch (ServiceException se) {
     } catch (ServiceException se) {

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportR
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterData
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
@@ -98,9 +98,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       HeartbeatRequestProto request) throws ServiceException {
       HeartbeatRequestProto request) throws ServiceException {
     DatanodeCommand[] cmds = null;
     DatanodeCommand[] cmds = null;
     try {
     try {
+      StorageReportProto report = request.getReports(0);
       cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
       cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
-          request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
-          request.getBlockPoolUsed(), request.getXmitsInProgress(),
+          report.getCapacity(), report.getDfsUsed(), report.getRemaining(),
+          report.getBlockPoolUsed(), request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes());
           request.getXceiverCount(), request.getFailedVolumes());
     } catch (IOException e) {
     } catch (IOException e) {
       throw new ServiceException(e);
       throw new ServiceException(e);
@@ -121,7 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   public BlockReportResponseProto blockReport(RpcController controller,
   public BlockReportResponseProto blockReport(RpcController controller,
       BlockReportRequestProto request) throws ServiceException {
       BlockReportRequestProto request) throws ServiceException {
     DatanodeCommand cmd = null;
     DatanodeCommand cmd = null;
-    List<Long> blockIds = request.getBlocksList();
+    List<Long> blockIds = request.getReports(0).getBlocksList();
     long[] blocks = new long[blockIds.size()];
     long[] blocks = new long[blockIds.size()];
     for (int i = 0; i < blockIds.size(); i++) {
     for (int i = 0; i < blockIds.size(); i++) {
       blocks[i] = blockIds.get(i);
       blocks[i] = blockIds.get(i);
@@ -144,7 +145,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
   public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
       RpcController controller, BlockReceivedAndDeletedRequestProto request)
       RpcController controller, BlockReceivedAndDeletedRequestProto request)
       throws ServiceException {
       throws ServiceException {
-    List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocksList();
+    List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocks(0)
+        .getBlocksList();
     ReceivedDeletedBlockInfo[] info = 
     ReceivedDeletedBlockInfo[] info = 
         new ReceivedDeletedBlockInfo[rdbip.size()];
         new ReceivedDeletedBlockInfo[rdbip.size()];
     for (int i = 0; i < rdbip.size(); i++) {
     for (int i = 0; i < rdbip.size(); i++) {

+ 45 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -35,6 +35,19 @@ message DatanodeRegistrationProto {
   required ExportedBlockKeysProto keys = 3;   // Block keys
   required ExportedBlockKeysProto keys = 3;   // Block keys
 }
 }
 
 
+/**
+ * Represents a storage available on the datanode
+ */
+message DatanodeStorageProto {
+  enum StorageState {
+    NORMAL = 0;
+    READ_ONLY = 1;
+  }
+
+  required string storageID = 1;    // Unique identifier for the storage
+  optional StorageState state = 2 [default = NORMAL];
+}
+
 /**
 /**
  * Commands sent from namenode to the datanodes
  * Commands sent from namenode to the datanodes
  */
  */
@@ -136,6 +149,7 @@ message UpgradeCommandProto {
  */
  */
 message RegisterDatanodeRequestProto {
 message RegisterDatanodeRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
   required DatanodeRegistrationProto registration = 1; // Datanode info
+  repeated DatanodeStorageProto storages = 2; // Storages on the datanode
 }
 }
 
 
 /**
 /**
@@ -159,13 +173,19 @@ message RegisterDatanodeResponseProto {
  */
  */
 message HeartbeatRequestProto {
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
   required DatanodeRegistrationProto registration = 1; // Datanode info
-  required uint64 capacity = 2;
-  required uint64 dfsUsed = 3;
-  required uint64 remaining = 4;
-  required uint64 blockPoolUsed = 5;
-  required uint32 xmitsInProgress = 6;
-  required uint32 xceiverCount = 7;
-  required uint32 failedVolumes = 8;
+  repeated StorageReportProto reports = 2;
+  optional uint32 xmitsInProgress = 3 [ default = 0 ];
+  optional uint32 xceiverCount = 4 [ default = 0 ];
+  optional uint32 failedVolumes = 5 [ default = 0 ];
+}
+
+message StorageReportProto {
+  required string storageID = 1;
+  optional bool failed = 2 [ default = false ];
+  optional uint64 capacity = 3 [ default = 0 ];
+  optional uint64 dfsUsed = 4 [ default = 0 ];
+  optional uint64 remaining = 5 [ default = 0 ];
+  optional uint64 blockPoolUsed = 6 [ default = 0 ];
 }
 }
 
 
 /**
 /**
@@ -185,7 +205,15 @@ message HeartbeatResponseProto {
 message BlockReportRequestProto {
 message BlockReportRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
   required string blockPoolId = 2;
-  repeated uint64 blocks = 3 [packed=true];
+  repeated StorageBlockReportProto reports = 3;
+}
+
+/**
+ * Report of blocks in a storage
+ */
+message StorageBlockReportProto {
+  required string storageID = 1;    // Storage ID
+  repeated uint64 blocks = 2 [packed=true];
 }
 }
 
 
 /**
 /**
@@ -207,6 +235,14 @@ message ReceivedDeletedBlockInfoProto {
   optional string deleteHint = 2;
   optional string deleteHint = 2;
 }
 }
 
 
+/**
+ * List of blocks received and deleted for a storage.
+ */
+message StorageReceivedDeletedBlocksProto {
+  required string storageID = 1;
+  repeated ReceivedDeletedBlockInfoProto blocks = 2;
+}
+
 /**
 /**
  * registration - datanode registration information
  * registration - datanode registration information
  * blockPoolID  - block pool ID of the reported blocks
  * blockPoolID  - block pool ID of the reported blocks
@@ -215,7 +251,7 @@ message ReceivedDeletedBlockInfoProto {
 message BlockReceivedAndDeletedRequestProto {
 message BlockReceivedAndDeletedRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
   required string blockPoolId = 2;
-  repeated ReceivedDeletedBlockInfoProto blocks = 3;
+  repeated StorageReceivedDeletedBlocksProto blocks = 3;
 }
 }
 
 
 /**
 /**