|
@@ -21,12 +21,13 @@ package org.apache.hadoop.hdds.scm.container;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.protobuf.GeneratedMessage;
|
|
|
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hdds.conf.ConfigType;
|
|
|
+import org.apache.hadoop.hdds.conf.ConfigGroup;
|
|
|
+import org.apache.hadoop.hdds.conf.Config;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
|
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
|
|
|
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
|
|
.ContainerPlacementPolicy;
|
|
|
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
|
@@ -108,15 +109,9 @@ public class ReplicationManager {
|
|
|
private final Thread replicationMonitor;
|
|
|
|
|
|
/**
|
|
|
- * The frequency in which ReplicationMonitor thread should run.
|
|
|
+ * ReplicationManager specific configuration.
|
|
|
*/
|
|
|
- private final long interval;
|
|
|
-
|
|
|
- /**
|
|
|
- * Timeout for container replication & deletion command issued by
|
|
|
- * ReplicationManager.
|
|
|
- */
|
|
|
- private final long eventTimeout;
|
|
|
+ private final ReplicationManagerConfiguration conf;
|
|
|
|
|
|
/**
|
|
|
* Flag used for checking if the ReplicationMonitor thread is running or
|
|
@@ -132,27 +127,21 @@ public class ReplicationManager {
|
|
|
* @param containerPlacement ContainerPlacementPolicy
|
|
|
* @param eventPublisher EventPublisher
|
|
|
*/
|
|
|
- public ReplicationManager(final Configuration conf,
|
|
|
+ public ReplicationManager(final ReplicationManagerConfiguration conf,
|
|
|
final ContainerManager containerManager,
|
|
|
final ContainerPlacementPolicy containerPlacement,
|
|
|
- final EventPublisher eventPublisher) {
|
|
|
+ final EventPublisher eventPublisher,
|
|
|
+ final LockManager lockManager) {
|
|
|
this.containerManager = containerManager;
|
|
|
this.containerPlacement = containerPlacement;
|
|
|
this.eventPublisher = eventPublisher;
|
|
|
- this.lockManager = new LockManager<>(conf);
|
|
|
+ this.lockManager = lockManager;
|
|
|
this.inflightReplication = new HashMap<>();
|
|
|
this.inflightDeletion = new HashMap<>();
|
|
|
this.replicationMonitor = new Thread(this::run);
|
|
|
this.replicationMonitor.setName("ReplicationMonitor");
|
|
|
this.replicationMonitor.setDaemon(true);
|
|
|
- this.interval = conf.getTimeDuration(
|
|
|
- ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL,
|
|
|
- ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT,
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
- this.eventTimeout = conf.getTimeDuration(
|
|
|
- ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT,
|
|
|
- ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT,
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
+ this.conf = conf;
|
|
|
this.running = false;
|
|
|
}
|
|
|
|
|
@@ -217,7 +206,7 @@ public class ReplicationManager {
|
|
|
" processing {} containers.", Time.monotonicNow() - start,
|
|
|
containerIds.size());
|
|
|
|
|
|
- wait(interval);
|
|
|
+ wait(conf.getInterval());
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
// When we get runtime exception, we should terminate SCM.
|
|
@@ -337,7 +326,7 @@ public class ReplicationManager {
|
|
|
final Map<ContainerID, List<InflightAction>> inflightActions,
|
|
|
final Predicate<InflightAction> filter) {
|
|
|
final ContainerID id = container.containerID();
|
|
|
- final long deadline = Time.monotonicNow() - eventTimeout;
|
|
|
+ final long deadline = Time.monotonicNow() - conf.getEventTimeout();
|
|
|
if (inflightActions.containsKey(id)) {
|
|
|
final List<InflightAction> actions = inflightActions.get(id);
|
|
|
actions.removeIf(action -> action.time < deadline);
|
|
@@ -754,4 +743,41 @@ public class ReplicationManager {
|
|
|
this.time = time;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configuration used by the Replication Manager.
|
|
|
+ */
|
|
|
+ @ConfigGroup(prefix = "hdds.scm.replication")
|
|
|
+ public static class ReplicationManagerConfiguration {
|
|
|
+ /**
|
|
|
+ * The frequency in which ReplicationMonitor thread should run.
|
|
|
+ */
|
|
|
+ private long interval = 5 * 60 * 1000;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Timeout for container replication & deletion command issued by
|
|
|
+ * ReplicationManager.
|
|
|
+ */
|
|
|
+ private long eventTimeout = 10 * 60 * 1000;
|
|
|
+
|
|
|
+ @Config(key = "thread.interval", type = ConfigType.TIME, timeUnit =
|
|
|
+ TimeUnit.MILLISECONDS)
|
|
|
+ public void setInterval(long interval) {
|
|
|
+ this.interval = interval;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Config(key = "event.timeout", type = ConfigType.TIME, timeUnit =
|
|
|
+ TimeUnit.MILLISECONDS)
|
|
|
+ public void setEventTimeout(long eventTimeout) {
|
|
|
+ this.eventTimeout = eventTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getInterval() {
|
|
|
+ return interval;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getEventTimeout() {
|
|
|
+ return eventTimeout;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|