ClusterTestDFS.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.dfs;
  19. import junit.framework.TestCase;
  20. import junit.framework.AssertionFailedError;
  21. import org.apache.commons.logging.*;
  22. import org.apache.hadoop.fs.FSInputStream;
  23. import org.apache.hadoop.fs.FileUtil;
  24. import org.apache.hadoop.io.UTF8;
  25. import org.apache.hadoop.conf.Configuration;
  26. import java.io.File;
  27. import java.io.OutputStream;
  28. import java.net.InetSocketAddress;
  29. import java.util.ArrayList;
  30. import java.util.ListIterator;
  31. import java.util.Random;
  32. import java.lang.reflect.Constructor;
  33. import java.lang.reflect.InvocationTargetException;
  34. /**
  35. * Test DFS.
  36. * ClusterTestDFS is a JUnit test for DFS using "pseudo multiprocessing" (or
  37. more strictly, pseudo distributed) meaning all daemons run in one process
  38. and sockets are used to communicate between daemons. The test permutes
  39. * various block sizes, number of files, file sizes, and number of
  40. * datanodes. After creating 1 or more files and filling them with random
  41. * data, one datanode is shutdown, and then the files are verfified.
  42. * Next, all the random test files are deleted and we test for leakage
  43. * (non-deletion) by directly checking the real directories corresponding
  44. * to the datanodes still running.
  45. * <p>
  46. * Usage notes: TEST_PERMUTATION_MAX can be adjusted to perform more or
  47. * less testing of permutations. The ceiling of useful permutation is
  48. * TEST_PERMUTATION_MAX_CEILING.
  49. * <p>
  50. * DFSClient emits many messages that can be ignored like:
  51. * "Failed to connect to *:7000:java.net.ConnectException: Connection refused: connect"
  52. * because a datanode is forced to close during testing.
  53. * <p>
  54. * Warnings about "Zero targets found" can be ignored (these are naggingly
  55. * emitted even though it is not possible to achieve the desired replication
  56. * level with the number of active datanodes.)
  57. * <p>
  58. * Possible Extensions:
  59. * <p>Bring a datanode down and restart it to verify reconnection to namenode.
  60. * <p>Simulate running out of disk space on one datanode only.
  61. * <p>Bring the namenode down and restart it to verify that datanodes reconnect.
  62. * <p>
  63. * <p>For a another approach to filesystem testing, see the high level
  64. * (HadoopFS level) test {@link org.apache.hadoop.fs.TestFileSystem}.
  65. */
  66. public class ClusterTestDFS extends TestCase implements FSConstants {
  67. private static final Log LOG =
  68. LogFactory.getLog("org.apache.hadoop.dfs.ClusterTestDFS");
  69. private static Configuration conf = new Configuration();
  70. private static int BUFFER_SIZE =
  71. conf.getInt("io.file.buffer.size", 4096);
  72. private static int testCycleNumber = 0;
  73. /**
  74. * all DFS test files go under this base directory
  75. */
  76. private static String baseDirSpecified;
  77. /**
  78. * base dir as File
  79. */
  80. private static File baseDir;
  81. /** DFS block sizes to permute over in multiple test cycles
  82. * (array length should be prime).
  83. */
  84. private static final int[] BLOCK_SIZES = {100000, 4096};
  85. /** DFS file sizes to permute over in multiple test cycles
  86. * (array length should be prime).
  87. */
  88. private static final int[] FILE_SIZES =
  89. {100000, 100001, 4095, 4096, 4097, 1000000, 1000001};
  90. /** DFS file counts to permute over in multiple test cycles
  91. * (array length should be prime).
  92. */
  93. private static final int[] FILE_COUNTS = {1, 10, 100};
  94. /** Number of useful permutations or test cycles.
  95. * (The 2 factor represents the alternating 2 or 3 number of datanodes
  96. * started.)
  97. */
  98. private static final int TEST_PERMUTATION_MAX_CEILING =
  99. BLOCK_SIZES.length * FILE_SIZES.length * FILE_COUNTS.length * 2;
  100. /** Number of permutations of DFS test parameters to perform.
  101. * If this is greater than ceiling TEST_PERMUTATION_MAX_CEILING, then the
  102. * ceiling value is used.
  103. */
  104. private static final int TEST_PERMUTATION_MAX = 3;
  105. private Constructor randomDataGeneratorCtor = null;
  106. static {
  107. baseDirSpecified = System.getProperty("test.dfs.data", "/tmp/dfs_test");
  108. baseDir = new File(baseDirSpecified);
  109. }
  110. protected void setUp() throws Exception {
  111. super.setUp();
  112. conf.setBoolean("test.dfs.same.host.targets.allowed", true);
  113. }
  114. /**
  115. * Remove old files from temp area used by this test case and be sure
  116. * base temp directory can be created.
  117. */
  118. protected void prepareTempFileSpace() {
  119. if (baseDir.exists()) {
  120. try { // start from a blank slate
  121. FileUtil.fullyDelete(baseDir);
  122. } catch (Exception ignored) {
  123. }
  124. }
  125. baseDir.mkdirs();
  126. if (!baseDir.isDirectory()) {
  127. throw new RuntimeException("Value of root directory property test.dfs.data for dfs test is not a directory: "
  128. + baseDirSpecified);
  129. }
  130. }
  131. /**
  132. * Pseudo Distributed FS Test.
  133. * Test DFS by running all the necessary daemons in one process.
  134. * Test various block sizes, number of files, disk space consumption,
  135. * and leakage.
  136. *
  137. * @throws Exception
  138. */
  139. public void testFsPseudoDistributed()
  140. throws Exception {
  141. while (testCycleNumber < TEST_PERMUTATION_MAX &&
  142. testCycleNumber < TEST_PERMUTATION_MAX_CEILING) {
  143. int blockSize = BLOCK_SIZES[testCycleNumber % BLOCK_SIZES.length];
  144. int numFiles = FILE_COUNTS[testCycleNumber % FILE_COUNTS.length];
  145. int fileSize = FILE_SIZES[testCycleNumber % FILE_SIZES.length];
  146. prepareTempFileSpace();
  147. testFsPseudoDistributed(fileSize, numFiles, blockSize,
  148. (testCycleNumber % 2) + 2);
  149. }
  150. }
  151. /**
  152. * Pseudo Distributed FS Testing.
  153. * Do one test cycle with given parameters.
  154. *
  155. * @param nBytes number of bytes to write to each file.
  156. * @param numFiles number of files to create.
  157. * @param blockSize block size to use for this test cycle.
  158. * @param initialDNcount number of datanodes to create
  159. * @throws Exception
  160. */
  161. public void testFsPseudoDistributed(long nBytes, int numFiles,
  162. int blockSize, int initialDNcount)
  163. throws Exception {
  164. long startTime = System.currentTimeMillis();
  165. int bufferSize = Math.min(BUFFER_SIZE, blockSize);
  166. boolean checkDataDirsEmpty = false;
  167. int iDatanodeClosed = 0;
  168. Random randomDataGenerator = makeRandomDataGenerator();
  169. final int currentTestCycleNumber = testCycleNumber;
  170. msg("using randomDataGenerator=" + randomDataGenerator.getClass().getName());
  171. //
  172. // modify config for test
  173. //
  174. // set given config param to override other config settings
  175. conf.setInt("test.dfs.block_size", blockSize);
  176. // verify that config changed
  177. assertTrue(blockSize == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
  178. // downsize for testing (just to save resources)
  179. conf.setInt("dfs.namenode.handler.count", 3);
  180. if (false) { // use MersenneTwister, if present
  181. conf.set("hadoop.random.class",
  182. "org.apache.hadoop.util.MersenneTwister");
  183. }
  184. conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
  185. conf.setLong("dfs.datanode.startupMsec", 15*1000L);
  186. String nameFSDir = baseDirSpecified + "/name";
  187. msg("----Start Test Cycle=" + currentTestCycleNumber +
  188. " test.dfs.block_size=" + blockSize +
  189. " nBytes=" + nBytes +
  190. " numFiles=" + numFiles +
  191. " initialDNcount=" + initialDNcount);
  192. //
  193. // start a NameNode
  194. int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port
  195. String nameNodeSocketAddr = "localhost:" + nameNodePort;
  196. conf.set("dfs.name.dir", nameFSDir);
  197. NameNode nameNodeDaemon = new NameNode(nameNodeSocketAddr, conf);
  198. DFSClient dfsClient = null;
  199. try {
  200. //
  201. // start some DataNodes
  202. //
  203. ArrayList<DataNode> listOfDataNodeDaemons = new ArrayList<DataNode>();
  204. conf.set("fs.default.name", nameNodeSocketAddr);
  205. for (int i = 0; i < initialDNcount; i++) {
  206. // uniquely config real fs path for data storage for this datanode
  207. String dataDirs[] = new String[1];
  208. dataDirs[0] = baseDirSpecified + "/datanode" + i;
  209. conf.set("dfs.data.dir", dataDirs[0]);
  210. DataNode dn = DataNode.makeInstance(dataDirs, conf);
  211. if (dn != null) {
  212. listOfDataNodeDaemons.add(dn);
  213. (new Thread(dn, "DataNode" + i + ": " + dataDirs[0])).start();
  214. }
  215. }
  216. try {
  217. assertTrue("insufficient datanodes for test to continue",
  218. (listOfDataNodeDaemons.size() >= 2));
  219. //
  220. // wait for datanodes to report in
  221. awaitQuiescence();
  222. // act as if namenode is a remote process
  223. dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
  224. //
  225. // write nBytes of data using randomDataGenerator to numFiles
  226. //
  227. ArrayList<UTF8> testfilesList = new ArrayList<UTF8>();
  228. byte[] buffer = new byte[bufferSize];
  229. UTF8 testFileName = null;
  230. for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) {
  231. testFileName = new UTF8("/f" + iFileNumber);
  232. testfilesList.add(testFileName);
  233. OutputStream nos = dfsClient.create(testFileName.toString(), false);
  234. try {
  235. for (long nBytesWritten = 0L;
  236. nBytesWritten < nBytes;
  237. nBytesWritten += buffer.length) {
  238. if ((nBytesWritten + buffer.length) > nBytes) {
  239. // calculate byte count needed to exactly hit nBytes in length
  240. // to keep randomDataGenerator in sync during the verify step
  241. int pb = (int) (nBytes - nBytesWritten);
  242. byte[] bufferPartial = new byte[pb];
  243. randomDataGenerator.nextBytes(bufferPartial);
  244. nos.write(bufferPartial);
  245. } else {
  246. randomDataGenerator.nextBytes(buffer);
  247. nos.write(buffer);
  248. }
  249. }
  250. } finally {
  251. nos.flush();
  252. nos.close();
  253. }
  254. }
  255. //
  256. // No need to wait for blocks to be replicated because replication
  257. // is supposed to be complete when the file is closed.
  258. //
  259. //
  260. // take one datanode down
  261. iDatanodeClosed =
  262. currentTestCycleNumber % listOfDataNodeDaemons.size();
  263. DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed);
  264. msg("shutdown datanode daemon " + iDatanodeClosed +
  265. " dn=" + dn.data);
  266. try {
  267. dn.shutdown();
  268. } catch (Exception e) {
  269. msg("ignoring datanode shutdown exception=" + e);
  270. }
  271. //
  272. // verify data against a "rewound" randomDataGenerator
  273. // that all of the data is intact
  274. long lastLong = randomDataGenerator.nextLong();
  275. randomDataGenerator = makeRandomDataGenerator(); // restart (make new) PRNG
  276. ListIterator li = testfilesList.listIterator();
  277. while (li.hasNext()) {
  278. testFileName = (UTF8) li.next();
  279. FSInputStream nis = dfsClient.open(testFileName.toString());
  280. byte[] bufferGolden = new byte[bufferSize];
  281. int m = 42;
  282. try {
  283. while (m != -1) {
  284. m = nis.read(buffer);
  285. if (m == buffer.length) {
  286. randomDataGenerator.nextBytes(bufferGolden);
  287. assertBytesEqual(buffer, bufferGolden, buffer.length);
  288. } else if (m > 0) {
  289. byte[] bufferGoldenPartial = new byte[m];
  290. randomDataGenerator.nextBytes(bufferGoldenPartial);
  291. assertBytesEqual(buffer, bufferGoldenPartial, bufferGoldenPartial.length);
  292. }
  293. }
  294. } finally {
  295. nis.close();
  296. }
  297. }
  298. // verify last randomDataGenerator rand val to ensure last file length was checked
  299. long lastLongAgain = randomDataGenerator.nextLong();
  300. assertEquals(lastLong, lastLongAgain);
  301. msg("Finished validating all file contents");
  302. //
  303. // now delete all the created files
  304. msg("Delete all random test files under DFS via remaining datanodes");
  305. li = testfilesList.listIterator();
  306. while (li.hasNext()) {
  307. testFileName = (UTF8) li.next();
  308. assertTrue(dfsClient.delete(testFileName.toString(), true));
  309. }
  310. //
  311. // wait for delete to be propagated
  312. // (unlike writing files, delete is lazy)
  313. msg("Test thread sleeping while datanodes propagate delete...");
  314. awaitQuiescence();
  315. msg("Test thread awakens to verify file contents");
  316. //
  317. // check that the datanode's block directory is empty
  318. // (except for datanode that had forced shutdown)
  319. checkDataDirsEmpty = true; // do it during finally clause
  320. } catch (AssertionFailedError afe) {
  321. throw afe;
  322. } catch (Throwable t) {
  323. msg("Unexpected exception_b: " + t);
  324. t.printStackTrace();
  325. } finally {
  326. //
  327. // shut down datanode daemons (this takes advantage of being same-process)
  328. msg("begin shutdown of all datanode daemons for test cycle " +
  329. currentTestCycleNumber);
  330. for (int i = 0; i < listOfDataNodeDaemons.size(); i++) {
  331. DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i);
  332. if (i != iDatanodeClosed) {
  333. try {
  334. if (checkDataDirsEmpty) {
  335. assertNoBlocks(dataNode);
  336. }
  337. dataNode.shutdown();
  338. } catch (Exception e) {
  339. msg("ignoring exception during (all) datanode shutdown, e=" + e);
  340. }
  341. }
  342. }
  343. }
  344. msg("finished shutdown of all datanode daemons for test cycle " +
  345. currentTestCycleNumber);
  346. if (dfsClient != null) {
  347. try {
  348. msg("close down subthreads of DFSClient");
  349. dfsClient.close();
  350. } catch (Exception ignored) { }
  351. msg("finished close down of DFSClient");
  352. }
  353. } catch (AssertionFailedError afe) {
  354. throw afe;
  355. } catch (Throwable t) {
  356. msg("Unexpected exception_a: " + t);
  357. t.printStackTrace();
  358. } finally {
  359. // shut down namenode daemon (this takes advantage of being same-process)
  360. msg("begin shutdown of namenode daemon for test cycle " +
  361. currentTestCycleNumber);
  362. try {
  363. nameNodeDaemon.stop();
  364. } catch (Exception e) {
  365. msg("ignoring namenode shutdown exception=" + e);
  366. }
  367. msg("finished shutdown of namenode daemon for test cycle " +
  368. currentTestCycleNumber);
  369. }
  370. msg("test cycle " + currentTestCycleNumber + " elapsed time=" +
  371. (System.currentTimeMillis() - startTime) / 1000. + "sec");
  372. msg("threads still running (look for stragglers): ");
  373. msg(summarizeThreadGroup());
  374. }
  375. private void assertNoBlocks(DataNode dn) {
  376. Block[] blocks = dn.data.getBlockReport();
  377. // if this fails, the delete did not propagate because either
  378. // awaitQuiescence() returned before the disk images were removed
  379. // or a real failure was detected.
  380. assertTrue(" data dir not empty: " + dn.data,
  381. blocks.length==0);
  382. }
  383. /**
  384. * Make a data generator.
  385. * Allows optional use of high quality PRNG by setting property
  386. * hadoop.random.class to the full class path of a subclass of
  387. * java.util.Random such as "...util.MersenneTwister".
  388. * The property test.dfs.random.seed can supply a seed for reproducible
  389. * testing (a default is set here if property is not set.)
  390. */
  391. private Random makeRandomDataGenerator() {
  392. long seed = conf.getLong("test.dfs.random.seed", 0xB437EF);
  393. try {
  394. if (randomDataGeneratorCtor == null) {
  395. // lazy init
  396. String rndDataGenClassname =
  397. conf.get("hadoop.random.class", "java.util.Random");
  398. Class<?> clazz = Class.forName(rndDataGenClassname);
  399. randomDataGeneratorCtor = clazz.getConstructor(Long.TYPE);
  400. }
  401. if (randomDataGeneratorCtor != null) {
  402. Object arg[] = {new Long(seed)};
  403. return (Random) randomDataGeneratorCtor.newInstance(arg);
  404. }
  405. } catch (ClassNotFoundException absorb) {
  406. } catch (NoSuchMethodException absorb) {
  407. } catch (SecurityException absorb) {
  408. } catch (InstantiationException absorb) {
  409. } catch (IllegalAccessException absorb) {
  410. } catch (IllegalArgumentException absorb) {
  411. } catch (InvocationTargetException absorb) {
  412. }
  413. // last resort
  414. return new java.util.Random(seed);
  415. }
  416. /** Wait for the DFS datanodes to become quiescent.
  417. * The initial implementation is to sleep for some fixed amount of time,
  418. * but a better implementation would be to really detect when distributed
  419. * operations are completed.
  420. * @throws InterruptedException
  421. */
  422. private void awaitQuiescence() throws InterruptedException {
  423. // ToDo: Need observer pattern, not static sleep
  424. // Doug suggested that the block report interval could be made shorter
  425. // and then observing that would be a good way to know when an operation
  426. // was complete (quiescence detect).
  427. sleepAtLeast(60000);
  428. }
  429. private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) {
  430. for (int i = 0; i < len; i++) {
  431. assertEquals(buffer[i], bufferGolden[i]);
  432. }
  433. }
  434. private void msg(String s) {
  435. //System.out.println(s);
  436. LOG.info(s);
  437. }
  438. public static void sleepAtLeast(int tmsec) {
  439. long t0 = System.currentTimeMillis();
  440. long t1 = t0;
  441. long tslept = t1 - t0;
  442. while (tmsec > tslept) {
  443. try {
  444. long tsleep = tmsec - tslept;
  445. Thread.sleep(tsleep);
  446. t1 = System.currentTimeMillis();
  447. } catch (InterruptedException ie) {
  448. t1 = System.currentTimeMillis();
  449. }
  450. tslept = t1 - t0;
  451. }
  452. }
  453. public static String summarizeThreadGroup() {
  454. int n = 10;
  455. int k = 0;
  456. Thread[] tarray = null;
  457. StringBuffer sb = new StringBuffer(500);
  458. do {
  459. n = n * 10;
  460. tarray = new Thread[n];
  461. k = Thread.enumerate(tarray);
  462. } while (k == n); // while array is too small...
  463. for (int i = 0; i < k; i++) {
  464. Thread thread = tarray[i];
  465. sb.append(thread.toString());
  466. sb.append("\n");
  467. }
  468. return sb.toString();
  469. }
  470. public static void main(String[] args) throws Exception {
  471. String usage = "Usage: ClusterTestDFS (no args)";
  472. if (args.length != 0) {
  473. System.err.println(usage);
  474. System.exit(-1);
  475. }
  476. String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFS"};
  477. junit.textui.TestRunner.main(testargs);
  478. }
  479. }