|
@@ -105,15 +105,15 @@ public class ReplicationManager {
|
|
|
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
|
|
|
|
|
|
/**
|
|
|
- * ReplicationMonitor thread is the one which wakes up at configured
|
|
|
- * interval and processes all the containers.
|
|
|
+ * ReplicationManager specific configuration.
|
|
|
*/
|
|
|
- private final Thread replicationMonitor;
|
|
|
+ private final ReplicationManagerConfiguration conf;
|
|
|
|
|
|
/**
|
|
|
- * ReplicationManager specific configuration.
|
|
|
+ * ReplicationMonitor thread is the one which wakes up at configured
|
|
|
+ * interval and processes all the containers.
|
|
|
*/
|
|
|
- private final ReplicationManagerConfiguration conf;
|
|
|
+ private Thread replicationMonitor;
|
|
|
|
|
|
/**
|
|
|
* Flag used for checking if the ReplicationMonitor thread is running or
|
|
@@ -132,28 +132,28 @@ public class ReplicationManager {
|
|
|
public ReplicationManager(final ReplicationManagerConfiguration conf,
|
|
|
final ContainerManager containerManager,
|
|
|
final ContainerPlacementPolicy containerPlacement,
|
|
|
- final EventPublisher eventPublisher,
|
|
|
- final LockManager lockManager) {
|
|
|
+ final EventPublisher eventPublisher,
|
|
|
+ final LockManager<ContainerID> lockManager) {
|
|
|
this.containerManager = containerManager;
|
|
|
this.containerPlacement = containerPlacement;
|
|
|
this.eventPublisher = eventPublisher;
|
|
|
this.lockManager = lockManager;
|
|
|
- this.inflightReplication = new HashMap<>();
|
|
|
- this.inflightDeletion = new HashMap<>();
|
|
|
- this.replicationMonitor = new Thread(this::run);
|
|
|
- this.replicationMonitor.setName("ReplicationMonitor");
|
|
|
- this.replicationMonitor.setDaemon(true);
|
|
|
this.conf = conf;
|
|
|
this.running = false;
|
|
|
+ this.inflightReplication = new HashMap<>();
|
|
|
+ this.inflightDeletion = new HashMap<>();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Starts Replication Monitor thread.
|
|
|
*/
|
|
|
public synchronized void start() {
|
|
|
- if (!running) {
|
|
|
+ if (!isRunning()) {
|
|
|
LOG.info("Starting Replication Monitor Thread.");
|
|
|
running = true;
|
|
|
+ replicationMonitor = new Thread(this::run);
|
|
|
+ replicationMonitor.setName("ReplicationMonitor");
|
|
|
+ replicationMonitor.setDaemon(true);
|
|
|
replicationMonitor.start();
|
|
|
} else {
|
|
|
LOG.info("Replication Monitor Thread is already running.");
|
|
@@ -166,7 +166,13 @@ public class ReplicationManager {
|
|
|
* @return true if running, false otherwise
|
|
|
*/
|
|
|
public boolean isRunning() {
|
|
|
- return replicationMonitor.isAlive();
|
|
|
+ if (!running) {
|
|
|
+ synchronized (this) {
|
|
|
+ return replicationMonitor != null
|
|
|
+ && replicationMonitor.isAlive();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -185,6 +191,8 @@ public class ReplicationManager {
|
|
|
public synchronized void stop() {
|
|
|
if (running) {
|
|
|
LOG.info("Stopping Replication Monitor Thread.");
|
|
|
+ inflightReplication.clear();
|
|
|
+ inflightDeletion.clear();
|
|
|
running = false;
|
|
|
notify();
|
|
|
} else {
|