|
@@ -30,13 +30,13 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
|
-import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
|
|
|
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import java.io.Closeable;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -57,7 +57,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
|
|
|
|
|
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
|
|
|
|
|
|
+import static org.apache.hadoop.ozone.scm.exceptions
|
|
|
|
+ .SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
|
|
|
|
|
|
/**
|
|
/**
|
|
* A container state manager keeps track of container states and returns
|
|
* A container state manager keeps track of container states and returns
|
|
@@ -116,7 +117,7 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FA
|
|
* TimeOut Delete Container State Machine - if the container creating times out,
|
|
* TimeOut Delete Container State Machine - if the container creating times out,
|
|
* then Container State manager decides to delete the container.
|
|
* then Container State manager decides to delete the container.
|
|
*/
|
|
*/
|
|
-public class ContainerStateManager {
|
|
|
|
|
|
+public class ContainerStateManager implements Closeable {
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(ContainerStateManager.class);
|
|
LoggerFactory.getLogger(ContainerStateManager.class);
|
|
|
|
|
|
@@ -130,8 +131,8 @@ public class ContainerStateManager {
|
|
// A map that maintains the ContainerKey to Containers of that type ordered
|
|
// A map that maintains the ContainerKey to Containers of that type ordered
|
|
// by last access time.
|
|
// by last access time.
|
|
private final ReadWriteLock lock;
|
|
private final ReadWriteLock lock;
|
|
- private final Queue<BlockContainerInfo> containerCloseQueue;
|
|
|
|
- private Map<ContainerKey, PriorityQueue<BlockContainerInfo>> containers;
|
|
|
|
|
|
+ private final Queue<ContainerInfo> containerCloseQueue;
|
|
|
|
+ private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructs a Container State Manager that tracks all containers owned by
|
|
* Constructs a Container State Manager that tracks all containers owned by
|
|
@@ -167,7 +168,7 @@ public class ContainerStateManager {
|
|
containers = new HashMap<>();
|
|
containers = new HashMap<>();
|
|
initializeContainerMaps();
|
|
initializeContainerMaps();
|
|
loadExistingContainers(containerMapping);
|
|
loadExistingContainers(containerMapping);
|
|
- containerCloseQueue = new ConcurrentLinkedQueue<BlockContainerInfo>();
|
|
|
|
|
|
+ containerCloseQueue = new ConcurrentLinkedQueue<>();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -191,7 +192,7 @@ public class ContainerStateManager {
|
|
for (ReplicationFactor factor : ReplicationFactor.values()) {
|
|
for (ReplicationFactor factor : ReplicationFactor.values()) {
|
|
for (LifeCycleState state : LifeCycleState.values()) {
|
|
for (LifeCycleState state : LifeCycleState.values()) {
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
- PriorityQueue<BlockContainerInfo> queue = new PriorityQueue<>();
|
|
|
|
|
|
+ PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
|
|
containers.put(key, queue);
|
|
containers.put(key, queue);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -212,9 +213,7 @@ public class ContainerStateManager {
|
|
ContainerKey key = new ContainerKey(container.getOwner(),
|
|
ContainerKey key = new ContainerKey(container.getOwner(),
|
|
container.getPipeline().getType(),
|
|
container.getPipeline().getType(),
|
|
container.getPipeline().getFactor(), container.getState());
|
|
container.getPipeline().getFactor(), container.getState());
|
|
- BlockContainerInfo blockContainerInfo =
|
|
|
|
- new BlockContainerInfo(container, 0);
|
|
|
|
- ((PriorityQueue) containers.get(key)).add(blockContainerInfo);
|
|
|
|
|
|
+ containers.get(key).add(container);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
if (!e.getMessage().equals("No container exists in current db")) {
|
|
if (!e.getMessage().equals("No container exists in current db")) {
|
|
@@ -289,28 +288,31 @@ public class ContainerStateManager {
|
|
|
|
|
|
Pipeline pipeline = selector.getReplicationPipeline(type,
|
|
Pipeline pipeline = selector.getReplicationPipeline(type,
|
|
replicationFactor, containerName);
|
|
replicationFactor, containerName);
|
|
- ContainerInfo info = new ContainerInfo.Builder()
|
|
|
|
|
|
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
|
|
.setContainerName(containerName)
|
|
.setContainerName(containerName)
|
|
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
|
|
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
|
|
.setPipeline(pipeline)
|
|
.setPipeline(pipeline)
|
|
|
|
+ // This is bytes allocated for blocks inside container, not the
|
|
|
|
+ // container size
|
|
|
|
+ .setAllocatedBytes(0)
|
|
|
|
+ .setUsedBytes(0)
|
|
|
|
+ .setNumberOfKeys(0)
|
|
.setStateEnterTime(Time.monotonicNow())
|
|
.setStateEnterTime(Time.monotonicNow())
|
|
.setOwner(owner)
|
|
.setOwner(owner)
|
|
.build();
|
|
.build();
|
|
- Preconditions.checkNotNull(info);
|
|
|
|
- BlockContainerInfo blockInfo = new BlockContainerInfo(info, 0);
|
|
|
|
- blockInfo.setLastUsed(Time.monotonicNow());
|
|
|
|
|
|
+ Preconditions.checkNotNull(containerInfo);
|
|
lock.writeLock().lock();
|
|
lock.writeLock().lock();
|
|
try {
|
|
try {
|
|
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
|
|
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
|
|
- blockInfo.getState());
|
|
|
|
- PriorityQueue<BlockContainerInfo> queue = containers.get(key);
|
|
|
|
|
|
+ containerInfo.getState());
|
|
|
|
+ PriorityQueue<ContainerInfo> queue = containers.get(key);
|
|
Preconditions.checkNotNull(queue);
|
|
Preconditions.checkNotNull(queue);
|
|
- queue.add(blockInfo);
|
|
|
|
- LOG.trace("New container allocated: {}", blockInfo);
|
|
|
|
|
|
+ queue.add(containerInfo);
|
|
|
|
+ LOG.trace("New container allocated: {}", containerInfo);
|
|
} finally {
|
|
} finally {
|
|
lock.writeLock().unlock();
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
- return info;
|
|
|
|
|
|
+ return containerInfo;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -318,20 +320,14 @@ public class ContainerStateManager {
|
|
*
|
|
*
|
|
* @param info - ContainerInfo
|
|
* @param info - ContainerInfo
|
|
* @param event - LifeCycle Event
|
|
* @param event - LifeCycle Event
|
|
- * @return New state of the container.
|
|
|
|
|
|
+ * @return Updated ContainerInfo.
|
|
* @throws SCMException
|
|
* @throws SCMException
|
|
*/
|
|
*/
|
|
- public OzoneProtos.LifeCycleState updateContainerState(BlockContainerInfo
|
|
|
|
|
|
+ public ContainerInfo updateContainerState(ContainerInfo
|
|
info, OzoneProtos.LifeCycleEvent event) throws SCMException {
|
|
info, OzoneProtos.LifeCycleEvent event) throws SCMException {
|
|
- LifeCycleState newState = null;
|
|
|
|
- boolean shouldLease = false;
|
|
|
|
|
|
+ LifeCycleState newState;
|
|
try {
|
|
try {
|
|
newState = this.stateMachine.getNextState(info.getState(), event);
|
|
newState = this.stateMachine.getNextState(info.getState(), event);
|
|
- if(newState == LifeCycleState.CREATING) {
|
|
|
|
- // if we are moving into a Creating State, it is possible that clients
|
|
|
|
- // could timeout therefore we need to use a lease.
|
|
|
|
- shouldLease = true;
|
|
|
|
- }
|
|
|
|
} catch (InvalidStateTransitionException ex) {
|
|
} catch (InvalidStateTransitionException ex) {
|
|
String error = String.format("Failed to update container state %s, " +
|
|
String error = String.format("Failed to update container state %s, " +
|
|
"reason: invalid state transition from state: %s upon event: %s.",
|
|
"reason: invalid state transition from state: %s upon event: %s.",
|
|
@@ -352,7 +348,7 @@ public class ContainerStateManager {
|
|
lock.writeLock().lock();
|
|
lock.writeLock().lock();
|
|
try {
|
|
try {
|
|
|
|
|
|
- PriorityQueue<BlockContainerInfo> currentQueue = containers.get(oldKey);
|
|
|
|
|
|
+ PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey);
|
|
// This should never happen, since we have initialized the map and
|
|
// This should never happen, since we have initialized the map and
|
|
// queues to all possible states. No harm in asserting that info.
|
|
// queues to all possible states. No harm in asserting that info.
|
|
Preconditions.checkNotNull(currentQueue);
|
|
Preconditions.checkNotNull(currentQueue);
|
|
@@ -368,14 +364,23 @@ public class ContainerStateManager {
|
|
currentQueue.remove(info);
|
|
currentQueue.remove(info);
|
|
}
|
|
}
|
|
|
|
|
|
- info.setState(newState);
|
|
|
|
- PriorityQueue<BlockContainerInfo> nextQueue = containers.get(newKey);
|
|
|
|
|
|
+ PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey);
|
|
Preconditions.checkNotNull(nextQueue);
|
|
Preconditions.checkNotNull(nextQueue);
|
|
|
|
|
|
- info.setLastUsed(Time.monotonicNow());
|
|
|
|
- nextQueue.add(info);
|
|
|
|
-
|
|
|
|
- return newState;
|
|
|
|
|
|
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
|
|
|
|
+ .setContainerName(info.getContainerName())
|
|
|
|
+ .setState(newState)
|
|
|
|
+ .setPipeline(info.getPipeline())
|
|
|
|
+ .setAllocatedBytes(info.getAllocatedBytes())
|
|
|
|
+ .setUsedBytes(info.getUsedBytes())
|
|
|
|
+ .setNumberOfKeys(info.getNumberOfKeys())
|
|
|
|
+ .setStateEnterTime(Time.monotonicNow())
|
|
|
|
+ .setOwner(info.getOwner())
|
|
|
|
+ .build();
|
|
|
|
+ Preconditions.checkNotNull(containerInfo);
|
|
|
|
+ nextQueue.add(containerInfo);
|
|
|
|
+
|
|
|
|
+ return containerInfo;
|
|
} finally {
|
|
} finally {
|
|
lock.writeLock().unlock();
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
@@ -389,43 +394,34 @@ public class ContainerStateManager {
|
|
* @param type - Replication Type {StandAlone, Ratis}
|
|
* @param type - Replication Type {StandAlone, Ratis}
|
|
* @param factor - Replication Factor {ONE, THREE}
|
|
* @param factor - Replication Factor {ONE, THREE}
|
|
* @param state - State of the Container-- {Open, Allocated etc.}
|
|
* @param state - State of the Container-- {Open, Allocated etc.}
|
|
- * @return BlockContainerInfo
|
|
|
|
|
|
+ * @return ContainerInfo
|
|
*/
|
|
*/
|
|
- public BlockContainerInfo getMatchingContainer(final long size,
|
|
|
|
|
|
+ public ContainerInfo getMatchingContainer(final long size,
|
|
Owner owner, ReplicationType type, ReplicationFactor factor,
|
|
Owner owner, ReplicationType type, ReplicationFactor factor,
|
|
LifeCycleState state) {
|
|
LifeCycleState state) {
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
lock.writeLock().lock();
|
|
lock.writeLock().lock();
|
|
try {
|
|
try {
|
|
- PriorityQueue<BlockContainerInfo> queue = containers.get(key);
|
|
|
|
|
|
+ PriorityQueue<ContainerInfo> queue = containers.get(key);
|
|
if (queue.size() == 0) {
|
|
if (queue.size() == 0) {
|
|
// We don't have any Containers of this type.
|
|
// We don't have any Containers of this type.
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- Iterator<BlockContainerInfo> iter = queue.iterator();
|
|
|
|
|
|
+ Iterator<ContainerInfo> iter = queue.iterator();
|
|
// Two assumptions here.
|
|
// Two assumptions here.
|
|
// 1. The Iteration on the heap is in ordered by the last used time.
|
|
// 1. The Iteration on the heap is in ordered by the last used time.
|
|
// 2. We remove and add the node back to push the node to the end of
|
|
// 2. We remove and add the node back to push the node to the end of
|
|
// the queue.
|
|
// the queue.
|
|
|
|
|
|
while (iter.hasNext()) {
|
|
while (iter.hasNext()) {
|
|
- BlockContainerInfo info = iter.next();
|
|
|
|
- if (info.canAllocate(size, this.containerSize)) {
|
|
|
|
|
|
+ ContainerInfo info = iter.next();
|
|
|
|
+ if (info.getAllocatedBytes() + size <= this.containerSize) {
|
|
queue.remove(info);
|
|
queue.remove(info);
|
|
- info.addAllocated(size);
|
|
|
|
- info.setLastUsed(Time.monotonicNow());
|
|
|
|
|
|
+ info.allocate(size);
|
|
|
|
+ info.updateLastUsedTime();
|
|
queue.add(info);
|
|
queue.add(info);
|
|
|
|
|
|
return info;
|
|
return info;
|
|
- } else {
|
|
|
|
- if (info.getState() != LifeCycleState.CLOSED) {
|
|
|
|
- // We should close this container.
|
|
|
|
- LOG.info("Moving {} to containerCloseQueue.", info.toString());
|
|
|
|
- info.setState(LifeCycleState.CLOSED);
|
|
|
|
- containerCloseQueue.add(info);
|
|
|
|
- //TODO: Next JIRA will handle these containers to close.
|
|
|
|
- //TODO: move container to right queue
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -436,13 +432,13 @@ public class ContainerStateManager {
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public List<BlockContainerInfo> getMatchingContainers(Owner owner,
|
|
|
|
|
|
+ public List<ContainerInfo> getMatchingContainers(Owner owner,
|
|
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
|
|
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
ContainerKey key = new ContainerKey(owner, type, factor, state);
|
|
lock.readLock().lock();
|
|
lock.readLock().lock();
|
|
try {
|
|
try {
|
|
- return Arrays.asList((BlockContainerInfo[]) containers.get(key)
|
|
|
|
- .toArray(new BlockContainerInfo[0]));
|
|
|
|
|
|
+ return Arrays.asList((ContainerInfo[]) containers.get(key)
|
|
|
|
+ .toArray(new ContainerInfo[0]));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.error("Could not get matching containers", e);
|
|
LOG.error("Could not get matching containers", e);
|
|
} finally {
|
|
} finally {
|
|
@@ -451,6 +447,11 @@ public class ContainerStateManager {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ //TODO: update container metadata db with actual allocated bytes values.
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Class that acts as the container Key.
|
|
* Class that acts as the container Key.
|
|
*/
|
|
*/
|