Prechádzať zdrojové kódy

HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen.

Erik Krogen 6 rokov pred
rodič
commit
11fee2d4e1

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java

@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -43,6 +44,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -68,6 +70,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private static final Logger LOG = LoggerFactory.getLogger(
   private static final Logger LOG = LoggerFactory.getLogger(
       ObserverReadProxyProvider.class);
       ObserverReadProxyProvider.class);
 
 
+  /** Configuration key for {@link #autoMsyncPeriodMs}. */
+  static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
+      HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period";
+  /** Auto-msync disabled by default. */
+  static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;
+
   /** Client-side context for syncing with the NameNode server side. */
   /** Client-side context for syncing with the NameNode server side. */
   private final AlignmentContext alignmentContext;
   private final AlignmentContext alignmentContext;
 
 
@@ -87,6 +95,24 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
    */
    */
   private boolean observerReadEnabled;
   private boolean observerReadEnabled;
 
 
+  /**
+   * This adjusts how frequently this proxy provider should auto-msync to the
+   * Active NameNode, automatically performing an msync() call to the active
+   * to fetch the current transaction ID before submitting read requests to
+   * observer nodes. See HDFS-14211 for more description of this feature.
+   * If this is below 0, never auto-msync. If this is 0, perform an msync on
+   * every read operation. If this is above 0, perform an msync after this many
+   * ms have elapsed since the last msync.
+   */
+  private final long autoMsyncPeriodMs;
+
+  /**
+   * The time, in millisecond epoch, that the last msync operation was
+   * performed. This includes any implicit msync (any operation which is
+   * serviced by the Active NameNode).
+   */
+  private volatile long lastMsyncTimeMs = -1;
+
   /**
   /**
    * A client using an ObserverReadProxyProvider should first sync with the
    * A client using an ObserverReadProxyProvider should first sync with the
    * active NameNode on startup. This ensures that the client reads data which
    * active NameNode on startup. This ensures that the client reads data which
@@ -154,6 +180,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         ObserverReadInvocationHandler.class.getClassLoader(),
         ObserverReadInvocationHandler.class.getClassLoader(),
         new Class<?>[] {xface}, new ObserverReadInvocationHandler());
         new Class<?>[] {xface}, new ObserverReadInvocationHandler());
     combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
     combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+
+    autoMsyncPeriodMs = conf.getTimeDuration(
+        // The host of the URI is the nameservice ID
+        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
     // TODO : make this configurable or remove this variable
     // TODO : make this configurable or remove this variable
     this.observerReadEnabled = true;
     this.observerReadEnabled = true;
   }
   }
@@ -247,6 +279,35 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     }
     }
     failoverProxy.getProxy().proxy.msync();
     failoverProxy.getProxy().proxy.msync();
     msynced = true;
     msynced = true;
+    lastMsyncTimeMs = Time.monotonicNow();
+  }
+
+  /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #failoverProxy}) to update the state of this client, only
+   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+   * an msync was performed.
+   *
+   * @see #autoMsyncPeriodMs
+   */
+  private void autoMsyncIfNecessary() throws IOException {
+    if (autoMsyncPeriodMs == 0) {
+      // Always msync
+      failoverProxy.getProxy().proxy.msync();
+    } else if (autoMsyncPeriodMs > 0) {
+      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+        synchronized (this) {
+          // Use a synchronized block so that only one thread will msync
+          // if many operations are submitted around the same time.
+          // Re-check the entry criterion since the status may have changed
+          // while waiting for the lock.
+          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+            failoverProxy.getProxy().proxy.msync();
+            lastMsyncTimeMs = Time.monotonicNow();
+          }
+        }
+      }
+    }
   }
   }
 
 
   /**
   /**
@@ -273,6 +334,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
           // An msync() must first be performed to ensure that this client is
           // An msync() must first be performed to ensure that this client is
           // up-to-date with the active's state. This will only be done once.
           // up-to-date with the active's state. This will only be done once.
           initializeMsync();
           initializeMsync();
+        } else {
+          autoMsyncIfNecessary();
         }
         }
 
 
         int failedObserverCount = 0;
         int failedObserverCount = 0;
@@ -349,6 +412,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       // If this was reached, the request reached the active, so the
       // If this was reached, the request reached the active, so the
       // state is up-to-date with active and no further msync is needed.
       // state is up-to-date with active and no further msync is needed.
       msynced = true;
       msynced = true;
+      lastMsyncTimeMs = Time.monotonicNow();
       lastProxy = activeProxy;
       lastProxy = activeProxy;
       return retVal;
       return retVal;
     }
     }

+ 46 - 2
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md

@@ -61,9 +61,12 @@ ID, which is implemented using transaction ID within NameNode, is
 introduced in RPC headers. When a client performs write through Active
 introduced in RPC headers. When a client performs write through Active
 NameNode, it updates its state ID using the latest transaction ID from
 NameNode, it updates its state ID using the latest transaction ID from
 the NameNode. When performing a subsequent read, the client passes this
 the NameNode. When performing a subsequent read, the client passes this
-state ID to Observe NameNode, which will then check against its own
+state ID to Observer NameNode, which will then check against its own
 transaction ID, and will ensure its own transaction ID has caught up
 transaction ID, and will ensure its own transaction ID has caught up
-with the request's state ID, before serving the read request.
+with the request's state ID, before serving the read request. This ensures
+"read your own writes" semantics from a single client. Maintaining
+consistency between multiple clients in the face of out-of-band communication
+is discussed in the "Maintaining Client Consistency" section below.
 
 
 Edit log tailing is critical for Observer NameNode as it directly affects
 Edit log tailing is critical for Observer NameNode as it directly affects
 the latency between when a transaction is applied in Active NameNode and
 the latency between when a transaction is applied in Active NameNode and
@@ -83,6 +86,32 @@ available in the cluster, and only fall back to Active NameNode if all
 of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover
 of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover
 is introduced to replace IPFailoverProxyProvider in a IP failover setup.
 is introduced to replace IPFailoverProxyProvider in a IP failover setup.
 
 
+### Maintaining Client Consistency
+
+As discussed above, a client 'foo' will update its state ID upon every request
+to the Active NameNode, which includes all write operations. Any request
+directed to an Observer NameNode will wait until the Observer has seen
+this transaction ID, ensuring that the client is able to read all of its own
+writes. However, if 'foo' sends an out-of-band (i.e., non-HDFS) message to
+client 'bar' telling it that a write has been performed, a subsequent read by
+'bar' may not see the recent write by 'foo'. To prevent this inconsistent
+behavior, a new `msync()`, or "metadata sync", command has been added. When
+`msync()` is called on a client, it will update its state ID against the
+Active NameNode -- a very lightweight operation -- so that subsequent reads
+are guaranteed to be consistent up to the point of the `msync()`. Thus as long
+as 'bar' calls `msync()` before performing its read, it is guaranteed to see
+the write made by 'foo'.
+
+To make use of `msync()`, an application does not necessarily have to make any
+code changes. Upon startup, a client will automatically call `msync()` before
+performing any reads against an Observer, so that any writes performed prior
+to the initialization of the client will be visible. In addition, there is
+a configurable "auto-msync" mode supported by ObserverReadProxyProvider which
+will automatically perform an `msync()` at some configurable interval, to
+prevent a client from ever seeing data that is more stale than a time bound.
+There is some overhead associated with this, as each refresh requires an RPC
+to the Active NameNode, so it is disabled by default.
+
 Deployment
 Deployment
 -----------
 -----------
 
 
@@ -185,3 +214,18 @@ implementation, in the client-side **hdfs-site.xml** configuration file:
 Clients who do not wish to use Observer NameNode can still use the
 Clients who do not wish to use Observer NameNode can still use the
 existing ConfiguredFailoverProxyProvider and should not see any behavior
 existing ConfiguredFailoverProxyProvider and should not see any behavior
 change.
 change.
+
+Clients who wish to make use of the "auto-msync" functionality should adjust
+the configuration below. This will specify some time period after which,
+if the client's state ID has not been updated from the Active NameNode, an
+`msync()` will automatically be performed. If this is specified as 0, an
+`msync()` will be performed before _every_ read operation. If this is a
+positive time duration, an `msync()` will be performed every time a read
+operation is requested and the Active has not been contacted for longer than
+that period. If this is negative (the default), no automatic `msync()` will
+be performed.
+
+    <property>
+        <name>dfs.client.failover.observer.auto-msync-period.<nameservice></name>
+        <value>500ms</value>
+    </property>

+ 46 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java

@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.RpcScheduler;
 import org.apache.hadoop.ipc.RpcScheduler;
 import org.apache.hadoop.ipc.Schedulable;
 import org.apache.hadoop.ipc.Schedulable;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -57,7 +60,7 @@ public class TestConsistentReadsObserver {
   private static Configuration conf;
   private static Configuration conf;
   private static MiniQJMHACluster qjmhaCluster;
   private static MiniQJMHACluster qjmhaCluster;
   private static MiniDFSCluster dfsCluster;
   private static MiniDFSCluster dfsCluster;
-  private static DistributedFileSystem dfs;
+  private DistributedFileSystem dfs;
 
 
   private final Path testPath= new Path("/TestConsistentReadsObserver");
   private final Path testPath= new Path("/TestConsistentReadsObserver");
 
 
@@ -74,7 +77,7 @@ public class TestConsistentReadsObserver {
 
 
   @Before
   @Before
   public void setUp() throws Exception {
   public void setUp() throws Exception {
-    setObserverRead(true);
+    dfs = setObserverRead(true);
   }
   }
 
 
   @After
   @After
@@ -106,8 +109,7 @@ public class TestConsistentReadsObserver {
     configuration.setBoolean(prefix
     configuration.setBoolean(prefix
         + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
         + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
 
 
-    dfsCluster.restartNameNode(observerIdx);
-    dfsCluster.transitionToObserver(observerIdx);
+    NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
 
 
     dfs.create(testPath, (short)1).close();
     dfs.create(testPath, (short)1).close();
     assertSentTo(0);
     assertSentTo(0);
@@ -151,18 +153,26 @@ public class TestConsistentReadsObserver {
     assertEquals(1, readStatus.get());
     assertEquals(1, readStatus.get());
   }
   }
 
 
-  @Test
-  public void testMsync() throws Exception {
+  private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
+      throws Exception {
     // 0 == not completed, 1 == succeeded, -1 == failed
     // 0 == not completed, 1 == succeeded, -1 == failed
     AtomicInteger readStatus = new AtomicInteger(0);
     AtomicInteger readStatus = new AtomicInteger(0);
     Configuration conf2 = new Configuration(conf);
     Configuration conf2 = new Configuration(conf);
 
 
     // Disable FS cache so two different DFS clients will be used.
     // Disable FS cache so two different DFS clients will be used.
     conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
     conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+    if (autoMsync) {
+      conf2.setTimeDuration(
+          ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
+              + "." + dfs.getUri().getHost(),
+          autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
+    }
     DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
     DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
 
 
     // Initialize the proxies for Observer Node.
     // Initialize the proxies for Observer Node.
     dfs.getClient().getHAServiceState();
     dfs.getClient().getHAServiceState();
+    // This initialization will perform the msync-on-startup, so that another
+    // form of msync is required later
     dfs2.getClient().getHAServiceState();
     dfs2.getClient().getHAServiceState();
 
 
     // Advance Observer's state ID so it is ahead of client's.
     // Advance Observer's state ID so it is ahead of client's.
@@ -176,7 +186,12 @@ public class TestConsistentReadsObserver {
       try {
       try {
         // After msync, client should have the latest state ID from active.
         // After msync, client should have the latest state ID from active.
         // Therefore, the subsequent getFileStatus call should succeed.
         // Therefore, the subsequent getFileStatus call should succeed.
-        dfs2.getClient().msync();
+        if (!autoMsync) {
+          // If not testing auto-msync, perform an explicit one here
+          dfs2.getClient().msync();
+        } else if (autoMsyncPeriodMs > 0) {
+          Thread.sleep(autoMsyncPeriodMs);
+        }
         dfs2.getFileStatus(testPath);
         dfs2.getFileStatus(testPath);
         if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
         if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
           readStatus.set(1);
           readStatus.set(1);
@@ -196,10 +211,31 @@ public class TestConsistentReadsObserver {
 
 
     dfsCluster.rollEditLogAndTail(0);
     dfsCluster.rollEditLogAndTail(0);
 
 
-    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
     assertEquals(1, readStatus.get());
     assertEquals(1, readStatus.get());
   }
   }
 
 
+  @Test
+  public void testExplicitMsync() throws Exception {
+    testMsync(false, -1);
+  }
+
+  @Test
+  public void testAutoMsyncPeriod0() throws Exception {
+    testMsync(true, 0);
+  }
+
+  @Test
+  public void testAutoMsyncPeriod5() throws Exception {
+    testMsync(true, 5);
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testAutoMsyncLongPeriod() throws Exception {
+    // This should fail since the auto-msync is never activated
+    testMsync(true, Long.MAX_VALUE);
+  }
+
   // A new client should first contact the active, before using an observer,
   // A new client should first contact the active, before using an observer,
   // to ensure that it is up-to-date with the current state
   // to ensure that it is up-to-date with the current state
   @Test
   @Test
@@ -313,8 +349,8 @@ public class TestConsistentReadsObserver {
         HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
         HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
   }
   }
 
 
-  private static void setObserverRead(boolean flag) throws Exception {
-    dfs = HATestUtil.configureObserverReadFs(
+  private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
+    return HATestUtil.configureObserverReadFs(
         dfsCluster, conf, ObserverReadProxyProvider.class, flag);
         dfsCluster, conf, ObserverReadProxyProvider.class, flag);
   }
   }