|
@@ -53,6 +53,8 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
private final CommandDispatcher commandDispatcher;
|
|
|
private long commandsHandled;
|
|
|
private AtomicLong nextHB;
|
|
|
+ private Thread stateMachineThread = null;
|
|
|
+ private Thread cmdProcessThread = null;
|
|
|
|
|
|
/**
|
|
|
* Constructs a a datanode state machine.
|
|
@@ -136,6 +138,8 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
if (now < nextHB.get()) {
|
|
|
Thread.sleep(nextHB.get() - now);
|
|
|
}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // Ignore this exception.
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Unable to finish the execution.", e);
|
|
|
}
|
|
@@ -173,6 +177,12 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
+ if (stateMachineThread != null) {
|
|
|
+ stateMachineThread.interrupt();
|
|
|
+ }
|
|
|
+ if (cmdProcessThread != null) {
|
|
|
+ cmdProcessThread.interrupt();
|
|
|
+ }
|
|
|
context.setState(DatanodeStates.getLastState());
|
|
|
executorService.shutdown();
|
|
|
try {
|
|
@@ -189,8 +199,8 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
|
|
|
- for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
|
|
- endPoint.close();
|
|
|
+ if (connectionManager != null) {
|
|
|
+ connectionManager.close();
|
|
|
}
|
|
|
|
|
|
if(container != null) {
|
|
@@ -275,11 +285,11 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
LOG.error("Unable to start the DatanodeState Machine", ex);
|
|
|
}
|
|
|
};
|
|
|
- Thread thread = new ThreadFactoryBuilder()
|
|
|
+ stateMachineThread = new ThreadFactoryBuilder()
|
|
|
.setDaemon(true)
|
|
|
.setNameFormat("Datanode State Machine Thread - %d")
|
|
|
.build().newThread(startStateMachineTask);
|
|
|
- thread.start();
|
|
|
+ stateMachineThread.start();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -344,7 +354,7 @@ public class DatanodeStateMachine implements Closeable {
|
|
|
};
|
|
|
|
|
|
// We will have only one thread for command processing in a datanode.
|
|
|
- Thread cmdProcessThread = new Thread(processCommandQueue);
|
|
|
+ cmdProcessThread = new Thread(processCommandQueue);
|
|
|
cmdProcessThread.setDaemon(true);
|
|
|
cmdProcessThread.setName("Command processor thread");
|
|
|
cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
|