소스 검색

HDFS-14250. [SBN read]. msync should always direct to active NameNode to get latest stateID. Contributed by Chao Sun.

Erik Krogen 6 년 전
부모
커밋
45cd309a40

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -1754,6 +1754,6 @@ public interface ClientProtocol {
    * @throws IOException
    * @throws IOException
    */
    */
   @Idempotent
   @Idempotent
-  @ReadOnly(isCoordinated = true)
+  @ReadOnly(activeOnly = true)
   void msync() throws IOException;
   void msync() throws IOException;
 }
 }

+ 46 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java

@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -150,7 +151,51 @@ public class TestConsistentReadsObserver {
     assertEquals(1, readStatus.get());
     assertEquals(1, readStatus.get());
   }
   }
 
 
-  // @Ignore("Move to another test file")
+  @Test
+  public void testMsync() throws Exception {
+    // 0 == not completed, 1 == succeeded, -1 == failed
+    AtomicInteger readStatus = new AtomicInteger(0);
+    Configuration conf2 = new Configuration(conf);
+
+    // Disable FS cache so two different DFS clients will be used.
+    conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+    DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
+
+    // Initialize the proxies for Observer Node.
+    dfs.getClient().getHAServiceState();
+    dfs2.getClient().getHAServiceState();
+
+    // Advance Observer's state ID so it is ahead of client's.
+    dfs.mkdir(new Path("/test"), FsPermission.getDefault());
+    dfsCluster.rollEditLogAndTail(0);
+
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+
+    Thread reader = new Thread(() -> {
+      try {
+        // After msync, client should have the latest state ID from active.
+        // Therefore, the subsequent getFileStatus call should succeed.
+        dfs2.getClient().msync();
+        dfs2.getFileStatus(testPath);
+        readStatus.set(1);
+      } catch (IOException e) {
+        e.printStackTrace();
+        readStatus.set(-1);
+      }
+    });
+
+    reader.start();
+
+    Thread.sleep(100);
+    assertEquals(0, readStatus.get());
+
+    dfsCluster.rollEditLogAndTail(0);
+
+    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+    assertEquals(1, readStatus.get());
+  }
+
   @Test
   @Test
   public void testUncoordinatedCall() throws Exception {
   public void testUncoordinatedCall() throws Exception {
     // make a write call so that client will be ahead of
     // make a write call so that client will be ahead of