فهرست منبع

MAPREDUCE-7391. TestLocalDistributedCacheManager failing after HADOOP-16202 (#4472)

Fixing a mockito-based test which broke when HADOOP-16202
changed the methods being invoked.

Contributed by Steve Loughran
Steve Loughran 2 سال پیش
والد
کامیت
c9ddbd210c

+ 134 - 70
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -31,9 +30,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -44,22 +45,31 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+/**
+ * Test the LocalDistributedCacheManager using mocking.
+ * This suite is brittle to changes in the class under test.
+ */
 @SuppressWarnings("deprecation")
 public class TestLocalDistributedCacheManager {
 
+  private static final byte[] TEST_DATA = "This is a test file\n".getBytes();
+
   private static FileSystem mockfs;
 
   public static class MockFileSystem extends FilterFileSystem {
@@ -70,6 +80,14 @@ public class TestLocalDistributedCacheManager {
 
   private File localDir;
 
+  /**
+   * Recursive delete of a path.
+   * For safety, paths of length under 5 are rejected.
+   * @param file path to delete.
+   * @throws IOException never, it is just "a dummy in the method signature"
+   * @throws IllegalArgumentException path too short
+   * @throws RuntimeException File.delete() failed.
+   */
   private static void delete(File file) throws IOException {
     if (file.getAbsolutePath().length() < 5) {
       throw new IllegalArgumentException(
@@ -109,9 +127,9 @@ public class TestLocalDistributedCacheManager {
    * Mock input stream based on a byte array so that it can be used by a
    * FSDataInputStream.
    */
-  private static class MockInputStream extends ByteArrayInputStream
+  private static final class MockInputStream extends ByteArrayInputStream
       implements Seekable, PositionedReadable {
-    public MockInputStream(byte[] buf) {
+    private MockInputStream(byte[] buf) {
       super(buf);
     }
 
@@ -134,47 +152,45 @@ public class TestLocalDistributedCacheManager {
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
     when(mockfs.getWorkingDirectory()).thenReturn(working);
-    when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
-      @Override
-      public Path answer(InvocationOnMock args) throws Throwable {
-        return (Path) args.getArguments()[0];
-      }
-    });
+    when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+        (Answer<Path>) args -> (Path) args.getArguments()[0]);
 
     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
     final Path filePath = new Path(file);
     File link = new File("link");
 
+    // return a filestatus for the file "*/file.txt"; raise FNFE for anything else
     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
       @Override
       public FileStatus answer(InvocationOnMock args) throws Throwable {
         Path p = (Path)args.getArguments()[0];
         if("file.txt".equals(p.getName())) {
-          return new FileStatus(201, false, 1, 500, 101, 101,
-              FsPermission.getDefault(), "me", "me", filePath);
+          return createMockTestFileStatus(filePath);
         }  else {
-          throw new FileNotFoundException(p+" not supported by mocking");
+          throw notMocked(p);
         }
       }
     });
 
     when(mockfs.getConf()).thenReturn(conf);
     final FSDataInputStream in =
-        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
-    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
-      @Override
-      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
-        Path src = (Path)args.getArguments()[0];
-        if ("file.txt".equals(src.getName())) {
-          return in;
-        } else {
-          throw new FileNotFoundException(src+" not supported by mocking");
-        }
-      }
-    });
+        new FSDataInputStream(new MockInputStream(TEST_DATA));
+
+    // file.txt: return an openfile builder which will eventually return the data,
+    // anything else: FNFE
+    when(mockfs.openFile(any(Path.class))).thenAnswer(
+        (Answer<FutureDataInputStreamBuilder>) args -> {
+          Path src = (Path)args.getArguments()[0];
+          if ("file.txt".equals(src.getName())) {
+            return new MockOpenFileBuilder(mockfs, src,
+                () -> CompletableFuture.completedFuture(in));
+          } else {
+            throw notMocked(src);
+          }
+        });
 
     Job.addCacheFile(file, conf);
-    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    Map<String, Boolean> policies = new HashMap<>();
     policies.put(file.toString(), true);
     Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
@@ -191,6 +207,12 @@ public class TestLocalDistributedCacheManager {
     assertFalse(link.exists());
   }
 
+  /**
+   * This test case sets the mock FS to raise FNFE
+   * on any getFileStatus/openFile calls.
+   * If the manager successfully starts up, it means that
+   * no files were probed for/opened.
+   */
   @Test
   public void testEmptyDownload() throws Exception {
     JobID jobId = new JobID();
@@ -201,30 +223,21 @@ public class TestLocalDistributedCacheManager {
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
     when(mockfs.getWorkingDirectory()).thenReturn(working);
-    when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
-      @Override
-      public Path answer(InvocationOnMock args) throws Throwable {
-        return (Path) args.getArguments()[0];
-      }
-    });
+    when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+        (Answer<Path>) args -> (Path) args.getArguments()[0]);
 
-    when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
-      @Override
-      public FileStatus answer(InvocationOnMock args) throws Throwable {
-        Path p = (Path)args.getArguments()[0];
-        throw new FileNotFoundException(p+" not supported by mocking");
-      }
-    });
+    when(mockfs.getFileStatus(any(Path.class))).thenAnswer(
+        (Answer<FileStatus>) args -> {
+          Path p = (Path)args.getArguments()[0];
+          throw notMocked(p);
+        });
 
     when(mockfs.getConf()).thenReturn(conf);
