Browse Source

HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages. Contributed by Suresh Srinivas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1240653 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 13 years ago
parent
commit
f88574acde

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -45,6 +45,9 @@ Trunk (unreleased changes)
     HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings 
     protocol to protocol buffers. (jitendra)
 
+    HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages.
+    (suresh)
+
   IMPROVEMENTS
 
     HADOOP-7524 Change RPC to allow multiple protocols including multuple 

+ 27 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -46,6 +46,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgra
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -169,11 +172,16 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xmitsInProgress, int xceiverCount, int failedVolumes)
       throws IOException {
-    HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
-        .setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
+    StorageReportProto report = StorageReportProto.newBuilder()
+        .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity)
         .setDfsUsed(dfsUsed).setRemaining(remaining)
-        .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
-        .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
+        .setStorageID(registration.getStorageID()).build();
+    
+    HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(registration)).addReports(report)
+        .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
+        .setFailedVolumes(failedVolumes)
+        .build();
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
@@ -192,15 +200,17 @@ public class DatanodeProtocolClientSideTranslatorPB implements
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
       String poolId, long[] blocks) throws IOException {
-    BlockReportRequestProto.Builder builder = BlockReportRequestProto
-        .newBuilder().setRegistration(PBHelper.convert(registration))
-        .setBlockPoolId(poolId);
+    StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
+        .newBuilder().setStorageID(registration.getStorageID());
+    
     if (blocks != null) {
       for (int i = 0; i < blocks.length; i++) {
-        builder.addBlocks(blocks[i]);
+        reportBuilder.addBlocks(blocks[i]);
       }
     }
-    BlockReportRequestProto req = builder.build();
+    BlockReportRequestProto req = BlockReportRequestProto
+        .newBuilder().setRegistration(PBHelper.convert(registration))
+        .setBlockPoolId(poolId).addReports(reportBuilder.build()).build();
     BlockReportResponseProto resp;
     try {
       resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
@@ -211,19 +221,21 @@ public class DatanodeProtocolClientSideTranslatorPB implements
   }
 
   @Override
-  public void blockReceivedAndDeleted(DatanodeRegistration registration,
+  public void blockReceivedAndDeleted(DatanodeRegistration reg,
       String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
       throws IOException {
-    BlockReceivedAndDeletedRequestProto.Builder builder = 
-        BlockReceivedAndDeletedRequestProto.newBuilder()
-        .setRegistration(PBHelper.convert(registration))
-        .setBlockPoolId(poolId);
+    StorageReceivedDeletedBlocksProto.Builder builder = 
+        StorageReceivedDeletedBlocksProto.newBuilder()
+        .setStorageID(reg.getStorageID());
     if (receivedAndDeletedBlocks != null) {
       for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
         builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
       }
     }
-    BlockReceivedAndDeletedRequestProto req = builder.build();
+    BlockReceivedAndDeletedRequestProto req = 
+        BlockReceivedAndDeletedRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(reg))
+        .setBlockPoolId(poolId).addBlocks(builder.build()).build();
     try {
       rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
     } catch (ServiceException se) {

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportR
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterData
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
@@ -98,9 +98,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       HeartbeatRequestProto request) throws ServiceException {
     DatanodeCommand[] cmds = null;
     try {
+      StorageReportProto report = request.getReports(0);
       cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
-          request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
-          request.getBlockPoolUsed(), request.getXmitsInProgress(),
+          report.getCapacity(), report.getDfsUsed(), report.getRemaining(),
+          report.getBlockPoolUsed(), request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes());
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -121,7 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   public BlockReportResponseProto blockReport(RpcController controller,
       BlockReportRequestProto request) throws ServiceException {
     DatanodeCommand cmd = null;
-    List<Long> blockIds = request.getBlocksList();
+    List<Long> blockIds = request.getReports(0).getBlocksList();
     long[] blocks = new long[blockIds.size()];
     for (int i = 0; i < blockIds.size(); i++) {
       blocks[i] = blockIds.get(i);
@@ -144,7 +145,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
       RpcController controller, BlockReceivedAndDeletedRequestProto request)
       throws ServiceException {
-    List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocksList();
+    List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocks(0)
+        .getBlocksList();
     ReceivedDeletedBlockInfo[] info = 
         new ReceivedDeletedBlockInfo[rdbip.size()];
     for (int i = 0; i < rdbip.size(); i++) {

+ 45 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -35,6 +35,19 @@ message DatanodeRegistrationProto {
   required ExportedBlockKeysProto keys = 3;   // Block keys
 }
 
+/**
+ * Represents a storage available on the datanode
+ */
+message DatanodeStorageProto {
+  enum StorageState {
+    NORMAL = 0;
+    READ_ONLY = 1;
+  }
+
+  required string storageID = 1;    // Unique identifier for the storage
+  optional StorageState state = 2 [default = NORMAL];
+}
+
 /**
  * Commands sent from namenode to the datanodes
  */
@@ -136,6 +149,7 @@ message UpgradeCommandProto {
  */
 message RegisterDatanodeRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
+  repeated DatanodeStorageProto storages = 2; // Storages on the datanode
 }
 
 /**
@@ -159,13 +173,19 @@ message RegisterDatanodeResponseProto {
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
-  required uint64 capacity = 2;
-  required uint64 dfsUsed = 3;
-  required uint64 remaining = 4;
-  required uint64 blockPoolUsed = 5;
-  required uint32 xmitsInProgress = 6;
-  required uint32 xceiverCount = 7;
-  required uint32 failedVolumes = 8;
+  repeated StorageReportProto reports = 2;
+  optional uint32 xmitsInProgress = 3 [ default = 0 ];
+  optional uint32 xceiverCount = 4 [ default = 0 ];
+  optional uint32 failedVolumes = 5 [ default = 0 ];
+}
+
+message StorageReportProto {
+  required string storageID = 1;
+  optional bool failed = 2 [ default = false ];
+  optional uint64 capacity = 3 [ default = 0 ];
+  optional uint64 dfsUsed = 4 [ default = 0 ];
+  optional uint64 remaining = 5 [ default = 0 ];
+  optional uint64 blockPoolUsed = 6 [ default = 0 ];
 }
 
 /**
@@ -185,7 +205,15 @@ message HeartbeatResponseProto {
 message BlockReportRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
-  repeated uint64 blocks = 3 [packed=true];
+  repeated StorageBlockReportProto reports = 3;
+}
+
+/**
+ * Report of blocks in a storage
+ */
+message StorageBlockReportProto {
+  required string storageID = 1;    // Storage ID
+  repeated uint64 blocks = 2 [packed=true];
 }
 
 /**
@@ -207,6 +235,14 @@ message ReceivedDeletedBlockInfoProto {
   optional string deleteHint = 2;
 }
 
+/**
+ * List of blocks received and deleted for a storage.
+ */
+message StorageReceivedDeletedBlocksProto {
+  required string storageID = 1;
+  repeated ReceivedDeletedBlockInfoProto blocks = 2;
+}
+
 /**
  * registration - datanode registration information
  * blockPoolID  - block pool ID of the reported blocks
@@ -215,7 +251,7 @@ message ReceivedDeletedBlockInfoProto {
 message BlockReceivedAndDeletedRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
-  repeated ReceivedDeletedBlockInfoProto blocks = 3;
+  repeated StorageReceivedDeletedBlocksProto blocks = 3;
 }
 
 /**