|
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
|
|
|
+import java.io.Closeable;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
@@ -28,6 +29,7 @@ import java.util.Collection;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
@@ -103,14 +106,20 @@ class BPServiceActor implements Runnable {
|
|
|
final LinkedList<BPServiceActorAction> bpThreadQueue
|
|
|
= new LinkedList<BPServiceActorAction>();
|
|
|
|
|
|
- BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
|
|
|
+ BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
|
|
|
+ BPOfferService bpos) {
|
|
|
this.bpos = bpos;
|
|
|
this.dn = bpos.getDataNode();
|
|
|
this.nnAddr = nnAddr;
|
|
|
+ this.lifelineSender = lifelineNnAddr != null ?
|
|
|
+ new LifelineSender(lifelineNnAddr) : null;
|
|
|
+ this.initialRegistrationComplete = lifelineNnAddr != null ?
|
|
|
+ new CountDownLatch(1) : null;
|
|
|
this.dnConf = dn.getDnConf();
|
|
|
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
|
|
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
|
|
- scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
|
|
|
+ scheduler = new Scheduler(dnConf.heartBeatInterval,
|
|
|
+ dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
|
|
|
}
|
|
|
|
|
|
public DatanodeRegistration getBpRegistration() {
|
|
@@ -138,6 +147,9 @@ class BPServiceActor implements Runnable {
|
|
|
return nnAddr;
|
|
|
}
|
|
|
|
|
|
+ private final CountDownLatch initialRegistrationComplete;
|
|
|
+ private final LifelineSender lifelineSender;
|
|
|
+
|
|
|
/**
|
|
|
* Used to inject a spy NN in the unit tests.
|
|
|
*/
|
|
@@ -151,6 +163,20 @@ class BPServiceActor implements Runnable {
|
|
|
return bpNamenode;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Used to inject a spy NN in the unit tests.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ void setLifelineNameNode(
|
|
|
+ DatanodeLifelineProtocolClientSideTranslatorPB dnLifelineProtocol) {
|
|
|
+ lifelineSender.lifelineNamenode = dnLifelineProtocol;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() {
|
|
|
+ return lifelineSender.lifelineNamenode;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Perform the first part of the handshake with the NameNode.
|
|
|
* This calls <code>versionRequest</code> to determine the NN's
|
|
@@ -420,29 +446,39 @@ class BPServiceActor implements Runnable {
|
|
|
//Thread is started already
|
|
|
return;
|
|
|
}
|
|
|
- bpThread = new Thread(this, formatThreadName());
|
|
|
+ bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
|
|
|
bpThread.setDaemon(true); // needed for JUnit testing
|
|
|
bpThread.start();
|
|
|
+
|
|
|
+ if (lifelineSender != null) {
|
|
|
+ lifelineSender.start();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private String formatThreadName() {
|
|
|
+ private String formatThreadName(String action, InetSocketAddress addr) {
|
|
|
Collection<StorageLocation> dataDirs =
|
|
|
DataNode.getStorageLocations(dn.getConf());
|
|
|
- return "DataNode: [" + dataDirs.toString() + "] " +
|
|
|
- " heartbeating to " + nnAddr;
|
|
|
+ return "DataNode: [" + dataDirs.toString() + "] " +
|
|
|
+ action + " to " + addr;
|
|
|
}
|
|
|
|
|
|
//This must be called only by blockPoolManager.
|
|
|
void stop() {
|
|
|
shouldServiceRun = false;
|
|
|
+ if (lifelineSender != null) {
|
|
|
+ lifelineSender.stop();
|
|
|
+ }
|
|
|
if (bpThread != null) {
|
|
|
- bpThread.interrupt();
|
|
|
+ bpThread.interrupt();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//This must be called only by blockPoolManager
|
|
|
void join() {
|
|
|
try {
|
|
|
+ if (lifelineSender != null) {
|
|
|
+ lifelineSender.join();
|
|
|
+ }
|
|
|
if (bpThread != null) {
|
|
|
bpThread.join();
|
|
|
}
|
|
@@ -454,6 +490,7 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
shouldServiceRun = false;
|
|
|
IOUtils.cleanup(null, bpNamenode);
|
|
|
+ IOUtils.cleanup(null, lifelineSender);
|
|
|
bpos.shutdownActor(this);
|
|
|
}
|
|
|
|
|
@@ -480,7 +517,9 @@ class BPServiceActor implements Runnable {
|
|
|
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
|
|
|
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
|
|
|
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
|
|
|
- + "; heartBeatInterval=" + dnConf.heartBeatInterval);
|
|
|
+ + "; heartBeatInterval=" + dnConf.heartBeatInterval
|
|
|
+ + (lifelineSender != null ?
|
|
|
+ "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
|
|
|
long fullBlockReportLeaseId = 0;
|
|
|
|
|
|
//
|
|
@@ -684,6 +723,9 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
|
|
|
runningState = RunningState.RUNNING;
|
|
|
+ if (initialRegistrationComplete != null) {
|
|
|
+ initialRegistrationComplete.countDown();
|
|
|
+ }
|
|
|
|
|
|
while (shouldRun()) {
|
|
|
try {
|
|
@@ -797,6 +839,135 @@ class BPServiceActor implements Runnable {
|
|
|
return scheduler;
|
|
|
}
|
|
|
|
|
|
+ private final class LifelineSender implements Runnable, Closeable {
|
|
|
+
|
|
|
+ private final InetSocketAddress lifelineNnAddr;
|
|
|
+ private Thread lifelineThread;
|
|
|
+ private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
|
|
|
+
|
|
|
+ public LifelineSender(InetSocketAddress lifelineNnAddr) {
|
|
|
+ this.lifelineNnAddr = lifelineNnAddr;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() {
|
|
|
+ stop();
|
|
|
+ try {
|
|
|
+ join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ IOUtils.cleanup(null, lifelineNamenode);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // The lifeline RPC depends on registration with the NameNode, so wait for
|
|
|
+ // initial registration to complete.
|
|
|
+ while (shouldRun()) {
|
|
|
+ try {
|
|
|
+ initialRegistrationComplete.await();
|
|
|
+ break;
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // The only way thread interruption can happen while waiting on this
|
|
|
+ // latch is if the state of the actor has been updated to signal
|
|
|
+ // shutdown. The next loop's call to shouldRun() will return false,
|
|
|
+ // and the thread will finish.
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // After initial NameNode registration has completed, execute the main
|
|
|
+ // loop for sending periodic lifeline RPCs if needed. This is done in a
|
|
|
+ // second loop to avoid a pointless wait on the above latch in every
|
|
|
+ // iteration of the main loop.
|
|
|
+ while (shouldRun()) {
|
|
|
+ try {
|
|
|
+ if (lifelineNamenode == null) {
|
|
|
+ lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
|
|
|
+ }
|
|
|
+ sendLifelineIfDue();
|
|
|
+ Thread.sleep(scheduler.getLifelineWaitTime());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
|
|
|
+ e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() {
|
|
|
+ lifelineThread = new Thread(this, formatThreadName("lifeline",
|
|
|
+ lifelineNnAddr));
|
|
|
+ lifelineThread.setDaemon(true);
|
|
|
+ lifelineThread.setUncaughtExceptionHandler(
|
|
|
+ new Thread.UncaughtExceptionHandler() {
|
|
|
+ @Override
|
|
|
+ public void uncaughtException(Thread thread, Throwable t) {
|
|
|
+ LOG.error(thread + " terminating on unexpected exception", t);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ lifelineThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ if (lifelineThread != null) {
|
|
|
+ lifelineThread.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void join() throws InterruptedException {
|
|
|
+ if (lifelineThread != null) {
|
|
|
+ lifelineThread.join();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendLifelineIfDue() throws IOException {
|
|
|
+ long startTime = scheduler.monotonicNow();
|
|
|
+ if (!scheduler.isLifelineDue(startTime)) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
|
|
|
+ + ", because it is not due.");
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (dn.areHeartbeatsDisabledForTests()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
|
|
|
+ + ", because heartbeats are disabled for tests.");
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ sendLifeline();
|
|
|
+ dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
|
|
|
+ scheduler.scheduleNextLifeline(scheduler.monotonicNow());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendLifeline() throws IOException {
|
|
|
+ StorageReport[] reports =
|
|
|
+ dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sending lifeline with " + reports.length + " storage " +
|
|
|
+ " reports from service actor: " + BPServiceActor.this);
|
|
|
+ }
|
|
|
+ VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
|
|
|
+ .getVolumeFailureSummary();
|
|
|
+ int numFailedVolumes = volumeFailureSummary != null ?
|
|
|
+ volumeFailureSummary.getFailedStorageLocations().length : 0;
|
|
|
+ lifelineNamenode.sendLifeline(bpRegistration,
|
|
|
+ reports,
|
|
|
+ dn.getFSDataset().getCacheCapacity(),
|
|
|
+ dn.getFSDataset().getCacheUsed(),
|
|
|
+ dn.getXmitsInProgress(),
|
|
|
+ dn.getXceiverCount(),
|
|
|
+ numFailedVolumes,
|
|
|
+ volumeFailureSummary);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Utility class that wraps the timestamp computations for scheduling
|
|
|
* heartbeats and block reports.
|
|
@@ -811,6 +982,9 @@ class BPServiceActor implements Runnable {
|
|
|
@VisibleForTesting
|
|
|
volatile long nextHeartbeatTime = monotonicNow();
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ volatile long nextLifelineTime = monotonicNow();
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
boolean resetBlockReportTime = true;
|
|
|
|
|
@@ -818,10 +992,13 @@ class BPServiceActor implements Runnable {
|
|
|
new AtomicBoolean(false);
|
|
|
|
|
|
private final long heartbeatIntervalMs;
|
|
|
+ private final long lifelineIntervalMs;
|
|
|
private final long blockReportIntervalMs;
|
|
|
|
|
|
- Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
|
|
|
+ Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
|
|
|
+ long blockReportIntervalMs) {
|
|
|
this.heartbeatIntervalMs = heartbeatIntervalMs;
|
|
|
+ this.lifelineIntervalMs = lifelineIntervalMs;
|
|
|
this.blockReportIntervalMs = blockReportIntervalMs;
|
|
|
}
|
|
|
|
|
@@ -835,19 +1012,31 @@ class BPServiceActor implements Runnable {
|
|
|
// Blockreport.
|
|
|
long scheduleHeartbeat() {
|
|
|
nextHeartbeatTime = monotonicNow();
|
|
|
+ scheduleNextLifeline(nextHeartbeatTime);
|
|
|
return nextHeartbeatTime;
|
|
|
}
|
|
|
|
|
|
long scheduleNextHeartbeat() {
|
|
|
// Numerical overflow is possible here and is okay.
|
|
|
nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
|
|
|
+ scheduleNextLifeline(nextHeartbeatTime);
|
|
|
return nextHeartbeatTime;
|
|
|
}
|
|
|
|
|
|
+ long scheduleNextLifeline(long baseTime) {
|
|
|
+ // Numerical overflow is possible here and is okay.
|
|
|
+ nextLifelineTime = baseTime + lifelineIntervalMs;
|
|
|
+ return nextLifelineTime;
|
|
|
+ }
|
|
|
+
|
|
|
boolean isHeartbeatDue(long startTime) {
|
|
|
return (nextHeartbeatTime - startTime <= 0);
|
|
|
}
|
|
|
|
|
|
+ boolean isLifelineDue(long startTime) {
|
|
|
+ return (nextLifelineTime - startTime <= 0);
|
|
|
+ }
|
|
|
+
|
|
|
boolean isBlockReportDue(long curTime) {
|
|
|
return nextBlockReportTime - curTime <= 0;
|
|
|
}
|
|
@@ -903,6 +1092,10 @@ class BPServiceActor implements Runnable {
|
|
|
return nextHeartbeatTime - monotonicNow();
|
|
|
}
|
|
|
|
|
|
+ long getLifelineWaitTime() {
|
|
|
+ return nextLifelineTime - monotonicNow();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Wrapped for testing.
|
|
|
* @return
|