|
@@ -5,33 +5,34 @@
|
|
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
|
|
* "License"); you may not use this file except in compliance with the License.
|
|
|
* You may obtain a copy of the License at
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
* License for the specific language governing permissions and limitations under
|
|
|
* the License.
|
|
|
*/
|
|
|
-
|
|
|
package org.apache.hadoop.ozone.scm.block;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
|
|
|
import org.apache.hadoop.ozone.scm.container.Mapping;
|
|
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
|
|
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
|
|
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.utils.BatchOperation;
|
|
@@ -43,35 +44,16 @@ import org.slf4j.LoggerFactory;
|
|
|
import javax.management.ObjectName;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.locks.Lock;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
-import java.util.stream.Collectors;
|
|
|
import java.util.UUID;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
-import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- CHILL_MODE_EXCEPTION;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- FAILED_TO_ALLOCATE_CONTAINER;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- FAILED_TO_FIND_CONTAINER;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- FAILED_TO_FIND_CONTAINER_WITH_SPACE;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- FAILED_TO_FIND_BLOCK;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- FAILED_TO_LOAD_OPEN_CONTAINER;
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|
|
- INVALID_BLOCK_SIZE;
|
|
|
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
|
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
|
|
|
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
@@ -80,13 +62,22 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
|
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
|
|
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
|
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
|
|
-
|
|
|
-/**
|
|
|
- * Block Manager manages the block access for SCM.
|
|
|
- */
|
|
|
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
|
|
|
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
|
|
|
+ .CHILL_MODE_EXCEPTION;
|
|
|
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
|
|
|
+ .FAILED_TO_FIND_BLOCK;
|
|
|
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
|
|
|
+ .INVALID_BLOCK_SIZE;
|
|
|
+
|
|
|
+/** Block Manager manages the block access for SCM. */
|
|
|
public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(BlockManagerImpl.class);
|
|
|
+ // TODO : FIX ME : Hard coding the owner.
|
|
|
+ // Currently only user of the block service is Ozone, CBlock manages blocks
|
|
|
+ // by itself and does not rely on the Block service offered by SCM.
|
|
|
+ private final Owner owner = Owner.OZONE;
|
|
|
|
|
|
private final NodeManager nodeManager;
|
|
|
private final Mapping containerManager;
|
|
@@ -96,20 +87,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
private final long containerSize;
|
|
|
private final long cacheSize;
|
|
|
|
|
|
- // Track all containers owned by block service.
|
|
|
- private final MetadataStore containerStore;
|
|
|
private final DeletedBlockLog deletedBlockLog;
|
|
|
private final SCMBlockDeletingService blockDeletingService;
|
|
|
|
|
|
- private Map<OzoneProtos.LifeCycleState,
|
|
|
- Map<String, BlockContainerInfo>> containers;
|
|
|
private final int containerProvisionBatchSize;
|
|
|
private final Random rand;
|
|
|
private ObjectName mxBean;
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Constructor.
|
|
|
+ *
|
|
|
* @param conf - configuration.
|
|
|
* @param nodeManager - node manager.
|
|
|
* @param containerManager - container manager.
|
|
@@ -122,34 +109,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
this.nodeManager = nodeManager;
|
|
|
this.containerManager = containerManager;
|
|
|
this.cacheSize = cacheSizeMB;
|
|
|
- File metaDir = OzoneUtils.getScmMetadirPath(conf);
|
|
|
- String scmMetaDataDir = metaDir.getPath();
|
|
|
-
|
|
|
- // Write the block key to container name mapping.
|
|
|
- File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
|
|
|
- blockStore = MetadataStoreBuilder.newBuilder()
|
|
|
- .setConf(conf)
|
|
|
- .setDbFile(blockContainerDbPath)
|
|
|
- .setCacheSize(this.cacheSize * OzoneConsts.MB)
|
|
|
- .build();
|
|
|
|
|
|
this.containerSize = OzoneConsts.GB * conf.getInt(
|
|
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
|
|
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
|
|
|
+ File metaDir = OzoneUtils.getScmMetadirPath(conf);
|
|
|
+ String scmMetaDataDir = metaDir.getPath();
|
|
|
|
|
|
- // Load store of all open contains for block allocation
|
|
|
- File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
|
|
|
- containerStore = MetadataStoreBuilder.newBuilder()
|
|
|
- .setConf(conf)
|
|
|
- .setDbFile(openContainsDbPath)
|
|
|
- .setCacheSize(this.cacheSize * OzoneConsts.MB)
|
|
|
- .build();
|
|
|
-
|
|
|
- loadAllocatedContainers();
|
|
|
-
|
|
|
- this.containerProvisionBatchSize = conf.getInt(
|
|
|
- ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
|
|
|
- ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
|
|
|
+ // Write the block key to container name mapping.
|
|
|
+ File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
|
|
|
+ blockStore =
|
|
|
+ MetadataStoreBuilder.newBuilder()
|
|
|
+ .setConf(conf)
|
|
|
+ .setDbFile(blockContainerDbPath)
|
|
|
+ .setCacheSize(this.cacheSize * OzoneConsts.MB)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ this.containerProvisionBatchSize =
|
|
|
+ conf.getInt(
|
|
|
+ ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
|
|
|
+ ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
|
|
|
rand = new Random();
|
|
|
this.lock = new ReentrantLock();
|
|
|
|
|
@@ -157,18 +136,24 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
// SCM block deleting transaction log and deleting service.
|
|
|
deletedBlockLog = new DeletedBlockLogImpl(conf);
|
|
|
- int svcInterval = conf.getInt(
|
|
|
- OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
|
|
- OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
|
|
- long serviceTimeout = conf.getTimeDuration(
|
|
|
- OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
|
|
- OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
|
|
- blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
|
|
|
- containerManager, nodeManager, svcInterval, serviceTimeout);
|
|
|
+ int svcInterval =
|
|
|
+ conf.getInt(
|
|
|
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
|
|
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
|
|
+ long serviceTimeout =
|
|
|
+ conf.getTimeDuration(
|
|
|
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
|
|
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
|
|
|
+ TimeUnit.MILLISECONDS);
|
|
|
+ blockDeletingService =
|
|
|
+ new SCMBlockDeletingService(
|
|
|
+ deletedBlockLog, containerManager, nodeManager, svcInterval,
|
|
|
+ serviceTimeout);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Start block manager services.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void start() throws IOException {
|
|
@@ -177,6 +162,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
/**
|
|
|
* Shutdown block manager services.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void stop() throws IOException {
|
|
@@ -184,59 +170,17 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
this.close();
|
|
|
}
|
|
|
|
|
|
- // TODO: close full (or almost full) containers with a separate thread.
|
|
|
- /**
|
|
|
- * Load allocated containers from persistent store.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void loadAllocatedContainers() throws IOException {
|
|
|
- // Pre-allocate empty map entry by state to avoid null check
|
|
|
- containers = new ConcurrentHashMap<>();
|
|
|
- for (OzoneProtos.LifeCycleState state :
|
|
|
- OzoneProtos.LifeCycleState.values()) {
|
|
|
- containers.put(state, new ConcurrentHashMap());
|
|
|
- }
|
|
|
- try {
|
|
|
- containerStore.iterate(null, (key, value) -> {
|
|
|
- try {
|
|
|
- String containerName = DFSUtil.bytes2String(key);
|
|
|
- Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
|
|
|
- ContainerInfo containerInfo =
|
|
|
- containerManager.getContainer(containerName);
|
|
|
- // TODO: remove the container from block manager's container DB
|
|
|
- // Most likely the allocated container is timeout and cleaned up
|
|
|
- // by SCM, we should clean up correspondingly instead of just skip it.
|
|
|
- if (containerInfo == null) {
|
|
|
- LOG.warn("Container {} allocated by block service" +
|
|
|
- "can't be found in SCM", containerName);
|
|
|
- return true;
|
|
|
- }
|
|
|
- Map<String, BlockContainerInfo> containersByState =
|
|
|
- containers.get(containerInfo.getState());
|
|
|
- containersByState.put(containerName,
|
|
|
- new BlockContainerInfo(containerInfo, containerUsed));
|
|
|
- LOG.debug("Loading allocated container: {} used : {} state: {}",
|
|
|
- containerName, containerUsed, containerInfo.getState());
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Failed loading allocated container, continue next...");
|
|
|
- }
|
|
|
- return true;
|
|
|
- });
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Loading open container store failed." + e);
|
|
|
- throw new SCMException("Failed to load open container store",
|
|
|
- FAILED_TO_LOAD_OPEN_CONTAINER);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Pre allocate specified count of containers for block creation.
|
|
|
- * @param count - number of containers to allocate.
|
|
|
- * @return list of container names allocated.
|
|
|
+ *
|
|
|
+ * @param count - Number of containers to allocate.
|
|
|
+ * @param type - Type of containers
|
|
|
+ * @param factor - how many copies needed for this container.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private List<String> allocateContainers(int count) throws IOException {
|
|
|
- List<String> results = new ArrayList();
|
|
|
+ private void preAllocateContainers(int count, ReplicationType type,
|
|
|
+ ReplicationFactor factor)
|
|
|
+ throws IOException {
|
|
|
lock.lock();
|
|
|
try {
|
|
|
for (int i = 0; i < count; i++) {
|
|
@@ -244,210 +188,177 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
ContainerInfo containerInfo = null;
|
|
|
try {
|
|
|
// TODO: Fix this later when Ratis is made the Default.
|
|
|
- containerInfo = containerManager.allocateContainer(
|
|
|
- OzoneProtos.ReplicationType.STAND_ALONE,
|
|
|
- OzoneProtos.ReplicationFactor.ONE,
|
|
|
- containerName);
|
|
|
+ containerInfo = containerManager.allocateContainer(type, factor,
|
|
|
+ containerName, owner);
|
|
|
|
|
|
if (containerInfo == null) {
|
|
|
LOG.warn("Unable to allocate container.");
|
|
|
continue;
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.warn("Unable to allocate container: " + ex);
|
|
|
+ LOG.warn("Unable to allocate container: {}", ex);
|
|
|
continue;
|
|
|
}
|
|
|
- Map<String, BlockContainerInfo> containersByState =
|
|
|
- containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
- Preconditions.checkNotNull(containersByState);
|
|
|
- containersByState.put(containerName,
|
|
|
- new BlockContainerInfo(containerInfo, 0));
|
|
|
- containerStore.put(DFSUtil.string2Bytes(containerName),
|
|
|
- DFSUtil.string2Bytes(Long.toString(0L)));
|
|
|
- results.add(containerName);
|
|
|
}
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
- return results;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Filter container by states and size.
|
|
|
- * @param state the state of the container.
|
|
|
- * @param size the minimal available size of the container
|
|
|
- * @return allocated containers satisfy both state and size.
|
|
|
- */
|
|
|
- private List <String> filterContainers(OzoneProtos.LifeCycleState state,
|
|
|
- long size) {
|
|
|
- Map<String, BlockContainerInfo> containersByState =
|
|
|
- this.containers.get(state);
|
|
|
- return containersByState.entrySet().parallelStream()
|
|
|
- .filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
|
|
|
- .map(e -> e.getKey())
|
|
|
- .collect(Collectors.toList());
|
|
|
- }
|
|
|
-
|
|
|
- private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
|
|
|
- String name) {
|
|
|
- Map<String, BlockContainerInfo> containersByState =
|
|
|
- this.containers.get(state);
|
|
|
- return containersByState.get(name);
|
|
|
- }
|
|
|
-
|
|
|
- // Relies on the caller such as allocateBlock() to hold the lock
|
|
|
- // to ensure containers map consistent.
|
|
|
- private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
|
|
|
- OzoneProtos.LifeCycleState newState) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Update container {} from state {} to state {}",
|
|
|
- name, oldState, newState);
|
|
|
- }
|
|
|
- Map<String, BlockContainerInfo> containersInOldState =
|
|
|
- this.containers.get(oldState);
|
|
|
- BlockContainerInfo containerInfo = containersInOldState.get(name);
|
|
|
- Preconditions.checkNotNull(containerInfo);
|
|
|
- containersInOldState.remove(name);
|
|
|
- Map<String, BlockContainerInfo> containersInNewState =
|
|
|
- this.containers.get(newState);
|
|
|
- containersInNewState.put(name, containerInfo);
|
|
|
- }
|
|
|
-
|
|
|
- // Refresh containers that have been allocated.
|
|
|
- // We may not need to track all the states, just the creating/open/close
|
|
|
- // should be enough for now.
|
|
|
- private void refreshContainers() {
|
|
|
- Map<String, BlockContainerInfo> containersByState =
|
|
|
- this.containers.get(OzoneProtos.LifeCycleState.CREATING);
|
|
|
- for (String containerName : containersByState.keySet()) {
|
|
|
- try {
|
|
|
- ContainerInfo containerInfo =
|
|
|
- containerManager.getContainer(containerName);
|
|
|
- if (containerInfo == null) {
|
|
|
- // TODO: clean up containers that has been deleted on SCM but
|
|
|
- // TODO: still in ALLOCATED state in block manager.
|
|
|
- LOG.debug("Container {} allocated by block service"
|
|
|
- + "can't be found in SCM", containerName);
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
|
|
|
- updateContainer(OzoneProtos.LifeCycleState.CREATING, containerName,
|
|
|
- containerInfo.getState());
|
|
|
- }
|
|
|
- // TODO: check containers in other state and refresh as needed.
|
|
|
- // TODO: ALLOCATED container that is timeout and DELETED. (unit test)
|
|
|
- // TODO: OPEN container that is CLOSE.
|
|
|
- } catch (IOException ex) {
|
|
|
- LOG.debug("Failed to get container info for: {}", containerName);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Allocates a new block for a given size.
|
|
|
+ * Allocates a block in a container and returns that info.
|
|
|
*
|
|
|
- * SCM choose one of the open containers and returns that as the location for
|
|
|
- * the new block. An open container is a container that is actively written to
|
|
|
- * via replicated log.
|
|
|
- * @param size - size of the block to be allocated
|
|
|
- * @return - the allocated pipeline and key for the block
|
|
|
- * @throws IOException
|
|
|
+ * @param size - Block Size
|
|
|
+ * @param type Replication Type
|
|
|
+ * @param factor - Replication Factor
|
|
|
+ * @return Allocated block
|
|
|
+ * @throws IOException on failure.
|
|
|
*/
|
|
|
@Override
|
|
|
- public AllocatedBlock allocateBlock(final long size) throws IOException {
|
|
|
- boolean createContainer = false;
|
|
|
+ public AllocatedBlock allocateBlock(
|
|
|
+ final long size, ReplicationType type, ReplicationFactor factor)
|
|
|
+ throws IOException {
|
|
|
+ LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
|
|
|
+
|
|
|
if (size < 0 || size > containerSize) {
|
|
|
- throw new SCMException("Unsupported block size", INVALID_BLOCK_SIZE);
|
|
|
+ LOG.warn("Invalid block size requested : {}", size);
|
|
|
+ throw new SCMException("Unsupported block size: " + size,
|
|
|
+ INVALID_BLOCK_SIZE);
|
|
|
}
|
|
|
+
|
|
|
if (!nodeManager.isOutOfNodeChillMode()) {
|
|
|
+ LOG.warn("Not out of Chill mode.");
|
|
|
throw new SCMException("Unable to create block while in chill mode",
|
|
|
CHILL_MODE_EXCEPTION);
|
|
|
}
|
|
|
|
|
|
lock.lock();
|
|
|
try {
|
|
|
- refreshContainers();
|
|
|
- List<String> candidates;
|
|
|
- candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
|
|
|
- if (candidates.size() == 0) {
|
|
|
- candidates =
|
|
|
- filterContainers(OzoneProtos.LifeCycleState.ALLOCATED, size);
|
|
|
- if (candidates.size() == 0) {
|
|
|
- try {
|
|
|
- candidates = allocateContainers(containerProvisionBatchSize);
|
|
|
- } catch (IOException ex) {
|
|
|
- LOG.error("Unable to allocate container for the block.");
|
|
|
- throw new SCMException("Unable to allocate container for the block",
|
|
|
- FAILED_TO_ALLOCATE_CONTAINER);
|
|
|
- }
|
|
|
- }
|
|
|
- // now we should have some candidates in ALLOCATE state
|
|
|
- if (candidates.size() == 0) {
|
|
|
- throw new SCMException(
|
|
|
- "Fail to find any container to allocate block " + "of size "
|
|
|
- + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SPACE);
|
|
|
- }
|
|
|
+ /*
|
|
|
+ Here is the high level logic.
|
|
|
+
|
|
|
+ 1. First we check if there are containers in ALLOCATED state,
|
|
|
+ that is
|
|
|
+ SCM has allocated them in the SCM namespace but the
|
|
|
+ corresponding
|
|
|
+ container has not been created in the Datanode yet. If we
|
|
|
+ have any
|
|
|
+ in that state, we will return that to the client, which allows
|
|
|
+ client to finish creating those containers. This is a sort of
|
|
|
+ greedy
|
|
|
+ algorithm, our primary purpose is to get as many containers as
|
|
|
+ possible.
|
|
|
+
|
|
|
+ 2. If there are no allocated containers -- Then we find a Open
|
|
|
+ container that matches that pattern.
|
|
|
+
|
|
|
+ 3. If both of them fail, the we will pre-allocate a bunch of
|
|
|
+ conatainers in SCM and try again.
|
|
|
+
|
|
|
+ TODO : Support random picking of two containers from the list.
|
|
|
+ So we
|
|
|
+ can use different kind of policies.
|
|
|
+ */
|
|
|
+
|
|
|
+ BlockContainerInfo containerInfo = null;
|
|
|
+
|
|
|
+ // Look for ALLOCATED container that matches all other parameters.
|
|
|
+ containerInfo =
|
|
|
+ containerManager
|
|
|
+ .getStateManager()
|
|
|
+ .getMatchingContainer(
|
|
|
+ size, owner, type, factor, OzoneProtos.LifeCycleState
|
|
|
+ .ALLOCATED);
|
|
|
+ if (containerInfo != null) {
|
|
|
+ return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
}
|
|
|
|
|
|
- // Candidates list now should include only ALLOCATE or OPEN containers
|
|
|
- int randomIdx = rand.nextInt(candidates.size());
|
|
|
- String containerName = candidates.get(randomIdx);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Find {} candidates: {}, picking: {}", candidates.size(),
|
|
|
- candidates.toString(), containerName);
|
|
|
+ // Since we found no allocated containers that match our criteria, let us
|
|
|
+ // look for OPEN containers that match the criteria.
|
|
|
+ containerInfo =
|
|
|
+ containerManager
|
|
|
+ .getStateManager()
|
|
|
+ .getMatchingContainer(size, owner, type, factor, OzoneProtos
|
|
|
+ .LifeCycleState.OPEN);
|
|
|
+ if (containerInfo != null) {
|
|
|
+ return newBlock(containerInfo, OzoneProtos.LifeCycleState.OPEN);
|
|
|
}
|
|
|
|
|
|
- ContainerInfo containerInfo =
|
|
|
- containerManager.getContainer(containerName);
|
|
|
- if (containerInfo == null) {
|
|
|
- LOG.debug("Unable to find container for the block");
|
|
|
- throw new SCMException("Unable to find container to allocate block",
|
|
|
- FAILED_TO_FIND_CONTAINER);
|
|
|
+ // We found neither ALLOCATED or OPEN Containers. This generally means
|
|
|
+ // that most of our containers are full or we have not allocated
|
|
|
+ // containers of the type and replication factor. So let us go and
|
|
|
+ // allocate some.
|
|
|
+ preAllocateContainers(containerProvisionBatchSize, type, factor);
|
|
|
+
|
|
|
+ // Since we just allocated a set of containers this should work
|
|
|
+ containerInfo =
|
|
|
+ containerManager
|
|
|
+ .getStateManager()
|
|
|
+ .getMatchingContainer(
|
|
|
+ size, owner, type, factor, OzoneProtos.LifeCycleState
|
|
|
+ .ALLOCATED);
|
|
|
+ if (containerInfo != null) {
|
|
|
+ return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Candidate {} state {}", containerName,
|
|
|
- containerInfo.getState());
|
|
|
- }
|
|
|
- // Container must be either OPEN or ALLOCATE state
|
|
|
- if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
|
|
|
- createContainer = true;
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: make block key easier to debug (e.g., seq no)
|
|
|
- // Allocate key for the block
|
|
|
- String blockKey = UUID.randomUUID().toString();
|
|
|
- AllocatedBlock.Builder abb = new AllocatedBlock.Builder().setKey(blockKey)
|
|
|
- .setPipeline(containerInfo.getPipeline())
|
|
|
- .setShouldCreateContainer(createContainer);
|
|
|
- if (containerInfo.getPipeline().getMachines().size() > 0) {
|
|
|
- blockStore.put(DFSUtil.string2Bytes(blockKey),
|
|
|
- DFSUtil.string2Bytes(containerName));
|
|
|
-
|
|
|
- // update the container usage information
|
|
|
- BlockContainerInfo containerInfoUpdate =
|
|
|
- getContainer(containerInfo.getState(), containerName);
|
|
|
- Preconditions.checkNotNull(containerInfoUpdate);
|
|
|
- containerInfoUpdate.addAllocated(size);
|
|
|
- containerStore.put(DFSUtil.string2Bytes(containerName), DFSUtil
|
|
|
- .string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
|
|
|
- if (createContainer) {
|
|
|
- OzoneProtos.LifeCycleState newState = containerManager
|
|
|
- .updateContainerState(containerName,
|
|
|
- OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
|
|
- updateContainer(containerInfo.getState(), containerName, newState);
|
|
|
- }
|
|
|
- return abb.build();
|
|
|
- }
|
|
|
+ // we have tried all strategies we know and but somehow we are not able
|
|
|
+ // to get a container for this block. Log that info and return a null.
|
|
|
+ LOG.error(
|
|
|
+ "Unable to allocate a block for the size: {}, type: {}, " +
|
|
|
+ "factor: {}",
|
|
|
+ size,
|
|
|
+ type,
|
|
|
+ factor);
|
|
|
+ return null;
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
- return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * newBlock - returns a new block assigned to a container.
|
|
|
*
|
|
|
+ * @param containerInfo - Container Info.
|
|
|
+ * @param state - Current state of the container.
|
|
|
+ * @return AllocatedBlock
|
|
|
+ */
|
|
|
+ private AllocatedBlock newBlock(
|
|
|
+ BlockContainerInfo containerInfo, OzoneProtos.LifeCycleState state)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ // TODO : Replace this with Block ID.
|
|
|
+ String blockKey = UUID.randomUUID().toString();
|
|
|
+ boolean createContainer = (state == OzoneProtos.LifeCycleState.ALLOCATED);
|
|
|
+
|
|
|
+ AllocatedBlock.Builder abb =
|
|
|
+ new AllocatedBlock.Builder()
|
|
|
+ .setKey(blockKey)
|
|
|
+ // TODO : Use containerinfo instead of pipeline.
|
|
|
+ .setPipeline(containerInfo.getPipeline())
|
|
|
+ .setShouldCreateContainer(createContainer);
|
|
|
+ LOG.trace("New block allocated : {} Container ID: {}", blockKey,
|
|
|
+ containerInfo.toString());
|
|
|
+
|
|
|
+ if (containerInfo.getPipeline().getMachines().size() == 0) {
|
|
|
+ LOG.error("Pipeline Machine count is zero.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Persist this block info to the blockStore DB, so getBlock(key) can
|
|
|
+ // find which container the block lives.
|
|
|
+ // TODO : Remove this DB in future
|
|
|
+ // and make this a KSM operation. Category: SCALABILITY.
|
|
|
+ if (containerInfo.getPipeline().getMachines().size() > 0) {
|
|
|
+ blockStore.put(
|
|
|
+ DFSUtil.string2Bytes(blockKey),
|
|
|
+ DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName()));
|
|
|
+ }
|
|
|
+ return abb.build();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Given a block key, return the Pipeline information.
|
|
|
+ *
|
|
|
* @param key - block key assigned by SCM.
|
|
|
* @return Pipeline (list of DNs and leader) to access the block.
|
|
|
* @throws IOException
|
|
@@ -462,14 +373,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
"Specified block key does not exist. key : " + key,
|
|
|
FAILED_TO_FIND_BLOCK);
|
|
|
}
|
|
|
+
|
|
|
String containerName = DFSUtil.bytes2String(containerBytes);
|
|
|
- ContainerInfo containerInfo =
|
|
|
- containerManager.getContainer(containerName);
|
|
|
+ ContainerInfo containerInfo = containerManager.getContainer(
|
|
|
+ containerName);
|
|
|
if (containerInfo == null) {
|
|
|
- LOG.debug(
|
|
|
- "Container {} allocated by block service" + "can't be found in SCM",
|
|
|
- containerName);
|
|
|
- throw new SCMException("Unable to find container for the block",
|
|
|
+ LOG.debug("Container {} allocated by block service"
|
|
|
+ + "can't be found in SCM", containerName);
|
|
|
+ throw new SCMException(
|
|
|
+ "Unable to find container for the block",
|
|
|
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
|
|
|
}
|
|
|
return containerInfo.getPipeline();
|
|
@@ -479,13 +391,14 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Deletes a list of blocks in an atomic operation. Internally, SCM
|
|
|
- * writes these blocks into a {@link DeletedBlockLog} and deletes them
|
|
|
- * from SCM DB. If this is successful, given blocks are entering pending
|
|
|
- * deletion state and becomes invisible from SCM namespace.
|
|
|
+ * Deletes a list of blocks in an atomic operation. Internally, SCM writes
|
|
|
+ * these blocks into a
|
|
|
+ * {@link DeletedBlockLog} and deletes them from SCM DB. If this is
|
|
|
+ * successful, given blocks are
|
|
|
+ * entering pending deletion state and becomes invisible from SCM namespace.
|
|
|
*
|
|
|
- * @param blockIDs block IDs. This is often the list of blocks of
|
|
|
- * a particular object key.
|
|
|
+ * @param blockIDs block IDs. This is often the list of blocks of a
|
|
|
+ * particular object key.
|
|
|
* @throws IOException if exception happens, non of the blocks is deleted.
|
|
|
*/
|
|
|
@Override
|
|
@@ -546,16 +459,20 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
// to be invisible from namespace but actual data are not removed.
|
|
|
// We log an error here so admin can manually check and fix such
|
|
|
// errors.
|
|
|
- LOG.error("Blocks might be in inconsistent state because"
|
|
|
+ LOG.error(
|
|
|
+ "Blocks might be in inconsistent state because"
|
|
|
+ " they were moved to pending deletion state in SCM DB but"
|
|
|
+ " not written into delLog. Admin can manually add them"
|
|
|
+ " into delLog for deletions. Inconsistent block list: {}",
|
|
|
- String.join(",", blockIDs), e);
|
|
|
+ String.join(",", blockIDs),
|
|
|
+ e);
|
|
|
throw rollbackException;
|
|
|
}
|
|
|
- throw new IOException("Skip writing the deleted blocks info to"
|
|
|
- + " the delLog because addTransaction fails. Batch skipped: "
|
|
|
- + String.join(",", blockIDs), e);
|
|
|
+ throw new IOException(
|
|
|
+ "Skip writing the deleted blocks info to"
|
|
|
+ + " the delLog because addTransaction fails. Batch skipped: "
|
|
|
+ + String.join(",", blockIDs),
|
|
|
+ e);
|
|
|
}
|
|
|
// TODO: Container report handling of the deleted blocks:
|
|
|
// Remove tombstone and update open container usage.
|
|
@@ -577,6 +494,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
/**
|
|
|
* Close the resources for BlockManager.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
@Override
|
|
@@ -584,9 +502,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
if (blockStore != null) {
|
|
|
blockStore.close();
|
|
|
}
|
|
|
- if (containerStore != null) {
|
|
|
- containerStore.close();
|
|
|
- }
|
|
|
if (deletedBlockLog != null) {
|
|
|
deletedBlockLog.close();
|
|
|
}
|
|
@@ -599,6 +514,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|
|
|
|
|
@Override
|
|
|
public int getOpenContainersNo() {
|
|
|
- return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
|
|
|
+ return 0;
|
|
|
+ // TODO : FIX ME : The open container being a single number does not make
|
|
|
+ // sense.
|
|
|
+ // We have to get open containers by Replication Type and Replication
|
|
|
+ // factor. Hence returning 0 for now.
|
|
|
+ // containers.get(OzoneProtos.LifeCycleState.OPEN).size();
|
|
|
}
|
|
|
}
|