|
@@ -36,8 +36,6 @@ import java.util.SortedSet;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
|
-import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -73,7 +71,6 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
import org.apache.hadoop.util.Preconditions;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
@@ -103,8 +100,6 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
volatile long lastCacheReport = 0;
|
|
volatile long lastCacheReport = 0;
|
|
private final Scheduler scheduler;
|
|
private final Scheduler scheduler;
|
|
- private final Object sendIBRLock;
|
|
|
|
- private final ExecutorService ibrExecutorService;
|
|
|
|
|
|
|
|
Thread bpThread;
|
|
Thread bpThread;
|
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
|
@@ -161,10 +156,6 @@ class BPServiceActor implements Runnable {
|
|
}
|
|
}
|
|
commandProcessingThread = new CommandProcessingThread(this);
|
|
commandProcessingThread = new CommandProcessingThread(this);
|
|
commandProcessingThread.start();
|
|
commandProcessingThread.start();
|
|
- sendIBRLock = new Object();
|
|
|
|
- ibrExecutorService = Executors.newSingleThreadExecutor(
|
|
|
|
- new ThreadFactoryBuilder().setDaemon(true)
|
|
|
|
- .setNameFormat("ibr-executor-%d").build());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public DatanodeRegistration getBpRegistration() {
|
|
public DatanodeRegistration getBpRegistration() {
|
|
@@ -397,10 +388,8 @@ class BPServiceActor implements Runnable {
|
|
// we have a chance that we will miss the delHint information
|
|
// we have a chance that we will miss the delHint information
|
|
// or we will report an RBW replica after the BlockReport already reports
|
|
// or we will report an RBW replica after the BlockReport already reports
|
|
// a FINALIZED one.
|
|
// a FINALIZED one.
|
|
- synchronized (sendIBRLock) {
|
|
|
|
- ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
|
|
|
- bpos.getBlockPoolId(), getRpcMetricSuffix());
|
|
|
|
- }
|
|
|
|
|
|
+ ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
|
|
|
+ bpos.getBlockPoolId(), getRpcMetricSuffix());
|
|
|
|
|
|
long brCreateStartTime = monotonicNow();
|
|
long brCreateStartTime = monotonicNow();
|
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
@@ -633,9 +622,6 @@ class BPServiceActor implements Runnable {
|
|
if (commandProcessingThread != null) {
|
|
if (commandProcessingThread != null) {
|
|
commandProcessingThread.interrupt();
|
|
commandProcessingThread.interrupt();
|
|
}
|
|
}
|
|
- if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
|
|
|
|
- ibrExecutorService.shutdownNow();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
//This must be called only by blockPoolManager
|
|
//This must be called only by blockPoolManager
|
|
@@ -650,18 +636,13 @@ class BPServiceActor implements Runnable {
|
|
} catch (InterruptedException ie) { }
|
|
} catch (InterruptedException ie) { }
|
|
}
|
|
}
|
|
|
|
|
|
- // Cleanup method to be called by current thread before exiting.
|
|
|
|
- // Any Thread / ExecutorService started by BPServiceActor can be shutdown
|
|
|
|
- // here.
|
|
|
|
|
|
+ //Cleanup method to be called by current thread before exiting.
|
|
private synchronized void cleanUp() {
|
|
private synchronized void cleanUp() {
|
|
|
|
|
|
shouldServiceRun = false;
|
|
shouldServiceRun = false;
|
|
IOUtils.cleanupWithLogger(null, bpNamenode);
|
|
IOUtils.cleanupWithLogger(null, bpNamenode);
|
|
IOUtils.cleanupWithLogger(null, lifelineSender);
|
|
IOUtils.cleanupWithLogger(null, lifelineSender);
|
|
bpos.shutdownActor(this);
|
|
bpos.shutdownActor(this);
|
|
- if (!ibrExecutorService.isShutdown()) {
|
|
|
|
- ibrExecutorService.shutdownNow();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
|
|
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
|
|
@@ -757,6 +738,11 @@ class BPServiceActor implements Runnable {
|
|
isSlownode = resp.getIsSlownode();
|
|
isSlownode = resp.getIsSlownode();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if (!dn.areIBRDisabledForTests() &&
|
|
|
|
+ (ibrManager.sendImmediately()|| sendHeartbeat)) {
|
|
|
|
+ ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
|
|
|
+ bpos.getBlockPoolId(), getRpcMetricSuffix());
|
|
|
|
+ }
|
|
|
|
|
|
List<DatanodeCommand> cmds = null;
|
|
List<DatanodeCommand> cmds = null;
|
|
boolean forceFullBr =
|
|
boolean forceFullBr =
|
|
@@ -923,10 +909,6 @@ class BPServiceActor implements Runnable {
|
|
initialRegistrationComplete.countDown();
|
|
initialRegistrationComplete.countDown();
|
|
}
|
|
}
|
|
|
|
|
|
- // IBR tasks to be handled separately from offerService() in order to
|
|
|
|
- // improve performance of offerService(), which can now focus only on
|
|
|
|
- // FBR and heartbeat.
|
|
|
|
- ibrExecutorService.submit(new IBRTaskHandler());
|
|
|
|
while (shouldRun()) {
|
|
while (shouldRun()) {
|
|
try {
|
|
try {
|
|
offerService();
|
|
offerService();
|
|
@@ -1159,34 +1141,6 @@ class BPServiceActor implements Runnable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- class IBRTaskHandler implements Runnable {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- LOG.info("Starting IBR Task Handler.");
|
|
|
|
- while (shouldRun()) {
|
|
|
|
- try {
|
|
|
|
- final long startTime = scheduler.monotonicNow();
|
|
|
|
- final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
|
|
|
|
- if (!dn.areIBRDisabledForTests() &&
|
|
|
|
- (ibrManager.sendImmediately() || sendHeartbeat)) {
|
|
|
|
- synchronized (sendIBRLock) {
|
|
|
|
- ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
|
|
|
- bpos.getBlockPoolId(), getRpcMetricSuffix());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- // There is no work to do; sleep until heartbeat timer elapses,
|
|
|
|
- // or work arrives, and then iterate again.
|
|
|
|
- ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
|
|
|
|
- } catch (Throwable t) {
|
|
|
|
- LOG.error("Exception in IBRTaskHandler.", t);
|
|
|
|
- sleepAndLogInterrupts(5000, "offering IBR service");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Utility class that wraps the timestamp computations for scheduling
|
|
* Utility class that wraps the timestamp computations for scheduling
|
|
* heartbeats and block reports.
|
|
* heartbeats and block reports.
|