Selaa lähdekoodia

HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin P. McCabe)

Colin Patrick Mccabe 10 vuotta sitten
vanhempi
commit
3b173d9517

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -465,6 +465,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7528. Consolidate symlink-related implementation into a single class.
     (wheat9)
 
+    HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin
+    P. McCabe)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 16 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -126,7 +126,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public List<FsVolumeImpl> getVolumes() {
-    return volumes.volumes;
+    return volumes.getVolumes();
   }
 
   @Override
@@ -139,9 +139,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     StorageReport[] reports;
     synchronized (statsLock) {
-      reports = new StorageReport[volumes.volumes.size()];
+      List<FsVolumeImpl> curVolumes = getVolumes();
+      reports = new StorageReport[curVolumes.size()];
       int i = 0;
-      for (FsVolumeImpl volume : volumes.volumes) {
+      for (FsVolumeImpl volume : curVolumes) {
         reports[i++] = new StorageReport(volume.toDatanodeStorage(),
                                          false,
                                          volume.getCapacity(),
@@ -1393,7 +1394,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     Map<String, ArrayList<ReplicaInfo>> uc =
         new HashMap<String, ArrayList<ReplicaInfo>>();
 
-    for (FsVolumeSpi v : volumes.volumes) {
+    List<FsVolumeImpl> curVolumes = getVolumes();
+    for (FsVolumeSpi v : curVolumes) {
       finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
       uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
     }
@@ -1420,7 +1422,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    for (FsVolumeImpl v : volumes.volumes) {
+    for (FsVolumeImpl v : curVolumes) {
       ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
       ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
       blockReportsMap.put(v.toDatanodeStorage(),
@@ -2283,7 +2285,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    for (FsVolumeImpl volume : volumes.volumes) {
+    for (FsVolumeImpl volume : getVolumes()) {
       long used = 0;
       long free = 0;
       try {
@@ -2317,8 +2319,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override //FsDatasetSpi
   public synchronized void deleteBlockPool(String bpid, boolean force)
       throws IOException {
+    List<FsVolumeImpl> curVolumes = getVolumes();
     if (!force) {
-      for (FsVolumeImpl volume : volumes.volumes) {
+      for (FsVolumeImpl volume : curVolumes) {
         if (!volume.isBPDirEmpty(bpid)) {
           LOG.warn(bpid + " has some block files, cannot delete unless forced");
           throw new IOException("Cannot delete block pool, "
@@ -2326,7 +2329,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
       }
     }
-    for (FsVolumeImpl volume : volumes.volumes) {
+    for (FsVolumeImpl volume : curVolumes) {
       volume.deleteBPDirectories(bpid, force);
     }
   }
@@ -2344,13 +2347,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
       long[] blockIds) throws IOException {
+    List<FsVolumeImpl> curVolumes = getVolumes();
     // List of VolumeIds, one per volume on the datanode
-    List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
+    List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
     // List of indexes into the list of VolumeIds, pointing at the VolumeId of
     // the volume that the block is on
     List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
     // Initialize the list of VolumeIds simply by enumerating the volumes
-    for (int i = 0; i < volumes.volumes.size(); i++) {
+    for (int i = 0; i < curVolumes.size(); i++) {
       blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
     }
     // Determine the index of the VolumeId of each block's volume, by comparing 
@@ -2363,7 +2367,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       int volumeIndex = 0;
       if (info != null) {
         FsVolumeSpi blockVolume = info.getVolume();
-        for (FsVolumeImpl volume : volumes.volumes) {
+        for (FsVolumeImpl volume : curVolumes) {
           // This comparison of references should be safe
           if (blockVolume == volume) {
             isValid = true;
@@ -2616,7 +2620,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       // Don't worry about fragmentation for now. We don't expect more than one
       // transient volume per DN.
-      for (FsVolumeImpl v : volumes.volumes) {
+      for (FsVolumeImpl v : getVolumes()) {
         if (v.isTransientStorage()) {
           capacity += v.getCapacity();
           free += v.getAvailable();

+ 92 - 44
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -31,11 +34,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
-  /**
-   * Read access to this unmodifiable list is not synchronized.
-   * This list is replaced on modification holding "this" lock.
-   */
-  volatile List<FsVolumeImpl> volumes = null;
+  private final AtomicReference<FsVolumeImpl[]> volumes =
+      new AtomicReference<>(new FsVolumeImpl[0]);
   private Object checkDirsMutex = new Object();
 
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
@@ -50,19 +50,27 @@ class FsVolumeList {
   int numberOfFailedVolumes() {
     return numFailedVolumes;
   }
-  
+
+  /**
+   * Return an immutable list view of all the volumes.
+   */
+  List<FsVolumeImpl> getVolumes() {
+    return Collections.unmodifiableList(Arrays.asList(volumes.get()));
+  }
+
   /** 
-   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
-   * by a single thread and next volume is chosen with no concurrent
-   * update to {@link #volumes}.
+   * Get next volume.
+   *
    * @param blockSize free space needed on the volume
    * @param storageType the desired {@link StorageType} 
    * @return next volume to store the block in.
    */
-  synchronized FsVolumeImpl getNextVolume(StorageType storageType,
-      long blockSize) throws IOException {
-    final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
-    for(FsVolumeImpl v : volumes) {
+  FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
+      throws IOException {
+    // Get a snapshot of currently available volumes.
+    final FsVolumeImpl[] curVolumes = volumes.get();
+    final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
+    for(FsVolumeImpl v : curVolumes) {
       if (v.getStorageType() == storageType) {
         list.add(v);
       }
@@ -71,16 +79,16 @@ class FsVolumeList {
   }
 
   /**
-   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
-   * by a single thread and next volume is chosen with no concurrent
-   * update to {@link #volumes}.
+   * Get next volume.
+   *
    * @param blockSize free space needed on the volume
    * @return next volume to store the block in.
    */
-  synchronized FsVolumeImpl getNextTransientVolume(
-      long blockSize) throws IOException {
-    final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
-    for(FsVolumeImpl v : volumes) {
+  FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
+    // Get a snapshot of currently available volumes.
+    final List<FsVolumeImpl> curVolumes = getVolumes();
+    final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
+    for(FsVolumeImpl v : curVolumes) {
       if (v.isTransientStorage()) {
         list.add(v);
       }
@@ -90,7 +98,7 @@ class FsVolumeList {
 
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       dfsUsed += v.getDfsUsed();
     }
     return dfsUsed;
@@ -98,7 +106,7 @@ class FsVolumeList {
 
   long getBlockPoolUsed(String bpid) throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       dfsUsed += v.getBlockPoolUsed(bpid);
     }
     return dfsUsed;
@@ -106,7 +114,7 @@ class FsVolumeList {
 
   long getCapacity() {
     long capacity = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       capacity += v.getCapacity();
     }
     return capacity;
@@ -114,7 +122,7 @@ class FsVolumeList {
     
   long getRemaining() throws IOException {
     long remaining = 0L;
-    for (FsVolumeSpi vol : volumes) {
+    for (FsVolumeSpi vol : volumes.get()) {
       remaining += vol.getAvailable();
     }
     return remaining;
@@ -128,7 +136,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> replicaAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes) {
+    for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
           try {
@@ -177,7 +185,7 @@ class FsVolumeList {
       ArrayList<FsVolumeImpl> removedVols = null;
       
       // Make a copy of volumes for performing modification 
-      final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+      final List<FsVolumeImpl> volumeList = getVolumes();
 
       for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
         final FsVolumeImpl fsv = i.next();
@@ -189,7 +197,7 @@ class FsVolumeList {
             removedVols = new ArrayList<FsVolumeImpl>(1);
           }
           removedVols.add(fsv);
-          removeVolume(fsv.getBasePath());
+          removeVolume(fsv);
           numFailedVolumes++;
         }
       }
@@ -212,31 +220,71 @@ class FsVolumeList {
    * Dynamically add new volumes to the existing volumes that this DN manages.
    * @param newVolume the instance of new FsVolumeImpl.
    */
-  synchronized void addVolume(FsVolumeImpl newVolume) {
+  void addVolume(FsVolumeImpl newVolume) {
     // Make a copy of volumes to add new volumes.
-    final List<FsVolumeImpl> volumeList = volumes == null ?
-        new ArrayList<FsVolumeImpl>() :
-        new ArrayList<FsVolumeImpl>(volumes);
-    volumeList.add(newVolume);
-    volumes = Collections.unmodifiableList(volumeList);
+    while (true) {
+      final FsVolumeImpl[] curVolumes = volumes.get();
+      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+      volumeList.add(newVolume);
+      if (volumes.compareAndSet(curVolumes,
+          volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+        break;
+      } else {
+        if (FsDatasetImpl.LOG.isDebugEnabled()) {
+          FsDatasetImpl.LOG.debug(
+              "The volume list has been changed concurrently, " +
+                  "retry to remove volume: " + newVolume);
+        }
+      }
+    }
+
     FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
   }
 
   /**
-   * Dynamically remove volume to the list.
+   * Dynamically remove a volume in the list.
+   * @param target the volume instance to be removed.
+   */
+  private void removeVolume(FsVolumeImpl target) {
+    while (true) {
+      final FsVolumeImpl[] curVolumes = volumes.get();
+      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+      if (volumeList.remove(target)) {
+        if (volumes.compareAndSet(curVolumes,
+            volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+          target.shutdown();
+          FsDatasetImpl.LOG.info("Removed volume: " + target);
+          break;
+        } else {
+          if (FsDatasetImpl.LOG.isDebugEnabled()) {
+            FsDatasetImpl.LOG.debug(
+                "The volume list has been changed concurrently, " +
+                "retry to remove volume: " + target);
+          }
+        }
+      } else {
+        if (FsDatasetImpl.LOG.isDebugEnabled()) {
+          FsDatasetImpl.LOG.debug("Volume " + target +
+              " does not exist or is removed by others.");
+        }
+        break;
+      }
+    }
+  }
+
+  /**
+   * Dynamically remove volume in the list.
    * @param volume the volume to be removed.
    */
-  synchronized void removeVolume(String volume) {
+  void removeVolume(String volume) {
     // Make a copy of volumes to remove one volume.
-    final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+    final FsVolumeImpl[] curVolumes = volumes.get();
+    final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
     for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
       FsVolumeImpl fsVolume = it.next();
       if (fsVolume.getBasePath().equals(volume)) {
-        fsVolume.shutdown();
-        it.remove();
-        volumes = Collections.unmodifiableList(volumeList);
-        FsDatasetImpl.LOG.info("Removed volume: " + volume);
-        break;
+        // Make sure the removed volume is the one in the curVolumes.
+        removeVolume(fsVolume);
       }
     }
   }
@@ -247,7 +295,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes) {
+    for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
           try {
@@ -285,13 +333,13 @@ class FsVolumeList {
   }
   
   void removeBlockPool(String bpid) {
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       v.shutdownBlockPool(bpid);
     }
   }
 
   void shutdown() {
-    for (FsVolumeImpl volume : volumes) {
+    for (FsVolumeImpl volume : volumes.get()) {
       if(volume != null) {
         volume.shutdown();
       }

+ 62 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -32,12 +32,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,11 +50,17 @@ import java.util.List;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -98,9 +107,9 @@ public class TestFsDatasetImpl {
 
   @Before
   public void setUp() throws IOException {
-    datanode = Mockito.mock(DataNode.class);
-    storage = Mockito.mock(DataStorage.class);
-    scanner = Mockito.mock(DataBlockScanner.class);
+    datanode = mock(DataNode.class);
+    storage = mock(DataStorage.class);
+    scanner = mock(DataBlockScanner.class);
     this.conf = new Configuration();
     final DNConf dnConf = new DNConf(conf);
 
@@ -197,4 +206,53 @@ public class TestFsDatasetImpl {
     verify(scanner, times(BLOCK_POOL_IDS.length))
         .deleteBlocks(anyString(), any(Block[].class));
   }
+
+  @Test(timeout = 5000)
+  public void testChangeVolumeWithRunningCheckDirs() throws IOException {
+    RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
+        new RoundRobinVolumeChoosingPolicy<>();
+    final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
+
+    // Initialize FsVolumeList with 5 mock volumes.
+    final int NUM_VOLUMES = 5;
+    for (int i = 0; i < NUM_VOLUMES; i++) {
+      FsVolumeImpl volume = mock(FsVolumeImpl.class);
+      oldVolumes.add(volume);
+      when(volume.getBasePath()).thenReturn("data" + i);
+      volumeList.addVolume(volume);
+    }
+
+    // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
+    // volume and add another volume. It does not affect checkDirs() running.
+    final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+    FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        volumeList.removeVolume("data4");
+        volumeList.addVolume(newVolume);
+        return null;
+      }
+    }).when(blockedVolume).checkDirs();
+
+    FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
+    doThrow(new DiskChecker.DiskErrorException("broken"))
+        .when(brokenVolume).checkDirs();
+
+    volumeList.checkDirs();
+
+    // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
+    // before running removeVolume(), it is supposed to run checkDirs() on all
+    // the old volumes.
+    for (FsVolumeImpl volume : oldVolumes) {
+      verify(volume).checkDirs();
+    }
+    // New volume is not visible to checkDirs() process.
+    verify(newVolume, never()).checkDirs();
+    assertTrue(volumeList.getVolumes().contains(newVolume));
+    assertFalse(volumeList.getVolumes().contains(brokenVolume));
+    assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
+  }
 }