TestFuseDFS.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. import java.io.*;
  19. import java.net.URI;
  20. import java.util.ArrayList;
  21. import java.util.concurrent.atomic.*;
  22. import org.apache.log4j.Level;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.*;
  27. import org.apache.hadoop.fs.permission.*;
  28. import org.apache.hadoop.hdfs.*;
  29. import org.apache.hadoop.io.IOUtils;
  30. import org.apache.hadoop.test.GenericTestUtils;
  31. import org.apache.hadoop.util.StringUtils;
  32. import org.junit.Test;
  33. import org.junit.BeforeClass;
  34. import org.junit.AfterClass;
  35. import static org.junit.Assert.*;
  36. /**
  37. * Basic functional tests on a fuse-dfs mount.
  38. */
  39. public class TestFuseDFS {
  40. private static MiniDFSCluster cluster;
  41. private static FileSystem fs;
  42. private static Process fuseProcess;
  43. private static Runtime r;
  44. private static String mountPoint;
  45. private static final Log LOG = LogFactory.getLog(TestFuseDFS.class);
  46. {
  47. GenericTestUtils.setLogLevel(LOG, Level.ALL);
  48. }
  49. /** Dump the given intput stream to stderr */
  50. private static void dumpInputStream(InputStream is) throws IOException {
  51. int len;
  52. do {
  53. byte b[] = new byte[is.available()];
  54. len = is.read(b);
  55. System.out.println("Read "+len+" bytes");
  56. System.out.write(b, 0, b.length);
  57. } while (len > 0);
  58. }
  59. /**
  60. * Wait for the given process to return and check that it exited
  61. * as required. Log if the process failed.
  62. */
  63. private static void checkProcessRet(Process p, boolean expectPass)
  64. throws IOException {
  65. try {
  66. int ret = p.waitFor();
  67. if (ret != 0) {
  68. dumpInputStream(p.getErrorStream());
  69. }
  70. if (expectPass) {
  71. assertEquals(0, ret);
  72. } else {
  73. assertTrue(ret != 0);
  74. }
  75. } catch (InterruptedException ie) {
  76. fail("Process interrupted: "+ie.getMessage());
  77. }
  78. }
  79. /** Exec the given command and assert it executed successfully */
  80. private static void execWaitRet(String cmd) throws IOException {
  81. LOG.debug("EXEC "+cmd);
  82. Process p = r.exec(cmd);
  83. try {
  84. p.waitFor();
  85. } catch (InterruptedException ie) {
  86. fail("Process interrupted: "+ie.getMessage());
  87. }
  88. }
  89. /** Exec the given command and assert it executed successfully */
  90. private static void execIgnoreRet(String cmd) throws IOException {
  91. LOG.debug("EXEC "+cmd);
  92. r.exec(cmd);
  93. }
  94. /** Exec the given command and assert it executed successfully */
  95. private static void execAssertSucceeds(String cmd) throws IOException {
  96. LOG.debug("EXEC "+cmd);
  97. checkProcessRet(r.exec(cmd), true);
  98. }
  99. /** Exec the given command, assert it returned an error code */
  100. private static void execAssertFails(String cmd) throws IOException {
  101. LOG.debug("EXEC "+cmd);
  102. checkProcessRet(r.exec(cmd), false);
  103. }
  104. /** Create and write the given file */
  105. private static void createFile(File f, String s) throws IOException {
  106. InputStream is = new ByteArrayInputStream(s.getBytes());
  107. FileOutputStream fos = new FileOutputStream(f);
  108. IOUtils.copyBytes(is, fos, s.length(), true);
  109. }
  110. /** Check that the given file exists with the given contents */
  111. private static void checkFile(File f, String expectedContents)
  112. throws IOException {
  113. FileInputStream fi = new FileInputStream(f);
  114. int len = expectedContents.length();
  115. byte[] b = new byte[len];
  116. try {
  117. IOUtils.readFully(fi, b, 0, len);
  118. } catch (IOException ie) {
  119. fail("Reading "+f.getName()+" failed with "+ie.getMessage());
  120. } finally {
  121. fi.close(); // NB: leaving f unclosed prevents unmount
  122. }
  123. String s = new String(b, 0, len);
  124. assertEquals("File content differs", expectedContents, s);
  125. }
  126. private static class RedirectToStdoutThread extends Thread {
  127. private InputStream is;
  128. RedirectToStdoutThread(InputStream is) {
  129. this.is = is;
  130. }
  131. public void run() {
  132. try {
  133. InputStreamReader isr = new InputStreamReader(is);
  134. BufferedReader br = new BufferedReader(isr);
  135. String line=null;
  136. while ( (line = br.readLine()) != null) {
  137. LOG.error("FUSE_LINE:" + line);
  138. }
  139. } catch (IOException e) {
  140. e.printStackTrace();
  141. }
  142. }
  143. }
  144. /** Run a fuse-dfs process to mount the given DFS */
  145. private static Process establishMount(URI uri) throws IOException {
  146. Runtime r = Runtime.getRuntime();
  147. String cp = System.getProperty("java.class.path");
  148. String buildTestDir = System.getProperty("build.test");
  149. String fuseCmd = buildTestDir + "/../fuse_dfs";
  150. String libHdfs = buildTestDir + "/../../../c++/lib";
  151. String arch = System.getProperty("os.arch");
  152. String jvm = System.getProperty("java.home") + "/lib/" + arch + "/server";
  153. String lp = System.getProperty("LD_LIBRARY_PATH")+":"+libHdfs+":"+jvm;
  154. LOG.debug("LD_LIBRARY_PATH=" + lp);
  155. String nameNode =
  156. "dfs://" + uri.getHost() + ":" + String.valueOf(uri.getPort());
  157. // NB: We're mounting via an unprivileged user, therefore
  158. // user_allow_other needs to be set in /etc/fuse.conf, which also
  159. // needs to be world readable.
  160. String mountCmd[] = {
  161. fuseCmd, nameNode, mountPoint,
  162. // "-odebug", // Don't daemonize
  163. "-obig_writes", // Allow >4kb writes
  164. "-oentry_timeout=0.1", // Don't cache dents long
  165. "-oattribute_timeout=0.1", // Don't cache attributes long
  166. "-ononempty", // Don't complain about junk in mount point
  167. "-f", // Don't background the process
  168. "-ordbuffer=32768", // Read buffer size in kb
  169. "rw"
  170. };
  171. String [] env = {
  172. "CLASSPATH="+cp,
  173. "LD_LIBRARY_PATH="+lp,
  174. "PATH=/usr/bin:/bin"
  175. };
  176. execWaitRet("fusermount -u " + mountPoint);
  177. execAssertSucceeds("rm -rf " + mountPoint);
  178. execAssertSucceeds("mkdir -p " + mountPoint);
  179. // Mount the mini cluster
  180. String cmdStr = "";
  181. for (String c : mountCmd) {
  182. cmdStr += (" " + c);
  183. }
  184. LOG.info("now mounting with:" + cmdStr);
  185. Process fuseProcess = r.exec(mountCmd, env);
  186. RedirectToStdoutThread stdoutThread =
  187. new RedirectToStdoutThread(fuseProcess.getInputStream());
  188. RedirectToStdoutThread stderrThread =
  189. new RedirectToStdoutThread(fuseProcess.getErrorStream());
  190. stdoutThread.start();
  191. stderrThread.start();
  192. // Wait for fusermount to start up, so that we know we're operating on the
  193. // FUSE FS when we run the tests.
  194. try {
  195. Thread.sleep(50000);
  196. } catch (InterruptedException e) {
  197. }
  198. return fuseProcess;
  199. }
  200. /** Tear down the fuse-dfs process and mount */
  201. private static void teardownMount() throws IOException {
  202. execWaitRet("fusermount -u " + mountPoint);
  203. try {
  204. assertEquals(0, fuseProcess.waitFor()); // fuse_dfs should exit cleanly
  205. } catch (InterruptedException e) {
  206. fail("interrupted while waiting for fuse_dfs process to exit.");
  207. }
  208. }
  209. @BeforeClass
  210. public static void startUp() throws IOException {
  211. Configuration conf = new HdfsConfiguration();
  212. r = Runtime.getRuntime();
  213. mountPoint = System.getProperty("build.test") + "/mnt";
  214. conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
  215. cluster = new MiniDFSCluster.Builder(conf).build();
  216. cluster.waitClusterUp();
  217. fs = cluster.getFileSystem();
  218. fuseProcess = establishMount(fs.getUri());
  219. }
  220. @AfterClass
  221. public static void tearDown() throws IOException {
  222. // Unmount before taking down the mini cluster
  223. // so no outstanding operations hang.
  224. teardownMount();
  225. if (fs != null) {
  226. fs.close();
  227. }
  228. if (cluster != null) {
  229. cluster.shutdown();
  230. }
  231. }
  232. /** Test basic directory creation, access, removal */
  233. @Test
  234. public void testBasicDir() throws IOException {
  235. File d = new File(mountPoint, "dir1");
  236. // Mkdir, access and rm via the mount
  237. execAssertSucceeds("mkdir " + d.getAbsolutePath());
  238. execAssertSucceeds("ls " + d.getAbsolutePath());
  239. execAssertSucceeds("rmdir " + d.getAbsolutePath());
  240. // The dir should no longer exist
  241. execAssertFails("ls " + d.getAbsolutePath());
  242. }
  243. /** Test basic file creation and writing */
  244. @Test
  245. public void testCreate() throws IOException {
  246. final String contents = "hello world";
  247. File f = new File(mountPoint, "file1");
  248. // Create and access via the mount
  249. createFile(f, contents);
  250. // XX avoids premature EOF
  251. try {
  252. Thread.sleep(1000);
  253. } catch (InterruptedException ie) { }
  254. checkFile(f, contents);
  255. // Cat, stat and delete via the mount
  256. execAssertSucceeds("cat " + f.getAbsolutePath());
  257. execAssertSucceeds("stat " + f.getAbsolutePath());
  258. execAssertSucceeds("rm " + f.getAbsolutePath());
  259. // The file should no longer exist
  260. execAssertFails("ls " + f.getAbsolutePath());
  261. }
  262. /** Test creating a file via touch */
  263. @Test
  264. public void testTouch() throws IOException {
  265. File f = new File(mountPoint, "file1");
  266. execAssertSucceeds("touch " + f.getAbsolutePath());
  267. execAssertSucceeds("rm " + f.getAbsolutePath());
  268. }
  269. /** Test random access to a file */
  270. @Test
  271. public void testRandomAccess() throws IOException {
  272. final String contents = "hello world";
  273. File f = new File(mountPoint, "file1");
  274. createFile(f, contents);
  275. RandomAccessFile raf = new RandomAccessFile(f, "rw");
  276. raf.seek(f.length());
  277. try {
  278. raf.write('b');
  279. } catch (IOException e) {
  280. // Expected: fuse-dfs not yet support append
  281. assertEquals("Operation not supported", e.getMessage());
  282. } finally {
  283. raf.close();
  284. }
  285. raf = new RandomAccessFile(f, "rw");
  286. raf.seek(0);
  287. try {
  288. raf.write('b');
  289. fail("Over-wrote existing bytes");
  290. } catch (IOException e) {
  291. // Expected: can-not overwrite a file
  292. assertEquals("Invalid argument", e.getMessage());
  293. } finally {
  294. raf.close();
  295. }
  296. execAssertSucceeds("rm " + f.getAbsolutePath());
  297. }
  298. /** Test copying a set of files from the mount to itself */
  299. @Test
  300. public void testCopyFiles() throws IOException {
  301. final String contents = "hello world";
  302. File d1 = new File(mountPoint, "dir1");
  303. File d2 = new File(mountPoint, "dir2");
  304. // Create and populate dir1 via the mount
  305. execAssertSucceeds("mkdir " + d1.getAbsolutePath());
  306. for (int i = 0; i < 5; i++) {
  307. createFile(new File(d1, "file"+i), contents);
  308. }
  309. assertEquals(5, d1.listFiles().length);
  310. // Copy dir from the mount to the mount
  311. execAssertSucceeds("cp -r " + d1.getAbsolutePath() +
  312. " " + d2.getAbsolutePath());
  313. assertEquals(5, d2.listFiles().length);
  314. // Access all the files in the dirs and remove them
  315. execAssertSucceeds("find " + d1.getAbsolutePath());
  316. execAssertSucceeds("find " + d2.getAbsolutePath());
  317. execAssertSucceeds("rm -r " + d1.getAbsolutePath());
  318. execAssertSucceeds("rm -r " + d2.getAbsolutePath());
  319. }
  320. /** Test concurrent creation and access of the mount */
  321. @Test
  322. public void testMultipleThreads() throws IOException {
  323. ArrayList<Thread> threads = new ArrayList<Thread>();
  324. final AtomicReference<String> errorMessage = new AtomicReference<String>();
  325. for (int i = 0; i < 10; i++) {
  326. Thread t = new Thread() {
  327. public void run() {
  328. try {
  329. File d = new File(mountPoint, "dir"+getId());
  330. execWaitRet("mkdir " + d.getAbsolutePath());
  331. for (int j = 0; j < 10; j++) {
  332. File f = new File(d, "file"+j);
  333. final String contents = "thread "+getId()+" "+j;
  334. createFile(f, contents);
  335. }
  336. for (int j = 0; j < 10; j++) {
  337. File f = new File(d, "file"+j);
  338. execWaitRet("cat " + f.getAbsolutePath());
  339. execWaitRet("rm " + f.getAbsolutePath());
  340. }
  341. execWaitRet("rmdir " + d.getAbsolutePath());
  342. } catch (IOException ie) {
  343. errorMessage.set(
  344. String.format("Exception %s",
  345. StringUtils.stringifyException(ie)));
  346. }
  347. }
  348. };
  349. t.start();
  350. threads.add(t);
  351. }
  352. for (Thread t : threads) {
  353. try {
  354. t.join();
  355. } catch (InterruptedException ie) {
  356. fail("Thread interrupted: "+ie.getMessage());
  357. }
  358. }
  359. assertNull(errorMessage.get(), errorMessage.get());
  360. }
  361. }