|
@@ -20,6 +20,9 @@ import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
|
+import org.apache.hadoop.ozone.lease.Lease;
|
|
|
|
+import org.apache.hadoop.ozone.lease.LeaseException;
|
|
|
|
+import org.apache.hadoop.ozone.lease.LeaseManager;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
|
@@ -28,6 +31,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
|
|
+import org.apache.hadoop.scm.ScmConfigKeys;
|
|
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
@@ -65,6 +69,7 @@ public class ContainerMapping implements Mapping {
|
|
private final MetadataStore containerStore;
|
|
private final MetadataStore containerStore;
|
|
private final PipelineSelector pipelineSelector;
|
|
private final PipelineSelector pipelineSelector;
|
|
private final ContainerStateManager containerStateManager;
|
|
private final ContainerStateManager containerStateManager;
|
|
|
|
+ private final LeaseManager<ContainerInfo> containerLeaseManager;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructs a mapping class that creates mapping between container names
|
|
* Constructs a mapping class that creates mapping between container names
|
|
@@ -105,6 +110,13 @@ public class ContainerMapping implements Mapping {
|
|
this.containerStateManager = new ContainerStateManager(conf, +this
|
|
this.containerStateManager = new ContainerStateManager(conf, +this
|
|
.cacheSize * OzoneConsts.MB);
|
|
.cacheSize * OzoneConsts.MB);
|
|
LOG.trace("Container State Manager created.");
|
|
LOG.trace("Container State Manager created.");
|
|
|
|
+
|
|
|
|
+ long containerCreationLeaseTimeout = conf.getLong(
|
|
|
|
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
|
|
|
|
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT);
|
|
|
|
+ LOG.trace("Starting Container Lease Manager.");
|
|
|
|
+ containerLeaseManager = new LeaseManager<>(containerCreationLeaseTimeout);
|
|
|
|
+ containerLeaseManager.start();
|
|
}
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
/** {@inheritDoc} */
|
|
@@ -278,6 +290,21 @@ public class ContainerMapping implements Mapping {
|
|
|
|
|
|
Preconditions.checkNotNull(containerInfo);
|
|
Preconditions.checkNotNull(containerInfo);
|
|
|
|
|
|
|
|
+ if (event == OzoneProtos.LifeCycleEvent.BEGIN_CREATE) {
|
|
|
|
+ // Acquire lease on container
|
|
|
|
+ Lease<ContainerInfo> containerLease =
|
|
|
|
+ containerLeaseManager.acquire(containerInfo);
|
|
|
|
+ // Register callback to be executed in case of timeout
|
|
|
|
+ containerLease.registerCallBack(() -> {
|
|
|
|
+ containerStateManager.updateContainerState(
|
|
|
|
+ new BlockContainerInfo(containerInfo, 0),
|
|
|
|
+ OzoneProtos.LifeCycleEvent.TIMEOUT);
|
|
|
|
+ return null;
|
|
|
|
+ });
|
|
|
|
+ } else if (event == OzoneProtos.LifeCycleEvent.COMPLETE_CREATE) {
|
|
|
|
+ // Release the lease on container
|
|
|
|
+ containerLeaseManager.release(containerInfo);
|
|
|
|
+ }
|
|
// TODO: Actual used will be updated via Container Reports later.
|
|
// TODO: Actual used will be updated via Container Reports later.
|
|
containerInfo.setState(
|
|
containerInfo.setState(
|
|
containerStateManager.updateContainerState(
|
|
containerStateManager.updateContainerState(
|
|
@@ -285,6 +312,8 @@ public class ContainerMapping implements Mapping {
|
|
|
|
|
|
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
|
|
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
|
|
return containerInfo.getState();
|
|
return containerInfo.getState();
|
|
|
|
+ } catch (LeaseException e) {
|
|
|
|
+ throw new IOException("Lease Exception.", e);
|
|
} finally {
|
|
} finally {
|
|
lock.unlock();
|
|
lock.unlock();
|
|
}
|
|
}
|
|
@@ -315,6 +344,9 @@ public class ContainerMapping implements Mapping {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void close() throws IOException {
|
|
public void close() throws IOException {
|
|
|
|
+ if (containerLeaseManager != null) {
|
|
|
|
+ containerLeaseManager.shutdown();
|
|
|
|
+ }
|
|
if (containerStore != null) {
|
|
if (containerStore != null) {
|
|
containerStore.close();
|
|
containerStore.close();
|
|
}
|
|
}
|