فهرست منبع

HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko.

(cherry picked from commit b3786d6c3cc13b0b92b9f42da1731c4ce35c9ded)
Konstantin V Shvachko 4 سال پیش
والد
کامیت
104dd85ad8

+ 13 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

@@ -844,6 +844,19 @@ public abstract class AbstractFileSystem {
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
 
+  /**
+   * Synchronize client metadata state.
+   * <p/>In some FileSystem implementations such as HDFS metadata
+   * synchronization is essential to guarantee consistency of read requests
+   * particularly in HA setting.
+   * @throws IOException
+   * @throws UnsupportedOperationException
+   */
+  public void msync() throws IOException, UnsupportedOperationException {
+    throw new UnsupportedOperationException(getClass().getCanonicalName() +
+        " does not support method msync");
+  }
+
   /**
    * The specification of this method matches that of
    * {@link FileContext#access(Path, FsAction)}

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

@@ -1188,6 +1188,16 @@ public class FileContext {
     }.resolve(this, absF);
   }
 
+  /**
+   * Synchronize client metadata state.
+   *
+   * @throws IOException
+   * @throws UnsupportedOperationException
+   */
+  public void msync() throws IOException, UnsupportedOperationException {
+    defaultFS.msync();
+  }
+
   /**
    * Checks if the user can access a path.  The mode specifies which access
    * checks to perform.  If the requested permissions are granted, then the

+ 13 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -2507,6 +2507,19 @@ public abstract class FileSystem extends Configured implements Closeable {
    */
   public abstract FileStatus getFileStatus(Path f) throws IOException;
 
+  /**
+   * Synchronize client metadata state.
+   * <p/>In some FileSystem implementations such as HDFS metadata
+   * synchronization is essential to guarantee consistency of read requests
+   * particularly in HA setting.
+   * @throws IOException
+   * @throws UnsupportedOperationException
+   */
+  public void msync() throws IOException, UnsupportedOperationException {
+    throw new UnsupportedOperationException(getClass().getCanonicalName() +
+        " does not support method msync");
+  }
+
   /**
    * Checks if the user can access a path.  The mode specifies which access
    * checks to perform.  If the requested permissions are granted, then the

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -446,6 +446,11 @@ public class FilterFileSystem extends FileSystem {
     return fs.getFileStatus(f);
   }
 
+  @Override
+  public void msync() throws IOException, UnsupportedOperationException {
+    fs.msync();
+  }
+
   @Override
   public void access(Path path, FsAction mode) throws AccessControlException,
       FileNotFoundException, IOException {

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

@@ -122,6 +122,11 @@ public abstract class FilterFs extends AbstractFileSystem {
     return myFs.getFileStatus(f);
   }
 
+  @Override
+  public void msync() throws IOException, UnsupportedOperationException {
+    myFs.msync();
+  }
+
   @Override
   public void access(Path path, FsAction mode) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java

@@ -673,6 +673,11 @@ public class HarFileSystem extends FileSystem {
     return hstatus;
   }
 
+  @Override
+  public void msync() throws IOException, UnsupportedOperationException {
+    fs.msync();
+  }
+
   /**
    * @return null since no checksum algorithm is implemented.
    */

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -137,7 +137,18 @@ public class Hdfs extends AbstractFileSystem {
       throw new FileNotFoundException("File does not exist: " + f.toString());
     }
   }
-  
+
+  /**
+   * Synchronize client metadata state with Active NameNode.
+   * <p/>In HA the client synchronizes its state with the Active NameNode
+   * in order to guarantee subsequent read consistency from Observer Nodes.
+   * @throws IOException
+   */
+  @Override
+  public void msync() throws IOException {
+    dfs.msync();
+  }
+
   @Override
   public FileStatus getFileLinkStatus(Path f) 
       throws IOException, UnresolvedLinkException {

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -1536,6 +1536,17 @@ public class DistributedFileSystem extends FileSystem
     }.resolve(this, absF);
   }
 
+  /**
+   * Synchronize client metadata state with Active NameNode.
+   * <p/>In HA the client synchronizes its state with the Active NameNode
+   * in order to guarantee subsequent read consistency from Observer Nodes.
+   * @throws IOException
+   */
+  @Override
+  public void msync() throws IOException {
+    dfs.msync();
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public void createSymlink(final Path target, final Path link,

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -2656,7 +2656,8 @@ public class MiniDFSCluster implements AutoCloseable {
   public void rollEditLogAndTail(int nnIndex) throws Exception {
     getNameNode(nnIndex).getRpcServer().rollEditLog();
     for (int i = 2; i < getNumNameNodes(); i++) {
-      getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
+      long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
+      LOG.info("editsLoaded " + el);
     }
   }
 

+ 39 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java

@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Supplier;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -31,9 +33,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -110,7 +115,8 @@ public class TestConsistentReadsObserver {
     final int observerIdx = 2;
     NameNode nn = dfsCluster.getNameNode(observerIdx);
     int port = nn.getNameNodeAddress().getPort();
-    Configuration configuration = dfsCluster.getConfiguration(observerIdx);
+    Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
+    Configuration configuration = new Configuration(originalConf);
     String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
     configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
         TestRpcScheduler.class.getName());
@@ -127,6 +133,8 @@ public class TestConsistentReadsObserver {
     // be triggered and client should retry active NN.
     dfs.getFileStatus(testPath);
     assertSentTo(0);
+    // reset the original call queue
+    NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
   }
 
   @Test
@@ -207,7 +215,7 @@ public class TestConsistentReadsObserver {
           // Therefore, the subsequent getFileStatus call should succeed.
           if (!autoMsync) {
             // If not testing auto-msync, perform an explicit one here
-            dfs2.getClient().msync();
+            dfs2.msync();
           } else if (autoMsyncPeriodMs > 0) {
             Thread.sleep(autoMsyncPeriodMs);
           }
@@ -413,6 +421,35 @@ public class TestConsistentReadsObserver {
     }
   }
 
+  @Test(timeout=10000)
+  public void testMsyncFileContext() throws Exception {
+    NameNode nn0 = dfsCluster.getNameNode(0);
+    NameNode nn2 = dfsCluster.getNameNode(2);
+    HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
+    assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
+    st = nn2.getRpcServer().getServiceStatus();
+    assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());
+
+    FileContext fc = FileContext.getFileContext(conf);
+    // initialize observer proxy for FileContext
+    fc.getFsStatus(testPath);
+
+    Path p = new Path(testPath, "testMsyncFileContext");
+    fc.mkdir(p, FsPermission.getDefault(), true);
+    fc.msync();
+    dfsCluster.rollEditLogAndTail(0);
+    LOG.info("State id active = {}, Stat id observer = {}",
+        nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
+        nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
+    try {
+      // if getFileStatus is taking too long due to server requeueing
+      // the test will time out
+      fc.getFileStatus(p);
+    } catch (FileNotFoundException e) {
+      fail("File should exist on Observer after msync");
+    }
+  }
+
   private void assertSentTo(int nnIdx) throws IOException {
     assertTrue("Request was not sent to the expected namenode " + nnIdx,
         HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));