|
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.ratis;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Queue;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
@@ -64,7 +65,7 @@ public class OzoneManagerDoubleBuffer {
|
|
|
private final OMMetadataManager omMetadataManager;
|
|
|
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
|
|
|
private final AtomicLong flushIterations = new AtomicLong(0);
|
|
|
- private volatile boolean isRunning;
|
|
|
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
|
|
private OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
|
|
|
private long maxFlushedTransactionsInOneIteration;
|
|
|
|
|
@@ -79,7 +80,7 @@ public class OzoneManagerDoubleBuffer {
|
|
|
this.ozoneManagerDoubleBufferMetrics =
|
|
|
OzoneManagerDoubleBufferMetrics.create();
|
|
|
|
|
|
- isRunning = true;
|
|
|
+ isRunning.set(true);
|
|
|
// Daemon thread which runs in back ground and flushes transactions to DB.
|
|
|
daemon = new Daemon(this::flushTransactions);
|
|
|
daemon.setName("OMDoubleBufferFlushThread");
|
|
@@ -92,7 +93,7 @@ public class OzoneManagerDoubleBuffer {
|
|
|
* and commit to DB.
|
|
|
*/
|
|
|
private void flushTransactions() {
|
|
|
- while(isRunning) {
|
|
|
+ while (isRunning.get()) {
|
|
|
try {
|
|
|
if (canFlush()) {
|
|
|
setReadyBuffer();
|
|
@@ -140,7 +141,7 @@ public class OzoneManagerDoubleBuffer {
|
|
|
}
|
|
|
} catch (InterruptedException ex) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
- if (isRunning) {
|
|
|
+ if (isRunning.get()) {
|
|
|
final String message = "OMDoubleBuffer flush thread " +
|
|
|
Thread.currentThread().getName() + " encountered Interrupted " +
|
|
|
"exception while running";
|
|
@@ -201,11 +202,16 @@ public class OzoneManagerDoubleBuffer {
|
|
|
/**
|
|
|
* Stop OM DoubleBuffer flush thread.
|
|
|
*/
|
|
|
- public synchronized void stop() {
|
|
|
- if (isRunning) {
|
|
|
+ public void stop() {
|
|
|
+ if (isRunning.compareAndSet(true, false)) {
|
|
|
LOG.info("Stopping OMDoubleBuffer flush thread");
|
|
|
- isRunning = false;
|
|
|
daemon.interrupt();
|
|
|
+ try {
|
|
|
+ // Wait for daemon thread to exit
|
|
|
+ daemon.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.error("Interrupted while waiting for daemon to exit.");
|
|
|
+ }
|
|
|
|
|
|
// stop metrics.
|
|
|
ozoneManagerDoubleBufferMetrics.unRegister();
|