|
@@ -106,18 +106,20 @@ public class DataNode implements FSConstants, Runnable {
|
|
return new InetSocketAddress(hostname, port);
|
|
return new InetSocketAddress(hostname, port);
|
|
}
|
|
}
|
|
|
|
|
|
- DatanodeProtocol namenode;
|
|
|
|
- FSDataset data;
|
|
|
|
- DatanodeRegistration dnRegistration;
|
|
|
|
|
|
+ DatanodeProtocol namenode = null;
|
|
|
|
+ FSDataset data = null;
|
|
|
|
+ DatanodeRegistration dnRegistration = null;
|
|
private String networkLoc;
|
|
private String networkLoc;
|
|
volatile boolean shouldRun = true;
|
|
volatile boolean shouldRun = true;
|
|
Vector receivedBlockList = new Vector();
|
|
Vector receivedBlockList = new Vector();
|
|
int xmitsInProgress = 0;
|
|
int xmitsInProgress = 0;
|
|
Daemon dataXceiveServer = null;
|
|
Daemon dataXceiveServer = null;
|
|
long blockReportInterval;
|
|
long blockReportInterval;
|
|
|
|
+ long lastBlockReport = 0;
|
|
|
|
+ long lastHeartbeat = 0;
|
|
long heartBeatInterval;
|
|
long heartBeatInterval;
|
|
private DataStorage storage = null;
|
|
private DataStorage storage = null;
|
|
- private StatusHttpServer infoServer;
|
|
|
|
|
|
+ private StatusHttpServer infoServer = null;
|
|
private DataNodeMetrics myMetrics = new DataNodeMetrics();
|
|
private DataNodeMetrics myMetrics = new DataNodeMetrics();
|
|
private static InetSocketAddress nameNodeAddr;
|
|
private static InetSocketAddress nameNodeAddr;
|
|
private static DataNode datanodeObject = null;
|
|
private static DataNode datanodeObject = null;
|
|
@@ -187,73 +189,57 @@ public class DataNode implements FSConstants, Runnable {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Create the DataNode given a configuration and an array of dataDirs.
|
|
|
|
- * 'dataDirs' is where the blocks are stored.
|
|
|
|
|
|
+ * @deprecated
|
|
|
|
+ * TODO: only MiniDFSCluster needs it, should be removed
|
|
*/
|
|
*/
|
|
- DataNode(Configuration conf, String[] dataDirs) throws IOException {
|
|
|
|
- this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
|
|
|
|
- this(networkLoc, dataDirs,
|
|
|
|
- createSocketAddr(conf.get("fs.default.name", "local")), conf);
|
|
|
|
- // register datanode
|
|
|
|
- int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
|
|
|
|
- String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
|
|
|
|
- this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
|
|
|
|
- //create a servlet to serve full-file content
|
|
|
|
- this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
|
|
|
|
- this.infoServer.start();
|
|
|
|
- this.dnRegistration.infoPort = this.infoServer.getPort();
|
|
|
|
- // register datanode
|
|
|
|
- try {
|
|
|
|
- register();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- try {
|
|
|
|
- infoServer.stop();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- }
|
|
|
|
- throw ie;
|
|
|
|
- }
|
|
|
|
- datanodeObject = this;
|
|
|
|
|
|
+ DataNode( Configuration conf, String networkLoc, String[] dataDirs ) throws IOException {
|
|
|
|
+ // networkLoc is ignored since it is already in the conf
|
|
|
|
+ this( conf, Storage.makeListOfFiles( dataDirs ) );
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * A DataNode can also be created with configuration information
|
|
|
|
- * explicitly given.
|
|
|
|
- *
|
|
|
|
- * @see DataStorage
|
|
|
|
|
|
+ * Create the DataNode given a configuration and an array of dataDirs.
|
|
|
|
+ * 'dataDirs' is where the blocks are stored.
|
|
*/
|
|
*/
|
|
- private DataNode(String networkLoc,
|
|
|
|
- String[] dataDirs,
|
|
|
|
- InetSocketAddress nameNodeAddr,
|
|
|
|
- Configuration conf ) throws IOException {
|
|
|
|
- File[] volumes = new File[dataDirs.length];
|
|
|
|
- for (int idx = 0; idx < dataDirs.length; idx++) {
|
|
|
|
- volumes[idx] = new File(dataDirs[idx]);
|
|
|
|
|
|
+ DataNode( Configuration conf,
|
|
|
|
+ AbstractList<File> dataDirs ) throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ startDataNode( conf, dataDirs );
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ shutdown();
|
|
|
|
+ throw ie;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void startDataNode( Configuration conf,
|
|
|
|
+ AbstractList<File> dataDirs
|
|
|
|
+ ) throws IOException {
|
|
// use configured nameserver & interface to get local hostname
|
|
// use configured nameserver & interface to get local hostname
|
|
- machineName =
|
|
|
|
- DNS.getDefaultHost
|
|
|
|
- (conf.get("dfs.datanode.dns.interface","default"),
|
|
|
|
- conf.get("dfs.datanode.dns.nameserver","default"));
|
|
|
|
-
|
|
|
|
- // get storage info and lock the data dirs
|
|
|
|
- storage = new DataStorage( volumes );
|
|
|
|
- int numDirs = storage.getNumLocked();
|
|
|
|
- if (numDirs == 0) { // all data dirs are in use
|
|
|
|
- throw new IOException("Cannot start multiple Datanode instances "
|
|
|
|
- + "sharing the same data directories.\n"
|
|
|
|
- + StringUtils.arrayToString(dataDirs) + " are locked. ");
|
|
|
|
- }
|
|
|
|
- volumes = storage.getLockedDirs();
|
|
|
|
|
|
+ machineName = DNS.getDefaultHost(
|
|
|
|
+ conf.get("dfs.datanode.dns.interface","default"),
|
|
|
|
+ conf.get("dfs.datanode.dns.nameserver","default"));
|
|
|
|
+ InetSocketAddress nameNodeAddr = createSocketAddr(
|
|
|
|
+ conf.get("fs.default.name", "local"));
|
|
|
|
+
|
|
// connect to name node
|
|
// connect to name node
|
|
this.namenode = (DatanodeProtocol)
|
|
this.namenode = (DatanodeProtocol)
|
|
RPC.waitForProxy(DatanodeProtocol.class,
|
|
RPC.waitForProxy(DatanodeProtocol.class,
|
|
DatanodeProtocol.versionID,
|
|
DatanodeProtocol.versionID,
|
|
nameNodeAddr,
|
|
nameNodeAddr,
|
|
conf);
|
|
conf);
|
|
|
|
+ // get version and id info from the name-node
|
|
|
|
+ NamespaceInfo nsInfo = handshake();
|
|
|
|
+
|
|
|
|
+ // read storage info, lock data dirs and transition fs state if necessary
|
|
|
|
+ StartupOption startOpt = (StartupOption)conf.get( "dfs.datanode.startup",
|
|
|
|
+ StartupOption.REGULAR );
|
|
|
|
+ assert startOpt != null : "Startup option must be set.";
|
|
|
|
+ storage = new DataStorage();
|
|
|
|
+ storage.recoverTransitionRead( nsInfo, dataDirs, startOpt );
|
|
|
|
+
|
|
|
|
+ // initialize data node internal structure
|
|
|
|
+ this.data = new FSDataset( storage, conf );
|
|
|
|
+
|
|
// find free port
|
|
// find free port
|
|
ServerSocket ss = null;
|
|
ServerSocket ss = null;
|
|
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
|
|
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
|
|
@@ -268,15 +254,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// construct registration
|
|
// construct registration
|
|
- this.dnRegistration = new DatanodeRegistration(
|
|
|
|
- DFS_CURRENT_VERSION,
|
|
|
|
- machineName + ":" + tmpPort,
|
|
|
|
- storage.getStorageID(),
|
|
|
|
- -1,
|
|
|
|
- "" );
|
|
|
|
- this.networkLoc = networkLoc;
|
|
|
|
- // initialize data node internal structure
|
|
|
|
- this.data = new FSDataset(volumes, conf);
|
|
|
|
|
|
+ this.dnRegistration = new DatanodeRegistration(
|
|
|
|
+ machineName + ":" + tmpPort,
|
|
|
|
+ -1, // info port determined later
|
|
|
|
+ storage );
|
|
|
|
+
|
|
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
|
|
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
|
|
|
|
|
|
long blockReportIntervalBasis =
|
|
long blockReportIntervalBasis =
|
|
@@ -284,7 +266,55 @@ public class DataNode implements FSConstants, Runnable {
|
|
this.blockReportInterval =
|
|
this.blockReportInterval =
|
|
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
|
|
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
|
|
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
|
|
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
|
|
- this.nameNodeAddr = nameNodeAddr;
|
|
|
|
|
|
+ DataNode.nameNodeAddr = nameNodeAddr;
|
|
|
|
+
|
|
|
|
+ //create a servlet to serve full-file content
|
|
|
|
+ int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
|
|
|
|
+ String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
|
|
|
|
+ this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
|
|
|
|
+ this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
|
|
|
|
+ this.infoServer.start();
|
|
|
|
+ this.dnRegistration.infoPort = this.infoServer.getPort();
|
|
|
|
+ // get network location
|
|
|
|
+ this.networkLoc = conf.get( "dfs.datanode.rack" );
|
|
|
|
+ if( networkLoc == null ) // exec network script or set the default rack
|
|
|
|
+ networkLoc = getNetworkLoc( conf );
|
|
|
|
+ // register datanode
|
|
|
|
+ register();
|
|
|
|
+ datanodeObject = this;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private NamespaceInfo handshake() throws IOException {
|
|
|
|
+ NamespaceInfo nsInfo;
|
|
|
|
+ while( true ) {
|
|
|
|
+ try {
|
|
|
|
+ nsInfo = namenode.versionRequest();
|
|
|
|
+ break;
|
|
|
|
+ } catch( SocketTimeoutException e ) { // namenode is busy
|
|
|
|
+ LOG.info("Problem connecting to server: " + getNameNodeAddr());
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ String errorMsg = null;
|
|
|
|
+ // verify build version
|
|
|
|
+ if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
|
|
|
|
+ errorMsg = "Incompatible build versions: namenode BV = "
|
|
|
|
+ + nsInfo.getBuildVersion() + "; datanode BV = "
|
|
|
|
+ + Storage.getBuildVersion();
|
|
|
|
+ LOG.fatal( errorMsg );
|
|
|
|
+ try {
|
|
|
|
+ namenode.errorReport( dnRegistration,
|
|
|
|
+ DatanodeProtocol.NOTIFY, errorMsg );
|
|
|
|
+ } catch( SocketTimeoutException e ) { // namenode is busy
|
|
|
|
+ LOG.info("Problem connecting to server: " + getNameNodeAddr());
|
|
|
|
+ }
|
|
|
|
+ throw new IOException( errorMsg );
|
|
|
|
+ }
|
|
|
|
+ assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
|
|
|
|
+ "Data-node and name-node layout versions must be the same.";
|
|
|
|
+ return nsInfo;
|
|
}
|
|
}
|
|
|
|
|
|
/** Return the DataNode object
|
|
/** Return the DataNode object
|
|
@@ -314,7 +344,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
* 2) to receive a registrationID
|
|
* 2) to receive a registrationID
|
|
* issued by the namenode to recognize registered datanodes.
|
|
* issued by the namenode to recognize registered datanodes.
|
|
*
|
|
*
|
|
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
|
|
|
|
|
+ * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private void register() throws IOException {
|
|
private void register() throws IOException {
|
|
@@ -355,7 +385,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
}
|
|
}
|
|
if (storage != null) {
|
|
if (storage != null) {
|
|
try {
|
|
try {
|
|
- this.storage.closeAll();
|
|
|
|
|
|
+ this.storage.unlockAll();
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -388,7 +418,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
*/
|
|
*/
|
|
public void offerService() throws Exception {
|
|
public void offerService() throws Exception {
|
|
|
|
|
|
- long lastHeartbeat = 0, lastBlockReport = 0;
|
|
|
|
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
|
|
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
|
|
|
|
|
|
//
|
|
//
|
|
@@ -410,47 +439,17 @@ public class DataNode implements FSConstants, Runnable {
|
|
// -- Total capacity
|
|
// -- Total capacity
|
|
// -- Bytes remaining
|
|
// -- Bytes remaining
|
|
//
|
|
//
|
|
- BlockCommand cmd = namenode.sendHeartbeat(dnRegistration,
|
|
|
|
|
|
+ DatanodeCommand cmd = namenode.sendHeartbeat( dnRegistration,
|
|
data.getCapacity(),
|
|
data.getCapacity(),
|
|
data.getRemaining(),
|
|
data.getRemaining(),
|
|
xmitsInProgress,
|
|
xmitsInProgress,
|
|
xceiverCount.getValue());
|
|
xceiverCount.getValue());
|
|
//LOG.info("Just sent heartbeat, with name " + localName);
|
|
//LOG.info("Just sent heartbeat, with name " + localName);
|
|
lastHeartbeat = now;
|
|
lastHeartbeat = now;
|
|
-
|
|
|
|
- if( cmd != null ) {
|
|
|
|
- switch( cmd.action ) {
|
|
|
|
- case DNA_TRANSFER:
|
|
|
|
- //
|
|
|
|
- // Send a copy of a block to another datanode
|
|
|
|
- //
|
|
|
|
- transferBlocks( cmd.getBlocks(), cmd.getTargets() );
|
|
|
|
- break;
|
|
|
|
- case DNA_INVALIDATE:
|
|
|
|
- //
|
|
|
|
- // Some local block(s) are obsolete and can be
|
|
|
|
- // safely garbage-collected.
|
|
|
|
- //
|
|
|
|
- Block toDelete[] = cmd.getBlocks();
|
|
|
|
- data.invalidate(toDelete);
|
|
|
|
- myMetrics.removedBlocks(toDelete.length);
|
|
|
|
- break;
|
|
|
|
- case DNA_SHUTDOWN:
|
|
|
|
- // shut down the data node
|
|
|
|
- this.shutdown();
|
|
|
|
- continue;
|
|
|
|
- case DNA_REGISTER:
|
|
|
|
- // namenode requested a registration
|
|
|
|
- register();
|
|
|
|
- lastHeartbeat=0;
|
|
|
|
- lastBlockReport=0;
|
|
|
|
- continue;
|
|
|
|
- default:
|
|
|
|
- LOG.warn( "Unknown BlockCommand action: " + cmd.action);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ if( ! processCommand( cmd ) )
|
|
|
|
+ continue;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
// send block report
|
|
// send block report
|
|
if (now - lastBlockReport > blockReportInterval) {
|
|
if (now - lastBlockReport > blockReportInterval) {
|
|
//
|
|
//
|
|
@@ -458,9 +457,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
// Get back a list of local block(s) that are obsolete
|
|
// Get back a list of local block(s) that are obsolete
|
|
// and can be safely GC'ed.
|
|
// and can be safely GC'ed.
|
|
//
|
|
//
|
|
- Block toDelete[] = namenode.blockReport(dnRegistration,
|
|
|
|
- data.getBlockReport());
|
|
|
|
- data.invalidate(toDelete);
|
|
|
|
|
|
+ DatanodeCommand cmd = namenode.blockReport( dnRegistration,
|
|
|
|
+ data.getBlockReport());
|
|
|
|
+ processCommand( cmd );
|
|
lastBlockReport = now;
|
|
lastBlockReport = now;
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
@@ -516,6 +515,51 @@ public class DataNode implements FSConstants, Runnable {
|
|
} // while (shouldRun)
|
|
} // while (shouldRun)
|
|
} // offerService
|
|
} // offerService
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ *
|
|
|
|
+ * @param cmd
|
|
|
|
+ * @return true if further processing may be required or false otherwise.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private boolean processCommand( DatanodeCommand cmd ) throws IOException {
|
|
|
|
+ if( cmd == null )
|
|
|
|
+ return true;
|
|
|
|
+ switch( cmd.action ) {
|
|
|
|
+ case DNA_TRANSFER:
|
|
|
|
+ //
|
|
|
|
+ // Send a copy of a block to another datanode
|
|
|
|
+ //
|
|
|
|
+ BlockCommand bcmd = (BlockCommand)cmd;
|
|
|
|
+ transferBlocks( bcmd.getBlocks(), bcmd.getTargets() );
|
|
|
|
+ break;
|
|
|
|
+ case DNA_INVALIDATE:
|
|
|
|
+ //
|
|
|
|
+ // Some local block(s) are obsolete and can be
|
|
|
|
+ // safely garbage-collected.
|
|
|
|
+ //
|
|
|
|
+ Block toDelete[] = ((BlockCommand)cmd).getBlocks();
|
|
|
|
+ data.invalidate(toDelete);
|
|
|
|
+ myMetrics.removedBlocks(toDelete.length);
|
|
|
|
+ break;
|
|
|
|
+ case DNA_SHUTDOWN:
|
|
|
|
+ // shut down the data node
|
|
|
|
+ this.shutdown();
|
|
|
|
+ return false;
|
|
|
|
+ case DNA_REGISTER:
|
|
|
|
+ // namenode requested a registration
|
|
|
|
+ register();
|
|
|
|
+ lastHeartbeat=0;
|
|
|
|
+ lastBlockReport=0;
|
|
|
|
+ break;
|
|
|
|
+ case DNA_FINALIZE:
|
|
|
|
+ storage.finalizeUpgrade();
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ LOG.warn( "Unknown DatanodeCommand action: " + cmd.action);
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
private void transferBlocks( Block blocks[],
|
|
private void transferBlocks( Block blocks[],
|
|
DatanodeInfo xferTargets[][]
|
|
DatanodeInfo xferTargets[][]
|
|
) throws IOException {
|
|
) throws IOException {
|
|
@@ -1074,9 +1118,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
/** Start datanode daemon.
|
|
/** Start datanode daemon.
|
|
*/
|
|
*/
|
|
- public static void run(Configuration conf, String networkLoc) throws IOException {
|
|
|
|
|
|
+ public static void run(Configuration conf) throws IOException {
|
|
String[] dataDirs = conf.getStrings("dfs.data.dir");
|
|
String[] dataDirs = conf.getStrings("dfs.data.dir");
|
|
- DataNode dn = makeInstance(networkLoc, dataDirs, conf);
|
|
|
|
|
|
+ DataNode dn = makeInstance(dataDirs, conf);
|
|
if (dn != null) {
|
|
if (dn != null) {
|
|
dataNodeList.add(dn);
|
|
dataNodeList.add(dn);
|
|
Thread t = new Thread(dn, "DataNode: [" +
|
|
Thread t = new Thread(dn, "DataNode: [" +
|
|
@@ -1087,25 +1131,34 @@ public class DataNode implements FSConstants, Runnable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Shut down all datanodes that where started via the
|
|
|
|
- * run(conf,networkLoc) method.
|
|
|
|
- * Returns only after shutdown is complete.
|
|
|
|
- */
|
|
|
|
- public static void shutdownAll(){
|
|
|
|
- while (!dataNodeList.isEmpty()) {
|
|
|
|
- dataNodeList.remove(0).shutdown();
|
|
|
|
- dataNodeThreadList.remove(0).interrupt();
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Shut down all datanodes that where started via the
|
|
|
|
+ * run(conf,networkLoc) method.
|
|
|
|
+ * Returns only after shutdown is complete.
|
|
|
|
+ */
|
|
|
|
+ public static void shutdownAll(){
|
|
|
|
+ while (!dataNodeList.isEmpty()) {
|
|
|
|
+ dataNodeList.remove(0).shutdown();
|
|
|
|
+ dataNodeThreadList.remove(0).interrupt();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
|
|
/** Start a single datanode daemon and wait for it to finish.
|
|
/** Start a single datanode daemon and wait for it to finish.
|
|
* If this thread is specifically interrupted, it will stop waiting.
|
|
* If this thread is specifically interrupted, it will stop waiting.
|
|
*/
|
|
*/
|
|
- private static void runAndWait(Configuration conf, String networkLoc)
|
|
|
|
- throws IOException {
|
|
|
|
- run(conf, networkLoc);
|
|
|
|
|
|
+ static DataNode createDataNode( String args[],
|
|
|
|
+ Configuration conf ) throws IOException {
|
|
|
|
+ if( conf == null )
|
|
|
|
+ conf = new Configuration();
|
|
|
|
+ if( ! parseArguments( args, conf )) {
|
|
|
|
+ printUsage();
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ run(conf);
|
|
|
|
+ return (DataNode)dataNodeList.get(0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void join() {
|
|
if (dataNodeThreadList.size() > 0) {
|
|
if (dataNodeThreadList.size() > 0) {
|
|
Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
|
|
Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
|
|
try {
|
|
try {
|
|
@@ -1125,24 +1178,22 @@ public class DataNode implements FSConstants, Runnable {
|
|
* no directory from this directory list can be created.
|
|
* no directory from this directory list can be created.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- static DataNode makeInstance( String[] dataDirs, Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
- return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf)
|
|
|
|
|
|
+ static DataNode makeInstance( String[] dataDirs, Configuration conf )
|
|
throws IOException {
|
|
throws IOException {
|
|
- ArrayList<String> dirs = new ArrayList<String>();
|
|
|
|
|
|
+ ArrayList<File> dirs = new ArrayList<File>();
|
|
for (int i = 0; i < dataDirs.length; i++) {
|
|
for (int i = 0; i < dataDirs.length; i++) {
|
|
File data = new File(dataDirs[i]);
|
|
File data = new File(dataDirs[i]);
|
|
try {
|
|
try {
|
|
DiskChecker.checkDir( data );
|
|
DiskChecker.checkDir( data );
|
|
- dirs.add(dataDirs[i]);
|
|
|
|
|
|
+ dirs.add(data);
|
|
} catch( DiskErrorException e ) {
|
|
} catch( DiskErrorException e ) {
|
|
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
|
|
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null);
|
|
|
|
|
|
+ if( dirs.size() > 0 )
|
|
|
|
+ return new DataNode(conf, dirs);
|
|
|
|
+ LOG.error("All directories in dfs.data.dir are invalid." );
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
public String toString() {
|
|
public String toString() {
|
|
@@ -1154,11 +1205,49 @@ public class DataNode implements FSConstants, Runnable {
|
|
"}";
|
|
"}";
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void printUsage() {
|
|
|
|
+ System.err.println("Usage: java DataNode");
|
|
|
|
+ System.err.println(" [-r, --rack <network location>] |");
|
|
|
|
+ System.err.println(" [-rollback]");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Parse and verify command line arguments and set configuration parameters.
|
|
|
|
+ *
|
|
|
|
+ * @return false if passed argements are incorrect
|
|
|
|
+ */
|
|
|
|
+ private static boolean parseArguments(String args[],
|
|
|
|
+ Configuration conf ) {
|
|
|
|
+ int argsLen = (args == null) ? 0 : args.length;
|
|
|
|
+ StartupOption startOpt = StartupOption.REGULAR;
|
|
|
|
+ String networkLoc = null;
|
|
|
|
+ for( int i=0; i < argsLen; i++ ) {
|
|
|
|
+ String cmd = args[i];
|
|
|
|
+ if( "-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd) ) {
|
|
|
|
+ if( i==args.length-1 )
|
|
|
|
+ return false;
|
|
|
|
+ networkLoc = args[++i];
|
|
|
|
+ if( networkLoc.startsWith("-") )
|
|
|
|
+ return false;
|
|
|
|
+ } else if( "-rollback".equalsIgnoreCase(cmd) ) {
|
|
|
|
+ startOpt = StartupOption.ROLLBACK;
|
|
|
|
+ } else if( "-regular".equalsIgnoreCase(cmd) ) {
|
|
|
|
+ startOpt = StartupOption.REGULAR;
|
|
|
|
+ } else
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ if( networkLoc != null )
|
|
|
|
+ conf.set( "dfs.datanode.rack", NodeBase.normalize( networkLoc ));
|
|
|
|
+ conf.setObject( "dfs.datanode.startup", startOpt );
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
/* Get the network location by running a script configured in conf */
|
|
/* Get the network location by running a script configured in conf */
|
|
private static String getNetworkLoc( Configuration conf )
|
|
private static String getNetworkLoc( Configuration conf )
|
|
throws IOException {
|
|
throws IOException {
|
|
String locScript = conf.get("dfs.network.script" );
|
|
String locScript = conf.get("dfs.network.script" );
|
|
- if( locScript == null ) return null;
|
|
|
|
|
|
+ if( locScript == null )
|
|
|
|
+ return NetworkTopology.DEFAULT_RACK;
|
|
|
|
|
|
LOG.info( "Starting to run script to get datanode network location");
|
|
LOG.info( "Starting to run script to get datanode network location");
|
|
Process p = Runtime.getRuntime().exec( locScript );
|
|
Process p = Runtime.getRuntime().exec( locScript );
|
|
@@ -1222,49 +1311,13 @@ public class DataNode implements FSConstants, Runnable {
|
|
return networkLoc.toString();
|
|
return networkLoc.toString();
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- /* Get the network location from the command line */
|
|
|
|
- private static String getNetworkLoc(String args[]) {
|
|
|
|
- for( int i=0; i< args.length; i++ ) {
|
|
|
|
- if ("-r".equals(args[i])||"--rack".equals(args[i]) ) {
|
|
|
|
- if( i==args.length-1 ) {
|
|
|
|
- printUsage();
|
|
|
|
- } else {
|
|
|
|
- return args[++i];
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /* Return the datanode's network location
|
|
|
|
- * either from the command line, from script, or a default value
|
|
|
|
- */
|
|
|
|
- private static String getNetworkLoc(String args[], Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
- String networkLoc = getNetworkLoc( args );
|
|
|
|
- if( networkLoc == null ) {
|
|
|
|
- networkLoc = getNetworkLoc( conf );
|
|
|
|
- }
|
|
|
|
- if( networkLoc == null ) {
|
|
|
|
- return NetworkTopology.DEFAULT_RACK;
|
|
|
|
- } else {
|
|
|
|
- return NodeBase.normalize( networkLoc );
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static void printUsage() {
|
|
|
|
- System.err.println(
|
|
|
|
- "Usage: java DataNode [-r, --rack <network location>]");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
*/
|
|
*/
|
|
- public static void main(String args[]) throws IOException {
|
|
|
|
|
|
+ public static void main(String args[]) {
|
|
try {
|
|
try {
|
|
- Configuration conf = new Configuration();
|
|
|
|
- runAndWait(conf, getNetworkLoc(args, conf));
|
|
|
|
|
|
+ DataNode datanode = createDataNode( args, null );
|
|
|
|
+ if( datanode != null )
|
|
|
|
+ datanode.join();
|
|
} catch ( Throwable e ) {
|
|
} catch ( Throwable e ) {
|
|
LOG.error( StringUtils.stringifyException( e ) );
|
|
LOG.error( StringUtils.stringifyException( e ) );
|
|
System.exit(-1);
|
|
System.exit(-1);
|