소스 검색

HDFS-11727. Block Storage: Retry Blocks should be requeued when cblock is restarted. Contributed by Mukul Kumar Singh.

Chen Liang 8 년 전
부모
커밋
5bba3ce765

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java

@@ -166,6 +166,21 @@ public final class CBlockConfigKeys {
       "dfs.cblock.cache.leveldb.cache.size.mb";
   public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
 
+  /**
+   * Cache does an best case attempt to write a block to a container.
+   * At some point of time, we will need to handle the case where we did try
+   * 64K times and is till not able to write to the container.
+   *
+   * TODO: We will need cBlock Server to allow us to do a remapping of the
+   * block location in case of failures, at that point we should reduce the
+   * retry count to a more normal number. This is approximately 18 hours of
+   * retry.
+   */
+  public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY =
+      "dfs.cblock.cache.max.retry";
+  public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
+      64 * 1024;
+
   private CBlockConfigKeys() {
 
   }

+ 22 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java

@@ -20,10 +20,12 @@ package org.apache.hadoop.cblock.jscsiHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -41,7 +43,7 @@ public class BlockWriterTask implements Runnable {
   private final ContainerCacheFlusher flusher;
   private final String dbPath;
   private final String fileName;
-  private static final String RETRY_LOG_PREFIX = "RetryLog";
+  private final int maxRetryCount;
 
   /**
    * Constructs a BlockWriterTask.
@@ -50,12 +52,13 @@ public class BlockWriterTask implements Runnable {
    * @param flusher - ContainerCacheFlusher.
    */
   public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher,
-      String dbPath, String fileName) {
+      String dbPath, int tryCount, String fileName, int maxRetryCount) {
     this.block = block;
     this.flusher = flusher;
     this.dbPath = dbPath;
-    tryCount = 0;
+    this.tryCount = tryCount;
     this.fileName = fileName;
+    this.maxRetryCount = maxRetryCount;
   }
 
   /**
@@ -73,6 +76,7 @@ public class BlockWriterTask implements Runnable {
   public void run() {
     String containerName = null;
     XceiverClientSpi client = null;
+    LevelDBStore levelDBStore = null;
     flusher.getLOG().debug(
         "Writing block to remote. block ID: {}", block.getBlockID());
     try {
@@ -83,7 +87,9 @@ public class BlockWriterTask implements Runnable {
       byte[] keybuf = Longs.toByteArray(block.getBlockID());
       byte[] data;
       long startTime = Time.monotonicNow();
-      data = flusher.getCacheDB(this.dbPath).get(keybuf);
+      levelDBStore = flusher.getCacheDB(this.dbPath);
+      data = levelDBStore.get(keybuf);
+      Preconditions.checkNotNull(data);
       long endTime = Time.monotonicNow();
       Preconditions.checkState(data.length > 0, "Block data is zero length");
       startTime = Time.monotonicNow();
@@ -99,17 +105,23 @@ public class BlockWriterTask implements Runnable {
       flusher.incrementRemoteIO();
 
     } catch (Exception ex) {
-      flusher.getLOG().error("Writing of block failed, We have attempted " +
+      flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
               "to write this block {} times to the container {}.Trace ID:{}",
-          this.getTryCount(), containerName, "", ex);
+          block.getBlockID(), this.getTryCount(), containerName, "", ex);
       writeRetryBlock(block);
       if (ex instanceof IOException) {
         flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
       } else {
         flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
       }
+      if (this.getTryCount() >= maxRetryCount) {
+        flusher.getTargetMetrics().incNumWriteMaxRetryBlocks();
+      }
     } finally {
       flusher.incFinishCount(fileName);
+      if (levelDBStore != null) {
+        flusher.releaseCacheDB(dbPath);
+      }
       if(client != null) {
         flusher.getXceiverClientManager().releaseClient(client);
       }
@@ -120,8 +132,8 @@ public class BlockWriterTask implements Runnable {
   private void writeRetryBlock(LogicalBlock currentBlock) {
     boolean append = false;
     String retryFileName =
-        String.format("%s.%d.%s", RETRY_LOG_PREFIX, currentBlock.getBlockID(),
-            Time.monotonicNow());
+        String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX,
+            currentBlock.getBlockID(), Time.monotonicNow(), tryCount);
     File logDir = new File(this.dbPath);
     if (!logDir.exists() && !logDir.mkdirs()) {
       flusher.getLOG().error(
@@ -131,12 +143,14 @@ public class BlockWriterTask implements Runnable {
     String log = Paths.get(this.dbPath, retryFileName).toString();
     ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
     buffer.putLong(currentBlock.getBlockID());
+    buffer.flip();
     try {
       FileChannel channel = new FileOutputStream(log, append).getChannel();
       channel.write(buffer);
       channel.close();
       flusher.processDirtyBlocks(this.dbPath, retryFileName);
     } catch (IOException e) {
+      flusher.getTargetMetrics().incNumFailedRetryLogFileWrites();
       flusher.getLOG().error("Unable to write the retry block. Block ID: {}",
           currentBlock.getBlockID(), e);
     }

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java

@@ -48,6 +48,8 @@ public class CBlockTargetMetrics {
   @Metric private MutableCounterLong numBlockBufferFlushCompleted;
   @Metric private MutableCounterLong numBlockBufferFlushTriggered;
   @Metric private MutableCounterLong numBlockBufferUpdates;
+  @Metric private MutableCounterLong numRetryLogBlockRead;
+  @Metric private MutableCounterLong numBytesRetryLogRead;
 
   // Failure Metrics
   @Metric private MutableCounterLong numReadLostBlocks;
@@ -59,6 +61,9 @@ public class CBlockTargetMetrics {
   @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
   @Metric private MutableCounterLong numFailedBlockBufferFlushes;
   @Metric private MutableCounterLong numInterruptedBufferWaits;
+  @Metric private MutableCounterLong numFailedRetryLogFileWrites;
+  @Metric private MutableCounterLong numWriteMaxRetryBlocks;
+  @Metric private MutableCounterLong numFailedReleaseLevelDB;
 
   // Latency based Metrics
   @Metric private MutableRate dbReadLatency;
@@ -138,6 +143,14 @@ public class CBlockTargetMetrics {
     numBlockBufferUpdates.incr();
   }
 
+  public void incNumRetryLogBlockRead() {
+    numRetryLogBlockRead.incr();
+  }
+
+  public void incNumBytesRetryLogRead(int bytes) {
+    numBytesRetryLogRead.incr(bytes);
+  }
+
   public void incNumBytesDirtyLogWritten(int bytes) {
     numBytesDirtyLogWritten.incr(bytes);
   }
@@ -158,6 +171,18 @@ public class CBlockTargetMetrics {
     numFailedDirtyLogFileDeletes.incr();
   }
 
+  public void incNumFailedRetryLogFileWrites() {
+    numFailedRetryLogFileWrites.incr();
+  }
+
+  public void incNumWriteMaxRetryBlocks() {
+    numWriteMaxRetryBlocks.incr();
+  }
+
+  public void incNumFailedReleaseLevelDB() {
+    numFailedReleaseLevelDB.incr();
+  }
+
   public void updateDBReadLatency(long latency) {
     dbReadLatency.add(latency);
   }
@@ -257,6 +282,16 @@ public class CBlockTargetMetrics {
     return numBlockBufferUpdates.value();
   }
 
+  @VisibleForTesting
+  public long getNumRetryLogBlockRead() {
+    return numRetryLogBlockRead.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBytesRetryLogReads() {
+    return numBytesRetryLogRead.value();
+  }
+
   @VisibleForTesting
   public long getNumBytesDirtyLogWritten() {
     return numBytesDirtyLogWritten.value();
@@ -281,4 +316,19 @@ public class CBlockTargetMetrics {
   public long getNumFailedDirtyLogFileDeletes() {
     return numFailedDirtyLogFileDeletes.value();
   }
+
+  @VisibleForTesting
+  public long getNumFailedRetryLogFileWrites() {
+    return numFailedRetryLogFileWrites.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteMaxRetryBlocks() {
+    return numWriteMaxRetryBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedReleaseLevelDB() {
+    return numFailedReleaseLevelDB.value();
+  }
 }

+ 49 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.cblock.jscsiHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
 import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -100,6 +102,7 @@ public class ContainerCacheFlusher implements Runnable {
   private final CBlockTargetMetrics metrics;
   private AtomicBoolean shutdown;
   private final long levelDBCacheSize;
+  private final int maxRetryCount;
 
   private final ConcurrentMap<String, FinishCounter> finishCountMap;
 
@@ -152,28 +155,33 @@ public class ContainerCacheFlusher implements Runnable {
     this.remoteIO = new AtomicLong();
 
     this.finishCountMap = new ConcurrentHashMap<>();
+    this.maxRetryCount =
+        config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY,
+            CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT);
   }
 
-  private void checkExistingDirtyLog(File dbPath) {
+  private void checkExistingLog(String prefixFileName, File dbPath) {
     if (!dbPath.exists()) {
       LOG.debug("No existing dirty log found at {}", dbPath);
       return;
     }
     LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
     HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
-    traverse(dbPath, allFiles);
+    traverse(prefixFileName, dbPath, allFiles);
     for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
       String parentPath = entry.getKey();
       for (String fileName : entry.getValue()) {
-        LOG.info("found this {} with {}", parentPath, fileName);
+        LOG.info("found {} {} with prefix {}",
+            parentPath, fileName, prefixFileName);
         processDirtyBlocks(parentPath, fileName);
       }
     }
   }
 
-  private void traverse(File path, HashMap<String, ArrayList<String>> files) {
+  private void traverse(String prefixFileName, File path,
+                        HashMap<String, ArrayList<String>> files) {
     if (path.isFile()) {
-      if (path.getName().startsWith("DirtyLog")) {
+      if (path.getName().startsWith(prefixFileName)) {
         LOG.debug("found this {} with {}", path.getParent(), path.getName());
         if (!files.containsKey(path.getParent())) {
           files.put(path.getParent(), new ArrayList<>());
@@ -184,7 +192,7 @@ public class ContainerCacheFlusher implements Runnable {
       File[] listFiles = path.listFiles();
       if (listFiles != null) {
         for (File subPath : listFiles) {
-          traverse(subPath, files);
+          traverse(prefixFileName, subPath, files);
         }
       }
     }
@@ -274,19 +282,28 @@ public class ContainerCacheFlusher implements Runnable {
   public void register(String dbPath, Pipeline[] containerList) {
     File dbFile = Paths.get(dbPath).toFile();
     pipelineMap.put(dbPath, containerList);
-    checkExistingDirtyLog(dbFile);
+    checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile);
+    checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile);
   }
 
   private String getDBFileName(String dbPath) {
     return dbPath + ".db";
   }
 
-  LevelDBStore getCacheDB(String dbPath) {
-    return dbMap.get(dbPath).db;
+  public LevelDBStore getCacheDB(String dbPath) throws IOException {
+    return openDB(dbPath);
   }
 
+  public void releaseCacheDB(String dbPath) {
+    try {
+      closeDB(dbPath);
+    } catch (Exception e) {
+      metrics.incNumFailedReleaseLevelDB();
+      LOG.error("LevelDB close failed, dbPath:" + dbPath, e);
+    }
+  }
   /**
-   * Close the DB if we don't have any outstanding refrences.
+   * Close the DB if we don't have any outstanding references.
    *
    * @param dbPath - dbPath
    * @throws IOException
@@ -348,18 +365,28 @@ public class ContainerCacheFlusher implements Runnable {
             message.getDbPath(), message.getFileName());
         String fullPath = Paths.get(message.getDbPath(),
             message.getFileName()).toString();
+        String[] fileNameParts = message.getFileName().split("\\.");
+        Preconditions.checkState(fileNameParts.length > 1);
+        String fileType = fileNameParts[0];
+        boolean isDirtyLogFile =
+            fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX);
         ReadableByteChannel fileChannel = new FileInputStream(fullPath)
             .getChannel();
         // TODO: We can batch and unique the IOs here. First getting the code
         // to work, we will add those later.
         int bytesRead = fileChannel.read(blockIDBuffer);
+        fileChannel.close();
         LOG.debug("Read blockID log of size: {} position {} remaining {}",
             bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining());
         // current position of in the buffer in bytes, divided by number of
         // bytes per long (which is calculated by number of bits per long
         // divided by number of bits per byte) gives the number of blocks
         int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
-        getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
+        if (isDirtyLogFile) {
+          getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
+        } else {
+          getTargetMetrics().incNumBytesRetryLogRead(bytesRead);
+        }
         if (finishCountMap.containsKey(message.getFileName())) {
           // In theory this should never happen. But if it happened,
           // we need to know it...
@@ -375,14 +402,22 @@ public class ContainerCacheFlusher implements Runnable {
         // should be flip instead of rewind, because we also need to make sure
         // the end position is correct.
         blockIDBuffer.flip();
-        LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
+        LOG.info("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
             blockCount);
         while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
-          getTargetMetrics().incNumDirtyLogBlockRead();
           long blockID = blockIDBuffer.getLong();
+          int retryCount = 0;
+          if (isDirtyLogFile) {
+            getTargetMetrics().incNumDirtyLogBlockRead();
+          } else {
+            getTargetMetrics().incNumRetryLogBlockRead();
+            Preconditions.checkState(fileNameParts.length == 4);
+            retryCount = Integer.parseInt(fileNameParts[3]);
+          }
           LogicalBlock block = new DiskBlock(blockID, null, false);
           BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
-              message.getDbPath(), message.getFileName());
+              message.getDbPath(), retryCount, message.getFileName(),
+              maxRetryCount);
           threadPoolExecutor.submit(blockWriterTask);
         }
         blockIDBuffer.clear();
@@ -491,7 +526,6 @@ public class ContainerCacheFlusher implements Runnable {
       this.currentCount = new AtomicLong(0);
       this.fileDeleted = new AtomicBoolean(false);
       this.flusher = flusher;
-      this.flusher.openDB(dbPath);
     }
 
     public boolean isFileDeleted() {
@@ -505,7 +539,6 @@ public class ContainerCacheFlusher implements Runnable {
         LOG.debug(
             "Deleting {} with count {} {}", filePath, count, expectedCount);
         try {
-          flusher.closeDB(dbPath);
           Path path = Paths.get(filePath);
           Files.delete(path);
           // the following part tries to remove the directory if it is empty

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java

@@ -69,6 +69,7 @@ public class AsyncBlockWriter {
   private final CBlockLocalCache parentCache;
   private final BlockBufferManager blockBufferManager;
   public final static String DIRTY_LOG_PREFIX = "DirtyLog";
+  public static final String RETRY_LOG_PREFIX = "RetryLog";
   private AtomicLong localIoCount;
 
   /**

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java

@@ -157,10 +157,9 @@ public class CBlockLocalCache implements CacheModule {
       throw new IllegalArgumentException("Unable to create paths. Path: " +
           dbPath);
     }
-    cacheDB = flusher.openDB(dbPath.toString());
+    cacheDB = flusher.getCacheDB(dbPath.toString());
     this.containerList = containerPipelines.toArray(new
         Pipeline[containerPipelines.size()]);
-    flusher.register(dbPath.toString(), containerList);
     this.ipAddressString = getHostIP();
     this.tracePrefix = ipAddressString + ":" + this.volumeName;
     this.volumeSize = volumeSize;
@@ -298,6 +297,7 @@ public class CBlockLocalCache implements CacheModule {
 
   @Override
   public void start() throws IOException {
+    flusher.register(getDbPath().getPath(), containerList);
     blockWriter.start();
   }
 
@@ -309,7 +309,7 @@ public class CBlockLocalCache implements CacheModule {
   public void close() throws IOException {
     blockReader.shutdown();
     blockWriter.shutdown();
-    this.flusher.closeDB(dbPath.toString());
+    this.flusher.releaseCacheDB(dbPath.toString());
     if (this.traceEnabled) {
       getTracer().info("Task=ShutdownCache");
     }
@@ -593,7 +593,7 @@ public class CBlockLocalCache implements CacheModule {
             "relies on private data on the pipeline, null data found.");
       }
 
-      Preconditions.checkNotNull(clientManager, "Client Manager canoot be " +
+      Preconditions.checkNotNull(clientManager, "Client Manager cannot be " +
           "null");
       Preconditions.checkState(blockSize > 0, " Block size has to be a " +
           "number greater than 0");

+ 95 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java

@@ -62,6 +62,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
 import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
 
 /**
  * Tests for Tests for local cache.
@@ -518,7 +522,7 @@ public class TestLocalBlockCache {
     flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
     flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
     flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
-
+    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
     XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
     String userName = "user" + RandomStringUtils.randomNumeric(4);
@@ -561,9 +565,11 @@ public class TestLocalBlockCache {
     Assert.assertEquals(512, metrics.getNumWriteOps());
     Thread.sleep(5000);
     flusher.shutdown();
+    Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
     Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
     Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
-
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
     // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
     flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
     CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
@@ -594,4 +600,91 @@ public class TestLocalBlockCache {
     newCache.close();
     newFlusher.shutdown();
   }
+
+  @Test
+  public void testRetryLog() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
+
+    int numblocks = 10;
+    flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+
+    List<Pipeline> fakeContainerPipelines = new LinkedList<>();
+    Pipeline fakePipeline = new Pipeline("fake");
+    fakePipeline.setData(Longs.toByteArray(1));
+    fakeContainerPipelines.add(fakePipeline);
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(fakeContainerPipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+
+    for (int i = 0; i < numblocks; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(numblocks, metrics.getNumWriteOps());
+    Thread.sleep(3000);
+
+    // all the writes to the container will fail because of fake pipelines
+    Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
+    Assert.assertTrue(
+        metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
+    cache.close();
+    flusher.shutdown();
+
+    // restart cache with correct pipelines, now blocks should be uploaded
+    // correctly
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig,
+            xceiverClientManager, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread newFlushListenerThread = new Thread(newFlusher);
+    newFlushListenerThread.setDaemon(true);
+    newFlushListenerThread.start();
+    Thread.sleep(3000);
+    Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
+    Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
+  }
 }