|
@@ -18,7 +18,6 @@
|
|
|
package org.apache.hadoop.cblock.storage;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
-import org.apache.hadoop.cblock.CBlockConfigKeys;
|
|
|
import org.apache.hadoop.cblock.exception.CBlockException;
|
|
|
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
|
|
|
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
|
@@ -38,27 +37,25 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.ArrayBlockingQueue;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
|
* This class maintains the key space of CBlock, more specifically, the
|
|
|
* volume to container mapping. The core data structure
|
|
|
* is a map from users to their volumes info, where volume info is a handler
|
|
|
- * to a volume, containing information for IO on that volume and a storage
|
|
|
- * client responsible for talking to the SCM.
|
|
|
+ * to a volume, containing information for IO on that volume.
|
|
|
+ *
|
|
|
+ * and a storage client responsible for talking to the SCM
|
|
|
+ *
|
|
|
+ * TODO : all the volume operations are fully serialized, which can potentially
|
|
|
+ * be optimized.
|
|
|
+ *
|
|
|
+ * TODO : if the certain operations (e.g. create) failed, the failure-handling
|
|
|
+ * logic may not be properly implemented currently.
|
|
|
*/
|
|
|
public class StorageManager {
|
|
|
private static final Logger LOGGER =
|
|
|
LoggerFactory.getLogger(StorageManager.class);
|
|
|
private final ScmClient storageClient;
|
|
|
- private final int numThreads;
|
|
|
- private static final int MAX_THREADS =
|
|
|
- Runtime.getRuntime().availableProcessors() * 2;
|
|
|
- private static final int MAX_QUEUE_CAPACITY = 1024;
|
|
|
-
|
|
|
/**
|
|
|
* We will NOT have the situation where same kv pair getting
|
|
|
* processed, but it is possible to have multiple kv pair being
|
|
@@ -81,9 +78,6 @@ public class StorageManager {
|
|
|
this.storageClient = storageClient;
|
|
|
this.user2VolumeMap = new ConcurrentHashMap<>();
|
|
|
this.containerSizeB = storageClient.getContainerSize(null);
|
|
|
- this.numThreads =
|
|
|
- ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
|
|
|
- CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -155,127 +149,6 @@ public class StorageManager {
|
|
|
makeVolumeReady(userName, volumeName, volumeDescriptor);
|
|
|
}
|
|
|
|
|
|
- private class CreateContainerTask implements Runnable {
|
|
|
- private final VolumeDescriptor volume;
|
|
|
- private final int containerIdx;
|
|
|
- private final ArrayList<String> containerIds;
|
|
|
- private final AtomicInteger numFailed;
|
|
|
-
|
|
|
- CreateContainerTask(VolumeDescriptor volume, int containerIdx,
|
|
|
- ArrayList<String> containerIds,
|
|
|
- AtomicInteger numFailed) {
|
|
|
- this.volume = volume;
|
|
|
- this.containerIdx = containerIdx;
|
|
|
- this.containerIds = containerIds;
|
|
|
- this.numFailed = numFailed;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * When an object implementing interface <code>Runnable</code> is used
|
|
|
- * to create a thread, starting the thread causes the object's
|
|
|
- * <code>run</code> method to be called in that separately executing
|
|
|
- * thread.
|
|
|
- * <p>
|
|
|
- * The general contract of the method <code>run</code> is that it may
|
|
|
- * take any action whatsoever.
|
|
|
- *
|
|
|
- * @see Thread#run()
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- ContainerDescriptor container = null;
|
|
|
- try {
|
|
|
- Pipeline pipeline = storageClient.createContainer(
|
|
|
- OzoneProtos.ReplicationType.STAND_ALONE,
|
|
|
- OzoneProtos.ReplicationFactor.ONE,
|
|
|
- KeyUtil.getContainerName(volume.getUserName(),
|
|
|
- volume.getVolumeName(), containerIdx));
|
|
|
-
|
|
|
- container = new ContainerDescriptor(pipeline.getContainerName());
|
|
|
-
|
|
|
- container.setPipeline(pipeline);
|
|
|
- container.setContainerIndex(containerIdx);
|
|
|
- volume.addContainer(container);
|
|
|
- containerIds.set(containerIdx, container.getContainerID());
|
|
|
- } catch (Exception e) {
|
|
|
- numFailed.incrementAndGet();
|
|
|
- if (container != null) {
|
|
|
- LOGGER.error("Error creating container Container:{}:" +
|
|
|
- " index:{} error:{}", container.getContainerID(),
|
|
|
- containerIdx, e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean createVolumeContainers(VolumeDescriptor volume) {
|
|
|
- ArrayList<String> containerIds = new ArrayList<>();
|
|
|
- ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
|
|
|
- MAX_THREADS, 1, TimeUnit.SECONDS,
|
|
|
- new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
-
|
|
|
- AtomicInteger numFailedCreates = new AtomicInteger(0);
|
|
|
- long allocatedSize = 0;
|
|
|
- int containerIdx = 0;
|
|
|
- while (allocatedSize < volume.getVolumeSize()) {
|
|
|
- // adding null to allocate space in ArrayList
|
|
|
- containerIds.add(containerIdx, null);
|
|
|
- Runnable task = new CreateContainerTask(volume, containerIdx,
|
|
|
- containerIds, numFailedCreates);
|
|
|
- executor.submit(task);
|
|
|
- allocatedSize += containerSizeB;
|
|
|
- containerIdx += 1;
|
|
|
- }
|
|
|
-
|
|
|
- // issue the command and then wait for it to finish
|
|
|
- executor.shutdown();
|
|
|
- try {
|
|
|
- executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOGGER.error("Error creating volume:{} error:{}",
|
|
|
- volume.getVolumeName(), e);
|
|
|
- executor.shutdownNow();
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
-
|
|
|
- volume.setContainerIDs(containerIds);
|
|
|
- return numFailedCreates.get() == 0;
|
|
|
- }
|
|
|
-
|
|
|
- private void deleteContainer(String containerID, boolean force) {
|
|
|
- try {
|
|
|
- Pipeline pipeline = storageClient.getContainer(containerID);
|
|
|
- storageClient.deleteContainer(pipeline, force);
|
|
|
- } catch (Exception e) {
|
|
|
- LOGGER.error("Error deleting container Container:{} error:{}",
|
|
|
- containerID, e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void deleteVolumeContainers(List<String> containers, boolean force)
|
|
|
- throws CBlockException {
|
|
|
- ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
|
|
|
- MAX_THREADS, 1, TimeUnit.SECONDS,
|
|
|
- new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
-
|
|
|
- for (String deleteContainer : containers) {
|
|
|
- if (deleteContainer != null) {
|
|
|
- Runnable task = () -> deleteContainer(deleteContainer, force);
|
|
|
- executor.submit(task);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // issue the command and then wait for it to finish
|
|
|
- executor.shutdown();
|
|
|
- try {
|
|
|
- executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOGGER.error("Error deleting containers error:{}", e);
|
|
|
- executor.shutdownNow();
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Called by CBlock server when creating a fresh volume. The core
|
|
@@ -299,13 +172,31 @@ public class StorageManager {
|
|
|
throw new CBlockException("Volume size smaller than block size? " +
|
|
|
"volume size:" + volumeSize + " block size:" + blockSize);
|
|
|
}
|
|
|
- VolumeDescriptor volume
|
|
|
- = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
|
|
|
- boolean success = createVolumeContainers(volume);
|
|
|
- if (!success) {
|
|
|
- // cleanup the containers and throw the exception
|
|
|
- deleteVolumeContainers(volume.getContainerIDsList(), true);
|
|
|
- throw new CBlockException("Error when creating volume:" + volumeName);
|
|
|
+ VolumeDescriptor volume;
|
|
|
+ int containerIdx = 0;
|
|
|
+ try {
|
|
|
+ volume = new VolumeDescriptor(userName, volumeName,
|
|
|
+ volumeSize, blockSize);
|
|
|
+ long allocatedSize = 0;
|
|
|
+ ArrayList<String> containerIds = new ArrayList<>();
|
|
|
+ while (allocatedSize < volumeSize) {
|
|
|
+ Pipeline pipeline = storageClient.createContainer(OzoneProtos
|
|
|
+ .ReplicationType.STAND_ALONE,
|
|
|
+ OzoneProtos.ReplicationFactor.ONE,
|
|
|
+ KeyUtil.getContainerName(userName, volumeName, containerIdx));
|
|
|
+ ContainerDescriptor container =
|
|
|
+ new ContainerDescriptor(pipeline.getContainerName());
|
|
|
+ container.setPipeline(pipeline);
|
|
|
+ container.setContainerIndex(containerIdx);
|
|
|
+ volume.addContainer(container);
|
|
|
+ containerIds.add(container.getContainerID());
|
|
|
+ allocatedSize += containerSizeB;
|
|
|
+ containerIdx += 1;
|
|
|
+ }
|
|
|
+ volume.setContainerIDs(containerIds);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new CBlockException("Error when creating volume:" + e.getMessage());
|
|
|
+ // TODO : delete already created containers? or re-try policy
|
|
|
}
|
|
|
makeVolumeReady(userName, volumeName, volume);
|
|
|
}
|
|
@@ -332,7 +223,16 @@ public class StorageManager {
|
|
|
throw new CBlockException("Deleting a non-empty volume without force!");
|
|
|
}
|
|
|
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
|
|
|
- deleteVolumeContainers(volume.getContainerIDsList(), force);
|
|
|
+ for (String containerID : volume.getContainerIDsList()) {
|
|
|
+ try {
|
|
|
+ Pipeline pipeline = storageClient.getContainer(containerID);
|
|
|
+ storageClient.deleteContainer(pipeline, force);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOGGER.error("Error deleting container Container:{} error:{}",
|
|
|
+ containerID, e);
|
|
|
+ throw new CBlockException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
if (user2VolumeMap.get(userName).size() == 0) {
|
|
|
user2VolumeMap.remove(userName);
|
|
|
}
|