|
@@ -22,6 +22,7 @@ import com.google.common.primitives.Longs;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
|
|
|
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
|
@@ -44,6 +45,7 @@ import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.NavigableSet;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.Lock;
|
|
@@ -67,6 +69,7 @@ public class SCMContainerManager implements ContainerManager {
|
|
private final MetadataStore containerStore;
|
|
private final MetadataStore containerStore;
|
|
private final PipelineManager pipelineManager;
|
|
private final PipelineManager pipelineManager;
|
|
private final ContainerStateManager containerStateManager;
|
|
private final ContainerStateManager containerStateManager;
|
|
|
|
+ private final int numContainerPerOwnerInPipeline;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructs a mapping class that creates mapping between container names
|
|
* Constructs a mapping class that creates mapping between container names
|
|
@@ -100,6 +103,9 @@ public class SCMContainerManager implements ContainerManager {
|
|
this.lock = new ReentrantLock();
|
|
this.lock = new ReentrantLock();
|
|
this.pipelineManager = pipelineManager;
|
|
this.pipelineManager = pipelineManager;
|
|
this.containerStateManager = new ContainerStateManager(conf);
|
|
this.containerStateManager = new ContainerStateManager(conf);
|
|
|
|
+ this.numContainerPerOwnerInPipeline = conf
|
|
|
|
+ .getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
|
|
|
|
+ ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
|
|
|
|
|
|
loadExistingContainers();
|
|
loadExistingContainers();
|
|
}
|
|
}
|
|
@@ -201,6 +207,7 @@ public class SCMContainerManager implements ContainerManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Allocates a new container.
|
|
* Allocates a new container.
|
|
*
|
|
*
|
|
@@ -215,23 +222,11 @@ public class SCMContainerManager implements ContainerManager {
|
|
throws IOException {
|
|
throws IOException {
|
|
lock.lock();
|
|
lock.lock();
|
|
try {
|
|
try {
|
|
- final ContainerInfo containerInfo; containerInfo = containerStateManager
|
|
|
|
- .allocateContainer(pipelineManager, type, replicationFactor, owner);
|
|
|
|
- try {
|
|
|
|
- final byte[] containerIDBytes = Longs.toByteArray(
|
|
|
|
- containerInfo.getContainerID());
|
|
|
|
- containerStore.put(containerIDBytes,
|
|
|
|
- containerInfo.getProtobuf().toByteArray());
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- // If adding to containerStore fails, we should remove the container
|
|
|
|
- // from in-memory map.
|
|
|
|
- try {
|
|
|
|
- containerStateManager.removeContainer(containerInfo.containerID());
|
|
|
|
- } catch (ContainerNotFoundException cnfe) {
|
|
|
|
- // No need to worry much, everything is going as planned.
|
|
|
|
- }
|
|
|
|
- throw ex;
|
|
|
|
- }
|
|
|
|
|
|
+ final ContainerInfo containerInfo =
|
|
|
|
+ containerStateManager.allocateContainer(pipelineManager, type,
|
|
|
|
+ replicationFactor, owner);
|
|
|
|
+ // Add container to DB.
|
|
|
|
+ addContainerToDB(containerInfo);
|
|
return containerInfo;
|
|
return containerInfo;
|
|
} finally {
|
|
} finally {
|
|
lock.unlock();
|
|
lock.unlock();
|
|
@@ -360,9 +355,45 @@ public class SCMContainerManager implements ContainerManager {
|
|
String owner, Pipeline pipeline) {
|
|
String owner, Pipeline pipeline) {
|
|
try {
|
|
try {
|
|
//TODO: #CLUTIL See if lock is required here
|
|
//TODO: #CLUTIL See if lock is required here
|
|
- return containerStateManager
|
|
|
|
- .getMatchingContainer(sizeRequired, owner, pipelineManager,
|
|
|
|
- pipeline);
|
|
|
|
|
|
+ NavigableSet<ContainerID> containerIDs =
|
|
|
|
+ pipelineManager.getContainersInPipeline(pipeline.getId());
|
|
|
|
+
|
|
|
|
+ containerIDs = getContainersForOwner(containerIDs, owner);
|
|
|
|
+ if (containerIDs.size() < numContainerPerOwnerInPipeline) {
|
|
|
|
+ synchronized (pipeline) {
|
|
|
|
+ // TODO: #CLUTIL Maybe we can add selection logic inside synchronized
|
|
|
|
+ // as well
|
|
|
|
+ containerIDs = getContainersForOwner(
|
|
|
|
+ pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
|
|
|
|
+ if (containerIDs.size() < numContainerPerOwnerInPipeline) {
|
|
|
|
+ ContainerInfo containerInfo =
|
|
|
|
+ containerStateManager.allocateContainer(pipelineManager, owner,
|
|
|
|
+ pipeline);
|
|
|
|
+ // Add to DB
|
|
|
|
+ addContainerToDB(containerInfo);
|
|
|
|
+ containerStateManager.updateLastUsedMap(pipeline.getId(),
|
|
|
|
+ containerInfo.containerID(), owner);
|
|
|
|
+ return containerInfo;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ContainerInfo containerInfo =
|
|
|
|
+ containerStateManager.getMatchingContainer(sizeRequired, owner,
|
|
|
|
+ pipeline.getId(), containerIDs);
|
|
|
|
+ if (containerInfo == null) {
|
|
|
|
+ synchronized (pipeline) {
|
|
|
|
+ containerInfo =
|
|
|
|
+ containerStateManager.allocateContainer(pipelineManager, owner,
|
|
|
|
+ pipeline);
|
|
|
|
+ // Add to DB
|
|
|
|
+ addContainerToDB(containerInfo);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ containerStateManager.updateLastUsedMap(pipeline.getId(),
|
|
|
|
+ containerInfo.containerID(), owner);
|
|
|
|
+ // TODO: #CLUTIL cleanup entries in lastUsedMap
|
|
|
|
+ return containerInfo;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
|
|
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
|
|
pipeline, sizeRequired, e);
|
|
pipeline, sizeRequired, e);
|
|
@@ -370,6 +401,55 @@ public class SCMContainerManager implements ContainerManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Add newly allocated container to container DB.
|
|
|
|
+ * @param containerInfo
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void addContainerToDB(ContainerInfo containerInfo)
|
|
|
|
+ throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ final byte[] containerIDBytes = Longs.toByteArray(
|
|
|
|
+ containerInfo.getContainerID());
|
|
|
|
+ containerStore.put(containerIDBytes,
|
|
|
|
+ containerInfo.getProtobuf().toByteArray());
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ // If adding to containerStore fails, we should remove the container
|
|
|
|
+ // from in-memory map.
|
|
|
|
+ try {
|
|
|
|
+ containerStateManager.removeContainer(containerInfo.containerID());
|
|
|
|
+ } catch (ContainerNotFoundException cnfe) {
|
|
|
|
+ // This should not happen, as we are removing after adding in to
|
|
|
|
+ // container state cmap.
|
|
|
|
+ }
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns the container ID's matching with specified owner.
|
|
|
|
+ * @param containerIDs
|
|
|
|
+ * @param owner
|
|
|
|
+ * @return NavigableSet<ContainerID>
|
|
|
|
+ */
|
|
|
|
+ private NavigableSet<ContainerID> getContainersForOwner(
|
|
|
|
+ NavigableSet<ContainerID> containerIDs, String owner) {
|
|
|
|
+ for (ContainerID cid : containerIDs) {
|
|
|
|
+ try {
|
|
|
|
+ if (!getContainer(cid).getOwner().equals(owner)) {
|
|
|
|
+ containerIDs.remove(cid);
|
|
|
|
+ }
|
|
|
|
+ } catch (ContainerNotFoundException e) {
|
|
|
|
+ LOG.error("Could not find container info for container id={} {}", cid,
|
|
|
|
+ e);
|
|
|
|
+ containerIDs.remove(cid);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return containerIDs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Returns the latest list of DataNodes where replica for given containerId
|
|
* Returns the latest list of DataNodes where replica for given containerId
|
|
* exist. Throws an SCMException if no entry is found for given containerId.
|
|
* exist. Throws an SCMException if no entry is found for given containerId.
|