Просмотр исходного кода

HDFS-11835. Block Storage: Overwrite of blocks fails. Contributed by Mukul Kumar Singh.

Chen Liang 8 лет назад
Родитель
Сommit
914ceb2587

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ReadContainerResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ReadContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 
 import java.io.IOException;
@@ -199,11 +200,14 @@ public final class ContainerProtocolCalls {
         .setPipeline(client.getPipeline().getProtobufMessage())
         .setKeyData(containerKeyData);
 
+    KeyValue keyValue = KeyValue.newBuilder()
+        .setKey("OverWriteRequested").setValue("true").build();
     ChunkInfo chunk = ChunkInfo
         .newBuilder()
         .setChunkName(key + "_chunk")
         .setOffset(0)
         .setLen(data.length)
+        .addMetadata(keyValue)
         .build();
 
     PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java

@@ -391,4 +391,59 @@ public class TestBufferManager {
     Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
     Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
   }
+
+  @Test
+  public void testRepeatedBlockWrites() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    Thread fllushListenerThread = new Thread(flusher);
+    fllushListenerThread.setDaemon(true);
+    fllushListenerThread.start();
+    cache.start();
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(512, metrics.getNumWriteOps());
+    Assert.assertEquals(512, metrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Thread.sleep(5000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+
+
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(1024, metrics.getNumWriteOps());
+    Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+
+    Thread.sleep(5000);
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted());
+  }
 }