|
@@ -164,9 +164,9 @@ import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.protobuf.BlockingService;
|
|
|
|
|
|
-
|
|
|
/**********************************************************
|
|
|
* DataNode is a class (and program) that stores a set of
|
|
|
* blocks for a DFS deployment. A single deployment can
|
|
@@ -244,9 +244,10 @@ public class DataNode extends Configured
|
|
|
private DataStorage storage = null;
|
|
|
private HttpServer infoServer = null;
|
|
|
DataNodeMetrics metrics;
|
|
|
- private InetSocketAddress selfAddr;
|
|
|
+ private InetSocketAddress streamingAddr;
|
|
|
|
|
|
- private volatile String hostName; // Host name of this datanode
|
|
|
+ private String hostName;
|
|
|
+ private DatanodeID id;
|
|
|
|
|
|
boolean isBlockTokenEnabled;
|
|
|
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
|
@@ -288,6 +289,7 @@ public class DataNode extends Configured
|
|
|
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
|
|
|
try {
|
|
|
hostName = getHostName(conf);
|
|
|
+ LOG.info("Configured hostname is " + hostName);
|
|
|
startDataNode(conf, dataDirs, resources);
|
|
|
} catch (IOException ie) {
|
|
|
shutdown();
|
|
@@ -305,16 +307,25 @@ public class DataNode extends Configured
|
|
|
clusterId = nsCid;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the hostname for this datanode. If the hostname is not
|
|
|
+ * explicitly configured in the given config, then it is determined
|
|
|
+ * via the DNS class.
|
|
|
+ *
|
|
|
+ * @param config
|
|
|
+ * @return the hostname (NB: may not be a FQDN)
|
|
|
+ * @throws UnknownHostException if the dfs.datanode.dns.interface
|
|
|
+ * option is used and the hostname can not be determined
|
|
|
+ */
|
|
|
private static String getHostName(Configuration config)
|
|
|
throws UnknownHostException {
|
|
|
- // use configured nameserver & interface to get local hostname
|
|
|
String name = config.get(DFS_DATANODE_HOST_NAME_KEY);
|
|
|
if (name == null) {
|
|
|
- name = DNS
|
|
|
- .getDefaultHost(config.get(DFS_DATANODE_DNS_INTERFACE_KEY,
|
|
|
- DFS_DATANODE_DNS_INTERFACE_DEFAULT), config.get(
|
|
|
- DFS_DATANODE_DNS_NAMESERVER_KEY,
|
|
|
- DFS_DATANODE_DNS_NAMESERVER_DEFAULT));
|
|
|
+ name = DNS.getDefaultHost(
|
|
|
+ config.get(DFS_DATANODE_DNS_INTERFACE_KEY,
|
|
|
+ DFS_DATANODE_DNS_INTERFACE_DEFAULT),
|
|
|
+ config.get(DFS_DATANODE_DNS_NAMESERVER_KEY,
|
|
|
+ DFS_DATANODE_DNS_NAMESERVER_DEFAULT));
|
|
|
}
|
|
|
return name;
|
|
|
}
|
|
@@ -485,23 +496,22 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
private void initDataXceiver(Configuration conf) throws IOException {
|
|
|
- InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
|
|
|
-
|
|
|
// find free port or use privileged port provided
|
|
|
ServerSocket ss;
|
|
|
- if(secureResources == null) {
|
|
|
+ if (secureResources == null) {
|
|
|
+ InetSocketAddress addr = DataNode.getStreamingAddr(conf);
|
|
|
ss = (dnConf.socketWriteTimeout > 0) ?
|
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
- Server.bind(ss, streamingAddr, 0);
|
|
|
+ Server.bind(ss, addr, 0);
|
|
|
} else {
|
|
|
ss = secureResources.getStreamingSocket();
|
|
|
}
|
|
|
ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
|
|
- // adjust machine name with the actual port
|
|
|
- int tmpPort = ss.getLocalPort();
|
|
|
- selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
|
- tmpPort);
|
|
|
- LOG.info("Opened streaming server at " + selfAddr);
|
|
|
+
|
|
|
+ streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
|
+ ss.getLocalPort());
|
|
|
+
|
|
|
+ LOG.info("Opened streaming server at " + streamingAddr);
|
|
|
this.threadGroup = new ThreadGroup("dataXceiverServer");
|
|
|
this.dataXceiverServer = new Daemon(threadGroup,
|
|
|
new DataXceiverServer(ss, conf, this));
|
|
@@ -646,7 +656,7 @@ public class DataNode extends Configured
|
|
|
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
|
|
|
initIpcServer(conf);
|
|
|
|
|
|
- metrics = DataNodeMetrics.create(conf, getMachineName());
|
|
|
+ metrics = DataNodeMetrics.create(conf, getDisplayName());
|
|
|
|
|
|
blockPoolManager = new BlockPoolManager(this);
|
|
|
blockPoolManager.refreshNamenodes(conf);
|
|
@@ -657,14 +667,18 @@ public class DataNode extends Configured
|
|
|
* @param nsInfo the namespace info from the first part of the NN handshake
|
|
|
*/
|
|
|
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
|
|
|
- DatanodeRegistration bpRegistration = createUnknownBPRegistration();
|
|
|
- String blockPoolId = nsInfo.getBlockPoolID();
|
|
|
-
|
|
|
+ final String xferIp = streamingAddr.getAddress().getHostAddress();
|
|
|
+ DatanodeRegistration bpRegistration = new DatanodeRegistration(xferIp);
|
|
|
+ bpRegistration.setXferPort(getXferPort());
|
|
|
+ bpRegistration.setInfoPort(getInfoPort());
|
|
|
+ bpRegistration.setIpcPort(getIpcPort());
|
|
|
+ bpRegistration.setHostName(hostName);
|
|
|
bpRegistration.setStorageID(getStorageId());
|
|
|
- StorageInfo storageInfo = storage.getBPStorage(blockPoolId);
|
|
|
+
|
|
|
+ StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
|
|
|
if (storageInfo == null) {
|
|
|
// it's null in the case of SimulatedDataSet
|
|
|
- bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
+ bpRegistration.getStorageInfo().layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
bpRegistration.setStorageInfo(nsInfo);
|
|
|
} else {
|
|
|
bpRegistration.setStorageInfo(storageInfo);
|
|
@@ -679,17 +693,18 @@ public class DataNode extends Configured
|
|
|
* Also updates the block pool's state in the secret manager.
|
|
|
*/
|
|
|
synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
|
|
|
- String blockPoolId)
|
|
|
- throws IOException {
|
|
|
- hostName = bpRegistration.getHost();
|
|
|
+ String blockPoolId) throws IOException {
|
|
|
+ // Set the ID if we haven't already
|
|
|
+ if (null == id) {
|
|
|
+ id = bpRegistration;
|
|
|
+ }
|
|
|
|
|
|
if (storage.getStorageID().equals("")) {
|
|
|
- // This is a fresh datanode -- take the storage ID provided by the
|
|
|
- // NN and persist it.
|
|
|
+ // This is a fresh datanode, persist the NN-provided storage ID
|
|
|
storage.setStorageID(bpRegistration.getStorageID());
|
|
|
storage.writeAll();
|
|
|
LOG.info("New storage id " + bpRegistration.getStorageID()
|
|
|
- + " is assigned to data-node " + bpRegistration.getName());
|
|
|
+ + " is assigned to data-node " + bpRegistration);
|
|
|
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
|
|
|
throw new IOException("Inconsistent storage IDs. Name-node returned "
|
|
|
+ bpRegistration.getStorageID()
|
|
@@ -708,7 +723,7 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
|
|
|
String blockPoolId) throws IOException {
|
|
|
- ExportedBlockKeys keys = bpRegistration.exportedKeys;
|
|
|
+ ExportedBlockKeys keys = bpRegistration.getExportedKeys();
|
|
|
isBlockTokenEnabled = keys.isBlockTokenEnabled();
|
|
|
// TODO should we check that all federated nns are either enabled or
|
|
|
// disabled?
|
|
@@ -728,8 +743,8 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
blockPoolTokenSecretManager.setKeys(blockPoolId,
|
|
|
- bpRegistration.exportedKeys);
|
|
|
- bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
|
|
|
+ bpRegistration.getExportedKeys());
|
|
|
+ bpRegistration.setExportedKeys(ExportedBlockKeys.DUMMY_KEYS);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -783,18 +798,6 @@ public class DataNode extends Configured
|
|
|
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Create a DatanodeRegistration object with no valid StorageInfo.
|
|
|
- * This is used when reporting an error during handshake - ie
|
|
|
- * before we can load any specific block pool.
|
|
|
- */
|
|
|
- private DatanodeRegistration createUnknownBPRegistration() {
|
|
|
- DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
|
|
|
- reg.setInfoPort(infoServer.getPort());
|
|
|
- reg.setIpcPort(getIpcPort());
|
|
|
- return reg;
|
|
|
- }
|
|
|
-
|
|
|
BPOfferService[] getAllBpOs() {
|
|
|
return blockPoolManager.getAllNamenodeThreads();
|
|
|
}
|
|
@@ -844,23 +847,37 @@ public class DataNode extends Configured
|
|
|
MBeans.register("DataNode", "DataNodeInfo", this);
|
|
|
}
|
|
|
|
|
|
- int getPort() {
|
|
|
- return selfAddr.getPort();
|
|
|
+ int getXferPort() {
|
|
|
+ return streamingAddr.getPort();
|
|
|
}
|
|
|
|
|
|
String getStorageId() {
|
|
|
return storage.getStorageID();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Get host:port with host set to Datanode host and port set to the
|
|
|
- * port {@link DataXceiver} is serving.
|
|
|
- * @return host:port string
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return name useful for logging
|
|
|
*/
|
|
|
- public String getMachineName() {
|
|
|
- return hostName + ":" + getPort();
|
|
|
+ public String getDisplayName() {
|
|
|
+ // NB: our DatanodeID may not be set yet
|
|
|
+ return hostName + ":" + getIpcPort();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * NB: The datanode can perform data transfer on the streaming
|
|
|
+ * address however clients are given the IPC IP address for data
|
|
|
+ * transfer, and that may be be a different address.
|
|
|
+ *
|
|
|
+ * @return socket address for data transfer
|
|
|
+ */
|
|
|
+ public InetSocketAddress getXferAddress() {
|
|
|
+ return streamingAddr;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the datanode's IPC port
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
public int getIpcPort() {
|
|
|
return ipcServer.getListenerAddress().getPort();
|
|
|
}
|
|
@@ -880,25 +897,6 @@ public class DataNode extends Configured
|
|
|
return bpos.bpRegistration;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * get BP registration by machine and port name (host:port)
|
|
|
- * @param mName - the name that the NN used
|
|
|
- * @return BP registration
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- DatanodeRegistration getDNRegistrationByMachineName(String mName) {
|
|
|
- // TODO: all the BPs should have the same name as each other, they all come
|
|
|
- // from getName() here! and the use cases only are in tests where they just
|
|
|
- // call with getName(). So we could probably just make this method return
|
|
|
- // the first BPOS's registration. See HDFS-2609.
|
|
|
- BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
|
|
|
- for (BPOfferService bpos : bposArray) {
|
|
|
- if(bpos.bpRegistration.getName().equals(mName))
|
|
|
- return bpos.bpRegistration;
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Creates either NIO or regular depending on socketWriteTimeout.
|
|
|
*/
|
|
@@ -918,8 +916,8 @@ public class DataNode extends Configured
|
|
|
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
|
|
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
|
|
|
throws IOException {
|
|
|
- final InetSocketAddress addr = NetUtils.createSocketAddr(
|
|
|
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
|
|
+ final InetSocketAddress addr =
|
|
|
+ NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
|
|
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
|
|
|
}
|
|
@@ -936,10 +934,6 @@ public class DataNode extends Configured
|
|
|
throw new IOException(ie.getMessage());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public InetSocketAddress getSelfAddr() {
|
|
|
- return selfAddr;
|
|
|
- }
|
|
|
|
|
|
DataNodeMetrics getMetrics() {
|
|
|
return metrics;
|
|
@@ -947,7 +941,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
public static void setNewStorageID(DatanodeID dnId) {
|
|
|
LOG.info("Datanode is " + dnId);
|
|
|
- dnId.storageID = createNewStorageId(dnId.getPort());
|
|
|
+ dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
|
|
|
}
|
|
|
|
|
|
static String createNewStorageId(int port) {
|
|
@@ -1223,7 +1217,7 @@ public class DataNode extends Configured
|
|
|
if (LOG.isInfoEnabled()) {
|
|
|
StringBuilder xfersBuilder = new StringBuilder();
|
|
|
for (int i = 0; i < numTargets; i++) {
|
|
|
- xfersBuilder.append(xferTargets[i].getName());
|
|
|
+ xfersBuilder.append(xferTargets[i]);
|
|
|
xfersBuilder.append(" ");
|
|
|
}
|
|
|
LOG.info(bpReg + " Starting thread to transfer block " +
|
|
@@ -1381,7 +1375,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
try {
|
|
|
InetSocketAddress curTarget =
|
|
|
- NetUtils.createSocketAddr(targets[0].getName());
|
|
|
+ NetUtils.createSocketAddr(targets[0].getXferAddr());
|
|
|
sock = newSocket();
|
|
|
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
|
|
|
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
|
|
@@ -1434,9 +1428,8 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException ie) {
|
|
|
- LOG.warn(
|
|
|
- bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
|
|
|
- + " got ", ie);
|
|
|
+ LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
|
|
|
+ targets[0] + " got ", ie);
|
|
|
// check if there are any disk problem
|
|
|
checkDiskError();
|
|
|
|
|
@@ -1632,7 +1625,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return "DataNode{data=" + data + ", localName='" + getMachineName()
|
|
|
+ return "DataNode{data=" + data + ", localName='" + getDisplayName()
|
|
|
+ "', storageID='" + getStorageId() + "', xmitsInProgress="
|
|
|
+ xmitsInProgress.get() + "}";
|
|
|
}
|
|
@@ -1990,15 +1983,14 @@ public class DataNode extends Configured
|
|
|
|
|
|
private static void logRecoverBlock(String who,
|
|
|
ExtendedBlock block, DatanodeID[] targets) {
|
|
|
- StringBuilder msg = new StringBuilder(targets[0].getName());
|
|
|
+ StringBuilder msg = new StringBuilder(targets[0].toString());
|
|
|
for (int i = 1; i < targets.length; i++) {
|
|
|
- msg.append(", " + targets[i].getName());
|
|
|
+ msg.append(", " + targets[i]);
|
|
|
}
|
|
|
LOG.info(who + " calls recoverBlock(block=" + block
|
|
|
+ ", targets=[" + msg + "])");
|
|
|
}
|
|
|
|
|
|
- // ClientDataNodeProtocol implementation
|
|
|
@Override // ClientDataNodeProtocol
|
|
|
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
|
|
|
checkWriteAccess(block);
|
|
@@ -2076,8 +2068,7 @@ public class DataNode extends Configured
|
|
|
storage.finalizeUpgrade(blockPoolId);
|
|
|
}
|
|
|
|
|
|
- // Determine a Datanode's streaming address
|
|
|
- public static InetSocketAddress getStreamingAddr(Configuration conf) {
|
|
|
+ static InetSocketAddress getStreamingAddr(Configuration conf) {
|
|
|
return NetUtils.createSocketAddr(
|
|
|
conf.get(DFS_DATANODE_ADDRESS_KEY, DFS_DATANODE_ADDRESS_DEFAULT));
|
|
|
}
|
|
@@ -2099,8 +2090,11 @@ public class DataNode extends Configured
|
|
|
return this.getConf().get("dfs.datanode.info.port");
|
|
|
}
|
|
|
|
|
|
- public int getInfoPort(){
|
|
|
- return this.infoServer.getPort();
|
|
|
+ /**
|
|
|
+ * @return the datanode's http port
|
|
|
+ */
|
|
|
+ public int getInfoPort() {
|
|
|
+ return infoServer.getPort();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2142,7 +2136,7 @@ public class DataNode extends Configured
|
|
|
blockPoolManager.refreshNamenodes(conf);
|
|
|
}
|
|
|
|
|
|
- @Override //ClientDatanodeProtocol
|
|
|
+ @Override // ClientDatanodeProtocol
|
|
|
public void refreshNamenodes() throws IOException {
|
|
|
conf = new Configuration();
|
|
|
refreshNamenodes(conf);
|
|
@@ -2204,10 +2198,9 @@ public class DataNode extends Configured
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- /** Methods used by fault injection tests */
|
|
|
+ @VisibleForTesting
|
|
|
public DatanodeID getDatanodeId() {
|
|
|
- return new DatanodeID(getMachineName(), getStorageId(),
|
|
|
- infoServer.getPort(), getIpcPort());
|
|
|
+ return id;
|
|
|
}
|
|
|
|
|
|
/**
|