|
@@ -79,7 +79,7 @@ import java.security.SecureRandom;
|
|
* information to clients or other DataNodes that might be interested.
|
|
* information to clients or other DataNodes that might be interested.
|
|
*
|
|
*
|
|
**********************************************************/
|
|
**********************************************************/
|
|
-public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
+public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -131,8 +131,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
private int socketWriteTimeout = 0;
|
|
private int socketWriteTimeout = 0;
|
|
private boolean transferToAllowed = true;
|
|
private boolean transferToAllowed = true;
|
|
|
|
|
|
- private DataBlockScanner blockScanner;
|
|
|
|
- private Daemon blockScannerThread;
|
|
|
|
|
|
+ DataBlockScanner blockScanner;
|
|
|
|
+ Daemon blockScannerThread;
|
|
|
|
|
|
private static final Random R = new Random();
|
|
private static final Random R = new Random();
|
|
|
|
|
|
@@ -152,6 +152,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
long balanceBandwidth;
|
|
long balanceBandwidth;
|
|
private Throttler balancingThrottler;
|
|
private Throttler balancingThrottler;
|
|
|
|
|
|
|
|
+ // For InterDataNodeProtocol
|
|
|
|
+ Server ipcServer;
|
|
|
|
+
|
|
// Record all sockets opend for data transfer
|
|
// Record all sockets opend for data transfer
|
|
Map<Socket, Socket> childSockets = Collections.synchronizedMap(
|
|
Map<Socket, Socket> childSockets = Collections.synchronizedMap(
|
|
new HashMap<Socket, Socket>());
|
|
new HashMap<Socket, Socket>());
|
|
@@ -281,7 +284,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
tmpPort);
|
|
tmpPort);
|
|
this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
- LOG.info("Opened server at " + tmpPort);
|
|
|
|
|
|
+ LOG.info("Opened info server at " + tmpPort);
|
|
|
|
|
|
this.threadGroup = new ThreadGroup("dataXceiveServer");
|
|
this.threadGroup = new ThreadGroup("dataXceiveServer");
|
|
this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
|
|
this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
|
|
@@ -347,6 +350,16 @@ public class DataNode implements FSConstants, Runnable {
|
|
// adjust info port
|
|
// adjust info port
|
|
this.dnRegistration.setInfoPort(this.infoServer.getPort());
|
|
this.dnRegistration.setInfoPort(this.infoServer.getPort());
|
|
myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
|
|
myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
|
|
|
|
+
|
|
|
|
+ //init ipc server
|
|
|
|
+ InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
|
|
|
|
+ conf.get("dfs.datanode.ipc.address"));
|
|
|
|
+ ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
|
|
|
|
+ conf.getInt("dfs.datanode.handler.count", 3), false, conf);
|
|
|
|
+ ipcServer.start();
|
|
|
|
+ dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
|
|
|
|
+
|
|
|
|
+ LOG.info("dnRegistration = " + dnRegistration);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -397,6 +410,17 @@ public class DataNode implements FSConstants, Runnable {
|
|
return datanodeObject;
|
|
return datanodeObject;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
|
|
|
+ DatanodeID datanodeid, Configuration conf) throws IOException {
|
|
|
|
+ InetSocketAddress addr = NetUtils.createSocketAddr(
|
|
|
|
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
|
|
|
+ if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
|
+ InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
|
|
|
|
+ }
|
|
|
|
+ return (InterDatanodeProtocol)RPC.waitForProxy(InterDatanodeProtocol.class,
|
|
|
|
+ InterDatanodeProtocol.versionID, addr, conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
public InetSocketAddress getNameNodeAddr() {
|
|
public InetSocketAddress getNameNodeAddr() {
|
|
return nameNodeAddr;
|
|
return nameNodeAddr;
|
|
}
|
|
}
|
|
@@ -504,6 +528,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if (ipcServer != null) {
|
|
|
|
+ ipcServer.stop();
|
|
|
|
+ }
|
|
this.shouldRun = false;
|
|
this.shouldRun = false;
|
|
if (dataXceiveServer != null) {
|
|
if (dataXceiveServer != null) {
|
|
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
|
|
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
|
|
@@ -2926,4 +2953,33 @@ public class DataNode implements FSConstants, Runnable {
|
|
System.exit(-1);
|
|
System.exit(-1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // InterDataNodeProtocol implementation
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
|
+ public BlockMetaDataInfo getBlockMetaDataInfo(Block block
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("block=" + block);
|
|
|
|
+ }
|
|
|
|
+ return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
|
+ public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
|
|
|
|
+ }
|
|
|
|
+ //TODO: update generation stamp here
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
|
+ public long getProtocolVersion(String protocol, long clientVersion
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ if (protocol.equals(InterDatanodeProtocol.class.getName())) {
|
|
|
|
+ return InterDatanodeProtocol.versionID;
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Unknown protocol to name node: " + protocol);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|