|
@@ -18,6 +18,7 @@
|
|
package org.apache.hadoop.cblock.storage;
|
|
package org.apache.hadoop.cblock.storage;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import org.apache.hadoop.cblock.CBlockConfigKeys;
|
|
import org.apache.hadoop.cblock.exception.CBlockException;
|
|
import org.apache.hadoop.cblock.exception.CBlockException;
|
|
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
|
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
|
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
|
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
|
@@ -37,25 +38,27 @@ import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This class maintains the key space of CBlock, more specifically, the
|
|
* This class maintains the key space of CBlock, more specifically, the
|
|
* volume to container mapping. The core data structure
|
|
* volume to container mapping. The core data structure
|
|
* is a map from users to their volumes info, where volume info is a handler
|
|
* is a map from users to their volumes info, where volume info is a handler
|
|
- * to a volume, containing information for IO on that volume.
|
|
|
|
- *
|
|
|
|
- * and a storage client responsible for talking to the SCM
|
|
|
|
- *
|
|
|
|
- * TODO : all the volume operations are fully serialized, which can potentially
|
|
|
|
- * be optimized.
|
|
|
|
- *
|
|
|
|
- * TODO : if the certain operations (e.g. create) failed, the failure-handling
|
|
|
|
- * logic may not be properly implemented currently.
|
|
|
|
|
|
+ * to a volume, containing information for IO on that volume and a storage
|
|
|
|
+ * client responsible for talking to the SCM.
|
|
*/
|
|
*/
|
|
public class StorageManager {
|
|
public class StorageManager {
|
|
private static final Logger LOGGER =
|
|
private static final Logger LOGGER =
|
|
LoggerFactory.getLogger(StorageManager.class);
|
|
LoggerFactory.getLogger(StorageManager.class);
|
|
private final ScmClient storageClient;
|
|
private final ScmClient storageClient;
|
|
|
|
+ private final int numThreads;
|
|
|
|
+ private static final int MAX_THREADS =
|
|
|
|
+ Runtime.getRuntime().availableProcessors() * 2;
|
|
|
|
+ private static final int MAX_QUEUE_CAPACITY = 1024;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* We will NOT have the situation where same kv pair getting
|
|
* We will NOT have the situation where same kv pair getting
|
|
* processed, but it is possible to have multiple kv pair being
|
|
* processed, but it is possible to have multiple kv pair being
|
|
@@ -78,6 +81,9 @@ public class StorageManager {
|
|
this.storageClient = storageClient;
|
|
this.storageClient = storageClient;
|
|
this.user2VolumeMap = new ConcurrentHashMap<>();
|
|
this.user2VolumeMap = new ConcurrentHashMap<>();
|
|
this.containerSizeB = storageClient.getContainerSize(null);
|
|
this.containerSizeB = storageClient.getContainerSize(null);
|
|
|
|
+ this.numThreads =
|
|
|
|
+ ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
|
|
|
|
+ CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -149,6 +155,127 @@ public class StorageManager {
|
|
makeVolumeReady(userName, volumeName, volumeDescriptor);
|
|
makeVolumeReady(userName, volumeName, volumeDescriptor);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private class CreateContainerTask implements Runnable {
|
|
|
|
+ private final VolumeDescriptor volume;
|
|
|
|
+ private final int containerIdx;
|
|
|
|
+ private final ArrayList<String> containerIds;
|
|
|
|
+ private final AtomicInteger numFailed;
|
|
|
|
+
|
|
|
|
+ CreateContainerTask(VolumeDescriptor volume, int containerIdx,
|
|
|
|
+ ArrayList<String> containerIds,
|
|
|
|
+ AtomicInteger numFailed) {
|
|
|
|
+ this.volume = volume;
|
|
|
|
+ this.containerIdx = containerIdx;
|
|
|
|
+ this.containerIds = containerIds;
|
|
|
|
+ this.numFailed = numFailed;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * When an object implementing interface <code>Runnable</code> is used
|
|
|
|
+ * to create a thread, starting the thread causes the object's
|
|
|
|
+ * <code>run</code> method to be called in that separately executing
|
|
|
|
+ * thread.
|
|
|
|
+ * <p>
|
|
|
|
+ * The general contract of the method <code>run</code> is that it may
|
|
|
|
+ * take any action whatsoever.
|
|
|
|
+ *
|
|
|
|
+ * @see Thread#run()
|
|
|
|
+ */
|
|
|
|
+ public void run() {
|
|
|
|
+ ContainerDescriptor container = null;
|
|
|
|
+ try {
|
|
|
|
+ Pipeline pipeline = storageClient.createContainer(
|
|
|
|
+ OzoneProtos.ReplicationType.STAND_ALONE,
|
|
|
|
+ OzoneProtos.ReplicationFactor.ONE,
|
|
|
|
+ KeyUtil.getContainerName(volume.getUserName(),
|
|
|
|
+ volume.getVolumeName(), containerIdx));
|
|
|
|
+
|
|
|
|
+ container = new ContainerDescriptor(pipeline.getContainerName());
|
|
|
|
+
|
|
|
|
+ container.setPipeline(pipeline);
|
|
|
|
+ container.setContainerIndex(containerIdx);
|
|
|
|
+ volume.addContainer(container);
|
|
|
|
+ containerIds.set(containerIdx, container.getContainerID());
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ numFailed.incrementAndGet();
|
|
|
|
+ if (container != null) {
|
|
|
|
+ LOGGER.error("Error creating container Container:{}:" +
|
|
|
|
+ " index:{} error:{}", container.getContainerID(),
|
|
|
|
+ containerIdx, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean createVolumeContainers(VolumeDescriptor volume) {
|
|
|
|
+ ArrayList<String> containerIds = new ArrayList<>();
|
|
|
|
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
|
|
|
|
+ MAX_THREADS, 1, TimeUnit.SECONDS,
|
|
|
|
+ new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
|
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
|
+
|
|
|
|
+ AtomicInteger numFailedCreates = new AtomicInteger(0);
|
|
|
|
+ long allocatedSize = 0;
|
|
|
|
+ int containerIdx = 0;
|
|
|
|
+ while (allocatedSize < volume.getVolumeSize()) {
|
|
|
|
+ // adding null to allocate space in ArrayList
|
|
|
|
+ containerIds.add(containerIdx, null);
|
|
|
|
+ Runnable task = new CreateContainerTask(volume, containerIdx,
|
|
|
|
+ containerIds, numFailedCreates);
|
|
|
|
+ executor.submit(task);
|
|
|
|
+ allocatedSize += containerSizeB;
|
|
|
|
+ containerIdx += 1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // issue the command and then wait for it to finish
|
|
|
|
+ executor.shutdown();
|
|
|
|
+ try {
|
|
|
|
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOGGER.error("Error creating volume:{} error:{}",
|
|
|
|
+ volume.getVolumeName(), e);
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ volume.setContainerIDs(containerIds);
|
|
|
|
+ return numFailedCreates.get() == 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void deleteContainer(String containerID, boolean force) {
|
|
|
|
+ try {
|
|
|
|
+ Pipeline pipeline = storageClient.getContainer(containerID);
|
|
|
|
+ storageClient.deleteContainer(pipeline, force);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOGGER.error("Error deleting container Container:{} error:{}",
|
|
|
|
+ containerID, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void deleteVolumeContainers(List<String> containers, boolean force)
|
|
|
|
+ throws CBlockException {
|
|
|
|
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
|
|
|
|
+ MAX_THREADS, 1, TimeUnit.SECONDS,
|
|
|
|
+ new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
|
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
|
+
|
|
|
|
+ for (String deleteContainer : containers) {
|
|
|
|
+ if (deleteContainer != null) {
|
|
|
|
+ Runnable task = () -> deleteContainer(deleteContainer, force);
|
|
|
|
+ executor.submit(task);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // issue the command and then wait for it to finish
|
|
|
|
+ executor.shutdown();
|
|
|
|
+ try {
|
|
|
|
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOGGER.error("Error deleting containers error:{}", e);
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Called by CBlock server when creating a fresh volume. The core
|
|
* Called by CBlock server when creating a fresh volume. The core
|
|
@@ -172,31 +299,13 @@ public class StorageManager {
|
|
throw new CBlockException("Volume size smaller than block size? " +
|
|
throw new CBlockException("Volume size smaller than block size? " +
|
|
"volume size:" + volumeSize + " block size:" + blockSize);
|
|
"volume size:" + volumeSize + " block size:" + blockSize);
|
|
}
|
|
}
|
|
- VolumeDescriptor volume;
|
|
|
|
- int containerIdx = 0;
|
|
|
|
- try {
|
|
|
|
- volume = new VolumeDescriptor(userName, volumeName,
|
|
|
|
- volumeSize, blockSize);
|
|
|
|
- long allocatedSize = 0;
|
|
|
|
- ArrayList<String> containerIds = new ArrayList<>();
|
|
|
|
- while (allocatedSize < volumeSize) {
|
|
|
|
- Pipeline pipeline = storageClient.createContainer(OzoneProtos
|
|
|
|
- .ReplicationType.STAND_ALONE,
|
|
|
|
- OzoneProtos.ReplicationFactor.ONE,
|
|
|
|
- KeyUtil.getContainerName(userName, volumeName, containerIdx));
|
|
|
|
- ContainerDescriptor container =
|
|
|
|
- new ContainerDescriptor(pipeline.getContainerName());
|
|
|
|
- container.setPipeline(pipeline);
|
|
|
|
- container.setContainerIndex(containerIdx);
|
|
|
|
- volume.addContainer(container);
|
|
|
|
- containerIds.add(container.getContainerID());
|
|
|
|
- allocatedSize += containerSizeB;
|
|
|
|
- containerIdx += 1;
|
|
|
|
- }
|
|
|
|
- volume.setContainerIDs(containerIds);
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- throw new CBlockException("Error when creating volume:" + e.getMessage());
|
|
|
|
- // TODO : delete already created containers? or re-try policy
|
|
|
|
|
|
+ VolumeDescriptor volume
|
|
|
|
+ = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
|
|
|
|
+ boolean success = createVolumeContainers(volume);
|
|
|
|
+ if (!success) {
|
|
|
|
+ // cleanup the containers and throw the exception
|
|
|
|
+ deleteVolumeContainers(volume.getContainerIDsList(), true);
|
|
|
|
+ throw new CBlockException("Error when creating volume:" + volumeName);
|
|
}
|
|
}
|
|
makeVolumeReady(userName, volumeName, volume);
|
|
makeVolumeReady(userName, volumeName, volume);
|
|
}
|
|
}
|
|
@@ -223,16 +332,7 @@ public class StorageManager {
|
|
throw new CBlockException("Deleting a non-empty volume without force!");
|
|
throw new CBlockException("Deleting a non-empty volume without force!");
|
|
}
|
|
}
|
|
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
|
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
|
- for (String containerID : volume.getContainerIDsList()) {
|
|
|
|
- try {
|
|
|
|
- Pipeline pipeline = storageClient.getContainer(containerID);
|
|
|
|
- storageClient.deleteContainer(pipeline, force);
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOGGER.error("Error deleting container Container:{} error:{}",
|
|
|
|
- containerID, e);
|
|
|
|
- throw new CBlockException(e.getMessage());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ deleteVolumeContainers(volume.getContainerIDsList(), force);
|
|
if (user2VolumeMap.get(userName).size() == 0) {
|
|
if (user2VolumeMap.get(userName).size() == 0) {
|
|
user2VolumeMap.remove(userName);
|
|
user2VolumeMap.remove(userName);
|
|
}
|
|
}
|