瀏覽代碼

Revert "HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago."

This reverts commit 26b51f3e2295b9a85ee1fc9a7f475cb3dc181933.
Eric Badger 5 年之前
父節點
當前提交
17fc8e4a64

+ 3 - 31
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java

@@ -47,7 +47,6 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
   private final long jitter;
   private final String dirPath;
   private Thread refreshUsed;
-  private boolean shouldFirstRefresh;
 
   /**
    * This is the constructor used by the builder.
@@ -80,30 +79,16 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
     this.refreshInterval = interval;
     this.jitter = jitter;
     this.used.set(initialUsed);
-    this.shouldFirstRefresh = true;
   }
 
   void init() {
     if (used.get() < 0) {
       used.set(0);
-      if (!shouldFirstRefresh) {
-        // Skip initial refresh operation, so we need to do first refresh
-        // operation immediately in refresh thread.
-        initRefeshThread(true);
-        return;
-      }
       refresh();
     }
-    initRefeshThread(false);
-  }
 
-  /**
-   * RunImmediately should set true, if we skip the first refresh.
-   * @param runImmediately The param default should be false.
-   */
-  private void initRefeshThread (boolean runImmediately) {
     if (refreshInterval > 0) {
-      refreshUsed = new Thread(new RefreshThread(this, runImmediately),
+      refreshUsed = new Thread(new RefreshThread(this),
           "refreshUsed-" + dirPath);
       refreshUsed.setDaemon(true);
       refreshUsed.start();
@@ -115,14 +100,6 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
 
   protected abstract void refresh();
 
-  /**
-   * Reset that if we need to do the first refresh.
-   * @param shouldFirstRefresh The flag value to set.
-   */
-  protected void setShouldFirstRefresh(boolean shouldFirstRefresh) {
-    this.shouldFirstRefresh = shouldFirstRefresh;
-  }
-
   /**
    * @return an estimate of space used in the directory path.
    */
@@ -179,11 +156,9 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
   private static final class RefreshThread implements Runnable {
 
     final CachingGetSpaceUsed spaceUsed;
-    private boolean runImmediately;
 
-    RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) {
+    RefreshThread(CachingGetSpaceUsed spaceUsed) {
       this.spaceUsed = spaceUsed;
-      this.runImmediately = runImmediately;
     }
 
     @Override
@@ -201,10 +176,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
           }
           // Make sure that after the jitter we didn't end up at 0.
           refreshInterval = Math.max(refreshInterval, 1);
-          if (!runImmediately) {
-            Thread.sleep(refreshInterval);
-          }
-          runImmediately = false;
+          Thread.sleep(refreshInterval);
           // update the used variable
           spaceUsed.refresh();
         } catch (InterruptedException e) {

+ 0 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -661,11 +661,5 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   AutoCloseableLock acquireDatasetLock();
 
-  /**
-   * Deep copy the replica info belonging to given block pool.
-   * @param bpid Specified block pool id.
-   * @return A set of replica info.
-   * @throws IOException
-   */
   Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
 }

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -198,14 +198,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  /**
+   * The deepCopyReplica call doesn't use the datasetock since it will lead the
+   * potential deadlock with the {@link FsVolumeList#addBlockPool} call.
+   */
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
-    Set<? extends Replica> replicas = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
-      replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
-          EMPTY_SET : volumeMap.replicas(bpid));
-    }
+    Set<? extends Replica> replicas =
+        new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
+            : volumeMap.replicas(bpid));
     return Collections.unmodifiableSet(replicas);
   }
 

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java

@@ -59,7 +59,6 @@ public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
 
   public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
     super(builder);
-    setShouldFirstRefresh(false);
     volume = builder.getVolume();
     bpid = builder.getBpid();
   }

+ 0 - 55
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import org.apache.commons.lang.math.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -28,11 +27,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.Replica;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,7 +36,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
-import java.util.Set;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
 import static org.junit.Assert.assertEquals;
@@ -150,54 +145,4 @@ public class TestReplicaCachingGetSpaceUsed {
 
     fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
   }
-
-  @Test(timeout = 15000)
-  public void testFsDatasetImplDeepCopyReplica() {
-    FsDatasetSpi<?> fsDataset = dataNode.getFSDataset();
-    ModifyThread modifyThread = new ModifyThread();
-    modifyThread.start();
-    String bpid = cluster.getNamesystem(0).getBlockPoolId();
-    int retryTimes = 10;
-
-    while (retryTimes > 0) {
-      try {
-        Set<? extends Replica> replicas = fsDataset.deepCopyReplica(bpid);
-        if (replicas.size() > 0) {
-          retryTimes--;
-        }
-      } catch (IOException e) {
-        modifyThread.setShouldRun(false);
-        Assert.fail("Encounter IOException when deep copy replica.");
-      }
-    }
-    modifyThread.setShouldRun(false);
-  }
-
-  private class ModifyThread extends Thread {
-    private boolean shouldRun = true;
-
-    @Override
-    public void run() {
-      FSDataOutputStream os = null;
-      while (shouldRun) {
-        try {
-          int id = RandomUtils.nextInt();
-          os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id));
-          byte[] bytes = new byte[2048];
-          InputStream is = new ByteArrayInputStream(bytes);
-          IOUtils.copyBytes(is, os, bytes.length);
-          os.hsync();
-          os.close();
-        } catch (IOException e) {}
-      }
-
-      try {
-        fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true);
-      } catch (IOException e) {}
-    }
-
-    private void setShouldRun(boolean shouldRun) {
-      this.shouldRun = shouldRun;
-    }
-  }
 }