瀏覽代碼

HDFS-6978. Directory scanner should correctly reconcile blocks on RAM disk. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
arp 10 年之前
父節點
當前提交
14edbc9419

+ 17 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -83,6 +83,7 @@ public class DirectoryScanner implements Runnable {
     long missingBlockFile = 0;
     long missingMemoryBlocks = 0;
     long mismatchBlocks = 0;
+    long duplicateBlocks = 0;
     
     public Stats(String bpid) {
       this.bpid = bpid;
@@ -440,7 +441,7 @@ public class DirectoryScanner implements Runnable {
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+          FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
           ScanInfo info = blockpoolReport[Math.min(
               d, blockpoolReport.length - 1)];
           if (info.getBlockId() < memBlock.getBlockId()) {
@@ -468,9 +469,23 @@ public class DirectoryScanner implements Runnable {
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
             addDifference(diffRecord, statsRecord, info);
+          } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
+            // volumeMap record and on-disk files don't match.
+            statsRecord.duplicateBlocks++;
+            addDifference(diffRecord, statsRecord, info);
           }
           d++;
-          m++;
+
+          if (d < blockpoolReport.length) {
+            // There may be multiple on-disk records for the same block, don't increment
+            // the memory record pointer if so.
+            ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
+            if (nextInfo.getBlockId() != info.blockId) {
+              ++m;
+            }
+          } else {
+            ++m;
+          }
         }
         while (m < memReport.length) {
           FinalizedReplica current = memReport[m++];

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -269,15 +269,16 @@ class BlockPoolSlice {
   /**
    * Save the given replica to persistent storage.
    *
-   * @param replicaInfo
    * @return The saved meta and block files, in that order.
    * @throws IOException
    */
-  File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
+  File[] lazyPersistReplica(long blockId, long genStamp,
+                            File srcMeta, File srcFile) throws IOException {
     if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
       FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
     }
-    File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
+    File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+        blockId, genStamp, srcMeta, srcFile, lazypersistDir);
     dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
     return targetFiles;
   }
@@ -510,7 +511,7 @@ class BlockPoolSlice {
    * @return the replica that is retained.
    * @throws IOException
    */
-  private ReplicaInfo resolveDuplicateReplicas(
+  ReplicaInfo resolveDuplicateReplicas(
       final ReplicaInfo replica1, final ReplicaInfo replica2,
       final ReplicaMap volumeMap) throws IOException {
 

+ 38 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -703,17 +703,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
-   * Copy the block and meta files for the given block from the given
+   * Copy the block and meta files for the given block to the given destination.
    * @return the new meta and block files.
    * @throws IOException
    */
-  static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
+  static File[] copyBlockFiles(long blockId, long genStamp,
+                               File srcMeta, File srcFile, File destRoot)
       throws IOException {
-    final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
-    final File dstFile = new File(destDir, replicaInfo.getBlockName());
-    final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp());
-    final File srcMeta = replicaInfo.getMetaFile();
-    final File srcFile = replicaInfo.getBlockFile();
+    final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+    final File dstFile = new File(destDir, srcFile.getName());
+    final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
     try {
       FileUtils.copyFile(srcMeta, dstMeta);
     } catch (IOException e) {
@@ -1847,10 +1846,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
-          LOG.warn("Block file " + memFile.getAbsolutePath()
-              + " does not match file found by scan "
-              + diskFile.getAbsolutePath());
-          // TODO: Should the diskFile be deleted?
+          if (diskMetaFile.exists()) {
+            if (memBlockInfo.getMetaFile().exists()) {
+              // We have two sets of block+meta files. Decide which one to
+              // keep.
+              ReplicaInfo diskBlockInfo = new FinalizedReplica(
+                  blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
+              ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
+                  memBlockInfo, diskBlockInfo, volumeMap);
+            }
+          } else {
+            if (!diskFile.delete()) {
+              LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
+            }
+          }
         }
       } else {
         // Block refers to a block file that does not exist.
@@ -2315,6 +2324,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       FsVolumeImpl targetVolume;
       ReplicaInfo replicaInfo;
+      BlockPoolSlice bpSlice;
+      File srcFile, srcMeta;
+      long genStamp;
 
       synchronized (FsDatasetImpl.this) {
         replicaInfo = volumeMap.get(bpid, blockId);
@@ -2336,10 +2348,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
 
         lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
-        File[] savedFiles = targetVolume.getBlockPoolSlice(bpid)
-                                        .lazyPersistReplica(replicaInfo);
-        lazyWriteReplicaTracker.recordEndLazyPersist(
-            bpid, blockId, savedFiles[0], savedFiles[1]);
+        bpSlice = targetVolume.getBlockPoolSlice(bpid);
+        srcMeta = replicaInfo.getMetaFile();
+        srcFile = replicaInfo.getBlockFile();
+        genStamp = replicaInfo.getGenerationStamp();
+      }
+
+      // Drop the FsDatasetImpl lock for the file copy.
+      File[] savedFiles =
+          bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
+
+      synchronized (FsDatasetImpl.this) {
+        lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
@@ -2360,7 +2380,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
         if (replicaState != null) {
-          // Move the replica outside the lock.
           moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
         }
         succeeded = true;
@@ -2459,9 +2478,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // Remove the old replicas from transient storage.
         if (blockFile.delete() || !blockFile.exists()) {
           ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
-        }
-        if (metaFile.delete() || !metaFile.exists()) {
-          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+          if (metaFile.delete() || !metaFile.exists()) {
+            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+          }
         }
 
         // If deletion failed then the directory scanner will cleanup the blocks

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java

@@ -161,9 +161,13 @@ class LazyWriteReplicaTracker {
     replicaState.lazyPersistVolume = checkpointVolume;
   }
 
+  /**
+   * @param bpid
+   * @param blockId
+   * @param savedFiles The saved meta and block files, in that order.
+   */
   synchronized void recordEndLazyPersist(
-      final String bpid, final long blockId,
-      final File savedMetaFile, final File savedBlockFile) {
+      final String bpid, final long blockId, final File[] savedFiles) {
     Map<Long, ReplicaState> map = replicaMaps.get(bpid);
     ReplicaState replicaState = map.get(blockId);
 
@@ -172,8 +176,8 @@ class LazyWriteReplicaTracker {
           bpid + "; blockId=" + blockId);
     }
     replicaState.state = State.LAZY_PERSIST_COMPLETE;
-    replicaState.savedMetaFile = savedMetaFile;
-    replicaState.savedBlockFile = savedBlockFile;
+    replicaState.savedMetaFile = savedFiles[0];
+    replicaState.savedBlockFile = savedFiles[1];
 
     if (replicasNotPersisted.peek() == replicaState) {
       // Common case.

+ 158 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -31,22 +33,21 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
 /**
@@ -60,22 +61,29 @@ public class TestDirectoryScanner {
 
   private MiniDFSCluster cluster;
   private String bpid;
+  private DFSClient client;
   private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
   private DirectoryScanner scanner = null;
   private final Random rand = new Random();
   private final Random r = new Random();
+  private static final int BLOCK_LENGTH = 100;
 
   static {
-    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
     CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
   }
 
   /** create a file with a length of <code>fileLen</code> */
-  private void createFile(String fileName, long fileLen) throws IOException {
+  private List<LocatedBlock> createFile(String fileNamePrefix,
+                                        long fileLen,
+                                        boolean isLazyPersist) throws IOException {
     FileSystem fs = cluster.getFileSystem();
-    Path filePath = new Path(fileName);
-    DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+    Path filePath = new Path("/" + fileNamePrefix + ".dat");
+    DFSTestUtil.createFile(
+        fs, filePath, isLazyPersist, 1024, fileLen,
+        BLOCK_LENGTH, (short) 1, r.nextLong(), false);
+    return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
   }
 
   /** Truncate a block file */
@@ -134,6 +142,43 @@ public class TestDirectoryScanner {
     return 0;
   }
 
+  /**
+   * Duplicate the given block on all volumes.
+   * @param blockId
+   * @throws IOException
+   */
+  private void duplicateBlock(long blockId) throws IOException {
+    synchronized (fds) {
+      ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+      for (FsVolumeSpi v : fds.getVolumes()) {
+        if (v.getStorageID().equals(b.getVolume().getStorageID())) {
+          continue;
+        }
+
+        // Volume without a copy of the block. Make a copy now.
+        File sourceBlock = b.getBlockFile();
+        File sourceMeta = b.getMetaFile();
+        String sourceRoot = b.getVolume().getBasePath();
+        String destRoot = v.getBasePath();
+
+        String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
+        String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
+
+        File destBlock = new File(destRoot, relativeBlockPath);
+        File destMeta = new File(destRoot, relativeMetaPath);
+
+        destBlock.getParentFile().mkdirs();
+        FileUtils.copyFile(sourceBlock, destBlock);
+        FileUtils.copyFile(sourceMeta, destMeta);
+
+        if (destBlock.exists() && destMeta.exists()) {
+          LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
+          LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
+        }
+      }
+    }
+  }
+
   /** Get a random blockId that is not used already */
   private long getFreeBlockId() {
     long id = rand.nextLong();
@@ -216,6 +261,12 @@ public class TestDirectoryScanner {
 
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
       long missingMemoryBlocks, long mismatchBlocks) throws IOException {
+    scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
+         missingMemoryBlocks, mismatchBlocks, 0);
+  }
+
+    private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
+      long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) throws IOException {
     scanner.reconcile();
     
     assertTrue(scanner.diffs.containsKey(bpid));
@@ -229,9 +280,92 @@ public class TestDirectoryScanner {
     assertEquals(missingBlockFile, stats.missingBlockFile);
     assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
     assertEquals(mismatchBlocks, stats.mismatchBlocks);
+    assertEquals(duplicateBlocks, stats.duplicateBlocks);
+  }
+
+  @Test (timeout=300000)
+  public void testRetainBlockOnPersistentStorage() throws Exception {
+    cluster = new MiniDFSCluster
+        .Builder(CONF)
+        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1)
+        .build();
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
+      FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+      // Add a file with 1 block
+      List<LocatedBlock> blocks =
+          createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
+
+      // Ensure no difference between volumeMap and disk.
+      scan(1, 0, 0, 0, 0, 0);
+
+      // Make a copy of the block on RAM_DISK and ensure that it is
+      // picked up by the scanner.
+      duplicateBlock(blocks.get(0).getBlock().getBlockId());
+      scan(2, 1, 0, 0, 0, 0, 1);
+      verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+      scan(1, 0, 0, 0, 0, 0);
+
+    } finally {
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
+      cluster.shutdown();
+      cluster = null;
+    }
   }
 
-  @Test
+  @Test (timeout=300000)
+  public void testDeleteBlockOnTransientStorage() throws Exception {
+    cluster = new MiniDFSCluster
+        .Builder(CONF)
+        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1)
+        .build();
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
+      FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+      // Create a file file on RAM_DISK
+      List<LocatedBlock> blocks =
+          createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
+
+      // Ensure no difference between volumeMap and disk.
+      scan(1, 0, 0, 0, 0, 0);
+
+      // Make a copy of the block on DEFAULT storage and ensure that it is
+      // picked up by the scanner.
+      duplicateBlock(blocks.get(0).getBlock().getBlockId());
+      scan(2, 1, 0, 0, 0, 0, 1);
+
+      // Ensure that the copy on RAM_DISK was deleted.
+      verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+      scan(1, 0, 0, 0, 0, 0);
+
+    } finally {
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test (timeout=600000)
   public void testDirectoryScanner() throws Exception {
     // Run the test with and without parallel scanning
     for (int parallelism = 1; parallelism < 3; parallelism++) {
@@ -245,16 +379,17 @@ public class TestDirectoryScanner {
       cluster.waitActive();
       bpid = cluster.getNamesystem().getBlockPoolId();
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
-      createFile("/tmp/t1", 10000);
+      createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
       long totalBlocks = 100;
 
-      // Test1: No difference between in-memory and disk
+      // Test1: No difference between volumeMap and disk
       scan(100, 0, 0, 0, 0, 0);
 
       // Test2: block metafile is missing
@@ -355,7 +490,10 @@ public class TestDirectoryScanner {
       assertFalse(scanner.getRunStatus());
       
     } finally {
-      scanner.shutdown();
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
       cluster.shutdown();
     }
   }
@@ -389,6 +527,13 @@ public class TestDirectoryScanner {
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }
   
+  private void verifyStorageType(long blockId, boolean expectTransient) {
+    final ReplicaInfo memBlock;
+    memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+    assertNotNull(memBlock);
+    assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
+  }
+
   private static class TestFsVolumeSpi implements FsVolumeSpi {
     @Override
     public String[] getBlockPoolList() {

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java

@@ -23,6 +23,7 @@ import java.util.Collection;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 
@@ -62,4 +63,13 @@ public class FsDatasetTestUtil {
       String bpid) {
     return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
   }
+
+  /**
+   * Stop the lazy writer daemon that saves RAM disk files to persistent storage.
+   * @param dn
+   */
+  public static void stopLazyWriter(DataNode dn) {
+    FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+    ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+  }
 }

+ 5 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java

@@ -368,6 +368,8 @@ public class TestLazyPersistFiles {
           // Found a persisted copy for this block!
           boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
           assertThat(added, is(true));
+        } else {
+          LOG.error(blockFile + " not found");
         }
       }
     }
@@ -423,7 +425,7 @@ public class TestLazyPersistFiles {
     final int SEED = 0XFADED;
 
     // Stop lazy writer to ensure block for path1 is not persisted to disk.
-    stopLazyWriter(cluster.getDataNodes().get(0));
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
 
     makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
@@ -488,7 +490,7 @@ public class TestLazyPersistFiles {
     throws IOException, InterruptedException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    stopLazyWriter(cluster.getDataNodes().get(0));
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
 
     Path path = new Path("/" + METHOD_NAME + ".dat");
     makeTestFile(path, BLOCK_SIZE, true);
@@ -682,7 +684,7 @@ public class TestLazyPersistFiles {
       throws IOException, InterruptedException {
 
     startUpCluster(true, 1);
-    stopLazyWriter(cluster.getDataNodes().get(0));
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
 
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -794,12 +796,6 @@ public class TestLazyPersistFiles {
     return locatedBlocks;
   }
 
-  private void stopLazyWriter(DataNode dn) {
-    // Stop the lazyWriter daemon.
-    FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
-    ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
-  }
-
   private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
                                   long seed) throws IOException {
     DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,