|
@@ -50,7 +50,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.server.common.Util.now;
|
|
|
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
@@ -61,7 +60,6 @@ import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
import java.net.Socket;
|
|
|
-import java.net.SocketTimeoutException;
|
|
|
import java.net.URI;
|
|
|
import java.net.UnknownHostException;
|
|
|
import java.nio.channels.ServerSocketChannel;
|
|
@@ -74,7 +72,6 @@ import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -94,7 +91,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
@@ -103,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
@@ -117,31 +112,21 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.Util;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
@@ -170,7 +155,6 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
|
|
@@ -393,8 +377,6 @@ public class DataNode extends Configured
|
|
|
|
|
|
private volatile String hostName; // Host name of this datanode
|
|
|
|
|
|
- private static String dnThreadName;
|
|
|
-
|
|
|
boolean isBlockTokenEnabled;
|
|
|
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
|
|
|
|
@@ -666,701 +648,8 @@ public class DataNode extends Configured
|
|
|
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * A thread per namenode to perform:
|
|
|
- * <ul>
|
|
|
- * <li> Pre-registration handshake with namenode</li>
|
|
|
- * <li> Registration with namenode</li>
|
|
|
- * <li> Send periodic heartbeats to the namenode</li>
|
|
|
- * <li> Handle commands received from the datanode</li>
|
|
|
- * </ul>
|
|
|
- */
|
|
|
- @InterfaceAudience.Private
|
|
|
- static class BPOfferService implements Runnable {
|
|
|
- final InetSocketAddress nnAddr;
|
|
|
-
|
|
|
- /**
|
|
|
- * Information about the namespace that this service
|
|
|
- * is registering with. This is assigned after
|
|
|
- * the first phase of the handshake.
|
|
|
- */
|
|
|
- NamespaceInfo bpNSInfo;
|
|
|
-
|
|
|
- /**
|
|
|
- * The registration information for this block pool.
|
|
|
- * This is assigned after the second phase of the
|
|
|
- * handshake.
|
|
|
- */
|
|
|
- DatanodeRegistration bpRegistration;
|
|
|
-
|
|
|
- long lastBlockReport = 0;
|
|
|
-
|
|
|
- boolean resetBlockReportTime = true;
|
|
|
-
|
|
|
- private Thread bpThread;
|
|
|
- private DatanodeProtocol bpNamenode;
|
|
|
- private long lastHeartbeat = 0;
|
|
|
- private volatile boolean initialized = false;
|
|
|
- private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
- private final LinkedList<String> delHints = new LinkedList<String>();
|
|
|
- private volatile boolean shouldServiceRun = true;
|
|
|
- UpgradeManagerDatanode upgradeManager = null;
|
|
|
- private final DataNode dn;
|
|
|
- private final DNConf dnConf;
|
|
|
-
|
|
|
- BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
|
|
|
- this.dn = dn;
|
|
|
- this.nnAddr = nnAddr;
|
|
|
- this.dnConf = dn.getDnConf();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * returns true if BP thread has completed initialization of storage
|
|
|
- * and has registered with the corresponding namenode
|
|
|
- * @return true if initialized
|
|
|
- */
|
|
|
- public boolean isInitialized() {
|
|
|
- return initialized;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isAlive() {
|
|
|
- return shouldServiceRun && bpThread.isAlive();
|
|
|
- }
|
|
|
-
|
|
|
- public String getBlockPoolId() {
|
|
|
- if (bpNSInfo != null) {
|
|
|
- return bpNSInfo.getBlockPoolID();
|
|
|
- } else {
|
|
|
- LOG.warn("Block pool ID needed, but service not yet registered with NN",
|
|
|
- new Exception("trace"));
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public NamespaceInfo getNamespaceInfo() {
|
|
|
- return bpNSInfo;
|
|
|
- }
|
|
|
-
|
|
|
- public String toString() {
|
|
|
- if (bpNSInfo == null) {
|
|
|
- // If we haven't yet connected to our NN, we don't yet know our
|
|
|
- // own block pool ID.
|
|
|
- // If _none_ of the block pools have connected yet, we don't even
|
|
|
- // know the storage ID of this DN.
|
|
|
- String storageId = dn.getStorageId();
|
|
|
- if (storageId == null || "".equals(storageId)) {
|
|
|
- storageId = "unknown";
|
|
|
- }
|
|
|
- return "Block pool <registering> (storage id " + storageId +
|
|
|
- ") connecting to " + nnAddr;
|
|
|
- } else {
|
|
|
- return "Block pool " + getBlockPoolId() +
|
|
|
- " (storage id " + dn.getStorageId() +
|
|
|
- ") registered with " + nnAddr;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private InetSocketAddress getNNSocketAddress() {
|
|
|
- return nnAddr;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Used to inject a spy NN in the unit tests.
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- void setNameNode(DatanodeProtocol dnProtocol) {
|
|
|
- bpNamenode = dnProtocol;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Perform the first part of the handshake with the NameNode.
|
|
|
- * This calls <code>versionRequest</code> to determine the NN's
|
|
|
- * namespace and version info. It automatically retries until
|
|
|
- * the NN responds or the DN is shutting down.
|
|
|
- *
|
|
|
- * @return the NamespaceInfo
|
|
|
- * @throws IncorrectVersionException if the remote NN does not match
|
|
|
- * this DN's version
|
|
|
- */
|
|
|
- NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
|
|
|
- NamespaceInfo nsInfo = null;
|
|
|
- while (shouldRun()) {
|
|
|
- try {
|
|
|
- nsInfo = bpNamenode.versionRequest();
|
|
|
- LOG.debug(this + " received versionRequest response: " + nsInfo);
|
|
|
- break;
|
|
|
- } catch(SocketTimeoutException e) { // namenode is busy
|
|
|
- LOG.warn("Problem connecting to server: " + nnAddr);
|
|
|
- } catch(IOException e ) { // namenode is not available
|
|
|
- LOG.warn("Problem connecting to server: " + nnAddr);
|
|
|
- }
|
|
|
-
|
|
|
- // try again in a second
|
|
|
- sleepAndLogInterrupts(5000, "requesting version info from NN");
|
|
|
- }
|
|
|
-
|
|
|
- if (nsInfo != null) {
|
|
|
- checkNNVersion(nsInfo);
|
|
|
- }
|
|
|
- return nsInfo;
|
|
|
- }
|
|
|
-
|
|
|
- private void checkNNVersion(NamespaceInfo nsInfo)
|
|
|
- throws IncorrectVersionException {
|
|
|
- // build and layout versions should match
|
|
|
- String nsBuildVer = nsInfo.getBuildVersion();
|
|
|
- String stBuildVer = Storage.getBuildVersion();
|
|
|
- if (!nsBuildVer.equals(stBuildVer)) {
|
|
|
- LOG.warn("Data-node and name-node Build versions must be the same. " +
|
|
|
- "Namenode build version: " + nsBuildVer + "Datanode " +
|
|
|
- "build version: " + stBuildVer);
|
|
|
- throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
|
|
|
- }
|
|
|
-
|
|
|
- if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
|
|
|
- LOG.warn("Data-node and name-node layout versions must be the same." +
|
|
|
- " Expected: "+ HdfsConstants.LAYOUT_VERSION +
|
|
|
- " actual "+ bpNSInfo.getLayoutVersion());
|
|
|
- throw new IncorrectVersionException(
|
|
|
- bpNSInfo.getLayoutVersion(), "namenode");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void connectToNNAndHandshake() throws IOException {
|
|
|
- // get NN proxy
|
|
|
- bpNamenode =
|
|
|
- (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
|
|
|
- DatanodeProtocol.versionID, nnAddr, dn.getConf());
|
|
|
-
|
|
|
- // First phase of the handshake with NN - get the namespace
|
|
|
- // info.
|
|
|
- bpNSInfo = retrieveNamespaceInfo();
|
|
|
-
|
|
|
- // Now that we know the namespace ID, etc, we can pass this to the DN.
|
|
|
- // The DN can now initialize its local storage if we are the
|
|
|
- // first BP to handshake, etc.
|
|
|
- dn.initBlockPool(this);
|
|
|
-
|
|
|
- // Second phase of the handshake with the NN.
|
|
|
- register();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This methods arranges for the data node to send the block report at
|
|
|
- * the next heartbeat.
|
|
|
- */
|
|
|
- void scheduleBlockReport(long delay) {
|
|
|
- if (delay > 0) { // send BR after random delay
|
|
|
- lastBlockReport = System.currentTimeMillis()
|
|
|
- - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
|
|
- } else { // send at next heartbeat
|
|
|
- lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
|
|
- }
|
|
|
- resetBlockReportTime = true; // reset future BRs for randomness
|
|
|
- }
|
|
|
-
|
|
|
- private void reportBadBlocks(ExtendedBlock block) {
|
|
|
- DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
|
|
|
- LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
|
|
|
-
|
|
|
- try {
|
|
|
- bpNamenode.reportBadBlocks(blocks);
|
|
|
- } catch (IOException e){
|
|
|
- /* One common reason is that NameNode could be in safe mode.
|
|
|
- * Should we keep on retrying in that case?
|
|
|
- */
|
|
|
- LOG.warn("Failed to report bad block " + block + " to namenode : "
|
|
|
- + " Exception", e);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Report received blocks and delete hints to the Namenode
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void reportReceivedBlocks() throws IOException {
|
|
|
- //check if there are newly received blocks
|
|
|
- Block [] blockArray=null;
|
|
|
- String [] delHintArray=null;
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- synchronized(delHints){
|
|
|
- int numBlocks = receivedBlockList.size();
|
|
|
- if (numBlocks > 0) {
|
|
|
- if(numBlocks!=delHints.size()) {
|
|
|
- LOG.warn("Panic: receiveBlockList and delHints are not of " +
|
|
|
- "the same length" );
|
|
|
- }
|
|
|
- //
|
|
|
- // Send newly-received blockids to namenode
|
|
|
- //
|
|
|
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
|
|
|
- delHintArray = delHints.toArray(new String[numBlocks]);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (blockArray != null) {
|
|
|
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
|
|
|
- LOG.warn("Panic: block array & delHintArray are not the same" );
|
|
|
- }
|
|
|
- bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
|
|
|
- delHintArray);
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- synchronized(delHints){
|
|
|
- for(int i=0; i<blockArray.length; i++) {
|
|
|
- receivedBlockList.remove(blockArray[i]);
|
|
|
- delHints.remove(delHintArray[i]);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * Informing the name node could take a long long time! Should we wait
|
|
|
- * till namenode is informed before responding with success to the
|
|
|
- * client? For now we don't.
|
|
|
- */
|
|
|
- void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
|
|
|
- if(block==null || delHint==null) {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- block==null?"Block is null":"delHint is null");
|
|
|
- }
|
|
|
-
|
|
|
- if (!block.getBlockPoolId().equals(getBlockPoolId())) {
|
|
|
- LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
|
|
|
- " vs. " + getBlockPoolId());
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (receivedBlockList) {
|
|
|
- synchronized (delHints) {
|
|
|
- receivedBlockList.add(block.getLocalBlock());
|
|
|
- delHints.add(delHint);
|
|
|
- receivedBlockList.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Report the list blocks to the Namenode
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- DatanodeCommand blockReport() throws IOException {
|
|
|
- // send block report if timer has expired.
|
|
|
- DatanodeCommand cmd = null;
|
|
|
- long startTime = now();
|
|
|
- if (startTime - lastBlockReport > dnConf.blockReportInterval) {
|
|
|
-
|
|
|
- // Create block report
|
|
|
- long brCreateStartTime = now();
|
|
|
- BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
|
|
|
-
|
|
|
- // Send block report
|
|
|
- long brSendStartTime = now();
|
|
|
- cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
|
|
|
- .getBlockListAsLongs());
|
|
|
-
|
|
|
- // Log the block report processing stats from Datanode perspective
|
|
|
- long brSendCost = now() - brSendStartTime;
|
|
|
- long brCreateCost = brSendStartTime - brCreateStartTime;
|
|
|
- dn.metrics.addBlockReport(brSendCost);
|
|
|
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
|
|
|
- + " blocks took " + brCreateCost + " msec to generate and "
|
|
|
- + brSendCost + " msecs for RPC and NN processing");
|
|
|
-
|
|
|
- // If we have sent the first block report, then wait a random
|
|
|
- // time before we start the periodic block reports.
|
|
|
- if (resetBlockReportTime) {
|
|
|
- lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
|
|
- resetBlockReportTime = false;
|
|
|
- } else {
|
|
|
- /* say the last block report was at 8:20:14. The current report
|
|
|
- * should have started around 9:20:14 (default 1 hour interval).
|
|
|
- * If current time is :
|
|
|
- * 1) normal like 9:20:18, next report should be at 10:20:14
|
|
|
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
|
- */
|
|
|
- lastBlockReport += (now() - lastBlockReport) /
|
|
|
- dnConf.blockReportInterval * dnConf.blockReportInterval;
|
|
|
- }
|
|
|
- LOG.info("sent block report, processed command:" + cmd);
|
|
|
- }
|
|
|
- return cmd;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- DatanodeCommand [] sendHeartBeat() throws IOException {
|
|
|
- return bpNamenode.sendHeartbeat(bpRegistration,
|
|
|
- dn.data.getCapacity(),
|
|
|
- dn.data.getDfsUsed(),
|
|
|
- dn.data.getRemaining(),
|
|
|
- dn.data.getBlockPoolUsed(getBlockPoolId()),
|
|
|
- dn.xmitsInProgress.get(),
|
|
|
- dn.getXceiverCount(), dn.data.getNumFailedVolumes());
|
|
|
- }
|
|
|
-
|
|
|
- //This must be called only by blockPoolManager
|
|
|
- void start() {
|
|
|
- if ((bpThread != null) && (bpThread.isAlive())) {
|
|
|
- //Thread is started already
|
|
|
- return;
|
|
|
- }
|
|
|
- bpThread = new Thread(this, dnThreadName);
|
|
|
- bpThread.setDaemon(true); // needed for JUnit testing
|
|
|
- bpThread.start();
|
|
|
- }
|
|
|
-
|
|
|
- //This must be called only by blockPoolManager.
|
|
|
- void stop() {
|
|
|
- shouldServiceRun = false;
|
|
|
- if (bpThread != null) {
|
|
|
- bpThread.interrupt();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //This must be called only by blockPoolManager
|
|
|
- void join() {
|
|
|
- try {
|
|
|
- if (bpThread != null) {
|
|
|
- bpThread.join();
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) { }
|
|
|
- }
|
|
|
-
|
|
|
- //Cleanup method to be called by current thread before exiting.
|
|
|
- private synchronized void cleanUp() {
|
|
|
-
|
|
|
- if(upgradeManager != null)
|
|
|
- upgradeManager.shutdownUpgrade();
|
|
|
- shouldServiceRun = false;
|
|
|
- RPC.stopProxy(bpNamenode);
|
|
|
- dn.shutdownBlockPool(this);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Main loop for each BP thread. Run until shutdown,
|
|
|
- * forever calling remote NameNode functions.
|
|
|
- */
|
|
|
- private void offerService() throws Exception {
|
|
|
- LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
|
|
|
- + dnConf.blockReportInterval + "msec" + " Initial delay: "
|
|
|
- + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
|
|
- + dnConf.heartBeatInterval);
|
|
|
-
|
|
|
- //
|
|
|
- // Now loop for a long time....
|
|
|
- //
|
|
|
- while (shouldRun()) {
|
|
|
- try {
|
|
|
- long startTime = now();
|
|
|
-
|
|
|
- //
|
|
|
- // Every so often, send heartbeat or block-report
|
|
|
- //
|
|
|
- if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
|
|
|
- //
|
|
|
- // All heartbeat messages include following info:
|
|
|
- // -- Datanode name
|
|
|
- // -- data transfer port
|
|
|
- // -- Total capacity
|
|
|
- // -- Bytes remaining
|
|
|
- //
|
|
|
- lastHeartbeat = startTime;
|
|
|
- if (!dn.heartbeatsDisabledForTests) {
|
|
|
- DatanodeCommand[] cmds = sendHeartBeat();
|
|
|
- dn.metrics.addHeartbeat(now() - startTime);
|
|
|
-
|
|
|
- long startProcessCommands = now();
|
|
|
- if (!processCommand(cmds))
|
|
|
- continue;
|
|
|
- long endProcessCommands = now();
|
|
|
- if (endProcessCommands - startProcessCommands > 2000) {
|
|
|
- LOG.info("Took " + (endProcessCommands - startProcessCommands) +
|
|
|
- "ms to process " + cmds.length + " commands from NN");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- reportReceivedBlocks();
|
|
|
-
|
|
|
- DatanodeCommand cmd = blockReport();
|
|
|
- processCommand(cmd);
|
|
|
-
|
|
|
- // Now safe to start scanning the block pool
|
|
|
- if (dn.blockScanner != null) {
|
|
|
- dn.blockScanner.addBlockPool(this.getBlockPoolId());
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // There is no work to do; sleep until hearbeat timer elapses,
|
|
|
- // or work arrives, and then iterate again.
|
|
|
- //
|
|
|
- long waitTime = dnConf.heartBeatInterval -
|
|
|
- (System.currentTimeMillis() - lastHeartbeat);
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- if (waitTime > 0 && receivedBlockList.size() == 0) {
|
|
|
- try {
|
|
|
- receivedBlockList.wait(waitTime);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("BPOfferService for " + this + " interrupted");
|
|
|
- }
|
|
|
- }
|
|
|
- } // synchronized
|
|
|
- } catch(RemoteException re) {
|
|
|
- String reClass = re.getClassName();
|
|
|
- if (UnregisteredNodeException.class.getName().equals(reClass) ||
|
|
|
- DisallowedDatanodeException.class.getName().equals(reClass) ||
|
|
|
- IncorrectVersionException.class.getName().equals(reClass)) {
|
|
|
- LOG.warn(this + " is shutting down", re);
|
|
|
- shouldServiceRun = false;
|
|
|
- return;
|
|
|
- }
|
|
|
- LOG.warn("RemoteException in offerService", re);
|
|
|
- try {
|
|
|
- long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
|
|
|
- Thread.sleep(sleepTime);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("IOException in offerService", e);
|
|
|
- }
|
|
|
- } // while (shouldRun())
|
|
|
- } // offerService
|
|
|
-
|
|
|
- /**
|
|
|
- * Register one bp with the corresponding NameNode
|
|
|
- * <p>
|
|
|
- * The bpDatanode needs to register with the namenode on startup in order
|
|
|
- * 1) to report which storage it is serving now and
|
|
|
- * 2) to receive a registrationID
|
|
|
- *
|
|
|
- * issued by the namenode to recognize registered datanodes.
|
|
|
- *
|
|
|
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- void register() throws IOException {
|
|
|
- Preconditions.checkState(bpNSInfo != null,
|
|
|
- "register() should be called after handshake()");
|
|
|
-
|
|
|
- // The handshake() phase loaded the block pool storage
|
|
|
- // off disk - so update the bpRegistration object from that info
|
|
|
- bpRegistration = dn.createBPRegistration(bpNSInfo);
|
|
|
-
|
|
|
- LOG.info(this + " beginning handshake with NN");
|
|
|
-
|
|
|
- while (shouldRun()) {
|
|
|
- try {
|
|
|
- // Use returned registration from namenode with updated machine name.
|
|
|
- bpRegistration = bpNamenode.registerDatanode(bpRegistration);
|
|
|
- break;
|
|
|
- } catch(SocketTimeoutException e) { // namenode is busy
|
|
|
- LOG.info("Problem connecting to server: " + nnAddr);
|
|
|
- sleepAndLogInterrupts(1000, "connecting to server");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Block pool " + this + " successfully registered with NN");
|
|
|
- dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
|
|
-
|
|
|
- // random short delay - helps scatter the BR from all DNs
|
|
|
- scheduleBlockReport(dnConf.initialBlockReportDelay);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private void sleepAndLogInterrupts(int millis,
|
|
|
- String stateString) {
|
|
|
- try {
|
|
|
- Thread.sleep(millis);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.info("BPOfferService " + this +
|
|
|
- " interrupted while " + stateString);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * No matter what kind of exception we get, keep retrying to offerService().
|
|
|
- * That's the loop that connects to the NameNode and provides basic DataNode
|
|
|
- * functionality.
|
|
|
- *
|
|
|
- * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
|
|
|
- * happen either at shutdown or due to refreshNamenodes.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- LOG.info(this + " starting to offer service");
|
|
|
-
|
|
|
- try {
|
|
|
- // init stuff
|
|
|
- try {
|
|
|
- // setup storage
|
|
|
- connectToNNAndHandshake();
|
|
|
- } catch (IOException ioe) {
|
|
|
- // Initial handshake, storage recovery or registration failed
|
|
|
- // End BPOfferService thread
|
|
|
- LOG.fatal("Initialization failed for block pool " + this, ioe);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- initialized = true; // bp is initialized;
|
|
|
-
|
|
|
- while (shouldRun()) {
|
|
|
- try {
|
|
|
- startDistributedUpgradeIfNeeded();
|
|
|
- offerService();
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Exception in BPOfferService for " + this, ex);
|
|
|
- sleepAndLogInterrupts(5000, "offering service");
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Throwable ex) {
|
|
|
- LOG.warn("Unexpected exception in block pool " + this, ex);
|
|
|
- } finally {
|
|
|
- LOG.warn("Ending block pool service for: " + this);
|
|
|
- cleanUp();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean shouldRun() {
|
|
|
- return shouldServiceRun && dn.shouldRun();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Process an array of datanode commands
|
|
|
- *
|
|
|
- * @param cmds an array of datanode commands
|
|
|
- * @return true if further processing may be required or false otherwise.
|
|
|
- */
|
|
|
- private boolean processCommand(DatanodeCommand[] cmds) {
|
|
|
- if (cmds != null) {
|
|
|
- for (DatanodeCommand cmd : cmds) {
|
|
|
- try {
|
|
|
- if (processCommand(cmd) == false) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.warn("Error processing datanode Command", ioe);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- *
|
|
|
- * @param cmd
|
|
|
- * @return true if further processing may be required or false otherwise.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
|
|
|
- if (cmd == null)
|
|
|
- return true;
|
|
|
- final BlockCommand bcmd =
|
|
|
- cmd instanceof BlockCommand? (BlockCommand)cmd: null;
|
|
|
-
|
|
|
- switch(cmd.getAction()) {
|
|
|
- case DatanodeProtocol.DNA_TRANSFER:
|
|
|
- // Send a copy of a block to another datanode
|
|
|
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
|
|
|
- dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_INVALIDATE:
|
|
|
- //
|
|
|
- // Some local block(s) are obsolete and can be
|
|
|
- // safely garbage-collected.
|
|
|
- //
|
|
|
- Block toDelete[] = bcmd.getBlocks();
|
|
|
- try {
|
|
|
- if (dn.blockScanner != null) {
|
|
|
- dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
|
|
|
- }
|
|
|
- // using global fsdataset
|
|
|
- dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
|
|
|
- } catch(IOException e) {
|
|
|
- dn.checkDiskError();
|
|
|
- throw e;
|
|
|
- }
|
|
|
- dn.metrics.incrBlocksRemoved(toDelete.length);
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_SHUTDOWN:
|
|
|
- // shut down the data node
|
|
|
- shouldServiceRun = false;
|
|
|
- return false;
|
|
|
- case DatanodeProtocol.DNA_REGISTER:
|
|
|
- // namenode requested a registration - at start or if NN lost contact
|
|
|
- LOG.info("DatanodeCommand action: DNA_REGISTER");
|
|
|
- if (shouldRun()) {
|
|
|
- // re-retrieve namespace info to make sure that, if the NN
|
|
|
- // was restarted, we still match its version (HDFS-2120)
|
|
|
- retrieveNamespaceInfo();
|
|
|
- // and re-register
|
|
|
- register();
|
|
|
- }
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_FINALIZE:
|
|
|
- dn.storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
|
|
|
- .getBlockPoolId());
|
|
|
- break;
|
|
|
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
|
|
|
- // start distributed upgrade here
|
|
|
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_RECOVERBLOCK:
|
|
|
- dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
|
|
- LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
|
|
|
- if (dn.isBlockTokenEnabled) {
|
|
|
- dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(),
|
|
|
- ((KeyUpdateCommand) cmd).getExportedKeys());
|
|
|
- }
|
|
|
- break;
|
|
|
- case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
|
|
|
- LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
|
|
|
- long bandwidth =
|
|
|
- ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
|
|
|
- if (bandwidth > 0) {
|
|
|
- DataXceiverServer dxcs =
|
|
|
- (DataXceiverServer) dn.dataXceiverServer.getRunnable();
|
|
|
- dxcs.balanceThrottler.setBandwidth(bandwidth);
|
|
|
- }
|
|
|
- break;
|
|
|
- default:
|
|
|
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- private void processDistributedUpgradeCommand(UpgradeCommand comm)
|
|
|
- throws IOException {
|
|
|
- UpgradeManagerDatanode upgradeManager = getUpgradeManager();
|
|
|
- upgradeManager.processUpgradeCommand(comm);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized UpgradeManagerDatanode getUpgradeManager() {
|
|
|
- if(upgradeManager == null)
|
|
|
- upgradeManager =
|
|
|
- new UpgradeManagerDatanode(dn, getBlockPoolId());
|
|
|
-
|
|
|
- return upgradeManager;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Start distributed upgrade if it should be initiated by the data-node.
|
|
|
- */
|
|
|
- private void startDistributedUpgradeIfNeeded() throws IOException {
|
|
|
- UpgradeManagerDatanode um = getUpgradeManager();
|
|
|
-
|
|
|
- if(!um.getUpgradeState())
|
|
|
- return;
|
|
|
- um.setUpgradeState(false, um.getUpgradeVersion());
|
|
|
- um.startUpgrade();
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
+ boolean areHeartbeatsDisabledForTests() {
|
|
|
+ return this.heartbeatsDisabledForTests;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1430,7 +719,7 @@ public class DataNode extends Configured
|
|
|
* sets the storage ID based on this registration.
|
|
|
* Also updates the block pool's state in the secret manager.
|
|
|
*/
|
|
|
- private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
|
|
|
+ synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
|
|
|
String blockPoolId)
|
|
|
throws IOException {
|
|
|
hostName = bpRegistration.getHost();
|
|
@@ -1487,7 +776,7 @@ public class DataNode extends Configured
|
|
|
/**
|
|
|
* Remove the given block pool from the block scanner, dataset, and storage.
|
|
|
*/
|
|
|
- private void shutdownBlockPool(BPOfferService bpos) {
|
|
|
+ void shutdownBlockPool(BPOfferService bpos) {
|
|
|
blockPoolManager.remove(bpos);
|
|
|
|
|
|
String bpId = bpos.getBlockPoolId();
|
|
@@ -1962,7 +1251,7 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void transferBlocks(String poolId, Block blocks[],
|
|
|
+ void transferBlocks(String poolId, Block blocks[],
|
|
|
DatanodeInfo xferTargets[][]) {
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
try {
|
|
@@ -2254,9 +1543,7 @@ public class DataNode extends Configured
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
Collection<URI> dataDirs = getStorageDirs(conf);
|
|
|
- dnThreadName = "DataNode: [" +
|
|
|
- StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
|
|
|
- UserGroupInformation.setConfiguration(conf);
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
|
|
|
DFS_DATANODE_USER_NAME_KEY);
|
|
|
return makeInstance(dataDirs, conf, resources);
|
|
@@ -2788,6 +2075,14 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Finalize a pending upgrade in response to DNA_FINALIZE.
|
|
|
+ * @param blockPoolId the block pool to finalize
|
|
|
+ */
|
|
|
+ void finalizeUpgradeForPool(String blockPoolId) throws IOException {
|
|
|
+ storage.finalizeUpgrade(blockPoolId);
|
|
|
+ }
|
|
|
+
|
|
|
// Determine a Datanode's streaming address
|
|
|
public static InetSocketAddress getStreamingAddr(Configuration conf) {
|
|
|
return NetUtils.createSocketAddr(
|