TestFuseDFS.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. import java.io.*;
  19. import java.net.URI;
  20. import java.util.ArrayList;
  21. import java.util.concurrent.atomic.*;
  22. import org.apache.log4j.Level;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.commons.logging.impl.Log4JLogger;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.*;
  28. import org.apache.hadoop.fs.permission.*;
  29. import org.apache.hadoop.hdfs.*;
  30. import org.apache.hadoop.io.IOUtils;
  31. import org.apache.hadoop.util.StringUtils;
  32. import org.junit.Test;
  33. import org.junit.BeforeClass;
  34. import org.junit.AfterClass;
  35. import static org.junit.Assert.*;
  36. /**
  37. * Basic functional tests on a fuse-dfs mount.
  38. */
  39. public class TestFuseDFS {
  40. private static MiniDFSCluster cluster;
  41. private static FileSystem fs;
  42. private static Runtime r;
  43. private static String mountPoint;
  44. private static final Log LOG = LogFactory.getLog(TestFuseDFS.class);
  45. {
  46. ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
  47. }
  48. /** Dump the given intput stream to stderr */
  49. private static void dumpInputStream(InputStream is) throws IOException {
  50. int len;
  51. do {
  52. byte b[] = new byte[is.available()];
  53. len = is.read(b);
  54. System.out.println("Read "+len+" bytes");
  55. System.out.write(b, 0, b.length);
  56. } while (len > 0);
  57. }
  58. /**
  59. * Wait for the given process to return and check that it exited
  60. * as required. Log if the process failed.
  61. */
  62. private static void checkProcessRet(Process p, boolean expectPass)
  63. throws IOException {
  64. try {
  65. int ret = p.waitFor();
  66. if (ret != 0) {
  67. dumpInputStream(p.getErrorStream());
  68. }
  69. if (expectPass) {
  70. assertEquals(0, ret);
  71. } else {
  72. assertTrue(ret != 0);
  73. }
  74. } catch (InterruptedException ie) {
  75. fail("Process interrupted: "+ie.getMessage());
  76. }
  77. }
  78. /** Exec the given command and assert it executed successfully */
  79. private static void execWaitRet(String cmd) throws IOException {
  80. LOG.debug("EXEC "+cmd);
  81. Process p = r.exec(cmd);
  82. try {
  83. p.waitFor();
  84. } catch (InterruptedException ie) {
  85. fail("Process interrupted: "+ie.getMessage());
  86. }
  87. }
  88. /** Exec the given command and assert it executed successfully */
  89. private static void execIgnoreRet(String cmd) throws IOException {
  90. LOG.debug("EXEC "+cmd);
  91. r.exec(cmd);
  92. }
  93. /** Exec the given command and assert it executed successfully */
  94. private static void execAssertSucceeds(String cmd) throws IOException {
  95. LOG.debug("EXEC "+cmd);
  96. checkProcessRet(r.exec(cmd), true);
  97. }
  98. /** Exec the given command, assert it returned an error code */
  99. private static void execAssertFails(String cmd) throws IOException {
  100. LOG.debug("EXEC "+cmd);
  101. checkProcessRet(r.exec(cmd), false);
  102. }
  103. /** Create and write the given file */
  104. private static void createFile(File f, String s) throws IOException {
  105. InputStream is = new ByteArrayInputStream(s.getBytes());
  106. FileOutputStream fos = new FileOutputStream(f);
  107. IOUtils.copyBytes(is, fos, s.length(), true);
  108. }
  109. /** Check that the given file exists with the given contents */
  110. private static void checkFile(File f, String expectedContents)
  111. throws IOException {
  112. FileInputStream fi = new FileInputStream(f);
  113. int len = expectedContents.length();
  114. byte[] b = new byte[len];
  115. try {
  116. IOUtils.readFully(fi, b, 0, len);
  117. } catch (IOException ie) {
  118. fail("Reading "+f.getName()+" failed with "+ie.getMessage());
  119. } finally {
  120. fi.close(); // NB: leaving f unclosed prevents unmount
  121. }
  122. String s = new String(b, 0, len);
  123. assertEquals("File content differs", expectedContents, s);
  124. }
  125. /** Run a fuse-dfs process to mount the given DFS */
  126. private static void establishMount(URI uri) throws IOException {
  127. Runtime r = Runtime.getRuntime();
  128. String cp = System.getProperty("java.class.path");
  129. String buildTestDir = System.getProperty("build.test");
  130. String fuseCmd = buildTestDir + "/../fuse_dfs";
  131. String libHdfs = buildTestDir + "/../../../c++/lib";
  132. String arch = System.getProperty("os.arch");
  133. String jvm = System.getProperty("java.home") + "/lib/" + arch + "/server";
  134. String lp = System.getProperty("LD_LIBRARY_PATH")+":"+libHdfs+":"+jvm;
  135. LOG.debug("LD_LIBRARY_PATH=" + lp);
  136. String nameNode =
  137. "dfs://" + uri.getHost() + ":" + String.valueOf(uri.getPort());
  138. // NB: We're mounting via an unprivileged user, therefore
  139. // user_allow_other needs to be set in /etc/fuse.conf, which also
  140. // needs to be world readable.
  141. String mountCmd[] = {
  142. fuseCmd, nameNode, mountPoint,
  143. // "-odebug", // Don't daemonize
  144. "-obig_writes", // Allow >4kb writes
  145. "-oentry_timeout=0.1", // Don't cache dents long
  146. "-oattribute_timeout=0.1", // Don't cache attributes long
  147. "-ordbuffer=32768", // Read buffer size in kb
  148. "rw"
  149. };
  150. String [] env = {
  151. "CLASSPATH="+cp,
  152. "LD_LIBRARY_PATH="+lp,
  153. "PATH=/usr/bin:/bin"
  154. };
  155. execWaitRet("fusermount -u " + mountPoint);
  156. execAssertSucceeds("rm -rf " + mountPoint);
  157. execAssertSucceeds("mkdir -p " + mountPoint);
  158. // Mount the mini cluster
  159. try {
  160. Process fuseProcess = r.exec(mountCmd, env);
  161. assertEquals(0, fuseProcess.waitFor());
  162. } catch (InterruptedException ie) {
  163. fail("Failed to mount");
  164. }
  165. }
  166. /** Tear down the fuse-dfs process and mount */
  167. private static void teardownMount() throws IOException {
  168. execWaitRet("fusermount -u " + mountPoint);
  169. }
  170. @BeforeClass
  171. public static void startUp() throws IOException {
  172. Configuration conf = new HdfsConfiguration();
  173. r = Runtime.getRuntime();
  174. mountPoint = System.getProperty("build.test") + "/mnt";
  175. conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
  176. cluster = new MiniDFSCluster.Builder(conf).build();
  177. cluster.waitClusterUp();
  178. fs = cluster.getFileSystem();
  179. establishMount(fs.getUri());
  180. }
  181. @AfterClass
  182. public static void tearDown() throws IOException {
  183. // Unmount before taking down the mini cluster
  184. // so no outstanding operations hang.
  185. teardownMount();
  186. if (fs != null) {
  187. fs.close();
  188. }
  189. if (cluster != null) {
  190. cluster.shutdown();
  191. }
  192. }
  193. /** Test basic directory creation, access, removal */
  194. @Test
  195. public void testBasicDir() throws IOException {
  196. File d = new File(mountPoint, "dir1");
  197. // Mkdir, access and rm via the mount
  198. execAssertSucceeds("mkdir " + d.getAbsolutePath());
  199. execAssertSucceeds("ls " + d.getAbsolutePath());
  200. execAssertSucceeds("rmdir " + d.getAbsolutePath());
  201. // The dir should no longer exist
  202. execAssertFails("ls " + d.getAbsolutePath());
  203. }
  204. /** Test basic file creation and writing */
  205. @Test
  206. public void testCreate() throws IOException {
  207. final String contents = "hello world";
  208. File f = new File(mountPoint, "file1");
  209. // Create and access via the mount
  210. createFile(f, contents);
  211. // XX avoids premature EOF
  212. try {
  213. Thread.sleep(1000);
  214. } catch (InterruptedException ie) { }
  215. checkFile(f, contents);
  216. // Cat, stat and delete via the mount
  217. execAssertSucceeds("cat " + f.getAbsolutePath());
  218. execAssertSucceeds("stat " + f.getAbsolutePath());
  219. execAssertSucceeds("rm " + f.getAbsolutePath());
  220. // The file should no longer exist
  221. execAssertFails("ls " + f.getAbsolutePath());
  222. }
  223. /** Test creating a file via touch */
  224. @Test
  225. public void testTouch() throws IOException {
  226. File f = new File(mountPoint, "file1");
  227. execAssertSucceeds("touch " + f.getAbsolutePath());
  228. execAssertSucceeds("rm " + f.getAbsolutePath());
  229. }
  230. /** Test random access to a file */
  231. @Test
  232. public void testRandomAccess() throws IOException {
  233. final String contents = "hello world";
  234. File f = new File(mountPoint, "file1");
  235. createFile(f, contents);
  236. RandomAccessFile raf = new RandomAccessFile(f, "rw");
  237. raf.seek(f.length());
  238. try {
  239. raf.write('b');
  240. } catch (IOException e) {
  241. // Expected: fuse-dfs not yet support append
  242. assertEquals("Operation not supported", e.getMessage());
  243. } finally {
  244. raf.close();
  245. }
  246. raf = new RandomAccessFile(f, "rw");
  247. raf.seek(0);
  248. try {
  249. raf.write('b');
  250. fail("Over-wrote existing bytes");
  251. } catch (IOException e) {
  252. // Expected: can-not overwrite a file
  253. assertEquals("Invalid argument", e.getMessage());
  254. } finally {
  255. raf.close();
  256. }
  257. execAssertSucceeds("rm " + f.getAbsolutePath());
  258. }
  259. /** Test copying a set of files from the mount to itself */
  260. @Test
  261. public void testCopyFiles() throws IOException {
  262. final String contents = "hello world";
  263. File d1 = new File(mountPoint, "dir1");
  264. File d2 = new File(mountPoint, "dir2");
  265. // Create and populate dir1 via the mount
  266. execAssertSucceeds("mkdir " + d1.getAbsolutePath());
  267. for (int i = 0; i < 5; i++) {
  268. createFile(new File(d1, "file"+i), contents);
  269. }
  270. assertEquals(5, d1.listFiles().length);
  271. // Copy dir from the mount to the mount
  272. execAssertSucceeds("cp -r " + d1.getAbsolutePath() +
  273. " " + d2.getAbsolutePath());
  274. assertEquals(5, d2.listFiles().length);
  275. // Access all the files in the dirs and remove them
  276. execAssertSucceeds("find " + d1.getAbsolutePath());
  277. execAssertSucceeds("find " + d2.getAbsolutePath());
  278. execAssertSucceeds("rm -r " + d1.getAbsolutePath());
  279. execAssertSucceeds("rm -r " + d2.getAbsolutePath());
  280. }
  281. /** Test concurrent creation and access of the mount */
  282. @Test
  283. public void testMultipleThreads() throws IOException {
  284. ArrayList<Thread> threads = new ArrayList<Thread>();
  285. final AtomicReference<String> errorMessage = new AtomicReference<String>();
  286. for (int i = 0; i < 10; i++) {
  287. Thread t = new Thread() {
  288. public void run() {
  289. try {
  290. File d = new File(mountPoint, "dir"+getId());
  291. execWaitRet("mkdir " + d.getAbsolutePath());
  292. for (int j = 0; j < 10; j++) {
  293. File f = new File(d, "file"+j);
  294. final String contents = "thread "+getId()+" "+j;
  295. createFile(f, contents);
  296. }
  297. for (int j = 0; j < 10; j++) {
  298. File f = new File(d, "file"+j);
  299. execWaitRet("cat " + f.getAbsolutePath());
  300. execWaitRet("rm " + f.getAbsolutePath());
  301. }
  302. execWaitRet("rmdir " + d.getAbsolutePath());
  303. } catch (IOException ie) {
  304. errorMessage.set(
  305. String.format("Exception %s",
  306. StringUtils.stringifyException(ie)));
  307. }
  308. }
  309. };
  310. t.start();
  311. threads.add(t);
  312. }
  313. for (Thread t : threads) {
  314. try {
  315. t.join();
  316. } catch (InterruptedException ie) {
  317. fail("Thread interrupted: "+ie.getMessage());
  318. }
  319. }
  320. assertNull(errorMessage.get(), errorMessage.get());
  321. }
  322. }