Explorar o código

HDFS-5027. On startup, DN should scan volumes in parallel. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508166 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers %!s(int64=11) %!d(string=hai) anos
pai
achega
01089216e7

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -270,6 +270,8 @@ Release 2.1.0-beta - 2013-07-02
 
     HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
 
+    HDFS-5027. On startup, DN should scan volumes in parallel. (atm)
+
   BUG FIXES
 
     HDFS-4626. ClientProtocol#getLinkTarget should throw an exception for

+ 50 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -99,9 +99,19 @@ class FsVolumeList {
   }
   
   void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+    long totalStartTime = System.currentTimeMillis();
     for (FsVolumeImpl v : volumes) {
+      FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
+          " on volume " + v + "...");
+      long startTime = System.currentTimeMillis();
       v.getVolumeMap(bpid, volumeMap);
+      long timeTaken = System.currentTimeMillis() - startTime;
+      FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
+          " on volume " + v + ": " + timeTaken + "ms");
     }
+    long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+    FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
+        + totalTimeTaken + "ms");
   }
     
   /**
@@ -150,10 +160,47 @@ class FsVolumeList {
   }
 
 
-  void addBlockPool(String bpid, Configuration conf) throws IOException {
-    for (FsVolumeImpl v : volumes) {
-      v.addBlockPool(bpid, conf);
+  void addBlockPool(final String bpid, final Configuration conf) throws IOException {
+    long totalStartTime = System.currentTimeMillis();
+    
+    final List<IOException> exceptions = Collections.synchronizedList(
+        new ArrayList<IOException>());
+    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
+    for (final FsVolumeImpl v : volumes) {
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
+                " on volume " + v + "...");
+            long startTime = System.currentTimeMillis();
+            v.addBlockPool(bpid, conf);
+            long timeTaken = System.currentTimeMillis() - startTime;
+            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
+                " on " + v + ": " + timeTaken + "ms");
+          } catch (IOException ioe) {
+            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
+                ". Will throw later.", ioe);
+            exceptions.add(ioe);
+          }
+        }
+      };
+      blockPoolAddingThreads.add(t);
+      t.start();
+    }
+    for (Thread t : blockPoolAddingThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
+    if (!exceptions.isEmpty()) {
+      throw exceptions.get(0);
+    }
+    
+    long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+    FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
+        bpid + ": " + totalTimeTaken + "ms");
   }
   
   void removeBlockPool(String bpid) {