Bläddra i källkod

HDFS-14311. Multi-threading conflict at layoutVersion when loading block pool storage. Contributed by Yicong Cai.

(cherry picked from commit fbe87eddbc30fe5191c008b496fb83e51ef4ee4a)
Wei-Chiu Chuang 5 år sedan
förälder
incheckning
77fd690c05

+ 14 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -445,22 +445,23 @@ public class DataStorage extends Storage {
       StartupOption startOpt, ExecutorService executor) throws IOException {
     final String bpid = nsInfo.getBlockPoolID();
     final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
+    Map<StorageLocation, List<Callable<StorageDirectory>>> upgradeCallableMap =
+        new HashMap<>();
     final List<StorageDirectory> success = Lists.newArrayList();
     final List<UpgradeTask> tasks = Lists.newArrayList();
     for (StorageLocation dataDir : dataDirs) {
       dataDir.makeBlockPoolDir(bpid, null);
       try {
-        final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
+        final List<Callable<StorageDirectory>> sdCallables =
+            Lists.newArrayList();
         final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
-            nsInfo, dataDir, startOpt, callables, datanode.getConf());
-        if (callables.isEmpty()) {
+            nsInfo, dataDir, startOpt, sdCallables, datanode.getConf());
+        if (sdCallables.isEmpty()) {
           for(StorageDirectory sd : dirs) {
             success.add(sd);
           }
         } else {
-          for(Callable<StorageDirectory> c : callables) {
-            tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
-          }
+          upgradeCallableMap.put(dataDir, sdCallables);
         }
       } catch (IOException e) {
         LOG.warn("Failed to add storage directory {} for block pool {}",
@@ -468,6 +469,13 @@ public class DataStorage extends Storage {
       }
     }
 
+    for (Map.Entry<StorageLocation, List<Callable<StorageDirectory>>> entry :
+        upgradeCallableMap.entrySet()) {
+      for(Callable<StorageDirectory> c : entry.getValue()) {
+        tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c)));
+      }
+    }
+
     if (!tasks.isEmpty()) {
       LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size());
       for(UpgradeTask t : tasks) {