|
@@ -170,6 +170,9 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+
|
|
|
|
|
|
/**********************************************************
|
|
|
* DataNode is a class (and program) that stores a set of
|
|
@@ -675,15 +678,27 @@ public class DataNode extends Configured
|
|
|
@InterfaceAudience.Private
|
|
|
static class BPOfferService implements Runnable {
|
|
|
final InetSocketAddress nnAddr;
|
|
|
- DatanodeRegistration bpRegistration;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Information about the namespace that this service
|
|
|
+ * is registering with. This is assigned after
|
|
|
+ * the first phase of the handshake.
|
|
|
+ */
|
|
|
NamespaceInfo bpNSInfo;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The registration information for this block pool.
|
|
|
+ * This is assigned after the second phase of the
|
|
|
+ * handshake.
|
|
|
+ */
|
|
|
+ DatanodeRegistration bpRegistration;
|
|
|
+
|
|
|
long lastBlockReport = 0;
|
|
|
|
|
|
boolean resetBlockReportTime = true;
|
|
|
|
|
|
private Thread bpThread;
|
|
|
private DatanodeProtocol bpNamenode;
|
|
|
- private String blockPoolId;
|
|
|
private long lastHeartbeat = 0;
|
|
|
private volatile boolean initialized = false;
|
|
|
private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
@@ -695,7 +710,6 @@ public class DataNode extends Configured
|
|
|
|
|
|
BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
|
|
|
this.dn = dn;
|
|
|
- this.bpRegistration = dn.createRegistration();
|
|
|
this.nnAddr = nnAddr;
|
|
|
this.dnConf = dn.getDnConf();
|
|
|
}
|
|
@@ -705,7 +719,7 @@ public class DataNode extends Configured
|
|
|
* and has registered with the corresponding namenode
|
|
|
* @return true if initialized
|
|
|
*/
|
|
|
- public boolean initialized() {
|
|
|
+ public boolean isInitialized() {
|
|
|
return initialized;
|
|
|
}
|
|
|
|
|
@@ -714,41 +728,67 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
public String getBlockPoolId() {
|
|
|
- return blockPoolId;
|
|
|
+ if (bpNSInfo != null) {
|
|
|
+ return bpNSInfo.getBlockPoolID();
|
|
|
+ } else {
|
|
|
+ LOG.warn("Block pool ID needed, but service not yet registered with NN",
|
|
|
+ new Exception("trace"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public NamespaceInfo getNamespaceInfo() {
|
|
|
+ return bpNSInfo;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String toString() {
|
|
|
+ if (bpNSInfo == null) {
|
|
|
+ // If we haven't yet connected to our NN, we don't yet know our
|
|
|
+ // own block pool ID.
|
|
|
+ // If _none_ of the block pools have connected yet, we don't even
|
|
|
+ // know the storage ID of this DN.
|
|
|
+ String storageId = dn.getStorageId();
|
|
|
+ if (storageId == null || "".equals(storageId)) {
|
|
|
+ storageId = "unknown";
|
|
|
+ }
|
|
|
+ return "Block pool <registering> (storage id " + storageId +
|
|
|
+ ") connecting to " + nnAddr;
|
|
|
+ } else {
|
|
|
+ return "Block pool " + getBlockPoolId() +
|
|
|
+ " (storage id " + dn.getStorageId() +
|
|
|
+ ") registered with " + nnAddr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private InetSocketAddress getNNSocketAddress() {
|
|
|
return nnAddr;
|
|
|
}
|
|
|
|
|
|
- void setNamespaceInfo(NamespaceInfo nsinfo) {
|
|
|
- bpNSInfo = nsinfo;
|
|
|
- this.blockPoolId = nsinfo.getBlockPoolID();
|
|
|
- }
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Used to inject a spy NN in the unit tests.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
void setNameNode(DatanodeProtocol dnProtocol) {
|
|
|
- bpNamenode = dnProtocol;
|
|
|
+ bpNamenode = dnProtocol;
|
|
|
}
|
|
|
|
|
|
- private NamespaceInfo handshake() throws IOException {
|
|
|
- NamespaceInfo nsInfo = new NamespaceInfo();
|
|
|
- while (dn.shouldRun && shouldServiceRun) {
|
|
|
+ /**
|
|
|
+ * Perform the first part of the handshake with the NameNode.
|
|
|
+ * This calls <code>versionRequest</code> to determine the NN's
|
|
|
+ * namespace and version info. It automatically retries until
|
|
|
+ * the NN responds or the DN is shutting down.
|
|
|
+ *
|
|
|
+ * @return the NamespaceInfo
|
|
|
+ * @throws IncorrectVersionException if the remote NN does not match
|
|
|
+ * this DN's version
|
|
|
+ */
|
|
|
+ NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
|
|
|
+ NamespaceInfo nsInfo = null;
|
|
|
+ while (shouldRun()) {
|
|
|
try {
|
|
|
nsInfo = bpNamenode.versionRequest();
|
|
|
- // verify build version
|
|
|
- String nsVer = nsInfo.getBuildVersion();
|
|
|
- String stVer = Storage.getBuildVersion();
|
|
|
- LOG.info("handshake: namespace info = " + nsInfo);
|
|
|
-
|
|
|
- if(! nsVer.equals(stVer)) {
|
|
|
- String errorMsg = "Incompatible build versions: bp = " + blockPoolId +
|
|
|
- "namenode BV = " + nsVer + "; datanode BV = " + stVer;
|
|
|
- LOG.warn(errorMsg);
|
|
|
- bpNamenode.errorReport( bpRegistration,
|
|
|
- DatanodeProtocol.NOTIFY, errorMsg );
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
+ LOG.debug(this + " received versionRequest response: " + nsInfo);
|
|
|
+ break;
|
|
|
} catch(SocketTimeoutException e) { // namenode is busy
|
|
|
LOG.warn("Problem connecting to server: " + nnAddr);
|
|
|
} catch(IOException e ) { // namenode is not available
|
|
@@ -756,40 +796,53 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
// try again in a second
|
|
|
- try {
|
|
|
- Thread.sleep(5000);
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ sleepAndLogInterrupts(5000, "requesting version info from NN");
|
|
|
}
|
|
|
|
|
|
- assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
|
|
|
- "Data-node and name-node layout versions must be the same."
|
|
|
- + "Expected: "+ HdfsConstants.LAYOUT_VERSION
|
|
|
- + " actual "+ nsInfo.getLayoutVersion();
|
|
|
+ if (nsInfo != null) {
|
|
|
+ checkNNVersion(nsInfo);
|
|
|
+ }
|
|
|
return nsInfo;
|
|
|
}
|
|
|
|
|
|
- void setupBP(Configuration conf)
|
|
|
- throws IOException {
|
|
|
+ private void checkNNVersion(NamespaceInfo nsInfo)
|
|
|
+ throws IncorrectVersionException {
|
|
|
+ // build and layout versions should match
|
|
|
+ String nsBuildVer = nsInfo.getBuildVersion();
|
|
|
+ String stBuildVer = Storage.getBuildVersion();
|
|
|
+ if (!nsBuildVer.equals(stBuildVer)) {
|
|
|
+ LOG.warn("Data-node and name-node Build versions must be the same. " +
|
|
|
+ "Namenode build version: " + nsBuildVer + "Datanode " +
|
|
|
+ "build version: " + stBuildVer);
|
|
|
+ throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
|
|
|
+ LOG.warn("Data-node and name-node layout versions must be the same." +
|
|
|
+ " Expected: "+ HdfsConstants.LAYOUT_VERSION +
|
|
|
+ " actual "+ bpNSInfo.getLayoutVersion());
|
|
|
+ throw new IncorrectVersionException(
|
|
|
+ bpNSInfo.getLayoutVersion(), "namenode");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void connectToNNAndHandshake() throws IOException {
|
|
|
// get NN proxy
|
|
|
- DatanodeProtocol dnp =
|
|
|
+ bpNamenode =
|
|
|
(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
|
|
|
- DatanodeProtocol.versionID, nnAddr, conf);
|
|
|
- setNameNode(dnp);
|
|
|
+ DatanodeProtocol.versionID, nnAddr, dn.getConf());
|
|
|
|
|
|
- // handshake with NN
|
|
|
- NamespaceInfo nsInfo = handshake();
|
|
|
- setNamespaceInfo(nsInfo);
|
|
|
- dn.initBlockPool(this, nsInfo);
|
|
|
+ // First phase of the handshake with NN - get the namespace
|
|
|
+ // info.
|
|
|
+ bpNSInfo = retrieveNamespaceInfo();
|
|
|
|
|
|
- bpRegistration.setStorageID(dn.getStorageId());
|
|
|
- StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
|
|
|
- if (storageInfo == null) {
|
|
|
- // it's null in the case of SimulatedDataSet
|
|
|
- bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
- bpRegistration.setStorageInfo(nsInfo);
|
|
|
- } else {
|
|
|
- bpRegistration.setStorageInfo(storageInfo);
|
|
|
- }
|
|
|
+ // Now that we know the namespace ID, etc, we can pass this to the DN.
|
|
|
+ // The DN can now initialize its local storage if we are the
|
|
|
+ // first BP to handshake, etc.
|
|
|
+ dn.initBlockPool(this);
|
|
|
+
|
|
|
+ // Second phase of the handshake with the NN.
|
|
|
+ register();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -850,7 +903,7 @@ public class DataNode extends Configured
|
|
|
if(delHintArray == null || delHintArray.length != blockArray.length ) {
|
|
|
LOG.warn("Panic: block array & delHintArray are not the same" );
|
|
|
}
|
|
|
- bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
|
|
|
+ bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
|
|
|
delHintArray);
|
|
|
synchronized(receivedBlockList) {
|
|
|
synchronized(delHints){
|
|
@@ -874,9 +927,9 @@ public class DataNode extends Configured
|
|
|
block==null?"Block is null":"delHint is null");
|
|
|
}
|
|
|
|
|
|
- if (!block.getBlockPoolId().equals(blockPoolId)) {
|
|
|
+ if (!block.getBlockPoolId().equals(getBlockPoolId())) {
|
|
|
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
|
|
|
- " vs. " + blockPoolId);
|
|
|
+ " vs. " + getBlockPoolId());
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -902,11 +955,11 @@ public class DataNode extends Configured
|
|
|
|
|
|
// Create block report
|
|
|
long brCreateStartTime = now();
|
|
|
- BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
|
|
|
+ BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
|
|
|
|
|
|
// Send block report
|
|
|
long brSendStartTime = now();
|
|
|
- cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
|
|
|
+ cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
|
|
|
.getBlockListAsLongs());
|
|
|
|
|
|
// Log the block report processing stats from Datanode perspective
|
|
@@ -943,7 +996,7 @@ public class DataNode extends Configured
|
|
|
dn.data.getCapacity(),
|
|
|
dn.data.getDfsUsed(),
|
|
|
dn.data.getRemaining(),
|
|
|
- dn.data.getBlockPoolUsed(blockPoolId),
|
|
|
+ dn.data.getBlockPoolUsed(getBlockPoolId()),
|
|
|
dn.xmitsInProgress.get(),
|
|
|
dn.getXceiverCount(), dn.data.getNumFailedVolumes());
|
|
|
}
|
|
@@ -999,7 +1052,7 @@ public class DataNode extends Configured
|
|
|
//
|
|
|
// Now loop for a long time....
|
|
|
//
|
|
|
- while (dn.shouldRun && shouldServiceRun) {
|
|
|
+ while (shouldRun()) {
|
|
|
try {
|
|
|
long startTime = now();
|
|
|
|
|
@@ -1037,7 +1090,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
// Now safe to start scanning the block pool
|
|
|
if (dn.blockScanner != null) {
|
|
|
- dn.blockScanner.addBlockPool(this.blockPoolId);
|
|
|
+ dn.blockScanner.addBlockPool(this.getBlockPoolId());
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -1051,8 +1104,7 @@ public class DataNode extends Configured
|
|
|
try {
|
|
|
receivedBlockList.wait(waitTime);
|
|
|
} catch (InterruptedException ie) {
|
|
|
- LOG.warn("BPOfferService for block pool="
|
|
|
- + this.getBlockPoolId() + " received exception:" + ie);
|
|
|
+ LOG.warn("BPOfferService for " + this + " interrupted");
|
|
|
}
|
|
|
}
|
|
|
} // synchronized
|
|
@@ -1061,7 +1113,7 @@ public class DataNode extends Configured
|
|
|
if (UnregisteredNodeException.class.getName().equals(reClass) ||
|
|
|
DisallowedDatanodeException.class.getName().equals(reClass) ||
|
|
|
IncorrectVersionException.class.getName().equals(reClass)) {
|
|
|
- LOG.warn("blockpool " + blockPoolId + " is shutting down", re);
|
|
|
+ LOG.warn(this + " is shutting down", re);
|
|
|
shouldServiceRun = false;
|
|
|
return;
|
|
|
}
|
|
@@ -1075,7 +1127,7 @@ public class DataNode extends Configured
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("IOException in offerService", e);
|
|
|
}
|
|
|
- } // while (shouldRun && shouldServiceRun)
|
|
|
+ } // while (shouldRun())
|
|
|
} // offerService
|
|
|
|
|
|
/**
|
|
@@ -1091,54 +1143,44 @@ public class DataNode extends Configured
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void register() throws IOException {
|
|
|
- LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
|
|
|
- + bpRegistration.storageInfo);
|
|
|
-
|
|
|
- // build and layout versions should match
|
|
|
- String nsBuildVer = bpNamenode.versionRequest().getBuildVersion();
|
|
|
- String stBuildVer = Storage.getBuildVersion();
|
|
|
+ Preconditions.checkState(bpNSInfo != null,
|
|
|
+ "register() should be called after handshake()");
|
|
|
+
|
|
|
+ // The handshake() phase loaded the block pool storage
|
|
|
+ // off disk - so update the bpRegistration object from that info
|
|
|
+ bpRegistration = dn.createBPRegistration(bpNSInfo);
|
|
|
|
|
|
- if (!nsBuildVer.equals(stBuildVer)) {
|
|
|
- LOG.warn("Data-node and name-node Build versions must be " +
|
|
|
- "the same. Namenode build version: " + nsBuildVer + "Datanode " +
|
|
|
- "build version: " + stBuildVer);
|
|
|
- throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
|
|
|
- }
|
|
|
+ LOG.info(this + " beginning handshake with NN");
|
|
|
|
|
|
- if (HdfsConstants.LAYOUT_VERSION != bpNSInfo.getLayoutVersion()) {
|
|
|
- LOG.warn("Data-node and name-node layout versions must be " +
|
|
|
- "the same. Expected: "+ HdfsConstants.LAYOUT_VERSION +
|
|
|
- " actual "+ bpNSInfo.getLayoutVersion());
|
|
|
- throw new IncorrectVersionException
|
|
|
- (bpNSInfo.getLayoutVersion(), "namenode");
|
|
|
- }
|
|
|
-
|
|
|
- while(dn.shouldRun && shouldServiceRun) {
|
|
|
+ while (shouldRun()) {
|
|
|
try {
|
|
|
// Use returned registration from namenode with updated machine name.
|
|
|
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
|
|
|
-
|
|
|
- LOG.info("bpReg after =" + bpRegistration.storageInfo +
|
|
|
- ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
|
|
|
-
|
|
|
break;
|
|
|
} catch(SocketTimeoutException e) { // namenode is busy
|
|
|
LOG.info("Problem connecting to server: " + nnAddr);
|
|
|
- try {
|
|
|
- Thread.sleep(1000);
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ sleepAndLogInterrupts(1000, "connecting to server");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
|
|
|
-
|
|
|
- LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
|
|
|
+ LOG.info("Block pool " + this + " successfully registered with NN");
|
|
|
+ dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
|
|
|
|
|
// random short delay - helps scatter the BR from all DNs
|
|
|
scheduleBlockReport(dnConf.initialBlockReportDelay);
|
|
|
}
|
|
|
|
|
|
|
|
|
+ private void sleepAndLogInterrupts(int millis,
|
|
|
+ String stateString) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(millis);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.info("BPOfferService " + this +
|
|
|
+ " interrupted while " + stateString);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* No matter what kind of exception we get, keep retrying to offerService().
|
|
|
* That's the loop that connects to the NameNode and provides basic DataNode
|
|
@@ -1149,49 +1191,43 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
|
|
|
- + ";bp=" + blockPoolId);
|
|
|
+ LOG.info(this + " starting to offer service");
|
|
|
|
|
|
try {
|
|
|
// init stuff
|
|
|
try {
|
|
|
// setup storage
|
|
|
- setupBP(dn.conf);
|
|
|
- register();
|
|
|
+ connectToNNAndHandshake();
|
|
|
} catch (IOException ioe) {
|
|
|
// Initial handshake, storage recovery or registration failed
|
|
|
// End BPOfferService thread
|
|
|
- LOG.fatal(bpRegistration + " initialization failed for block pool "
|
|
|
- + blockPoolId, ioe);
|
|
|
+ LOG.fatal("Initialization failed for block pool " + this, ioe);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
initialized = true; // bp is initialized;
|
|
|
|
|
|
- while (dn.shouldRun && shouldServiceRun) {
|
|
|
+ while (shouldRun()) {
|
|
|
try {
|
|
|
startDistributedUpgradeIfNeeded();
|
|
|
offerService();
|
|
|
} catch (Exception ex) {
|
|
|
- LOG.error("Exception in BPOfferService", ex);
|
|
|
- if (dn.shouldRun && shouldServiceRun) {
|
|
|
- try {
|
|
|
- Thread.sleep(5000);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Received exception", ie);
|
|
|
- }
|
|
|
- }
|
|
|
+ LOG.error("Exception in BPOfferService for " + this, ex);
|
|
|
+ sleepAndLogInterrupts(5000, "offering service");
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable ex) {
|
|
|
- LOG.warn("Unexpected exception", ex);
|
|
|
+ LOG.warn("Unexpected exception in block pool " + this, ex);
|
|
|
} finally {
|
|
|
- LOG.warn(bpRegistration + " ending block pool service for: "
|
|
|
- + blockPoolId + " thread " + Thread.currentThread().getId());
|
|
|
+ LOG.warn("Ending block pool service for: " + this);
|
|
|
cleanUp();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean shouldRun() {
|
|
|
+ return shouldServiceRun && dn.shouldRun();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Process an array of datanode commands
|
|
|
*
|
|
@@ -1256,7 +1292,11 @@ public class DataNode extends Configured
|
|
|
case DatanodeProtocol.DNA_REGISTER:
|
|
|
// namenode requested a registration - at start or if NN lost contact
|
|
|
LOG.info("DatanodeCommand action: DNA_REGISTER");
|
|
|
- if (dn.shouldRun && shouldServiceRun) {
|
|
|
+ if (shouldRun()) {
|
|
|
+ // re-retrieve namespace info to make sure that, if the NN
|
|
|
+ // was restarted, we still match its version (HDFS-2120)
|
|
|
+ retrieveNamespaceInfo();
|
|
|
+ // and re-register
|
|
|
register();
|
|
|
}
|
|
|
break;
|
|
@@ -1274,7 +1314,7 @@ public class DataNode extends Configured
|
|
|
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
|
|
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
|
|
|
if (dn.isBlockTokenEnabled) {
|
|
|
- dn.blockPoolTokenSecretManager.setKeys(blockPoolId,
|
|
|
+ dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(),
|
|
|
((KeyUpdateCommand) cmd).getExportedKeys());
|
|
|
}
|
|
|
break;
|
|
@@ -1303,7 +1343,7 @@ public class DataNode extends Configured
|
|
|
synchronized UpgradeManagerDatanode getUpgradeManager() {
|
|
|
if(upgradeManager == null)
|
|
|
upgradeManager =
|
|
|
- new UpgradeManagerDatanode(dn, blockPoolId);
|
|
|
+ new UpgradeManagerDatanode(dn, getBlockPoolId());
|
|
|
|
|
|
return upgradeManager;
|
|
|
}
|
|
@@ -1320,6 +1360,7 @@ public class DataNode extends Configured
|
|
|
um.startUpgrade();
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1363,6 +1404,26 @@ public class DataNode extends Configured
|
|
|
blockPoolManager = new BlockPoolManager(conf);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a DatanodeRegistration for a specific block pool.
|
|
|
+ * @param nsInfo the namespace info from the first part of the NN handshake
|
|
|
+ */
|
|
|
+ DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
|
|
|
+ DatanodeRegistration bpRegistration = createUnknownBPRegistration();
|
|
|
+ String blockPoolId = nsInfo.getBlockPoolID();
|
|
|
+
|
|
|
+ bpRegistration.setStorageID(getStorageId());
|
|
|
+ StorageInfo storageInfo = storage.getBPStorage(blockPoolId);
|
|
|
+ if (storageInfo == null) {
|
|
|
+ // it's null in the case of SimulatedDataSet
|
|
|
+ bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
+ bpRegistration.setStorageInfo(nsInfo);
|
|
|
+ } else {
|
|
|
+ bpRegistration.setStorageInfo(storageInfo);
|
|
|
+ }
|
|
|
+ return bpRegistration;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check that the registration returned from a NameNode is consistent
|
|
|
* with the information in the storage. If the storage is fresh/unformatted,
|
|
@@ -1443,11 +1504,27 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void initBlockPool(BPOfferService bpOfferService,
|
|
|
- NamespaceInfo nsInfo) throws IOException {
|
|
|
+ /**
|
|
|
+ * One of the Block Pools has successfully connected to its NN.
|
|
|
+ * This initializes the local storage for that block pool,
|
|
|
+ * checks consistency of the NN's cluster ID, etc.
|
|
|
+ *
|
|
|
+ * If this is the first block pool to register, this also initializes
|
|
|
+ * the datanode-scoped storage.
|
|
|
+ *
|
|
|
+ * @param nsInfo the handshake response from the NN.
|
|
|
+ * @throws IOException if the NN is inconsistent with the local storage.
|
|
|
+ */
|
|
|
+ void initBlockPool(BPOfferService bpos) throws IOException {
|
|
|
+ NamespaceInfo nsInfo = bpos.getNamespaceInfo();
|
|
|
+ Preconditions.checkState(nsInfo != null,
|
|
|
+ "Block pool " + bpos + " should have retrieved " +
|
|
|
+ "its namespace info before calling initBlockPool.");
|
|
|
+
|
|
|
String blockPoolId = nsInfo.getBlockPoolID();
|
|
|
|
|
|
- blockPoolManager.addBlockPool(bpOfferService);
|
|
|
+ // Register the new block pool with the BP manager.
|
|
|
+ blockPoolManager.addBlockPool(bpos);
|
|
|
|
|
|
synchronized (this) {
|
|
|
// we do not allow namenode from different cluster to register
|
|
@@ -1478,12 +1555,21 @@ public class DataNode extends Configured
|
|
|
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
|
|
|
";nsInfo=" + nsInfo);
|
|
|
}
|
|
|
+
|
|
|
+ // In the case that this is the first block pool to connect, initialize
|
|
|
+ // the dataset, block scanners, etc.
|
|
|
initFsDataSet();
|
|
|
- initPeriodicScanners(conf);
|
|
|
- data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
|
|
+ initPeriodicScanners(conf);
|
|
|
+
|
|
|
+ data.addBlockPool(blockPoolId, conf);
|
|
|
}
|
|
|
|
|
|
- private DatanodeRegistration createRegistration() {
|
|
|
+ /**
|
|
|
+ * Create a DatanodeRegistration object with no valid StorageInfo.
|
|
|
+ * This is used when reporting an error during handshake - ie
|
|
|
+ * before we can load any specific block pool.
|
|
|
+ */
|
|
|
+ private DatanodeRegistration createUnknownBPRegistration() {
|
|
|
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
|
|
|
reg.setInfoPort(infoServer.getPort());
|
|
|
reg.setIpcPort(getIpcPort());
|
|
@@ -2513,16 +2599,6 @@ public class DataNode extends Configured
|
|
|
return bpos.bpNamenode;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * To be used by tests only to set a mock namenode in BPOfferService
|
|
|
- */
|
|
|
- void setBPNamenode(String bpid, DatanodeProtocol namenode) {
|
|
|
- BPOfferService bp = blockPoolManager.get(bpid);
|
|
|
- if (bp != null) {
|
|
|
- bp.setNameNode(namenode);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/** Block synchronization */
|
|
|
void syncBlock(RecoveringBlock rBlock,
|
|
|
List<BlockRecord> syncList) throws IOException {
|
|
@@ -2748,7 +2824,7 @@ public class DataNode extends Configured
|
|
|
final Map<String, String> info = new HashMap<String, String>();
|
|
|
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
|
|
if (bpos != null && bpos.bpThread != null) {
|
|
|
- info.put(bpos.getNNSocketAddress().getHostName(), bpos.blockPoolId);
|
|
|
+ info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId());
|
|
|
}
|
|
|
}
|
|
|
return JSON.toString(info);
|
|
@@ -2836,7 +2912,7 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
public boolean isDatanodeFullyStarted() {
|
|
|
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
|
|
|
- if (!bp.initialized() || !bp.isAlive()) {
|
|
|
+ if (!bp.isInitialized() || !bp.isAlive()) {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -2863,4 +2939,8 @@ public class DataNode extends Configured
|
|
|
DNConf getDnConf() {
|
|
|
return dnConf;
|
|
|
}
|
|
|
+
|
|
|
+ boolean shouldRun() {
|
|
|
+ return shouldRun;
|
|
|
+ }
|
|
|
}
|