Browse Source

HDDS-246. Datanode should throw BlockNotCommittedException for uncommitted blocks to Ozone Client. Contributed by Shashikant Banerjee.

Mukul Kumar Singh 7 years ago
parent
commit
6b038f82da

+ 1 - 0
hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto

@@ -138,6 +138,7 @@ enum Result {
   CONTAINER_FILES_CREATE_ERROR = 32;
   CONTAINER_CHECKSUM_ERROR = 33;
   UNKNOWN_CONTAINER_TYPE = 34;
+  BLOCK_NOT_COMMITTED = 35;
 }
 
 /**

+ 12 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java

@@ -129,6 +129,18 @@ public class OpenContainerBlockMap {
         -> blocks.removeAndGetSize(blockID.getLocalID()) == 0? null: blocks);
   }
 
+  /**
+   * Returns true if the block exists in the map, false otherwise
+   *
+   * @param blockID
+   * @return True, if it exists, false otherwise
+   */
+  public boolean checkIfBlockExists(BlockID blockID) {
+    KeyDataMap keyDataMap = containers.get(blockID.getContainerID());
+    return keyDataMap == null ? false :
+        keyDataMap.get(blockID.getLocalID()) != null;
+  }
+
   @VisibleForTesting
   KeyDataMap getKeyDataMap(long containerId) {
     return containers.get(containerId);

+ 9 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -91,6 +91,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.GET_SMALL_FILE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.PUT_SMALL_FILE_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.BLOCK_NOT_COMMITTED;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Stage;
@@ -494,10 +496,14 @@ public class KeyValueHandler extends Handler {
 
     long blockLength;
     try {
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getGetCommittedBlockLength().getBlockID());
+      BlockID blockID = BlockID
+          .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID());
+      // Check if it really exists in the openContainerBlockMap
+      if (openContainerBlockMap.checkIfBlockExists(blockID)) {
+        String msg = "Block " + blockID + " is not committed yet.";
+        throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED);
+      }
       blockLength = keyManager.getCommittedBlockLength(kvContainer, blockID);
-
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ex) {

+ 35 - 10
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java

@@ -104,16 +104,6 @@ public class TestCommittedBlockLengthAPI {
             .getWriteChunkRequest(container.getPipeline(), blockID,
                 data.length);
     client.sendCommand(writeChunkRequest);
-    try {
-      // since there is neither explicit putKey request made for the block,
-      // nor the container is closed, GetCommittedBlockLength request
-      // should fail here.
-      response = ContainerProtocolCalls
-          .getCommittedBlockLength(client, blockID, traceID);
-      Assert.fail("Expected exception not thrown");
-    } catch (StorageContainerException sce) {
-      Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
-    }
     // Now, explicitly make a putKey request for the block.
     ContainerProtos.ContainerCommandRequestProto putKeyRequest =
         ContainerTestHelper
@@ -188,4 +178,39 @@ public class TestCommittedBlockLengthAPI {
     }
     xceiverClientManager.releaseClient(client);
   }
+
+  @Test
+  public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    XceiverClientSpi client = xceiverClientManager
+        .acquireClient(container.getPipeline(), containerID);
+    ContainerProtocolCalls
+        .createContainer(client, containerID, traceID);
+
+    BlockID blockID =
+        ContainerTestHelper.getTestBlockID(containerID);
+    ContainerProtos.ContainerCommandRequestProto requestProto =
+        ContainerTestHelper
+            .getWriteChunkRequest(container.getPipeline(), blockID, 1024);
+    client.sendCommand(requestProto);
+    try {
+      ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
+      Assert.fail("Expected Exception not thrown");
+    } catch (StorageContainerException sce) {
+      Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
+          sce.getResult());
+    }
+    // now close the container, it should auto commit pending open blocks
+    ContainerProtocolCalls
+        .closeContainer(client, containerID, traceID);
+    ContainerProtos.GetCommittedBlockLengthResponseProto response =
+        ContainerProtocolCalls
+            .getCommittedBlockLength(client, blockID, traceID);
+    Assert.assertTrue(response.getBlockLength() == 1024);
+    xceiverClientManager.releaseClient(client);
+  }
 }