-    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
-      @Override
-      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
-        Path src = (Path)args.getArguments()[0];
-        throw new FileNotFoundException(src+" not supported by mocking");
-      }
-    });
-
+    when(mockfs.openFile(any(Path.class))).thenAnswer(
+        (Answer<FutureDataInputStreamBuilder>) args -> {
+          Path src = (Path)args.getArguments()[0];
+            throw notMocked(src);
+        });
     conf.set(MRJobConfig.CACHE_FILES, "");
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@@ -236,6 +249,9 @@ public class TestLocalDistributedCacheManager {
   }
 
 
+  /**
+   * The same file can be added to the cache twice.
+   */
   @Test
   public void testDuplicateDownload() throws Exception {
     JobID jobId = new JobID();
@@ -246,12 +262,8 @@ public class TestLocalDistributedCacheManager {
     when(mockfs.getUri()).thenReturn(mockBase);
     Path working = new Path("mock://test-nn1/user/me/");
     when(mockfs.getWorkingDirectory()).thenReturn(working);
-    when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
-      @Override
-      public Path answer(InvocationOnMock args) throws Throwable {
-        return (Path) args.getArguments()[0];
-      }
-    });
+    when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+        (Answer<Path>) args -> (Path) args.getArguments()[0]);
 
     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
     final Path filePath = new Path(file);
@@ -262,32 +274,30 @@ public class TestLocalDistributedCacheManager {
       public FileStatus answer(InvocationOnMock args) throws Throwable {
         Path p = (Path)args.getArguments()[0];
         if("file.txt".equals(p.getName())) {
-          return new FileStatus(201, false, 1, 500, 101, 101,
-              FsPermission.getDefault(), "me", "me", filePath);
+          return createMockTestFileStatus(filePath);
         }  else {
-          throw new FileNotFoundException(p+" not supported by mocking");
+          throw notMocked(p);
         }
       }
     });
 
     when(mockfs.getConf()).thenReturn(conf);
     final FSDataInputStream in =
-        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
-    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
-      @Override
-      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
-        Path src = (Path)args.getArguments()[0];
-        if ("file.txt".equals(src.getName())) {
-          return in;
-        } else {
-          throw new FileNotFoundException(src+" not supported by mocking");
-        }
-      }
-    });
+        new FSDataInputStream(new MockInputStream(TEST_DATA));
+    when(mockfs.openFile(any(Path.class))).thenAnswer(
+        (Answer<FutureDataInputStreamBuilder>) args -> {
+          Path src = (Path)args.getArguments()[0];
+          if ("file.txt".equals(src.getName())) {
+            return new MockOpenFileBuilder(mockfs, src,
+                () -> CompletableFuture.completedFuture(in));
+          } else {
+            throw notMocked(src);
+          }
+        });
 
     Job.addCacheFile(file, conf);
     Job.addCacheFile(file, conf);
-    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    Map<String, Boolean> policies = new HashMap<>();
     policies.put(file.toString(), true);
     Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
@@ -306,7 +316,7 @@ public class TestLocalDistributedCacheManager {
 
   /**
    * This test tries to replicate the issue with the previous version of
-   * {@ref LocalDistributedCacheManager} when the resulting timestamp is
+   * {@link LocalDistributedCacheManager} when the resulting timestamp is
    * identical as that in another process.  Unfortunately, it is difficult
    * to mimic such behavior in a single process unit test.  And mocking
    * the unique id (timestamp previously, UUID otherwise) won't prove the
@@ -321,7 +331,7 @@ public class TestLocalDistributedCacheManager {
     final int threadCount = 10;
     final CyclicBarrier barrier = new CyclicBarrier(threadCount);
 
-    ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
+    List<Callable<Void>> setupCallable = new ArrayList<>();
     for (int i = 0; i < threadCount; ++i) {
       setupCallable.add(() -> {
         barrier.await();
@@ -340,4 +350,58 @@ public class TestLocalDistributedCacheManager {
       manager.close();
     }
   }
+
+  /**
+   * Create test file status using test data as the length.
+   * @param filePath path to the file
+   * @return a file status.
+   */
+  private FileStatus createMockTestFileStatus(final Path filePath) {
+    return new FileStatus(TEST_DATA.length, false, 1, 500, 101, 101,
+        FsPermission.getDefault(), "me", "me", filePath);
+  }
+
+  /**
+   * Exception to throw on a not mocked path.
+   * @return a FileNotFoundException
+   */
+  private FileNotFoundException notMocked(final Path p) {
+    return new FileNotFoundException(p + " not supported by mocking");
+  }
+
+  /**
+   * Openfile builder where the build operation is a l-expression
+   * supplied in the constructor.
+   */
+  private static final class MockOpenFileBuilder extends
+      FutureDataInputStreamBuilderImpl {
+
+    /**
+     * Operation to invoke to build the result.
+     */
+    private final CallableRaisingIOE<CompletableFuture<FSDataInputStream>>
+        buildTheResult;
+
+    /**
+     * Create the builder. the FS and path must be non-null.
+     * FileSystem.getConf() is the only method invoked of the FS by
+     * the superclass.
+     * @param fileSystem fs
+     * @param path path to open
+     * @param buildTheResult builder operation.
+     */
+    private MockOpenFileBuilder(final FileSystem fileSystem, Path path,
+        final CallableRaisingIOE<CompletableFuture<FSDataInputStream>> buildTheResult) {
+      super(fileSystem, path);
+      this.buildTheResult = buildTheResult;
+    }
+
+    @Override
+    public CompletableFuture<FSDataInputStream> build()
+        throws IllegalArgumentException, UnsupportedOperationException,
+               IOException {
+      return buildTheResult.apply();
+    }
+  }
+
 }