|
@@ -28,8 +28,19 @@ import java.io.InputStream;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
import java.io.RandomAccessFile;
|
|
|
import java.io.Writer;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Queue;
|
|
|
import java.util.Scanner;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ForkJoinPool;
|
|
|
+import java.util.concurrent.ForkJoinTask;
|
|
|
+import java.util.concurrent.RecursiveAction;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
@@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
|
|
-
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.MultipleIOException;
|
|
|
import org.apache.hadoop.util.AutoCloseableLock;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
@@ -96,6 +107,17 @@ class BlockPoolSlice {
|
|
|
private final int maxDataLength;
|
|
|
private final FileIoProvider fileIoProvider;
|
|
|
|
|
|
+ private static ForkJoinPool addReplicaThreadPool = null;
|
|
|
+ private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
|
|
|
+ .getRuntime().availableProcessors();
|
|
|
+ private static final Comparator<File> FILE_COMPARATOR =
|
|
|
+ new Comparator<File>() {
|
|
|
+ @Override
|
|
|
+ public int compare(File f1, File f2) {
|
|
|
+ return f1.getName().compareTo(f2.getName());
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
|
|
private final GetSpaceUsed dfsUsage;
|
|
|
|
|
@@ -161,13 +183,15 @@ class BlockPoolSlice {
|
|
|
.setConf(conf)
|
|
|
.setInitialUsed(loadDfsUsed())
|
|
|
.build();
|
|
|
-
|
|
|
+ // initialize add replica fork join pool
|
|
|
+ initializeAddReplicaPool(conf);
|
|
|
// Make the dfs usage to be saved during shutdown.
|
|
|
shutdownHook = new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (!dfsUsedSaved) {
|
|
|
saveDfsUsed();
|
|
|
+ addReplicaThreadPool.shutdownNow();
|
|
|
}
|
|
|
}
|
|
|
};
|
|
@@ -175,6 +199,21 @@ class BlockPoolSlice {
|
|
|
SHUTDOWN_HOOK_PRIORITY);
|
|
|
}
|
|
|
|
|
|
+ private synchronized void initializeAddReplicaPool(Configuration conf) {
|
|
|
+ if (addReplicaThreadPool == null) {
|
|
|
+ FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
|
|
|
+ int numberOfBlockPoolSlice = dataset.getVolumeCount()
|
|
|
+ * dataset.getBPServiceCount();
|
|
|
+ int poolsize = Math.max(numberOfBlockPoolSlice,
|
|
|
+ VOLUMES_REPLICA_ADD_THREADPOOL_SIZE);
|
|
|
+ // Default pool sizes is max of (volume * number of bp_service) and
|
|
|
+ // number of processor.
|
|
|
+ addReplicaThreadPool = new ForkJoinPool(conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
|
|
|
+ poolsize));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
File getDirectory() {
|
|
|
return currentDir.getParentFile();
|
|
|
}
|
|
@@ -374,10 +413,55 @@ class BlockPoolSlice {
|
|
|
|
|
|
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
|
|
|
if (!success) {
|
|
|
+ List<IOException> exceptions = Collections
|
|
|
+ .synchronizedList(new ArrayList<IOException>());
|
|
|
+ Queue<RecursiveAction> subTaskQueue =
|
|
|
+ new ConcurrentLinkedQueue<RecursiveAction>();
|
|
|
+
|
|
|
// add finalized replicas
|
|
|
- addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
|
|
|
+ AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
|
|
|
+ finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
|
|
|
+ ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
|
|
|
+
|
|
|
// add rbw replicas
|
|
|
- addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
|
|
|
+ task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
|
|
|
+ false, exceptions, subTaskQueue);
|
|
|
+ ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
|
|
|
+
|
|
|
+ try {
|
|
|
+ finalizedTask.get();
|
|
|
+ rbwTask.get();
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ exceptions.add(new IOException(
|
|
|
+ "Failed to start sub tasks to add replica in replica map :"
|
|
|
+ + e.getMessage()));
|
|
|
+ }
|
|
|
+
|
|
|
+ //wait for all the tasks to finish.
|
|
|
+ waitForSubTaskToFinish(subTaskQueue, exceptions);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait till all the recursive task for add replica to volume completed.
|
|
|
+ *
|
|
|
+ * @param subTaskQueue
|
|
|
+ * {@link AddReplicaProcessor} tasks list.
|
|
|
+ * @param exceptions
|
|
|
+ * exceptions occurred in sub tasks.
|
|
|
+ * @throws IOException
|
|
|
+ * throw if any sub task or multiple sub tasks failed.
|
|
|
+ */
|
|
|
+ private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue,
|
|
|
+ List<IOException> exceptions) throws IOException {
|
|
|
+ while (!subTaskQueue.isEmpty()) {
|
|
|
+ RecursiveAction task = subTaskQueue.poll();
|
|
|
+ if (task != null) {
|
|
|
+ task.join();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!exceptions.isEmpty()) {
|
|
|
+ throw MultipleIOException.createIOException(exceptions);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -526,10 +610,10 @@ class BlockPoolSlice {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
|
|
|
- if (oldReplica == null) {
|
|
|
- volumeMap.add(bpid, newReplica);
|
|
|
- } else {
|
|
|
+ ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica);
|
|
|
+ ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null
|
|
|
+ : tmpReplicaInfo;
|
|
|
+ if (oldReplica != null) {
|
|
|
// We have multiple replicas of the same block so decide which one
|
|
|
// to keep.
|
|
|
newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
|
|
@@ -558,15 +642,23 @@ class BlockPoolSlice {
|
|
|
* storage.
|
|
|
* @param isFinalized true if the directory has finalized replicas;
|
|
|
* false if the directory has rbw replicas
|
|
|
+ * @param exceptions list of exception which need to return to parent thread.
|
|
|
+ * @param subTaskQueue queue of sub tasks
|
|
|
*/
|
|
|
void addToReplicasMap(ReplicaMap volumeMap, File dir,
|
|
|
- final RamDiskReplicaTracker lazyWriteReplicaMap,
|
|
|
- boolean isFinalized)
|
|
|
+ final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
|
|
|
+ List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue)
|
|
|
throws IOException {
|
|
|
File[] files = fileIoProvider.listFiles(volume, dir);
|
|
|
- for (File file : files) {
|
|
|
+ Arrays.sort(files, FILE_COMPARATOR);
|
|
|
+ for (int i = 0; i < files.length; i++) {
|
|
|
+ File file = files[i];
|
|
|
if (file.isDirectory()) {
|
|
|
- addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
|
|
|
+ // Launch new sub task.
|
|
|
+ AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file,
|
|
|
+ lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue);
|
|
|
+ subTask.fork();
|
|
|
+ subTaskQueue.add(subTask);
|
|
|
}
|
|
|
|
|
|
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
|
|
@@ -581,7 +673,7 @@ class BlockPoolSlice {
|
|
|
}
|
|
|
|
|
|
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
|
|
|
- files, file);
|
|
|
+ files, file, i);
|
|
|
long blockId = Block.filename2id(file.getName());
|
|
|
Block block = new Block(blockId, file.length(), genStamp);
|
|
|
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
|
|
@@ -886,4 +978,63 @@ class BlockPoolSlice {
|
|
|
public long getNumOfBlocks() {
|
|
|
return numOfBlocks.get();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Recursive action for add replica in map.
|
|
|
+ */
|
|
|
+ class AddReplicaProcessor extends RecursiveAction {
|
|
|
+
|
|
|
+ private ReplicaMap volumeMap;
|
|
|
+ private File dir;
|
|
|
+ private RamDiskReplicaTracker lazyWriteReplicaMap;
|
|
|
+ private boolean isFinalized;
|
|
|
+ private List<IOException> exceptions;
|
|
|
+ private Queue<RecursiveAction> subTaskQueue;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param volumeMap
|
|
|
+ * the replicas map
|
|
|
+ * @param dir
|
|
|
+ * an input directory
|
|
|
+ * @param lazyWriteReplicaMap
|
|
|
+ * Map of replicas on transient storage.
|
|
|
+ * @param isFinalized
|
|
|
+ * true if the directory has finalized replicas; false if the
|
|
|
+ * directory has rbw replicas
|
|
|
+ * @param exceptions
|
|
|
+ * List of exception which need to return to parent thread.
|
|
|
+ * @param subTaskQueue
|
|
|
+ * queue of sub tasks
|
|
|
+ */
|
|
|
+ AddReplicaProcessor(ReplicaMap volumeMap, File dir,
|
|
|
+ RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
|
|
|
+ List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) {
|
|
|
+ this.volumeMap = volumeMap;
|
|
|
+ this.dir = dir;
|
|
|
+ this.lazyWriteReplicaMap = lazyWriteReplicaMap;
|
|
|
+ this.isFinalized = isFinalized;
|
|
|
+ this.exceptions = exceptions;
|
|
|
+ this.subTaskQueue = subTaskQueue;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void compute() {
|
|
|
+ try {
|
|
|
+ addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized,
|
|
|
+ exceptions, subTaskQueue);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Caught exception while adding replicas from " + volume
|
|
|
+ + " in subtask. Will throw later.", e);
|
|
|
+ exceptions.add(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the size of fork pool used for adding replica in map.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ public static int getAddReplicaForkPoolSize() {
|
|
|
+ return addReplicaThreadPool.getPoolSize();
|
|
|
+ }
|
|
|
}
|