Sfoglia il codice sorgente

HDFS-6833. DirectoryScanner should not register a deleting block with memory of DataNode. Contributed by Shinichi Yamashita

Tsz-Wo Nicholas Sze 10 anni fa
parent
commit
a1fffc3fcc

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -824,6 +824,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7880. Remove the tests for legacy Web UI in branch-2.
     (Brahma Reddy Battula via aajisaka)
 
+    HDFS-6833.  DirectoryScanner should not register a deleting block with
+    memory of DataNode.  (Shinichi Yamashita via szetszwo)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -443,13 +443,14 @@ public class DirectoryScanner implements Runnable {
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
-          ScanInfo info = blockpoolReport[Math.min(
-              d, blockpoolReport.length - 1)];
+          FinalizedReplica memBlock = memReport[m];
+          ScanInfo info = blockpoolReport[d];
           if (info.getBlockId() < memBlock.getBlockId()) {
-            // Block is missing in memory
-            statsRecord.missingMemoryBlocks++;
-            addDifference(diffRecord, statsRecord, info);
+            if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
+              // Block is missing in memory
+              statsRecord.missingMemoryBlocks++;
+              addDifference(diffRecord, statsRecord, info);
+            }
             d++;
             continue;
           }
@@ -495,8 +496,11 @@ public class DirectoryScanner implements Runnable {
                         current.getBlockId(), current.getVolume());
         }
         while (d < blockpoolReport.length) {
-          statsRecord.missingMemoryBlocks++;
-          addDifference(diffRecord, statsRecord, blockpoolReport[d++]);
+          if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
+            statsRecord.missingMemoryBlocks++;
+            addDifference(diffRecord, statsRecord, blockpoolReport[d]);
+          }
+          d++;
         }
         LOG.info(statsRecord.toString());
       } //end for

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -543,4 +543,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Check whether the block was pinned
    */
   public boolean getPinning(ExtendedBlock block) throws IOException;
+  
+  /**
+   * Confirm whether the block is deleting
+   */
+  public boolean isDeletingBlock(String bpid, long blockId);
 }

+ 30 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -22,7 +22,10 @@ import java.io.File;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -64,9 +67,14 @@ class FsDatasetAsyncDiskService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
   
   private final DataNode datanode;
+  private final FsDatasetImpl fsdatasetImpl;
   private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
+  private Map<String, Set<Long>> deletedBlockIds 
+      = new HashMap<String, Set<Long>>();
+  private static final int MAX_DELETED_BLOCKS = 64;
+  private int numDeletedBlocks = 0;
   
   /**
    * Create a AsyncDiskServices with a set of volumes (specified by their
@@ -75,8 +83,9 @@ class FsDatasetAsyncDiskService {
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
    */
-  FsDatasetAsyncDiskService(DataNode datanode) {
+  FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) {
     this.datanode = datanode;
+    this.fsdatasetImpl = fsdatasetImpl;
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
   }
 
@@ -286,7 +295,27 @@ class FsDatasetAsyncDiskService {
         LOG.info("Deleted " + block.getBlockPoolId() + " "
             + block.getLocalBlock() + " file " + blockFile);
       }
+      updateDeletedBlockId(block);
       IOUtils.cleanup(null, volumeRef);
     }
   }
+  
+  private synchronized void updateDeletedBlockId(ExtendedBlock block) {
+    Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId());
+    if (blockIds == null) {
+      blockIds = new HashSet<Long>();
+      deletedBlockIds.put(block.getBlockPoolId(), blockIds);
+    }
+    blockIds.add(block.getBlockId());
+    numDeletedBlocks++;
+    if (numDeletedBlocks == MAX_DELETED_BLOCKS) {
+      for (Entry<String, Set<Long>> e : deletedBlockIds.entrySet()) {
+        String bpid = e.getKey();
+        Set<Long> bs = e.getValue();
+        fsdatasetImpl.removeDeletedBlocks(bpid, bs);
+        bs.clear();
+      }
+      numDeletedBlocks = 0;
+    }
+  }
 }

+ 39 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -239,6 +239,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private volatile boolean fsRunning;
 
   final ReplicaMap volumeMap;
+  final Map<String, Set<Long>> deletingBlock;
   final RamDiskReplicaTracker ramDiskReplicaTracker;
   final RamDiskAsyncLazyPersistService asyncLazyPersistService;
 
@@ -300,8 +301,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
         blockChooserImpl);
-    asyncDiskService = new FsDatasetAsyncDiskService(datanode);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
+    deletingBlock = new HashMap<String, Set<Long>>();
 
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       addVolume(dataLocations, storage.getStorageDir(idx));
@@ -1797,7 +1799,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               +  ". Parent not found for file " + f);
           continue;
         }
-        volumeMap.remove(bpid, invalidBlks[i]);
+        ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]);
+        addDeletingBlock(bpid, removing.getBlockId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Block file " + removing.getBlockFile().getName()
+              + " is to be deleted");
+        }
       }
 
       if (v.isTransientStorage()) {
@@ -3005,5 +3012,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
     return fss.getPermission().getStickyBit();
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    synchronized(deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      return s != null ? s.contains(blockId) : false;
+    }
+  }
+  
+  public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
+    synchronized (deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      if (s != null) {
+        for (Long id : blockIds) {
+          s.remove(id);
+        }
+      }
+    }
+  }
+  
+  private void addDeletingBlock(String bpid, Long blockId) {
+    synchronized(deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      if (s == null) {
+        s = new HashSet<Long>();
+        deletingBlock.put(bpid, s);
+      }
+      s.add(blockId);
+    }
+  }
 }
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -1318,5 +1318,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public boolean getPinning(ExtendedBlock b) throws IOException {
     return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    throw new UnsupportedOperationException();
+  }
 }
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -429,4 +429,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   public boolean getPinning(ExtendedBlock block) throws IOException {
     return false;
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    return false;
+  }
 }

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -18,10 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -29,8 +33,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
@@ -88,6 +95,8 @@ public class TestFsDatasetImpl {
   private DataNode datanode;
   private DataStorage storage;
   private FsDatasetImpl dataset;
+  
+  private final static String BLOCKPOOL = "BP-TEST";
 
   private static Storage.StorageDirectory createStorageDirectory(File root) {
     Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
@@ -334,4 +343,54 @@ public class TestFsDatasetImpl {
 
     FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
   }
+  
+  @Test
+  public void testDeletingBlocks() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      
+      FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      FsVolumeImpl vol = ds.getVolumes().get(0);
+
+      ExtendedBlock eb;
+      ReplicaInfo info;
+      List<Block> blockList = new ArrayList<Block>();
+      for (int i = 1; i <= 63; i++) {
+        eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
+        info = new FinalizedReplica(
+            eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+        ds.volumeMap.add(BLOCKPOOL, info);
+        info.getBlockFile().createNewFile();
+        info.getMetaFile().createNewFile();
+        blockList.add(info);
+      }
+      ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Nothing to do
+      }
+      assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
+
+      blockList.clear();
+      eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
+      info = new FinalizedReplica(
+          eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+      ds.volumeMap.add(BLOCKPOOL, info);
+      info.getBlockFile().createNewFile();
+      info.getMetaFile().createNewFile();
+      blockList.add(info);
+      ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Nothing to do
+      }
+      assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }