MiniDFSCluster.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.dfs;
  19. import java.io.*;
  20. import java.net.*;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.net.NetworkTopology;
  24. /**
  25. * This class creates a single-process DFS cluster for junit testing.
  26. * One thread is created for each server.
  27. * The data directories for DFS are undering the testing directory.
  28. * @author Owen O'Malley
  29. */
  30. public class MiniDFSCluster {
  31. private Configuration conf;
  32. int nDatanodes;
  33. private Thread nameNodeThread;
  34. private Thread dataNodeThreads[];
  35. private NameNodeRunner nameNode;
  36. private DataNodeRunner dataNodes[];
  37. private int nameNodePort = 0;
  38. private int nameNodeInfoPort = 0;
  39. /**
  40. * An inner class that runs a name node.
  41. */
  42. class NameNodeRunner implements Runnable {
  43. private NameNode node;
  44. private volatile boolean isInitialized = false;
  45. private boolean isCrashed = false;
  46. private boolean isRunning = true;
  47. public InetSocketAddress getAddress() {
  48. return node.getNameNodeAddress();
  49. }
  50. synchronized public boolean isInitialized() {
  51. return isInitialized;
  52. }
  53. synchronized public boolean isCrashed() {
  54. return isCrashed;
  55. }
  56. public boolean isUp() {
  57. if (node == null) {
  58. return false;
  59. }
  60. try {
  61. long[] sizes = node.getStats();
  62. boolean isUp = false;
  63. synchronized (this) {
  64. isUp = (isInitialized && !node.isInSafeMode() && sizes[0] != 0);
  65. }
  66. return isUp;
  67. } catch (IOException ie) {
  68. return false;
  69. }
  70. }
  71. /**
  72. * Create the name node and run it.
  73. */
  74. public void run() {
  75. try {
  76. synchronized( this ) {
  77. if( isRunning ) {
  78. node = new NameNode(conf);
  79. }
  80. isInitialized = true;
  81. }
  82. } catch (Throwable e) {
  83. node = null;
  84. System.err.println("Name node crashed:");
  85. e.printStackTrace();
  86. synchronized (this) {
  87. isCrashed = true;
  88. }
  89. }
  90. }
  91. /**
  92. * Shutdown the name node and wait for it to finish.
  93. */
  94. public synchronized void shutdown() {
  95. isRunning = false;
  96. if (node != null) {
  97. node.stop();
  98. node.join();
  99. }
  100. }
  101. }
  102. /**
  103. * An inner class to run the data node.
  104. */
  105. class DataNodeRunner implements Runnable {
  106. private DataNode node;
  107. Configuration conf = null;
  108. private boolean isRunning = true;
  109. public DataNodeRunner(Configuration conf, File dataDir, int index) {
  110. this.conf = new Configuration(conf);
  111. this.conf.set("dfs.data.dir",
  112. new File(dataDir, "data"+(2*index+1)).getPath()+","+
  113. new File(dataDir, "data"+(2*index+2)).getPath());
  114. }
  115. public DataNodeRunner(Configuration conf, File dataDir,
  116. String networkLoc, int index) {
  117. this(conf, dataDir, index);
  118. this.conf.set("dfs.datanode.rack", networkLoc);
  119. }
  120. /**
  121. * Create and run the data node.
  122. */
  123. public void run() {
  124. try {
  125. String[] dirs = conf.getStrings("dfs.data.dir");
  126. for (int idx = 0; idx < dirs.length; idx++) {
  127. File dataDir = new File(dirs[idx]);
  128. synchronized (DataNodeRunner.class) {
  129. if (!dataDir.mkdirs()) {
  130. if (!dataDir.isDirectory()) {
  131. throw new RuntimeException("Mkdirs failed to create directory " +
  132. dataDir.toString());
  133. }
  134. }
  135. }
  136. }
  137. synchronized (this){
  138. if (isRunning) {
  139. node = new DataNode(conf, conf.get("dfs.datanode.rack",
  140. NetworkTopology.DEFAULT_RACK), dirs);
  141. }
  142. }
  143. node.run();
  144. } catch (Throwable e) {
  145. node.shutdown();
  146. node = null;
  147. System.err.println("Data node crashed:");
  148. e.printStackTrace();
  149. }
  150. }
  151. /**
  152. * Shut down the server and wait for it to finish.
  153. */
  154. public synchronized void shutdown() {
  155. isRunning = false;
  156. if (node != null) {
  157. node.shutdown();
  158. }
  159. }
  160. }
  161. public MiniDFSCluster(Configuration conf,
  162. int nDatanodes,
  163. boolean formatNamenode,
  164. String[] racks) throws IOException {
  165. this(0, conf, nDatanodes, false, formatNamenode, racks);
  166. }
  167. /**
  168. * Create the config and start up the servers. If either the rpc or info port is already
  169. * in use, we will try new ports.
  170. * @param namenodePort suggestion for which rpc port to use. caller should use
  171. * getNameNodePort() to get the actual port used.
  172. * @param dataNodeFirst should the datanode be brought up before the namenode?
  173. * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
  174. */
  175. public MiniDFSCluster(int namenodePort,
  176. Configuration conf,
  177. boolean dataNodeFirst) throws IOException {
  178. this(namenodePort, conf, 1, dataNodeFirst, true, null);
  179. }
  180. /**
  181. * Create the config and start up the only the namenode. If either the rpc or info port is already
  182. * in use, we will try new ports.
  183. * @param namenodePort suggestion for which rpc port to use. caller should use
  184. * getNameNodePort() to get the actual port used.
  185. * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
  186. */
  187. public MiniDFSCluster(int namenodePort,
  188. Configuration conf,
  189. int numRetries,
  190. int numRetriesPerPort) throws IOException {
  191. this(namenodePort, conf, 0, false, false, null);
  192. }
  193. /**
  194. * Create the config and start up the servers. If either the rpc or info port is already
  195. * in use, we will try new ports.
  196. * @param namenodePort suggestion for which rpc port to use. caller should use
  197. * getNameNodePort() to get the actual port used.
  198. * @param nDatanodes Number of datanodes
  199. * @param dataNodeFirst should the datanode be brought up before the namenode?
  200. * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
  201. */
  202. public MiniDFSCluster(int namenodePort,
  203. Configuration conf,
  204. int nDatanodes,
  205. boolean dataNodeFirst) throws IOException {
  206. this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
  207. }
  208. /**
  209. * Create the config and start up the servers. If either the rpc or info port is already
  210. * in use, we will try new ports.
  211. * @param namenodePort suggestion for which rpc port to use. caller should use
  212. * getNameNodePort() to get the actual port used.
  213. * @param nDatanodes Number of datanodes
  214. * @param dataNodeFirst should the datanode be brought up before the namenode?
  215. * @param formatNamenode should the namenode be formatted before starting up ?
  216. * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
  217. */
  218. public MiniDFSCluster(int namenodePort,
  219. Configuration conf,
  220. int nDatanodes,
  221. boolean dataNodeFirst,
  222. boolean formatNamenode ) throws IOException {
  223. this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
  224. }
  225. /**
  226. * Create the config and start up the servers. If either the rpc or info port is already
  227. * in use, we will try new ports.
  228. * @param namenodePort suggestion for which rpc port to use. caller should use
  229. * getNameNodePort() to get the actual port used.
  230. * @param nDatanodes Number of datanodes
  231. * @param dataNodeFirst should the datanode be brought up before the namenode?
  232. * @param formatNamenode should the namenode be formatted before starting up ?
  233. * @param racks array of strings indicating racks that each datanode is on
  234. */
  235. public MiniDFSCluster(int namenodePort,
  236. Configuration conf,
  237. int nDatanodes,
  238. boolean dataNodeFirst,
  239. boolean formatNamenode,
  240. String[] racks) throws IOException {
  241. this.conf = conf;
  242. this.nDatanodes = nDatanodes;
  243. this.nameNodePort = namenodePort;
  244. this.conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
  245. this.conf.setInt("dfs.info.port", nameNodeInfoPort);
  246. this.conf.setInt("dfs.datanode.info.port", 0);
  247. File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
  248. File data_dir = new File(base_dir, "data");
  249. this.conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
  250. new File(base_dir, "name2").getPath());
  251. this.conf.setInt("dfs.replication", Math.min(3, nDatanodes));
  252. this.conf.setInt("dfs.safemode.extension", 0);
  253. // Create the NameNode
  254. if (formatNamenode) { NameNode.format(conf); }
  255. nameNode = new NameNodeRunner();
  256. nameNodeThread = new Thread(nameNode);
  257. //
  258. // Start the MiniDFSCluster
  259. //
  260. if (dataNodeFirst) {
  261. startDataNodes(conf, racks, data_dir);
  262. }
  263. // Start the namenode and wait for it to be initialized
  264. nameNodeThread.start();
  265. while (!nameNode.isCrashed() && !nameNode.isInitialized()) {
  266. try { // let daemons get started
  267. System.err.println("Waiting for the NameNode to initialize...");
  268. Thread.sleep(1000);
  269. } catch(InterruptedException e) {
  270. }
  271. if (nameNode.isCrashed()) {
  272. throw new RuntimeException("Namenode crashed");
  273. }
  274. }
  275. // Set up the right ports for the datanodes
  276. InetSocketAddress nnAddr = nameNode.getAddress();
  277. nameNodePort = nnAddr.getPort();
  278. this.conf.set("fs.default.name", nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
  279. // Start the datanodes
  280. if (!dataNodeFirst) {
  281. startDataNodes(conf, racks, data_dir);
  282. }
  283. while (!nameNode.isCrashed() && !nameNode.isUp()) {
  284. try { // let daemons get started
  285. System.err.println("Waiting for the Mini HDFS Cluster to start...");
  286. Thread.sleep(1000);
  287. } catch(InterruptedException e) {
  288. }
  289. }
  290. if (nameNode.isCrashed()) {
  291. throw new RuntimeException("Namenode crashed");
  292. }
  293. }
  294. private void startDataNodes(Configuration conf, String[] racks, File data_dir) {
  295. // Create the DataNodes & start them
  296. dataNodes = new DataNodeRunner[nDatanodes];
  297. dataNodeThreads = new Thread[nDatanodes];
  298. for (int idx = 0; idx < nDatanodes; idx++) {
  299. if( racks == null || idx >= racks.length) {
  300. dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
  301. } else {
  302. dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
  303. }
  304. dataNodeThreads[idx] = new Thread(dataNodes[idx]);
  305. dataNodeThreads[idx].start();
  306. }
  307. }
  308. /**
  309. * Returns the rpc port used by the mini cluster, because the caller supplied port is
  310. * not necessarily the actual port used.
  311. */
  312. public int getNameNodePort() {
  313. return nameNode.getAddress().getPort();
  314. }
  315. /**
  316. * Shut down the servers.
  317. */
  318. public void shutdown() {
  319. System.out.println("Shutting down the cluster");
  320. for (int idx = 0; idx < nDatanodes; idx++) {
  321. dataNodes[idx].shutdown();
  322. }
  323. nameNode.shutdown();
  324. for (int idx = 0; idx < nDatanodes; idx++) {
  325. try {
  326. dataNodeThreads[idx].join();
  327. } catch(InterruptedException e) {
  328. }
  329. }
  330. try {
  331. nameNodeThread.join();
  332. } catch (InterruptedException e) {
  333. }
  334. }
  335. /**
  336. * Get a client handle to the DFS cluster.
  337. */
  338. public FileSystem getFileSystem() throws IOException {
  339. return FileSystem.get(conf);
  340. }
  341. /**
  342. * Get the directories where the namenode stores image
  343. */
  344. public File[] getNameDirs() {
  345. return NameNode.getDirs(conf);
  346. }
  347. /**
  348. * Wait till the cluster is active and running.
  349. */
  350. public void waitActive() throws IOException {
  351. InetSocketAddress addr = new InetSocketAddress("localhost",
  352. getNameNodePort());
  353. DFSClient client = new DFSClient(addr, conf);
  354. //
  355. // get initial state of datanodes
  356. //
  357. DatanodeInfo[] oldinfo = client.datanodeReport();
  358. while (oldinfo.length != nDatanodes) {
  359. try {
  360. Thread.sleep(500);
  361. } catch (Exception e) {
  362. }
  363. oldinfo = client.datanodeReport();
  364. }
  365. //
  366. // wait till all datanodes send at least yet another heartbeat
  367. //
  368. int numdead = 0;
  369. while (numdead > 0) {
  370. try {
  371. Thread.sleep(500);
  372. } catch (Exception e) {
  373. }
  374. DatanodeInfo[] info = client.datanodeReport();
  375. if (info.length != nDatanodes) {
  376. continue;
  377. }
  378. numdead = 0;
  379. for (int i = 0; i < info.length; i++) {
  380. if (oldinfo[i].getLastUpdate() >= info[i].getLastUpdate()) {
  381. numdead++;
  382. }
  383. }
  384. }
  385. client.close();
  386. }
  387. }