|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.ozone.scm.block;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
@@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
|
|
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
|
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.utils.BatchOperation;
|
|
@@ -83,8 +86,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
private final long containerSize;
|
|
|
private final long cacheSize;
|
|
|
|
|
|
- private final MetadataStore openContainerStore;
|
|
|
- private Map<String, Long> openContainers;
|
|
|
+ // Track all containers owned by block service.
|
|
|
+ private final MetadataStore containerStore;
|
|
|
+
|
|
|
+ private Map<OzoneProtos.LifeCycleState,
|
|
|
+ Map<String, BlockContainerInfo>> containers;
|
|
|
private final int containerProvisionBatchSize;
|
|
|
private final Random rand;
|
|
|
private final ObjectName mxBean;
|
|
@@ -121,14 +127,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
// Load store of all open contains for block allocation
|
|
|
File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
|
|
|
- openContainerStore = MetadataStoreBuilder.newBuilder()
|
|
|
+ containerStore = MetadataStoreBuilder.newBuilder()
|
|
|
.setConf(conf)
|
|
|
.setDbFile(openContainsDbPath)
|
|
|
.setCacheSize(this.cacheSize * OzoneConsts.MB)
|
|
|
.build();
|
|
|
|
|
|
- openContainers = new ConcurrentHashMap<>();
|
|
|
- loadOpenContainers();
|
|
|
+ loadAllocatedContainers();
|
|
|
|
|
|
this.containerProvisionBatchSize = conf.getInt(
|
|
|
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
|
|
@@ -141,20 +146,39 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
// TODO: close full (or almost full) containers with a separate thread.
|
|
|
/**
|
|
|
- * Load open containers from persistent store.
|
|
|
+ * Load allocated containers from persistent store.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void loadOpenContainers() throws IOException {
|
|
|
+ private void loadAllocatedContainers() throws IOException {
|
|
|
+ // Pre-allocate empty map entry by state to avoid null check
|
|
|
+ containers = new ConcurrentHashMap<>();
|
|
|
+ for (OzoneProtos.LifeCycleState state :
|
|
|
+ OzoneProtos.LifeCycleState.values()) {
|
|
|
+ containers.put(state, new ConcurrentHashMap());
|
|
|
+ }
|
|
|
try {
|
|
|
- openContainerStore.iterate(null, (key, value) -> {
|
|
|
+ containerStore.iterate(null, (key, value) -> {
|
|
|
try {
|
|
|
String containerName = DFSUtil.bytes2String(key);
|
|
|
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
|
|
|
- openContainers.put(containerName, containerUsed);
|
|
|
- LOG.debug("Loading open container: {} used : {}", containerName,
|
|
|
- containerUsed);
|
|
|
+ ContainerInfo containerInfo =
|
|
|
+ containerManager.getContainer(containerName);
|
|
|
+ // TODO: remove the container from block manager's container DB
|
|
|
+ // Most likely the allocated container is timeout and cleaned up
|
|
|
+ // by SCM, we should clean up correspondingly instead of just skip it.
|
|
|
+ if (containerInfo == null) {
|
|
|
+ LOG.warn("Container {} allocated by block service" +
|
|
|
+ "can't be found in SCM", containerName);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ Map<String, BlockContainerInfo> containersByState =
|
|
|
+ containers.get(containerInfo.getState());
|
|
|
+ containersByState.put(containerName,
|
|
|
+ new BlockContainerInfo(containerInfo, containerUsed));
|
|
|
+ LOG.debug("Loading allocated container: {} used : {} state: {}",
|
|
|
+ containerName, containerUsed, containerInfo.getState());
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Failed loading open container, continue next...");
|
|
|
+ LOG.warn("Failed loading allocated container, continue next...");
|
|
|
}
|
|
|
return true;
|
|
|
});
|
|
@@ -166,25 +190,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Pre-provision specified count of containers for block creation.
|
|
|
- * @param count - number of containers to create.
|
|
|
- * @return list of container names created.
|
|
|
+ * Pre allocate specified count of containers for block creation.
|
|
|
+ * @param count - number of containers to allocate.
|
|
|
+ * @return list of container names allocated.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private List<String> provisionContainers(int count) throws IOException {
|
|
|
+ private List<String> allocateContainers(int count) throws IOException {
|
|
|
List<String> results = new ArrayList();
|
|
|
lock.lock();
|
|
|
try {
|
|
|
for (int i = 0; i < count; i++) {
|
|
|
String containerName = UUID.randomUUID().toString();
|
|
|
+ ContainerInfo containerInfo = null;
|
|
|
try {
|
|
|
// TODO: Fix this later when Ratis is made the Default.
|
|
|
- Pipeline pipeline = containerManager.allocateContainer(
|
|
|
+ containerInfo = containerManager.allocateContainer(
|
|
|
OzoneProtos.ReplicationType.STAND_ALONE,
|
|
|
OzoneProtos.ReplicationFactor.ONE,
|
|
|
containerName);
|
|
|
|
|
|
- if (pipeline == null) {
|
|
|
+ if (containerInfo == null) {
|
|
|
LOG.warn("Unable to allocate container.");
|
|
|
continue;
|
|
|
}
|
|
@@ -192,8 +217,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
LOG.warn("Unable to allocate container: " + ex);
|
|
|
continue;
|
|
|
}
|
|
|
- openContainers.put(containerName, 0L);
|
|
|
- openContainerStore.put(DFSUtil.string2Bytes(containerName),
|
|
|
+ Map<String, BlockContainerInfo> containersByState =
|
|
|
+ containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
+ Preconditions.checkNotNull(containersByState);
|
|
|
+ containersByState.put(containerName,
|
|
|
+ new BlockContainerInfo(containerInfo, 0));
|
|
|
+ containerStore.put(DFSUtil.string2Bytes(containerName),
|
|
|
DFSUtil.string2Bytes(Long.toString(0L)));
|
|
|
results.add(containerName);
|
|
|
}
|
|
@@ -203,6 +232,76 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Filter container by states and size.
|
|
|
+ * @param state the state of the container.
|
|
|
+ * @param size the minimal available size of the container
|
|
|
+ * @return allocated containers satisfy both state and size.
|
|
|
+ */
|
|
|
+ private List <String> filterContainers(OzoneProtos.LifeCycleState state,
|
|
|
+ long size) {
|
|
|
+ Map<String, BlockContainerInfo> containersByState =
|
|
|
+ this.containers.get(state);
|
|
|
+ return containersByState.entrySet().parallelStream()
|
|
|
+ .filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
|
|
|
+ .map(e -> e.getKey())
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
|
|
|
+ String name) {
|
|
|
+ Map<String, BlockContainerInfo> containersByState = this.containers.get(state);
|
|
|
+ return containersByState.get(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Relies on the caller such as allocateBlock() to hold the lock
|
|
|
+ // to ensure containers map consistent.
|
|
|
+ private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
|
|
|
+ OzoneProtos.LifeCycleState newState) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Update container {} from state {} to state {}",
|
|
|
+ name, oldState, newState);
|
|
|
+ }
|
|
|
+ Map<String, BlockContainerInfo> containersInOldState =
|
|
|
+ this.containers.get(oldState);
|
|
|
+ BlockContainerInfo containerInfo = containersInOldState.get(name);
|
|
|
+ Preconditions.checkNotNull(containerInfo);
|
|
|
+ containersInOldState.remove(name);
|
|
|
+ Map<String, BlockContainerInfo> containersInNewState =
|
|
|
+ this.containers.get(newState);
|
|
|
+ containersInNewState.put(name, containerInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Refresh containers that have been allocated.
|
|
|
+ // We may not need to track all the states, just the creating/open/close
|
|
|
+ // should be enough for now.
|
|
|
+ private void refreshContainers() {
|
|
|
+ Map<String, BlockContainerInfo> containersByState =
|
|
|
+ this.containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
+ for (String containerName: containersByState.keySet()) {
|
|
|
+ try {
|
|
|
+ ContainerInfo containerInfo =
|
|
|
+ containerManager.getContainer(containerName);
|
|
|
+ if (containerInfo == null) {
|
|
|
+ // TODO: clean up containers that has been deleted on SCM but
|
|
|
+ // TODO: still in ALLOCATED state in block manager.
|
|
|
+ LOG.debug("Container {} allocated by block service" +
|
|
|
+ "can't be found in SCM", containerName);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
|
|
|
+ updateContainer(OzoneProtos.LifeCycleState.ALLOCATED, containerName,
|
|
|
+ containerInfo.getState());
|
|
|
+ }
|
|
|
+ // TODO: check containers in other state and refresh as needed.
|
|
|
+ // TODO: ALLOCATED container that is timeout and DELETED. (unit test)
|
|
|
+ // TODO: OPEN container that is CLOSE.
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.debug("Failed to get container info for: {}", containerName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Allocates a new block for a given size.
|
|
|
*
|
|
@@ -215,8 +314,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
*/
|
|
|
@Override
|
|
|
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
|
|
- boolean createContainer;
|
|
|
- Pipeline pipeline;
|
|
|
+ boolean createContainer = false;
|
|
|
if (size < 0 || size > containerSize) {
|
|
|
throw new SCMException("Unsupported block size",
|
|
|
INVALID_BLOCK_SIZE);
|
|
@@ -228,37 +326,29 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
lock.lock();
|
|
|
try {
|
|
|
+ refreshContainers();
|
|
|
List<String> candidates;
|
|
|
- if (openContainers.size() == 0) {
|
|
|
- try {
|
|
|
- candidates = provisionContainers(containerProvisionBatchSize);
|
|
|
- } catch (IOException ex) {
|
|
|
- throw new SCMException("Unable to allocate container for the block",
|
|
|
- FAILED_TO_ALLOCATE_CONTAINER);
|
|
|
- }
|
|
|
- createContainer = true;
|
|
|
- } else {
|
|
|
- candidates = openContainers.entrySet().parallelStream()
|
|
|
- .filter(e -> (e.getValue() + size < containerSize))
|
|
|
- .map(e -> e.getKey())
|
|
|
- .collect(Collectors.toList());
|
|
|
- createContainer = false;
|
|
|
- }
|
|
|
-
|
|
|
+ candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
|
|
|
if (candidates.size() == 0) {
|
|
|
- try {
|
|
|
- candidates = provisionContainers(containerProvisionBatchSize);
|
|
|
- } catch (IOException ex) {
|
|
|
- throw new SCMException("Unable to allocate container for the block",
|
|
|
- FAILED_TO_ALLOCATE_CONTAINER);
|
|
|
+ candidates = filterContainers(OzoneProtos.LifeCycleState.ALLOCATED,
|
|
|
+ size);
|
|
|
+ if (candidates.size() == 0) {
|
|
|
+ try {
|
|
|
+ candidates = allocateContainers(containerProvisionBatchSize);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Unable to allocate container for the block.");
|
|
|
+ throw new SCMException("Unable to allocate container for the block",
|
|
|
+ FAILED_TO_ALLOCATE_CONTAINER);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // now we should have some candidates in ALLOCATE state
|
|
|
+ if (candidates.size() == 0) {
|
|
|
+ throw new SCMException("Fail to find any container to allocate block " +
|
|
|
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (candidates.size() == 0) {
|
|
|
- throw new SCMException("Fail to find any container to allocate block " +
|
|
|
- "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
|
|
|
- }
|
|
|
-
|
|
|
+ // Candidates list now should include only ALLOCATE or OPEN containers
|
|
|
int randomIdx = rand.nextInt(candidates.size());
|
|
|
String containerName = candidates.get(randomIdx);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -266,28 +356,46 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
candidates.toString(), containerName);
|
|
|
}
|
|
|
|
|
|
- pipeline = containerManager.getContainer(containerName);
|
|
|
- if (pipeline == null) {
|
|
|
+ ContainerInfo containerInfo =
|
|
|
+ containerManager.getContainer(containerName);
|
|
|
+ if (containerInfo == null) {
|
|
|
LOG.debug("Unable to find container for the block");
|
|
|
throw new SCMException("Unable to find container to allocate block",
|
|
|
FAILED_TO_FIND_CONTAINER);
|
|
|
}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Candidate {} state {}", containerName,
|
|
|
+ containerInfo.getState());
|
|
|
+ }
|
|
|
+ // Container must be either OPEN or ALLOCATE state
|
|
|
+ if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
|
|
|
+ createContainer = true;
|
|
|
+ }
|
|
|
+
|
|
|
// TODO: make block key easier to debug (e.g., seq no)
|
|
|
// Allocate key for the block
|
|
|
String blockKey = UUID.randomUUID().toString();
|
|
|
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
|
|
|
- .setKey(blockKey).setPipeline(pipeline)
|
|
|
+ .setKey(blockKey).setPipeline(containerInfo.getPipeline())
|
|
|
.setShouldCreateContainer(createContainer);
|
|
|
- if (pipeline.getMachines().size() > 0) {
|
|
|
+ if (containerInfo.getPipeline().getMachines().size() > 0) {
|
|
|
blockStore.put(DFSUtil.string2Bytes(blockKey),
|
|
|
DFSUtil.string2Bytes(containerName));
|
|
|
|
|
|
// update the container usage information
|
|
|
- Long newUsed = openContainers.get(containerName) + size;
|
|
|
- openContainers.put(containerName, newUsed);
|
|
|
- openContainerStore.put(DFSUtil.string2Bytes(containerName),
|
|
|
- DFSUtil.string2Bytes(Long.toString(newUsed)));
|
|
|
+ BlockContainerInfo containerInfoUpdate =
|
|
|
+ getContainer(containerInfo.getState(), containerName);
|
|
|
+ Preconditions.checkNotNull(containerInfoUpdate);
|
|
|
+ containerInfoUpdate.addAllocated(size);
|
|
|
+ containerStore.put(DFSUtil.string2Bytes(containerName),
|
|
|
+ DFSUtil.string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
|
|
|
+ if (createContainer) {
|
|
|
+ OzoneProtos.LifeCycleState newState =
|
|
|
+ containerManager.updateContainerState(containerName,
|
|
|
+ OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
|
|
+ updateContainer(containerInfo.getState(), containerName, newState);
|
|
|
+ }
|
|
|
return abb.build();
|
|
|
}
|
|
|
} finally {
|
|
@@ -312,8 +420,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
throw new SCMException("Specified block key does not exist. key : " +
|
|
|
key, FAILED_TO_FIND_BLOCK);
|
|
|
}
|
|
|
- return containerManager.getContainer(
|
|
|
- DFSUtil.bytes2String(containerBytes));
|
|
|
+ String containerName = DFSUtil.bytes2String(containerBytes);
|
|
|
+ ContainerInfo containerInfo = containerManager.getContainer(
|
|
|
+ containerName);
|
|
|
+ if (containerInfo == null) {
|
|
|
+ LOG.debug("Container {} allocated by block service" +
|
|
|
+ "can't be found in SCM", containerName);
|
|
|
+ throw new SCMException("Unable to find container for the block",
|
|
|
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
|
|
|
+ }
|
|
|
+ return containerInfo.getPipeline();
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
@@ -338,8 +454,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
throw new SCMException("Specified block key does not exist. key : " +
|
|
|
key, FAILED_TO_FIND_BLOCK);
|
|
|
}
|
|
|
+ // TODO: track the block size info so that we can reclaim the container
|
|
|
+ // TODO: used space when the block is deleted.
|
|
|
BatchOperation batch = new BatchOperation();
|
|
|
- containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
|
|
|
String deletedKeyName = getDeletedKeyName(key);
|
|
|
// Add a tombstone for the deleted key
|
|
|
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
|
|
@@ -370,8 +487,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
if (blockStore != null) {
|
|
|
blockStore.close();
|
|
|
}
|
|
|
- if (openContainerStore != null) {
|
|
|
- openContainerStore.close();
|
|
|
+ if (containerStore != null) {
|
|
|
+ containerStore.close();
|
|
|
}
|
|
|
|
|
|
MBeans.unregister(mxBean);
|
|
@@ -379,6 +496,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
@Override
|
|
|
public int getOpenContainersNo() {
|
|
|
- return openContainers.size();
|
|
|
+ return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
|
|
|
}
|
|
|
}
|