|
@@ -447,6 +447,8 @@ public class DataStorage extends Storage {
|
|
|
StartupOption startOpt, ExecutorService executor) throws IOException {
|
|
|
final String bpid = nsInfo.getBlockPoolID();
|
|
|
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
|
|
+ Map<StorageLocation, List<Callable<StorageDirectory>>> upgradeCallableMap =
|
|
|
+ new HashMap<>();
|
|
|
final List<StorageDirectory> success = Lists.newArrayList();
|
|
|
final List<UpgradeTask> tasks = Lists.newArrayList();
|
|
|
for (StorageLocation dataDir : dataDirs) {
|
|
@@ -456,17 +458,16 @@ public class DataStorage extends Storage {
|
|
|
try {
|
|
|
makeBlockPoolDataDir(bpDataDirs, null);
|
|
|
|
|
|
- final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
|
|
|
+ final List<Callable<StorageDirectory>> sdCallables =
|
|
|
+ Lists.newArrayList();
|
|
|
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
|
|
|
- nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
|
|
|
- if (callables.isEmpty()) {
|
|
|
+ nsInfo, bpDataDirs, startOpt, sdCallables, datanode.getConf());
|
|
|
+ if (sdCallables.isEmpty()) {
|
|
|
for(StorageDirectory sd : dirs) {
|
|
|
success.add(sd);
|
|
|
}
|
|
|
} else {
|
|
|
- for(Callable<StorageDirectory> c : callables) {
|
|
|
- tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
|
|
|
- }
|
|
|
+ upgradeCallableMap.put(dataDir, sdCallables);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Failed to add storage directory " + dataDir
|
|
@@ -474,6 +475,13 @@ public class DataStorage extends Storage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ for (Map.Entry<StorageLocation, List<Callable<StorageDirectory>>> entry :
|
|
|
+ upgradeCallableMap.entrySet()) {
|
|
|
+ for(Callable<StorageDirectory> c : entry.getValue()) {
|
|
|
+ tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if (!tasks.isEmpty()) {
|
|
|
LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
|
|
|
for(UpgradeTask t : tasks) {
|