|
@@ -19,13 +19,13 @@ package org.apache.hadoop.dfs;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.ipc.*;
|
|
import org.apache.hadoop.ipc.*;
|
|
import org.apache.hadoop.conf.*;
|
|
import org.apache.hadoop.conf.*;
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
import org.apache.hadoop.net.DNS;
|
|
import org.apache.hadoop.net.DNS;
|
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NodeBase;
|
|
import org.apache.hadoop.net.NodeBase;
|
|
import org.apache.hadoop.util.*;
|
|
import org.apache.hadoop.util.*;
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
@@ -83,30 +83,12 @@ public class DataNode implements FSConstants, Runnable {
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Util method to build socket addr from either:
|
|
|
|
- * <host>:<post>
|
|
|
|
- * <fs>://<host>:<port>/<path>
|
|
|
|
|
|
+ * Use {@link NetUtils#createSocketAddr(String)} instead.
|
|
*/
|
|
*/
|
|
|
|
+ @Deprecated
|
|
public static InetSocketAddress createSocketAddr(String target
|
|
public static InetSocketAddress createSocketAddr(String target
|
|
) throws IOException {
|
|
) throws IOException {
|
|
- int colonIndex = target.indexOf(':');
|
|
|
|
- if (colonIndex < 0) {
|
|
|
|
- throw new RuntimeException("Not a host:port pair: " + target);
|
|
|
|
- }
|
|
|
|
- String hostname;
|
|
|
|
- int port;
|
|
|
|
- if (!target.contains("/")) {
|
|
|
|
- // must be the old style <host>:<port>
|
|
|
|
- hostname = target.substring(0, colonIndex);
|
|
|
|
- port = Integer.parseInt(target.substring(colonIndex + 1));
|
|
|
|
- } else {
|
|
|
|
- // a new uri
|
|
|
|
- URI addr = new Path(target).toUri();
|
|
|
|
- hostname = addr.getHost();
|
|
|
|
- port = addr.getPort();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return new InetSocketAddress(hostname, port);
|
|
|
|
|
|
+ return NetUtils.createSocketAddr(target);
|
|
}
|
|
}
|
|
|
|
|
|
DatanodeProtocol namenode = null;
|
|
DatanodeProtocol namenode = null;
|
|
@@ -242,13 +224,15 @@ public class DataNode implements FSConstants, Runnable {
|
|
machineName = DNS.getDefaultHost(
|
|
machineName = DNS.getDefaultHost(
|
|
conf.get("dfs.datanode.dns.interface","default"),
|
|
conf.get("dfs.datanode.dns.interface","default"),
|
|
conf.get("dfs.datanode.dns.nameserver","default"));
|
|
conf.get("dfs.datanode.dns.nameserver","default"));
|
|
- InetSocketAddress nameNodeAddr = createSocketAddr(
|
|
|
|
- conf.get("fs.default.name", "local"));
|
|
|
|
|
|
+ InetSocketAddress nameNodeAddr = NetUtils.createSocketAddr(
|
|
|
|
+ conf.get("fs.default.name", "local"));
|
|
|
|
|
|
this.defaultBytesPerChecksum =
|
|
this.defaultBytesPerChecksum =
|
|
Math.max(conf.getInt("io.bytes.per.checksum", 512), 1);
|
|
Math.max(conf.getInt("io.bytes.per.checksum", 512), 1);
|
|
|
|
|
|
- int tmpPort = conf.getInt("dfs.datanode.port", 50010);
|
|
|
|
|
|
+ String address = conf.get("dfs.datanode.bindAddress", "0.0.0.0:50010");
|
|
|
|
+ InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
|
|
|
|
+ int tmpPort = socAddr.getPort();
|
|
storage = new DataStorage();
|
|
storage = new DataStorage();
|
|
// construct registration
|
|
// construct registration
|
|
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
|
|
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
|
|
@@ -289,19 +273,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
}
|
|
}
|
|
|
|
|
|
// find free port
|
|
// find free port
|
|
- ServerSocket ss = null;
|
|
|
|
- String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
|
|
|
|
- while (ss == null) {
|
|
|
|
- try {
|
|
|
|
- ss = new ServerSocket(tmpPort, 0, InetAddress.getByName(bindAddress));
|
|
|
|
- LOG.info("Opened server at " + tmpPort);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.info("Could not open server at " + tmpPort + ", trying new port");
|
|
|
|
- tmpPort++;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ ServerSocket ss = new ServerSocket(tmpPort, 0, socAddr.getAddress());
|
|
// adjust machine name with the actual port
|
|
// adjust machine name with the actual port
|
|
|
|
+ tmpPort = ss.getLocalPort();
|
|
this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
|
|
+ LOG.info("Opened server at " + tmpPort);
|
|
|
|
|
|
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
|
|
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
|
|
|
|
|
|
@@ -313,9 +289,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
DataNode.nameNodeAddr = nameNodeAddr;
|
|
DataNode.nameNodeAddr = nameNodeAddr;
|
|
|
|
|
|
//create a servlet to serve full-file content
|
|
//create a servlet to serve full-file content
|
|
- int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
|
|
|
|
- String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
|
|
|
|
- this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
|
|
|
|
|
|
+ String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
|
|
|
|
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
|
+ String infoHost = infoSocAddr.getHostName();
|
|
|
|
+ int tmpInfoPort = infoSocAddr.getPort();
|
|
|
|
+ this.infoServer = new StatusHttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0);
|
|
this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
|
|
this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
|
|
this.infoServer.start();
|
|
this.infoServer.start();
|
|
// adjust info port
|
|
// adjust info port
|
|
@@ -380,7 +358,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
return "<namenode>";
|
|
return "<namenode>";
|
|
}
|
|
}
|
|
|
|
|
|
- private void setNewStorageID(DatanodeRegistration dnReg) {
|
|
|
|
|
|
+ static void setNewStorageID(DatanodeRegistration dnReg) {
|
|
/* Return
|
|
/* Return
|
|
* "DS-randInt-ipaddr-currentTimeMillis"
|
|
* "DS-randInt-ipaddr-currentTimeMillis"
|
|
* It is considered extermely rare for all these numbers to match
|
|
* It is considered extermely rare for all these numbers to match
|
|
@@ -965,7 +943,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
InetSocketAddress mirrorTarget = null;
|
|
InetSocketAddress mirrorTarget = null;
|
|
// Connect to backup machine
|
|
// Connect to backup machine
|
|
mirrorNode = targets[0].getName();
|
|
mirrorNode = targets[0].getName();
|
|
- mirrorTarget = createSocketAddr(mirrorNode);
|
|
|
|
|
|
+ mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
|
mirrorSock = new Socket();
|
|
mirrorSock = new Socket();
|
|
try {
|
|
try {
|
|
mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
|
|
mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
|
|
@@ -1098,7 +1076,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
blockSender = new BlockSender(block, 0, -1, false, false);
|
|
blockSender = new BlockSender(block, 0, -1, false, false);
|
|
|
|
|
|
// get the output stream to the target
|
|
// get the output stream to the target
|
|
- InetSocketAddress targetAddr = createSocketAddr(target.getName());
|
|
|
|
|
|
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
|
|
targetSock = new Socket();
|
|
targetSock = new Socket();
|
|
targetSock.connect(targetAddr, READ_TIMEOUT);
|
|
targetSock.connect(targetAddr, READ_TIMEOUT);
|
|
targetSock.setSoTimeout(READ_TIMEOUT);
|
|
targetSock.setSoTimeout(READ_TIMEOUT);
|
|
@@ -1661,7 +1639,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
try {
|
|
try {
|
|
InetSocketAddress curTarget =
|
|
InetSocketAddress curTarget =
|
|
- createSocketAddr(targets[0].getName());
|
|
|
|
|
|
+ NetUtils.createSocketAddr(targets[0].getName());
|
|
sock = new Socket();
|
|
sock = new Socket();
|
|
sock.connect(curTarget, READ_TIMEOUT);
|
|
sock.connect(curTarget, READ_TIMEOUT);
|
|
sock.setSoTimeout(targets.length*READ_TIMEOUT);
|
|
sock.setSoTimeout(targets.length*READ_TIMEOUT);
|