Browse Source

HDFS-6634. inotify in HDFS. Contributed by James Thomas.

(cherry picked from commit faa4455be512e070fa420084be8d1be5c72f3b08)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Andrew Wang 10 years ago
parent
commit
958c9b5080
36 changed files with 2215 additions and 59 deletions
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 9 0
      hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
  3. 1 0
      hadoop-hdfs-project/hadoop-hdfs/pom.xml
  4. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
  5. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  6. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  7. 220 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
  8. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  9. 50 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
  10. 452 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
  11. 63 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
  12. 54 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
  13. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  14. 25 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  15. 25 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  16. 245 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  17. 54 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
  18. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
  19. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
  20. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  21. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
  22. 45 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  23. 13 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  24. 146 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
  25. 36 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
  26. 114 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  27. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
  28. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
  29. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  30. 117 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
  31. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  32. 430 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
  33. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
  34. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
  35. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
  36. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -171,6 +171,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
     HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
     via atm)
     via atm)
 
 
+    HDFS-6634. inotify in HDFS. (James Thomas via wang)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -106,6 +106,15 @@
        <Field name="metrics" />
        <Field name="metrics" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
      </Match>
+    <!--
+     We use a separate lock to protect modifications to journalSet so that
+     FSEditLog#selectInputStreams does not need to be a synchronized method.
+    -->
+    <Match>
+        <Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
+        <Field name="journalSet" />
+        <Bug pattern="IS2_INCONSISTENT_SYNC" />
+    </Match>
      <!--
      <!--
       This method isn't performance-critical and is much clearer to write as it's written.
       This method isn't performance-critical and is much clearer to write as it's written.
       -->
       -->

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -453,6 +453,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                   <include>fsimage.proto</include>
                   <include>fsimage.proto</include>
                   <include>hdfs.proto</include>
                   <include>hdfs.proto</include>
                   <include>encryption.proto</include>
                   <include>encryption.proto</include>
+                  <include>inotify.proto</include>
                 </includes>
                 </includes>
               </source>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>
               <output>${project.build.directory}/generated-sources/java</output>

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java

@@ -168,6 +168,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
     reader.setMaxOpSize(maxOpSize);
     reader.setMaxOpSize(maxOpSize);
   }
   }
 
 
+  @Override
+  public boolean isLocalLog() {
+    return false;
+  }
+
   /**
   /**
    * Input stream implementation which can be used by 
    * Input stream implementation which can be used by 
    * FSEditLogOp.Reader
    * FSEditLogOp.Reader

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -2990,6 +2990,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
     }
   }
   }
 
 
+  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+    return new DFSInotifyEventInputStream(namenode);
+  }
+
+  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+      throws IOException {
+    return new DFSInotifyEventInputStream(namenode, lastReadTxid);
+  }
+
   @Override // RemotePeerFactory
   @Override // RemotePeerFactory
   public Peer newConnectedPeer(InetSocketAddress addr,
   public Peer newConnectedPeer(InetSocketAddress addr,
       Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
       Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -677,4 +677,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
    public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
    public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
      "dfs.datanode.slow.io.warning.threshold.ms";
      "dfs.datanode.slow.io.warning.threshold.ms";
    public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
    public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
+
+  public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY =
+      "dfs.namenode.inotify.max.events.per.rpc";
+  public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
+      1000;
+
 }
 }

+ 220 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Stream for reading inotify events. DFSInotifyEventInputStreams should not
+ * be shared among multiple threads.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DFSInotifyEventInputStream {
+  public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
+      .class);
+
+  private final ClientProtocol namenode;
+  private Iterator<Event> it;
+  private long lastReadTxid;
+  /**
+   * The most recent txid the NameNode told us it has sync'ed -- helps us
+   * determine how far behind we are in the edit stream.
+   */
+  private long syncTxid;
+  /**
+   * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
+   */
+  private Random rng = new Random();
+
+  private static final int INITIAL_WAIT_MS = 10;
+
+  DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException {
+    this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's
+  }
+
+  DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid)
+      throws IOException {
+    this.namenode = namenode;
+    this.it = Iterators.emptyIterator();
+    this.lastReadTxid = lastReadTxid;
+  }
+
+  /**
+   * Returns the next event in the stream or null if no new events are currently
+   * available.
+   *
+   * @throws IOException because of network error or edit log
+   * corruption. Also possible if JournalNodes are unresponsive in the
+   * QJM setting (even one unresponsive JournalNode is enough in rare cases),
+   * so catching this exception and retrying at least a few times is
+   * recommended.
+   * @throws MissingEventsException if we cannot return the next event in the
+   * stream because the data for the event (and possibly some subsequent events)
+   * has been deleted (generally because this stream is a very large number of
+   * events behind the current state of the NameNode). It is safe to continue
+   * reading from the stream after this exception is thrown -- the next
+   * available event will be returned.
+   */
+  public Event poll() throws IOException, MissingEventsException {
+    // need to keep retrying until the NN sends us the latest committed txid
+    if (lastReadTxid == -1) {
+      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
+      lastReadTxid = namenode.getCurrentEditLogTxid();
+      return null;
+    }
+    if (!it.hasNext()) {
+      EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+      if (el.getLastTxid() != -1) {
+        // we only want to set syncTxid when we were actually able to read some
+        // edits on the NN -- otherwise it will seem like edits are being
+        // generated faster than we can read them when the problem is really
+        // that we are temporarily unable to read edits
+        syncTxid = el.getSyncTxid();
+        it = el.getEvents().iterator();
+        long formerLastReadTxid = lastReadTxid;
+        lastReadTxid = el.getLastTxid();
+        if (el.getFirstTxid() != formerLastReadTxid + 1) {
+          throw new MissingEventsException(formerLastReadTxid + 1,
+              el.getFirstTxid());
+        }
+      } else {
+        LOG.debug("poll(): read no edits from the NN when requesting edits " +
+          "after txid {}", lastReadTxid);
+        return null;
+      }
+    }
+
+    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
+      // newly seen edit log ops actually got converted to events
+      return it.next();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Return a estimate of how many events behind the NameNode's current state
+   * this stream is. Clients should periodically call this method and check if
+   * its result is steadily increasing, which indicates that they are falling
+   * behind (i.e. events are being generated faster than the client is reading
+   * them). If a client falls too far behind events may be deleted before the
+   * client can read them.
+   * <p/>
+   * A return value of -1 indicates that an estimate could not be produced, and
+   * should be ignored. The value returned by this method is really only useful
+   * when compared to previous or subsequent returned values.
+   */
+  public long getEventsBehindEstimate() {
+    if (syncTxid == 0) {
+      return -1;
+    } else {
+      assert syncTxid >= lastReadTxid;
+      // this gives the difference between the last txid we have fetched to the
+      // client and syncTxid at the time we last fetched events from the
+      // NameNode
+      return syncTxid - lastReadTxid;
+    }
+  }
+
+  /**
+   * Returns the next event in the stream, waiting up to the specified amount of
+   * time for a new event. Returns null if a new event is not available at the
+   * end of the specified amount of time. The time before the method returns may
+   * exceed the specified amount of time by up to the time required for an RPC
+   * to the NameNode.
+   *
+   * @param time number of units of the given TimeUnit to wait
+   * @param tu the desired TimeUnit
+   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
+   * @throws MissingEventsException
+   * see {@link DFSInotifyEventInputStream#poll()}
+   * @throws InterruptedException if the calling thread is interrupted
+   */
+  public Event poll(long time, TimeUnit tu) throws IOException,
+      InterruptedException, MissingEventsException {
+    long initialTime = Time.monotonicNow();
+    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
+    long nextWait = INITIAL_WAIT_MS;
+    Event next = null;
+    while ((next = poll()) == null) {
+      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
+      if (timeLeft <= 0) {
+        LOG.debug("timed poll(): timed out");
+        break;
+      } else if (timeLeft < nextWait * 2) {
+        nextWait = timeLeft;
+      } else {
+        nextWait *= 2;
+      }
+      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
+          nextWait);
+      Thread.sleep(nextWait);
+    }
+
+    return next;
+  }
+
+  /**
+   * Returns the next event in the stream, waiting indefinitely if a new event
+   * is not immediately available.
+   *
+   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
+   * @throws MissingEventsException see
+   * {@link DFSInotifyEventInputStream#poll()}
+   * @throws InterruptedException if the calling thread is interrupted
+   */
+  public Event take() throws IOException, InterruptedException,
+      MissingEventsException {
+    Event next = null;
+    int nextWaitMin = INITIAL_WAIT_MS;
+    while ((next = poll()) == null) {
+      // sleep for a random period between nextWaitMin and nextWaitMin * 2
+      // to avoid stampedes at the NN if there are multiple clients
+      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
+      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
+      Thread.sleep(sleepTime);
+      // the maximum sleep is 2 minutes
+      nextWaitMin = Math.min(60000, nextWaitMin * 2);
+    }
+
+    return next;
+  }
+}

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -1988,4 +1988,13 @@ public class DistributedFileSystem extends FileSystem {
       }
       }
     }.resolve(this, absF);
     }.resolve(this, absF);
   }
   }
+
+  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+    return dfs.getInotifyEventStream();
+  }
+
+  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+      throws IOException {
+    return dfs.getInotifyEventStream(lastReadTxid);
+  }
 }
 }

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -275,4 +276,53 @@ public class HdfsAdmin {
       throws IOException {
       throws IOException {
     return dfs.listEncryptionZones();
     return dfs.listEncryptionZones();
   }
   }
+
+  /**
+   * Exposes a stream of namesystem events. Only events occurring after the
+   * stream is created are available.
+   * See {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream}
+   * for information on stream usage.
+   * See {@link org.apache.hadoop.hdfs.inotify.Event}
+   * for information on the available events.
+   * <p/>
+   * Inotify users may want to tune the following HDFS parameters to
+   * ensure that enough extra HDFS edits are saved to support inotify clients
+   * that fall behind the current state of the namespace while reading events.
+   * The default parameter values should generally be reasonable. If edits are
+   * deleted before their corresponding events can be read, clients will see a
+   * {@link org.apache.hadoop.hdfs.inotify.MissingEventsException} on
+   * {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} method calls.
+   *
+   * It should generally be sufficient to tune these parameters:
+   * dfs.namenode.num.extra.edits.retained
+   * dfs.namenode.max.extra.edits.segments.retained
+   *
+   * Parameters that affect the number of created segments and the number of
+   * edits that are considered necessary, i.e. do not count towards the
+   * dfs.namenode.num.extra.edits.retained quota):
+   * dfs.namenode.checkpoint.period
+   * dfs.namenode.checkpoint.txns
+   * dfs.namenode.num.checkpoints.retained
+   * dfs.ha.log-roll.period
+   * <p/>
+   * It is recommended that local journaling be configured
+   * (dfs.namenode.edits.dir) for inotify (in addition to a shared journal)
+   * so that edit transfers from the shared journal can be avoided.
+   *
+   * @throws IOException If there was an error obtaining the stream.
+   */
+  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+    return dfs.getInotifyEventStream();
+  }
+
+  /**
+   * A version of {@link HdfsAdmin#getInotifyEventStream()} meant for advanced
+   * users who are aware of HDFS edits up to lastReadTxid (e.g. because they
+   * have access to an FSImage inclusive of lastReadTxid) and only want to read
+   * events after this point.
+   */
+  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+      throws IOException {
+    return dfs.getInotifyEventStream(lastReadTxid);
+  }
 }
 }

+ 452 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java

@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.util.List;
+
+/**
+ * Events sent by the inotify system. Note that no events are necessarily sent
+ * when a file is opened for read (although a MetadataUpdateEvent will be sent
+ * if the atime is updated).
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class Event {
+  public static enum EventType {
+    CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK
+  }
+
+  private EventType eventType;
+
+  public EventType getEventType() {
+    return eventType;
+  }
+
+  public Event(EventType eventType) {
+    this.eventType = eventType;
+  }
+
+  /**
+   * Sent when a file is closed after append or create.
+   */
+  public static class CloseEvent extends Event {
+    private String path;
+    private long fileSize;
+    private long timestamp;
+
+    public CloseEvent(String path, long fileSize, long timestamp) {
+      super(EventType.CLOSE);
+      this.path = path;
+      this.fileSize = fileSize;
+      this.timestamp = timestamp;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    /**
+     * The size of the closed file in bytes. May be -1 if the size is not
+     * available (e.g. in the case of a close generated by a concat operation).
+     */
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    /**
+     * The time when this event occurred, in milliseconds since the epoch.
+     */
+    public long getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  /**
+   * Sent when a new file is created (including overwrite).
+   */
+  public static class CreateEvent extends Event {
+
+    public static enum INodeType {
+      FILE, DIRECTORY, SYMLINK;
+    }
+
+    private INodeType iNodeType;
+    private String path;
+    private long ctime;
+    private int replication;
+    private String ownerName;
+    private String groupName;
+    private FsPermission perms;
+    private String symlinkTarget;
+
+    public static class Builder {
+      private INodeType iNodeType;
+      private String path;
+      private long ctime;
+      private int replication;
+      private String ownerName;
+      private String groupName;
+      private FsPermission perms;
+      private String symlinkTarget;
+
+      public Builder iNodeType(INodeType type) {
+        this.iNodeType = type;
+        return this;
+      }
+
+      public Builder path(String path) {
+        this.path = path;
+        return this;
+      }
+
+      public Builder ctime(long ctime) {
+        this.ctime = ctime;
+        return this;
+      }
+
+      public Builder replication(int replication) {
+        this.replication = replication;
+        return this;
+      }
+
+      public Builder ownerName(String ownerName) {
+        this.ownerName = ownerName;
+        return this;
+      }
+
+      public Builder groupName(String groupName) {
+        this.groupName = groupName;
+        return this;
+      }
+
+      public Builder perms(FsPermission perms) {
+        this.perms = perms;
+        return this;
+      }
+
+      public Builder symlinkTarget(String symlinkTarget) {
+        this.symlinkTarget = symlinkTarget;
+        return this;
+      }
+
+      public CreateEvent build() {
+        return new CreateEvent(this);
+      }
+    }
+
+    private CreateEvent(Builder b) {
+      super(EventType.CREATE);
+      this.iNodeType = b.iNodeType;
+      this.path = b.path;
+      this.ctime = b.ctime;
+      this.replication = b.replication;
+      this.ownerName = b.ownerName;
+      this.groupName = b.groupName;
+      this.perms = b.perms;
+      this.symlinkTarget = b.symlinkTarget;
+    }
+
+    public INodeType getiNodeType() {
+      return iNodeType;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    /**
+     * Creation time of the file, directory, or symlink.
+     */
+    public long getCtime() {
+      return ctime;
+    }
+
+    /**
+     * Replication is zero if the CreateEvent iNodeType is directory or symlink.
+     */
+    public int getReplication() {
+      return replication;
+    }
+
+    public String getOwnerName() {
+      return ownerName;
+    }
+
+    public String getGroupName() {
+      return groupName;
+    }
+
+    public FsPermission getPerms() {
+      return perms;
+    }
+
+    /**
+     * Symlink target is null if the CreateEvent iNodeType is not symlink.
+     */
+    public String getSymlinkTarget() {
+      return symlinkTarget;
+    }
+  }
+
+  /**
+   * Sent when there is an update to directory or file (none of the metadata
+   * tracked here applies to symlinks) that is not associated with another
+   * inotify event. The tracked metadata includes atime/mtime, replication,
+   * owner/group, permissions, ACLs, and XAttributes. Fields not relevant to the
+   * metadataType of the MetadataUpdateEvent will be null or will have their default
+   * values.
+   */
+  public static class MetadataUpdateEvent extends Event {
+
+    public static enum MetadataType {
+      TIMES, REPLICATION, OWNER, PERMS, ACLS, XATTRS;
+    }
+
+    private String path;
+    private MetadataType metadataType;
+    private long mtime;
+    private long atime;
+    private int replication;
+    private String ownerName;
+    private String groupName;
+    private FsPermission perms;
+    private List<AclEntry> acls;
+    private List<XAttr> xAttrs;
+    private boolean xAttrsRemoved;
+
+    public static class Builder {
+      private String path;
+      private MetadataType metadataType;
+      private long mtime;
+      private long atime;
+      private int replication;
+      private String ownerName;
+      private String groupName;
+      private FsPermission perms;
+      private List<AclEntry> acls;
+      private List<XAttr> xAttrs;
+      private boolean xAttrsRemoved;
+
+      public Builder path(String path) {
+        this.path = path;
+        return this;
+      }
+
+      public Builder metadataType(MetadataType type) {
+        this.metadataType = type;
+        return this;
+      }
+
+      public Builder mtime(long mtime) {
+        this.mtime = mtime;
+        return this;
+      }
+
+      public Builder atime(long atime) {
+        this.atime = atime;
+        return this;
+      }
+
+      public Builder replication(int replication) {
+        this.replication = replication;
+        return this;
+      }
+
+      public Builder ownerName(String ownerName) {
+        this.ownerName = ownerName;
+        return this;
+      }
+
+      public Builder groupName(String groupName) {
+        this.groupName = groupName;
+        return this;
+      }
+
+      public Builder perms(FsPermission perms) {
+        this.perms = perms;
+        return this;
+      }
+
+      public Builder acls(List<AclEntry> acls) {
+        this.acls = acls;
+        return this;
+      }
+
+      public Builder xAttrs(List<XAttr> xAttrs) {
+        this.xAttrs = xAttrs;
+        return this;
+      }
+
+      public Builder xAttrsRemoved(boolean xAttrsRemoved) {
+        this.xAttrsRemoved = xAttrsRemoved;
+        return this;
+      }
+
+      public MetadataUpdateEvent build() {
+        return new MetadataUpdateEvent(this);
+      }
+    }
+
+    private MetadataUpdateEvent(Builder b) {
+      super(EventType.METADATA);
+      this.path = b.path;
+      this.metadataType = b.metadataType;
+      this.mtime = b.mtime;
+      this.atime = b.atime;
+      this.replication = b.replication;
+      this.ownerName = b.ownerName;
+      this.groupName = b.groupName;
+      this.perms = b.perms;
+      this.acls = b.acls;
+      this.xAttrs = b.xAttrs;
+      this.xAttrsRemoved = b.xAttrsRemoved;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public MetadataType getMetadataType() {
+      return metadataType;
+    }
+
+    public long getMtime() {
+      return mtime;
+    }
+
+    public long getAtime() {
+      return atime;
+    }
+
+    public int getReplication() {
+      return replication;
+    }
+
+    public String getOwnerName() {
+      return ownerName;
+    }
+
+    public String getGroupName() {
+      return groupName;
+    }
+
+    public FsPermission getPerms() {
+      return perms;
+    }
+
+    /**
+     * The full set of ACLs currently associated with this file or directory.
+     * May be null if all ACLs were removed.
+     */
+    public List<AclEntry> getAcls() {
+      return acls;
+    }
+
+    public List<XAttr> getxAttrs() {
+      return xAttrs;
+    }
+
+    /**
+     * Whether the xAttrs returned by getxAttrs() were removed (as opposed to
+     * added).
+     */
+    public boolean isxAttrsRemoved() {
+      return xAttrsRemoved;
+    }
+
+  }
+
+  /**
+   * Sent when a file, directory, or symlink is renamed.
+   */
+  public static class RenameEvent extends Event {
+    private String srcPath;
+    private String dstPath;
+    private long timestamp;
+
+    public RenameEvent(String srcPath, String dstPath, long timestamp) {
+      super(EventType.RENAME);
+      this.srcPath = srcPath;
+      this.dstPath = dstPath;
+      this.timestamp = timestamp;
+    }
+
+    public String getSrcPath() {
+      return srcPath;
+    }
+
+    public String getDstPath() {
+      return dstPath;
+    }
+
+    /**
+     * The time when this event occurred, in milliseconds since the epoch.
+     */
+    public long getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  /**
+   * Sent when an existing file is opened for append.
+   */
+  public static class AppendEvent extends Event {
+    private String path;
+
+    public AppendEvent(String path) {
+      super(EventType.APPEND);
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+  }
+
+  /**
+   * Sent when a file, directory, or symlink is deleted.
+   */
+  public static class UnlinkEvent extends Event {
+    private String path;
+    private long timestamp;
+
+    public UnlinkEvent(String path, long timestamp) {
+      super(EventType.UNLINK);
+      this.path = path;
+      this.timestamp = timestamp;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    /**
+     * The time when this event occurred, in milliseconds since the epoch.
+     */
+    public long getTimestamp() {
+      return timestamp;
+    }
+  }
+}

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.List;
+
+/**
+ * Contains a set of events, the transaction ID in the edit log up to which we
+ * read to produce these events, and the first txid we observed when producing
+ * these events (the last of which is for the purpose of determining whether we
+ * have missed events due to edit deletion). Also contains the most recent txid
+ * that the NameNode has sync'ed, so the client can determine how far behind in
+ * the edit log it is.
+ */
+@InterfaceAudience.Private
+public class EventsList {
+  private List<Event> events;
+  private long firstTxid;
+  private long lastTxid;
+  private long syncTxid;
+
+  public EventsList(List<Event> events, long firstTxid, long lastTxid,
+      long syncTxid) {
+    this.events = events;
+    this.firstTxid = firstTxid;
+    this.lastTxid = lastTxid;
+    this.syncTxid = syncTxid;
+  }
+
+  public List<Event> getEvents() {
+    return events;
+  }
+
+  public long getFirstTxid() {
+    return firstTxid;
+  }
+
+  public long getLastTxid() {
+    return lastTxid;
+  }
+
+  public long getSyncTxid() {
+    return syncTxid;
+  }
+}

+ 54 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MissingEventsException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  private long expectedTxid;
+  private long actualTxid;
+
+  public MissingEventsException() {}
+
+  public MissingEventsException(long expectedTxid, long actualTxid) {
+    this.expectedTxid = expectedTxid;
+    this.actualTxid = actualTxid;
+  }
+
+  public long getExpectedTxid() {
+    return expectedTxid;
+  }
+
+  public long getActualTxid() {
+    return actualTxid;
+  }
+
+  @Override
+  public String toString() {
+    return "We expected the next batch of events to start with transaction ID "
+        + expectedTxid + ", but it instead started with transaction ID " +
+        actualTxid + ". Most likely the intervening transactions were cleaned "
+        + "up as part of checkpointing.";
+  }
+}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -43,10 +43,13 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventsList;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -1372,4 +1375,19 @@ public interface ClientProtocol {
    */
    */
   @Idempotent
   @Idempotent
   public void checkAccess(String path, FsAction mode) throws IOException;
   public void checkAccess(String path, FsAction mode) throws IOException;
+
+  /**
+   * Get the highest txid the NameNode knows has been written to the edit
+   * log, or -1 if the NameNode's edit log is not yet open for write. Used as
+   * the starting point for the inotify event stream.
+   */
+  @Idempotent
+  public long getCurrentEditLogTxid() throws IOException;
+
+  /**
+   * Get an ordered list of events corresponding to the edit log transactions
+   * from txid onwards.
+   */
+  @Idempotent
+  public EventsList getEditsFromTxid(long txid) throws IOException;
 }
 }

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -91,12 +91,16 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlo
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1408,4 +1412,25 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     }
     }
     return VOID_CHECKACCESS_RESPONSE;
     return VOID_CHECKACCESS_RESPONSE;
   }
   }
+
+  public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller,
+      GetCurrentEditLogTxidRequestProto req) throws ServiceException {
+    try {
+      return GetCurrentEditLogTxidResponseProto.newBuilder().setTxid(
+          server.getCurrentEditLogTxid()).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetEditsFromTxidResponseProto getEditsFromTxid(RpcController controller,
+      GetEditsFromTxidRequestProto req) throws ServiceException {
+    try {
+      return PBHelper.convertEditsResponse(server.getEditsFromTxid(
+          req.getTxid()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
 }

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.inotify.EventsList;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -95,10 +96,12 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdd
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -159,6 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -1430,4 +1434,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
       throw ProtobufHelper.getRemoteException(e);
     }
     }
   }
   }
+
+  public long getCurrentEditLogTxid() throws IOException {
+    GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto
+        .getDefaultInstance();
+    try {
+      return rpcProxy.getCurrentEditLogTxid(null, req).getTxid();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public EventsList getEditsFromTxid(long txid) throws IOException {
+    GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
+        .setTxid(txid).build();
+    try {
+      return PBHelper.convert(rpcProxy.getEditsFromTxid(null, req));
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }
 }

+ 245 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventsList;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -95,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheP
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
@@ -157,6 +160,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
+import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@@ -2334,6 +2338,247 @@ public class PBHelper {
     return new ShmId(shmId.getHi(), shmId.getLo());
     return new ShmId(shmId.getHi(), shmId.getLo());
   }
   }
 
 
+  private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType
+      type) {
+    switch (type) {
+    case I_TYPE_DIRECTORY:
+      return Event.CreateEvent.INodeType.DIRECTORY;
+    case I_TYPE_FILE:
+      return Event.CreateEvent.INodeType.FILE;
+    case I_TYPE_SYMLINK:
+      return Event.CreateEvent.INodeType.SYMLINK;
+    default:
+      return null;
+    }
+  }
+
+  private static InotifyProtos.MetadataUpdateType metadataUpdateTypeConvert(
+      Event.MetadataUpdateEvent.MetadataType type) {
+    switch (type) {
+    case TIMES:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_TIMES;
+    case REPLICATION:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_REPLICATION;
+    case OWNER:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_OWNER;
+    case PERMS:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_PERMS;
+    case ACLS:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_ACLS;
+    case XATTRS:
+      return InotifyProtos.MetadataUpdateType.META_TYPE_XATTRS;
+    default:
+      return null;
+    }
+  }
+
+  private static Event.MetadataUpdateEvent.MetadataType metadataUpdateTypeConvert(
+      InotifyProtos.MetadataUpdateType type) {
+    switch (type) {
+    case META_TYPE_TIMES:
+      return Event.MetadataUpdateEvent.MetadataType.TIMES;
+    case META_TYPE_REPLICATION:
+      return Event.MetadataUpdateEvent.MetadataType.REPLICATION;
+    case META_TYPE_OWNER:
+      return Event.MetadataUpdateEvent.MetadataType.OWNER;
+    case META_TYPE_PERMS:
+      return Event.MetadataUpdateEvent.MetadataType.PERMS;
+    case META_TYPE_ACLS:
+      return Event.MetadataUpdateEvent.MetadataType.ACLS;
+    case META_TYPE_XATTRS:
+      return Event.MetadataUpdateEvent.MetadataType.XATTRS;
+    default:
+      return null;
+    }
+  }
+
+  private static InotifyProtos.INodeType createTypeConvert(Event.CreateEvent.INodeType
+      type) {
+    switch (type) {
+    case DIRECTORY:
+      return InotifyProtos.INodeType.I_TYPE_DIRECTORY;
+    case FILE:
+      return InotifyProtos.INodeType.I_TYPE_FILE;
+    case SYMLINK:
+      return InotifyProtos.INodeType.I_TYPE_SYMLINK;
+    default:
+      return null;
+    }
+  }
+
+  public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
+    IOException {
+    List<Event> events = Lists.newArrayList();
+    for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
+      switch(p.getType()) {
+      case EVENT_CLOSE:
+        InotifyProtos.CloseEventProto close =
+            InotifyProtos.CloseEventProto.parseFrom(p.getContents());
+        events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
+            close.getTimestamp()));
+        break;
+      case EVENT_CREATE:
+        InotifyProtos.CreateEventProto create =
+            InotifyProtos.CreateEventProto.parseFrom(p.getContents());
+        events.add(new Event.CreateEvent.Builder()
+            .iNodeType(createTypeConvert(create.getType()))
+            .path(create.getPath())
+            .ctime(create.getCtime())
+            .ownerName(create.getOwnerName())
+            .groupName(create.getGroupName())
+            .perms(convert(create.getPerms()))
+            .replication(create.getReplication())
+            .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+            create.getSymlinkTarget()).build());
+        break;
+      case EVENT_METADATA:
+        InotifyProtos.MetadataUpdateEventProto meta =
+            InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
+        events.add(new Event.MetadataUpdateEvent.Builder()
+            .path(meta.getPath())
+            .metadataType(metadataUpdateTypeConvert(meta.getType()))
+            .mtime(meta.getMtime())
+            .atime(meta.getAtime())
+            .replication(meta.getReplication())
+            .ownerName(
+                meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
+            .groupName(
+                meta.getGroupName().isEmpty() ? null : meta.getGroupName())
+            .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
+            .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
+                meta.getAclsList()))
+            .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
+                meta.getXAttrsList()))
+            .xAttrsRemoved(meta.getXAttrsRemoved())
+            .build());
+        break;
+      case EVENT_RENAME:
+        InotifyProtos.RenameEventProto rename =
+            InotifyProtos.RenameEventProto.parseFrom(p.getContents());
+        events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
+            rename.getTimestamp()));
+        break;
+      case EVENT_APPEND:
+        InotifyProtos.AppendEventProto reopen =
+            InotifyProtos.AppendEventProto.parseFrom(p.getContents());
+        events.add(new Event.AppendEvent(reopen.getPath()));
+        break;
+      case EVENT_UNLINK:
+        InotifyProtos.UnlinkEventProto unlink =
+            InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
+        events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
+        break;
+      default:
+        throw new RuntimeException("Unexpected inotify event type: " +
+            p.getType());
+      }
+    }
+    return new EventsList(events, resp.getEventsList().getFirstTxid(),
+        resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
+  }
+
+  public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
+    InotifyProtos.EventsListProto.Builder builder =
+        InotifyProtos.EventsListProto.newBuilder();
+    for (Event e : el.getEvents()) {
+      switch(e.getEventType()) {
+      case CLOSE:
+        Event.CloseEvent ce = (Event.CloseEvent) e;
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_CLOSE)
+            .setContents(
+                InotifyProtos.CloseEventProto.newBuilder()
+                    .setPath(ce.getPath())
+                    .setFileSize(ce.getFileSize())
+                    .setTimestamp(ce.getTimestamp()).build().toByteString()
+            ).build());
+        break;
+      case CREATE:
+        Event.CreateEvent ce2 = (Event.CreateEvent) e;
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_CREATE)
+            .setContents(
+                InotifyProtos.CreateEventProto.newBuilder()
+                    .setType(createTypeConvert(ce2.getiNodeType()))
+                    .setPath(ce2.getPath())
+                    .setCtime(ce2.getCtime())
+                    .setOwnerName(ce2.getOwnerName())
+                    .setGroupName(ce2.getGroupName())
+                    .setPerms(convert(ce2.getPerms()))
+                    .setReplication(ce2.getReplication())
+                    .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+                        "" : ce2.getSymlinkTarget()).build().toByteString()
+            ).build());
+        break;
+      case METADATA:
+        Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
+        InotifyProtos.MetadataUpdateEventProto.Builder metaB =
+            InotifyProtos.MetadataUpdateEventProto.newBuilder()
+                .setPath(me.getPath())
+                .setType(metadataUpdateTypeConvert(me.getMetadataType()))
+                .setMtime(me.getMtime())
+                .setAtime(me.getAtime())
+                .setReplication(me.getReplication())
+                .setOwnerName(me.getOwnerName() == null ? "" :
+                    me.getOwnerName())
+                .setGroupName(me.getGroupName() == null ? "" :
+                    me.getGroupName())
+                .addAllAcls(me.getAcls() == null ?
+                    Lists.<AclEntryProto>newArrayList() :
+                    convertAclEntryProto(me.getAcls()))
+                .addAllXAttrs(me.getxAttrs() == null ?
+                    Lists.<XAttrProto>newArrayList() :
+                    convertXAttrProto(me.getxAttrs()))
+                .setXAttrsRemoved(me.isxAttrsRemoved());
+        if (me.getPerms() != null) {
+          metaB.setPerms(convert(me.getPerms()));
+        }
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_METADATA)
+            .setContents(metaB.build().toByteString())
+            .build());
+        break;
+      case RENAME:
+        Event.RenameEvent re = (Event.RenameEvent) e;
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_RENAME)
+            .setContents(
+                InotifyProtos.RenameEventProto.newBuilder()
+                    .setSrcPath(re.getSrcPath())
+                    .setDestPath(re.getDstPath())
+                    .setTimestamp(re.getTimestamp()).build().toByteString()
+            ).build());
+        break;
+      case APPEND:
+        Event.AppendEvent re2 = (Event.AppendEvent) e;
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_APPEND)
+            .setContents(
+                InotifyProtos.AppendEventProto.newBuilder()
+                    .setPath(re2.getPath()).build().toByteString()
+            ).build());
+        break;
+      case UNLINK:
+        Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
+        builder.addEvents(InotifyProtos.EventProto.newBuilder()
+            .setType(InotifyProtos.EventType.EVENT_UNLINK)
+            .setContents(
+                InotifyProtos.UnlinkEventProto.newBuilder()
+                    .setPath(ue.getPath())
+                    .setTimestamp(ue.getTimestamp()).build().toByteString()
+            ).build());
+        break;
+      default:
+        throw new RuntimeException("Unexpected inotify event: " + e);
+      }
+    }
+    builder.setFirstTxid(el.getFirstTxid());
+    builder.setLastTxid(el.getLastTxid());
+    builder.setSyncTxid(el.getSyncTxid());
+    return GetEditsFromTxidResponseProto.newBuilder().setEventsList(
+        builder.build()).build();
+  }
+
   public static HdfsProtos.CipherSuite convert(CipherSuite suite) {
   public static HdfsProtos.CipherSuite convert(CipherSuite suite) {
     switch (suite) {
     switch (suite) {
     case UNKNOWN:
     case UNKNOWN:

+ 54 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -79,7 +79,17 @@ public class IPCLoggerChannel implements AsyncLogger {
   protected final InetSocketAddress addr;
   protected final InetSocketAddress addr;
   private QJournalProtocol proxy;
   private QJournalProtocol proxy;
 
 
-  private final ListeningExecutorService executor;
+  /**
+   * Executes tasks submitted to it serially, on a single thread, in FIFO order
+   * (generally used for write tasks that should not be reordered).
+   */
+  private final ListeningExecutorService singleThreadExecutor;
+  /**
+   * Executes tasks submitted to it in parallel with each other and with those
+   * submitted to singleThreadExecutor (generally used for read tasks that can
+   * be safely reordered and interleaved with writes).
+   */
+  private final ListeningExecutorService parallelExecutor;
   private long ipcSerial = 0;
   private long ipcSerial = 0;
   private long epoch = -1;
   private long epoch = -1;
   private long committedTxId = HdfsConstants.INVALID_TXID;
   private long committedTxId = HdfsConstants.INVALID_TXID;
@@ -160,8 +170,10 @@ public class IPCLoggerChannel implements AsyncLogger {
         DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
         DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
         DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
         DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
     
     
-    executor = MoreExecutors.listeningDecorator(
-        createExecutor());
+    singleThreadExecutor = MoreExecutors.listeningDecorator(
+        createSingleThreadExecutor());
+    parallelExecutor = MoreExecutors.listeningDecorator(
+        createParallelExecutor());
     
     
     metrics = IPCLoggerChannelMetrics.create(this);
     metrics = IPCLoggerChannelMetrics.create(this);
   }
   }
@@ -183,7 +195,8 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public void close() {
   public void close() {
     // No more tasks may be submitted after this point.
     // No more tasks may be submitted after this point.
-    executor.shutdown();
+    singleThreadExecutor.shutdown();
+    parallelExecutor.shutdown();
     if (proxy != null) {
     if (proxy != null) {
       // TODO: this can hang for quite some time if the client
       // TODO: this can hang for quite some time if the client
       // is currently in the middle of a call to a downed JN.
       // is currently in the middle of a call to a downed JN.
@@ -230,15 +243,30 @@ public class IPCLoggerChannel implements AsyncLogger {
    * Separated out for easy overriding in tests.
    * Separated out for easy overriding in tests.
    */
    */
   @VisibleForTesting
   @VisibleForTesting
-  protected ExecutorService createExecutor() {
+  protected ExecutorService createSingleThreadExecutor() {
     return Executors.newSingleThreadExecutor(
     return Executors.newSingleThreadExecutor(
         new ThreadFactoryBuilder()
         new ThreadFactoryBuilder()
           .setDaemon(true)
           .setDaemon(true)
-          .setNameFormat("Logger channel to " + addr)
+          .setNameFormat("Logger channel (from single-thread executor) to " +
+              addr)
           .setUncaughtExceptionHandler(
           .setUncaughtExceptionHandler(
               UncaughtExceptionHandlers.systemExit())
               UncaughtExceptionHandlers.systemExit())
           .build());
           .build());
   }
   }
+
+  /**
+   * Separated out for easy overriding in tests.
+   */
+  @VisibleForTesting
+  protected ExecutorService createParallelExecutor() {
+    return Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Logger channel (from parallel executor) to " + addr)
+            .setUncaughtExceptionHandler(
+                UncaughtExceptionHandlers.systemExit())
+            .build());
+  }
   
   
   @Override
   @Override
   public URL buildURLToFetchLogs(long segmentTxId) {
   public URL buildURLToFetchLogs(long segmentTxId) {
@@ -286,7 +314,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @VisibleForTesting
   @VisibleForTesting
   void waitForAllPendingCalls() throws InterruptedException {
   void waitForAllPendingCalls() throws InterruptedException {
     try {
     try {
-      executor.submit(new Runnable() {
+      singleThreadExecutor.submit(new Runnable() {
         @Override
         @Override
         public void run() {
         public void run() {
         }
         }
@@ -299,7 +327,7 @@ public class IPCLoggerChannel implements AsyncLogger {
 
 
   @Override
   @Override
   public ListenableFuture<Boolean> isFormatted() {
   public ListenableFuture<Boolean> isFormatted() {
-    return executor.submit(new Callable<Boolean>() {
+    return singleThreadExecutor.submit(new Callable<Boolean>() {
       @Override
       @Override
       public Boolean call() throws IOException {
       public Boolean call() throws IOException {
         return getProxy().isFormatted(journalId);
         return getProxy().isFormatted(journalId);
@@ -309,7 +337,7 @@ public class IPCLoggerChannel implements AsyncLogger {
 
 
   @Override
   @Override
   public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
   public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
-    return executor.submit(new Callable<GetJournalStateResponseProto>() {
+    return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() {
       @Override
       @Override
       public GetJournalStateResponseProto call() throws IOException {
       public GetJournalStateResponseProto call() throws IOException {
         GetJournalStateResponseProto ret =
         GetJournalStateResponseProto ret =
@@ -323,7 +351,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<NewEpochResponseProto> newEpoch(
   public ListenableFuture<NewEpochResponseProto> newEpoch(
       final long epoch) {
       final long epoch) {
-    return executor.submit(new Callable<NewEpochResponseProto>() {
+    return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
       @Override
       @Override
       public NewEpochResponseProto call() throws IOException {
       public NewEpochResponseProto call() throws IOException {
         return getProxy().newEpoch(journalId, nsInfo, epoch);
         return getProxy().newEpoch(journalId, nsInfo, epoch);
@@ -347,7 +375,7 @@ public class IPCLoggerChannel implements AsyncLogger {
     
     
     ListenableFuture<Void> ret = null;
     ListenableFuture<Void> ret = null;
     try {
     try {
-      ret = executor.submit(new Callable<Void>() {
+      ret = singleThreadExecutor.submit(new Callable<Void>() {
         @Override
         @Override
         public Void call() throws IOException {
         public Void call() throws IOException {
           throwIfOutOfSync();
           throwIfOutOfSync();
@@ -464,7 +492,7 @@ public class IPCLoggerChannel implements AsyncLogger {
 
 
   @Override
   @Override
   public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
   public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws Exception {
       public Void call() throws Exception {
         getProxy().format(journalId, nsInfo);
         getProxy().format(journalId, nsInfo);
@@ -476,7 +504,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<Void> startLogSegment(final long txid,
   public ListenableFuture<Void> startLogSegment(final long txid,
       final int layoutVersion) {
       final int layoutVersion) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
         getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
@@ -497,7 +525,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<Void> finalizeLogSegment(
   public ListenableFuture<Void> finalizeLogSegment(
       final long startTxId, final long endTxId) {
       final long startTxId, final long endTxId) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         throwIfOutOfSync();
         throwIfOutOfSync();
@@ -510,7 +538,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   
   
   @Override
   @Override
   public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
   public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws Exception {
       public Void call() throws Exception {
         getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
         getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
@@ -522,7 +550,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId, final boolean inProgressOk) {
       final long fromTxnId, final boolean inProgressOk) {
-    return executor.submit(new Callable<RemoteEditLogManifest>() {
+    return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
       @Override
       @Override
       public RemoteEditLogManifest call() throws IOException {
       public RemoteEditLogManifest call() throws IOException {
         GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
         GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
@@ -538,7 +566,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
   public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
       final long segmentTxId) {
       final long segmentTxId) {
-    return executor.submit(new Callable<PrepareRecoveryResponseProto>() {
+    return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() {
       @Override
       @Override
       public PrepareRecoveryResponseProto call() throws IOException {
       public PrepareRecoveryResponseProto call() throws IOException {
         if (!hasHttpServerEndPoint()) {
         if (!hasHttpServerEndPoint()) {
@@ -556,7 +584,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<Void> acceptRecovery(
   public ListenableFuture<Void> acceptRecovery(
       final SegmentStateProto log, final URL url) {
       final SegmentStateProto log, final URL url) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().acceptRecovery(createReqInfo(), log, url);
         getProxy().acceptRecovery(createReqInfo(), log, url);
@@ -567,7 +595,7 @@ public class IPCLoggerChannel implements AsyncLogger {
 
 
   @Override
   @Override
   public ListenableFuture<Void> discardSegments(final long startTxId) {
   public ListenableFuture<Void> discardSegments(final long startTxId) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().discardSegments(journalId, startTxId);
         getProxy().discardSegments(journalId, startTxId);
@@ -578,7 +606,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   
   
   @Override
   @Override
   public ListenableFuture<Void> doPreUpgrade() {
   public ListenableFuture<Void> doPreUpgrade() {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().doPreUpgrade(journalId);
         getProxy().doPreUpgrade(journalId);
@@ -589,7 +617,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   
   
   @Override
   @Override
   public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
   public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().doUpgrade(journalId, sInfo);
         getProxy().doUpgrade(journalId, sInfo);
@@ -600,7 +628,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   
   
   @Override
   @Override
   public ListenableFuture<Void> doFinalize() {
   public ListenableFuture<Void> doFinalize() {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().doFinalize(journalId);
         getProxy().doFinalize(journalId);
@@ -612,7 +640,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @Override
   @Override
   public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
   public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
       final StorageInfo prevStorage, final int targetLayoutVersion) {
       final StorageInfo prevStorage, final int targetLayoutVersion) {
-    return executor.submit(new Callable<Boolean>() {
+    return singleThreadExecutor.submit(new Callable<Boolean>() {
       @Override
       @Override
       public Boolean call() throws IOException {
       public Boolean call() throws IOException {
         return getProxy().canRollBack(journalId, storage, prevStorage,
         return getProxy().canRollBack(journalId, storage, prevStorage,
@@ -623,7 +651,7 @@ public class IPCLoggerChannel implements AsyncLogger {
 
 
   @Override
   @Override
   public ListenableFuture<Void> doRollback() {
   public ListenableFuture<Void> doRollback() {
-    return executor.submit(new Callable<Void>() {
+    return singleThreadExecutor.submit(new Callable<Void>() {
       @Override
       @Override
       public Void call() throws IOException {
       public Void call() throws IOException {
         getProxy().doRollback(journalId);
         getProxy().doRollback(journalId);
@@ -631,10 +659,10 @@ public class IPCLoggerChannel implements AsyncLogger {
       }
       }
     });
     });
   }
   }
-  
+
   @Override
   @Override
   public ListenableFuture<Long> getJournalCTime() {
   public ListenableFuture<Long> getJournalCTime() {
-    return executor.submit(new Callable<Long>() {
+    return singleThreadExecutor.submit(new Callable<Long>() {
       @Override
       @Override
       public Long call() throws IOException {
       public Long call() throws IOException {
         return getProxy().getJournalCTime(journalId);
         return getProxy().getJournalCTime(journalId);

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -651,7 +651,8 @@ public class Journal implements Closeable {
         }
         }
       }
       }
       if (log != null && log.isInProgress()) {
       if (log != null && log.isInProgress()) {
-        logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId()));
+        logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(),
+            true));
       }
       }
     }
     }
     
     

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -147,4 +147,9 @@ class EditLogBackupInputStream extends EditLogInputStream {
   public void setMaxOpSize(int maxOpSize) {
   public void setMaxOpSize(int maxOpSize) {
     reader.setMaxOpSize(maxOpSize);
     reader.setMaxOpSize(maxOpSize);
   }
   }
+
+  @Override
+  public boolean isLocalLog() {
+    return true;
+  }
 }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -507,4 +507,9 @@ public class EditLogFileInputStream extends EditLogInputStream {
       reader.setMaxOpSize(maxOpSize);
       reader.setMaxOpSize(maxOpSize);
     }
     }
   }
   }
+
+  @Override
+  public boolean isLocalLog() {
+    return log instanceof FileLog;
+  }
 }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java

@@ -203,4 +203,10 @@ public abstract class EditLogInputStream implements Closeable {
    * Set the maximum opcode size in bytes.
    * Set the maximum opcode size in bytes.
    */
    */
   public abstract void setMaxOpSize(int maxOpSize);
   public abstract void setMaxOpSize(int maxOpSize);
+
+  /**
+   * Returns true if we are currently reading the log from a local disk or an
+   * even faster data source (e.g. a byte buffer).
+   */
+  public abstract boolean isLocalLog();
 }
 }

+ 45 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -188,6 +188,13 @@ public class FSEditLog implements LogsPurgeable {
    */
    */
   private final List<URI> sharedEditsDirs;
   private final List<URI> sharedEditsDirs;
 
 
+  /**
+   * Take this lock when adding journals to or closing the JournalSet. Allows
+   * us to ensure that the JournalSet isn't closed or updated underneath us
+   * in selectInputStreams().
+   */
+  private final Object journalSetLock = new Object();
+
   private static class TransactionId {
   private static class TransactionId {
     public long txid;
     public long txid;
 
 
@@ -252,20 +259,22 @@ public class FSEditLog implements LogsPurgeable {
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
 
 
-    journalSet = new JournalSet(minimumRedundantJournals);
-
-    for (URI u : dirs) {
-      boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
-          .contains(u);
-      if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
-        StorageDirectory sd = storage.getStorageDirectory(u);
-        if (sd != null) {
-          journalSet.add(new FileJournalManager(conf, sd, storage),
-              required, sharedEditsDirs.contains(u));
+    synchronized(journalSetLock) {
+      journalSet = new JournalSet(minimumRedundantJournals);
+
+      for (URI u : dirs) {
+        boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
+            .contains(u);
+        if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
+          StorageDirectory sd = storage.getStorageDirectory(u);
+          if (sd != null) {
+            journalSet.add(new FileJournalManager(conf, sd, storage),
+                required, sharedEditsDirs.contains(u));
+          }
+        } else {
+          journalSet.add(createJournal(u), required,
+              sharedEditsDirs.contains(u));
         }
         }
-      } else {
-        journalSet.add(createJournal(u), required,
-            sharedEditsDirs.contains(u));
       }
       }
     }
     }
  
  
@@ -349,7 +358,9 @@ public class FSEditLog implements LogsPurgeable {
     } finally {
     } finally {
       if (journalSet != null && !journalSet.isEmpty()) {
       if (journalSet != null && !journalSet.isEmpty()) {
         try {
         try {
-          journalSet.close();
+          synchronized(journalSetLock) {
+            journalSet.close();
+          }
         } catch (IOException ioe) {
         } catch (IOException ioe) {
           LOG.warn("Error closing journalSet", ioe);
           LOG.warn("Error closing journalSet", ioe);
         }
         }
@@ -606,7 +617,9 @@ public class FSEditLog implements LogsPurgeable {
                 "due to " + e.getMessage() + ". " +
                 "due to " + e.getMessage() + ". " +
                 "Unsynced transactions: " + (txid - synctxid);
                 "Unsynced transactions: " + (txid - synctxid);
             LOG.fatal(msg, new Exception());
             LOG.fatal(msg, new Exception());
-            IOUtils.cleanup(LOG, journalSet);
+            synchronized(journalSetLock) {
+              IOUtils.cleanup(LOG, journalSet);
+            }
             terminate(1, msg);
             terminate(1, msg);
           }
           }
         } finally {
         } finally {
@@ -630,7 +643,9 @@ public class FSEditLog implements LogsPurgeable {
               "Could not sync enough journals to persistent storage. "
               "Could not sync enough journals to persistent storage. "
               + "Unsynced transactions: " + (txid - synctxid);
               + "Unsynced transactions: " + (txid - synctxid);
           LOG.fatal(msg, new Exception());
           LOG.fatal(msg, new Exception());
-          IOUtils.cleanup(LOG, journalSet);
+          synchronized(journalSetLock) {
+            IOUtils.cleanup(LOG, journalSet);
+          }
           terminate(1, msg);
           terminate(1, msg);
         }
         }
       }
       }
@@ -1268,9 +1283,8 @@ public class FSEditLog implements LogsPurgeable {
 
 
   /**
   /**
    * Return the txid of the last synced transaction.
    * Return the txid of the last synced transaction.
-   * For test use only
    */
    */
-  synchronized long getSyncTxId() {
+  public synchronized long getSyncTxId() {
     return synctxid;
     return synctxid;
   }
   }
 
 
@@ -1307,7 +1321,9 @@ public class FSEditLog implements LogsPurgeable {
     
     
     LOG.info("Registering new backup node: " + bnReg);
     LOG.info("Registering new backup node: " + bnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
-    journalSet.add(bjm, true);
+    synchronized(journalSetLock) {
+      journalSet.add(bjm, false);
+    }
   }
   }
   
   
   synchronized void releaseBackupStream(NamenodeRegistration registration)
   synchronized void releaseBackupStream(NamenodeRegistration registration)
@@ -1315,7 +1331,9 @@ public class FSEditLog implements LogsPurgeable {
     BackupJournalManager bjm = this.findBackupJournal(registration);
     BackupJournalManager bjm = this.findBackupJournal(registration);
     if (bjm != null) {
     if (bjm != null) {
       LOG.info("Removing backup journal " + bjm);
       LOG.info("Removing backup journal " + bjm);
-      journalSet.remove(bjm);
+      synchronized(journalSetLock) {
+        journalSet.remove(bjm);
+      }
     }
     }
   }
   }
   
   
@@ -1443,11 +1461,16 @@ public class FSEditLog implements LogsPurgeable {
    * @param recovery recovery context
    * @param recovery recovery context
    * @param inProgressOk set to true if in-progress streams are OK
    * @param inProgressOk set to true if in-progress streams are OK
    */
    */
-  public synchronized Collection<EditLogInputStream> selectInputStreams(
+  public Collection<EditLogInputStream> selectInputStreams(
       long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
       long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
       boolean inProgressOk) throws IOException {
       boolean inProgressOk) throws IOException {
+
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    selectInputStreams(streams, fromTxId, inProgressOk);
+    synchronized(journalSetLock) {
+      Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
+          "selectInputStreams() on closed FSEditLog");
+      selectInputStreams(streams, fromTxId, inProgressOk);
+    }
 
 
     try {
     try {
       checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
       checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

+ 13 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -186,17 +186,27 @@ public class FileJournalManager implements JournalManager {
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
         allLogFiles.size());
         allLogFiles.size());
-
     for (EditLogFile elf : allLogFiles) {
     for (EditLogFile elf : allLogFiles) {
       if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) {
       if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) {
         continue;
         continue;
       }
       }
+      if (elf.isInProgress()) {
+        try {
+          elf.validateLog();
+        } catch (IOException e) {
+          LOG.error("got IOException while trying to validate header of " +
+              elf + ".  Skipping.", e);
+          continue;
+        }
+      }
       if (elf.getFirstTxId() >= firstTxId) {
       if (elf.getFirstTxId() >= firstTxId) {
-        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
+        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
+            elf.isInProgress()));
       } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
       } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
         // If the firstTxId is in the middle of an edit log segment. Return this
         // If the firstTxId is in the middle of an edit log segment. Return this
         // anyway and let the caller figure out whether it wants to use it.
         // anyway and let the caller figure out whether it wants to use it.
-        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
+        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
+            elf.isInProgress()));
       }
       }
     }
     }
     
     

+ 146 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+import java.util.List;
+
+/**
+ * Translates from edit log ops to inotify events.
+ */
+@InterfaceAudience.Private
+public class InotifyFSEditLogOpTranslator {
+
+  private static long getSize(FSEditLogOp.AddCloseOp acOp) {
+    long size = 0;
+    for (Block b : acOp.getBlocks()) {
+      size += b.getNumBytes();
+    }
+    return size;
+  }
+
+  public static Event[] translate(FSEditLogOp op) {
+    switch(op.opCode) {
+    case OP_ADD:
+      FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
+      if (addOp.blocks.length == 0) { // create
+        return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
+            .ctime(addOp.atime)
+            .replication(addOp.replication)
+            .ownerName(addOp.permissions.getUserName())
+            .groupName(addOp.permissions.getGroupName())
+            .perms(addOp.permissions.getPermission())
+            .iNodeType(Event.CreateEvent.INodeType.FILE).build() };
+      } else {
+        return new Event[] { new Event.AppendEvent(addOp.path) };
+      }
+    case OP_CLOSE:
+      FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
+      return new Event[] {
+          new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
+    case OP_SET_REPLICATION:
+      FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
+          .path(setRepOp.path)
+          .replication(setRepOp.replication).build() };
+    case OP_CONCAT_DELETE:
+      FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
+      List<Event> events = Lists.newArrayList();
+      events.add(new Event.AppendEvent(cdOp.trg));
+      for (String src : cdOp.srcs) {
+        events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
+      }
+      events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
+      return events.toArray(new Event[0]);
+    case OP_RENAME_OLD:
+      FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
+      return new Event[] {
+          new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
+    case OP_RENAME:
+      FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
+      return new Event[] {
+          new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
+    case OP_DELETE:
+      FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
+      return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
+    case OP_MKDIR:
+      FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
+      return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
+          .ctime(mkOp.timestamp)
+          .ownerName(mkOp.permissions.getUserName())
+          .groupName(mkOp.permissions.getGroupName())
+          .perms(mkOp.permissions.getPermission())
+          .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
+    case OP_SET_PERMISSIONS:
+      FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
+          .path(permOp.src)
+          .perms(permOp.permissions).build() };
+    case OP_SET_OWNER:
+      FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
+          .path(ownOp.src)
+          .ownerName(ownOp.username).groupName(ownOp.groupname).build() };
+    case OP_TIMES:
+      FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
+          .path(timesOp.path)
+          .atime(timesOp.atime).mtime(timesOp.mtime).build() };
+    case OP_SYMLINK:
+      FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
+      return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
+          .ctime(symOp.atime)
+          .ownerName(symOp.permissionStatus.getUserName())
+          .groupName(symOp.permissionStatus.getGroupName())
+          .perms(symOp.permissionStatus.getPermission())
+          .symlinkTarget(symOp.value)
+          .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
+    case OP_REMOVE_XATTR:
+      FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
+          .path(rxOp.src)
+          .xAttrs(rxOp.xAttrs)
+          .xAttrsRemoved(true).build() };
+    case OP_SET_XATTR:
+      FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
+          .path(sxOp.src)
+          .xAttrs(sxOp.xAttrs)
+          .xAttrsRemoved(false).build() };
+    case OP_SET_ACL:
+      FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
+          .path(saOp.src)
+          .acls(saOp.aclEntries).build() };
+    default:
+      return null;
+    }
+  }
+}

+ 36 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java

@@ -57,6 +57,17 @@ import com.google.common.collect.Sets;
 public class JournalSet implements JournalManager {
 public class JournalSet implements JournalManager {
 
 
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
+  private static final Comparator<EditLogInputStream>
+    LOCAL_LOG_PREFERENCE_COMPARATOR = new Comparator<EditLogInputStream>() {
+    @Override
+    public int compare(EditLogInputStream elis1, EditLogInputStream elis2) {
+      // we want local logs to be ordered earlier in the collection, and true
+      // is considered larger than false, so we want to invert the booleans here
+      return ComparisonChain.start().compare(!elis1.isLocalLog(),
+          !elis2.isLocalLog()).result();
+    }
+  };
   
   
   static final public Comparator<EditLogInputStream>
   static final public Comparator<EditLogInputStream>
     EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
     EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@@ -181,6 +192,8 @@ public class JournalSet implements JournalManager {
   private final List<JournalAndStream> journals =
   private final List<JournalAndStream> journals =
       new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
       new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
   final int minimumRedundantJournals;
   final int minimumRedundantJournals;
+
+  private boolean closed;
   
   
   JournalSet(int minimumRedundantResources) {
   JournalSet(int minimumRedundantResources) {
     this.minimumRedundantJournals = minimumRedundantResources;
     this.minimumRedundantJournals = minimumRedundantResources;
@@ -234,6 +247,11 @@ public class JournalSet implements JournalManager {
         jas.close();
         jas.close();
       }
       }
     }, "close journal");
     }, "close journal");
+    closed = true;
+  }
+
+  public boolean isOpen() {
+    return !closed;
   }
   }
 
 
   /**
   /**
@@ -282,10 +300,25 @@ public class JournalSet implements JournalManager {
       if (acc.isEmpty()) {
       if (acc.isEmpty()) {
         acc.add(elis);
         acc.add(elis);
       } else {
       } else {
-        long accFirstTxId = acc.get(0).getFirstTxId();
+        EditLogInputStream accFirst = acc.get(0);
+        long accFirstTxId = accFirst.getFirstTxId();
         if (accFirstTxId == elis.getFirstTxId()) {
         if (accFirstTxId == elis.getFirstTxId()) {
-          acc.add(elis);
+          // if we have a finalized log segment available at this txid,
+          // we should throw out all in-progress segments at this txid
+          if (elis.isInProgress()) {
+            if (accFirst.isInProgress()) {
+              acc.add(elis);
+            }
+          } else {
+            if (accFirst.isInProgress()) {
+              acc.clear();
+            }
+            acc.add(elis);
+          }
         } else if (accFirstTxId < elis.getFirstTxId()) {
         } else if (accFirstTxId < elis.getFirstTxId()) {
+          // try to read from the local logs first since the throughput should
+          // be higher
+          Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
           outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.clear();
           acc.add(elis);
           acc.add(elis);
@@ -297,6 +330,7 @@ public class JournalSet implements JournalManager {
       }
       }
     }
     }
     if (!acc.isEmpty()) {
     if (!acc.isEmpty()) {
+      Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
       outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
       acc.clear();
     }
     }

+ 114 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
 
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventsList;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -1466,5 +1469,116 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void checkAccess(String path, FsAction mode) throws IOException {
   public void checkAccess(String path, FsAction mode) throws IOException {
     namesystem.checkAccess(path, mode);
     namesystem.checkAccess(path, mode);
   }
   }
+
+  @Override // ClientProtocol
+  public long getCurrentEditLogTxid() throws IOException {
+    namesystem.checkOperation(OperationCategory.READ); // only active
+    namesystem.checkSuperuserPrivilege();
+    // if it's not yet open for write, we may be in the process of transitioning
+    // from standby to active and may not yet know what the latest committed
+    // txid is
+    return namesystem.getEditLog().isOpenForWrite() ?
+        namesystem.getEditLog().getLastWrittenTxId() : -1;
+  }
+
+  private static FSEditLogOp readOp(EditLogInputStream elis)
+      throws IOException {
+    try {
+      return elis.readOp();
+      // we can get the below two exceptions if a segment is deleted
+      // (because we have accumulated too many edits) or (for the local journal/
+      // no-QJM case only) if a in-progress segment is finalized under us ...
+      // no need to throw an exception back to the client in this case
+    } catch (FileNotFoundException e) {
+      LOG.debug("Tried to read from deleted or moved edit log segment", e);
+      return null;
+    } catch (TransferFsImage.HttpGetFailedException e) {
+      LOG.debug("Tried to read from deleted edit log segment", e);
+      return null;
+    }
+  }
+
+  @Override // ClientProtocol
+  public EventsList getEditsFromTxid(long txid) throws IOException {
+    namesystem.checkOperation(OperationCategory.READ); // only active
+    namesystem.checkSuperuserPrivilege();
+    int maxEventsPerRPC = nn.conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
+        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
+    FSEditLog log = namesystem.getFSImage().getEditLog();
+    long syncTxid = log.getSyncTxId();
+    // If we haven't synced anything yet, we can only read finalized
+    // segments since we can't reliably determine which txns in in-progress
+    // segments have actually been committed (e.g. written to a quorum of JNs).
+    // If we have synced txns, we can definitely read up to syncTxid since
+    // syncTxid is only updated after a transaction is committed to all
+    // journals. (In-progress segments written by old writers are already
+    // discarded for us, so if we read any in-progress segments they are
+    // guaranteed to have been written by this NameNode.)
+    boolean readInProgress = syncTxid > 0;
+
+    List<Event> events = Lists.newArrayList();
+    long maxSeenTxid = -1;
+    long firstSeenTxid = -1;
+
+    if (syncTxid > 0 && txid > syncTxid) {
+      // we can't read past syncTxid, so there's no point in going any further
+      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+    }
+
+    Collection<EditLogInputStream> streams = null;
+    try {
+      streams = log.selectInputStreams(txid, 0, null, readInProgress);
+    } catch (IllegalStateException e) { // can happen if we have
+      // transitioned out of active and haven't yet transitioned to standby
+      // and are using QJM -- the edit log will be closed and this exception
+      // will result
+      LOG.info("NN is transitioning from active to standby and FSEditLog " +
+      "is closed -- could not read edits");
+      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+    }
+
+    boolean breakOuter = false;
+    for (EditLogInputStream elis : streams) {
+      // our assumption in this code is the EditLogInputStreams are ordered by
+      // starting txid
+      try {
+        FSEditLogOp op = null;
+        while ((op = readOp(elis)) != null) {
+          // break out of here in the unlikely event that syncTxid is so
+          // out of date that its segment has already been deleted, so the first
+          // txid we get is greater than syncTxid
+          if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
+            breakOuter = true;
+            break;
+          }
+
+          Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
+          if (eventsFromOp != null) {
+            events.addAll(Arrays.asList(eventsFromOp));
+          }
+          if (op.getTransactionId() > maxSeenTxid) {
+            maxSeenTxid = op.getTransactionId();
+          }
+          if (firstSeenTxid == -1) {
+            firstSeenTxid = op.getTransactionId();
+          }
+          if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
+              op.getTransactionId() == syncTxid)) {
+            // we're done
+            breakOuter = true;
+            break;
+          }
+        }
+      } finally {
+        elis.close();
+      }
+      if (breakOuter) {
+        break;
+      }
+    }
+
+    return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+  }
 }
 }
 
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java

@@ -279,4 +279,9 @@ class RedundantEditLogInputStream extends EditLogInputStream {
       elis.setMaxOpSize(maxOpSize);
       elis.setMaxOpSize(maxOpSize);
     }
     }
   }
   }
+
+  @Override
+  public boolean isLocalLog() {
+    return streams[curIdx].isLocalLog();
+  }
 }
 }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

@@ -63,7 +63,7 @@ import org.apache.http.client.utils.URIBuilder;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
-
+import org.mortbay.jetty.EofException;
 
 
 /**
 /**
  * This class provides fetching a specified file from the NameNode.
  * This class provides fetching a specified file from the NameNode.
@@ -370,6 +370,9 @@ public class TransferFsImage {
           throttler.throttle(num, canceler);
           throttler.throttle(num, canceler);
         }
         }
       }
       }
+    } catch (EofException e) {
+      LOG.info("Connection closed by client");
+      out = null; // so we don't close in the finally
     } finally {
     } finally {
       if (out != null) {
       if (out != null) {
         out.close();
         out.close();

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -33,6 +33,7 @@ import "hdfs.proto";
 import "acl.proto";
 import "acl.proto";
 import "xattr.proto";
 import "xattr.proto";
 import "encryption.proto";
 import "encryption.proto";
+import "inotify.proto";
 
 
 /**
 /**
  * The ClientNamenodeProtocol Service defines the interface between a client 
  * The ClientNamenodeProtocol Service defines the interface between a client 
@@ -664,6 +665,21 @@ message CheckAccessRequestProto {
 message CheckAccessResponseProto { // void response
 message CheckAccessResponseProto { // void response
 }
 }
 
 
+message GetCurrentEditLogTxidRequestProto {
+}
+
+message GetCurrentEditLogTxidResponseProto {
+  required int64 txid = 1;
+}
+
+message GetEditsFromTxidRequestProto {
+  required int64 txid = 1;
+}
+
+message GetEditsFromTxidResponseProto {
+  required EventsListProto eventsList = 1;
+}
+
 service ClientNamenodeProtocol {
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
       returns(GetBlockLocationsResponseProto);
@@ -801,4 +817,8 @@ service ClientNamenodeProtocol {
       returns(ListEncryptionZonesResponseProto);
       returns(ListEncryptionZonesResponseProto);
   rpc getEZForPath(GetEZForPathRequestProto)
   rpc getEZForPath(GetEZForPathRequestProto)
       returns(GetEZForPathResponseProto);
       returns(GetEZForPathResponseProto);
+  rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto)
+      returns(GetCurrentEditLogTxidResponseProto);
+  rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
+      returns(GetEditsFromTxidResponseProto);
 }
 }

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+// This file contains protocol buffers used to communicate edits to clients
+// as part of the inotify system.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "InotifyProtos";
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "acl.proto";
+import "xattr.proto";
+import "hdfs.proto";
+
+enum EventType {
+  EVENT_CREATE = 0x0;
+  EVENT_CLOSE = 0x1;
+  EVENT_APPEND = 0x2;
+  EVENT_RENAME = 0x3;
+  EVENT_METADATA = 0x4;
+  EVENT_UNLINK = 0x5;
+}
+
+message EventProto {
+  required EventType type = 1;
+  required bytes contents = 2;
+}
+
+enum INodeType {
+  I_TYPE_FILE = 0x0;
+  I_TYPE_DIRECTORY = 0x1;
+  I_TYPE_SYMLINK = 0x2;
+}
+
+enum MetadataUpdateType {
+  META_TYPE_TIMES = 0x0;
+  META_TYPE_REPLICATION = 0x1;
+  META_TYPE_OWNER = 0x2;
+  META_TYPE_PERMS = 0x3;
+  META_TYPE_ACLS = 0x4;
+  META_TYPE_XATTRS = 0x5;
+}
+
+message CreateEventProto {
+  required INodeType type = 1;
+  required string path = 2;
+  required int64 ctime = 3;
+  required string ownerName = 4;
+  required string groupName = 5;
+  required FsPermissionProto perms = 6;
+  optional int32 replication = 7;
+  optional string symlinkTarget = 8;
+}
+
+message CloseEventProto {
+  required string path = 1;
+  required int64 fileSize = 2;
+  required int64 timestamp = 3;
+}
+
+message AppendEventProto {
+  required string path = 1;
+}
+
+message RenameEventProto {
+  required string srcPath = 1;
+  required string destPath = 2;
+  required int64 timestamp = 3;
+}
+
+message MetadataUpdateEventProto {
+  required string path = 1;
+  required MetadataUpdateType type = 2;
+  optional int64 mtime = 3;
+  optional int64 atime = 4;
+  optional int32 replication = 5;
+  optional string ownerName = 6;
+  optional string groupName = 7;
+  optional FsPermissionProto perms = 8;
+  repeated AclEntryProto acls = 9;
+  repeated XAttrProto xAttrs = 10;
+  optional bool xAttrsRemoved = 11;
+}
+
+message UnlinkEventProto {
+  required string path = 1;
+  required int64 timestamp = 2;
+}
+
+message EventsListProto {
+  repeated EventProto events = 1;
+  required int64 firstTxid = 2;
+  required int64 lastTxid = 3;
+  required int64 syncTxid = 4;
+}

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2058,4 +2058,14 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.namenode.inotify.max.events.per.rpc</name>
+  <value>1000</value>
+  <description>Maximum number of events that will be sent to an inotify client
+    in a single RPC response. The default value attempts to amortize away
+    the overhead for this RPC while avoiding huge memory requirements for the
+    client and NameNode (1000 events should consume no more than 1 MB.)
+  </description>
+</property>
+
 </configuration>
 </configuration>

+ 430 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java

@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestDFSInotifyEventInputStream {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final Log LOG = LogFactory.getLog(
+      TestDFSInotifyEventInputStream.class);
+
+  private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
+    throws IOException, MissingEventsException {
+    Event next = null;
+    while ((next = eis.poll()) == null);
+    return next;
+  }
+
+  /**
+   * If this test fails, check whether the newly added op should map to an
+   * inotify event, and if so, establish the mapping in
+   * {@link org.apache.hadoop.hdfs.server.namenode.InotifyFSEditLogOpTranslator}
+   * and update testBasic() to include the new op.
+   */
+  @Test
+  public void testOpcodeCount() {
+    Assert.assertTrue(FSEditLogOpCodes.values().length == 46);
+  }
+
+
+  /**
+   * Tests all FsEditLogOps that are converted to inotify events.
+   */
+  @Test(timeout = 120000)
+  @SuppressWarnings("deprecation")
+  public void testBasic() throws IOException, URISyntaxException,
+      InterruptedException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // so that we can get an atime change
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
+
+    MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+    builder.getDfsBuilder().numDataNodes(2);
+    MiniQJMHACluster cluster = builder.build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+          .getNameNodeAddress(), conf);
+      FileSystem fs = cluster.getDfsCluster().getFileSystem(0);
+      DFSTestUtil.createFile(fs, new Path("/file"), BLOCK_SIZE, (short) 1, 0L);
+      DFSTestUtil.createFile(fs, new Path("/file3"), BLOCK_SIZE, (short) 1, 0L);
+      DFSTestUtil.createFile(fs, new Path("/file5"), BLOCK_SIZE, (short) 1, 0L);
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      client.rename("/file", "/file4", null); // RenameOp -> RenameEvent
+      client.rename("/file4", "/file2"); // RenameOldOp -> RenameEvent
+      // DeleteOp, AddOp -> UnlinkEvent, CreateEvent
+      OutputStream os = client.create("/file2", true, (short) 2, BLOCK_SIZE);
+      os.write(new byte[BLOCK_SIZE]);
+      os.close(); // CloseOp -> CloseEvent
+      // AddOp -> AppendEvent
+      os = client.append("/file2", BLOCK_SIZE, null, null);
+      os.write(new byte[BLOCK_SIZE]);
+      os.close(); // CloseOp -> CloseEvent
+      Thread.sleep(10); // so that the atime will get updated on the next line
+      client.open("/file2").read(new byte[1]); // TimesOp -> MetadataUpdateEvent
+      // SetReplicationOp -> MetadataUpdateEvent
+      client.setReplication("/file2", (short) 1);
+      // ConcatDeleteOp -> AppendEvent, UnlinkEvent, CloseEvent
+      client.concat("/file2", new String[]{"/file3"});
+      client.delete("/file2", false); // DeleteOp -> UnlinkEvent
+      client.mkdirs("/dir", null, false); // MkdirOp -> CreateEvent
+      // SetPermissionsOp -> MetadataUpdateEvent
+      client.setPermission("/dir", FsPermission.valueOf("-rw-rw-rw-"));
+      // SetOwnerOp -> MetadataUpdateEvent
+      client.setOwner("/dir", "username", "groupname");
+      client.createSymlink("/dir", "/dir2", false); // SymlinkOp -> CreateEvent
+      client.setXAttr("/file5", "user.field", "value".getBytes(), EnumSet.of(
+          XAttrSetFlag.CREATE)); // SetXAttrOp -> MetadataUpdateEvent
+      // RemoveXAttrOp -> MetadataUpdateEvent
+      client.removeXAttr("/file5", "user.field");
+      // SetAclOp -> MetadataUpdateEvent
+      client.setAcl("/file5", AclEntry.parseAclSpec(
+          "user::rwx,user:foo:rw-,group::r--,other::---", true));
+      client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
+
+      Event next = null;
+
+      // RenameOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re = (Event.RenameEvent) next;
+      Assert.assertTrue(re.getDstPath().equals("/file4"));
+      Assert.assertTrue(re.getSrcPath().equals("/file"));
+      Assert.assertTrue(re.getTimestamp() > 0);
+
+      long eventsBehind = eis.getEventsBehindEstimate();
+
+      // RenameOldOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re2 = (Event.RenameEvent) next;
+      Assert.assertTrue(re2.getDstPath().equals("/file2"));
+      Assert.assertTrue(re2.getSrcPath().equals("/file4"));
+      Assert.assertTrue(re.getTimestamp() > 0);
+
+      // DeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Assert.assertTrue(((Event.UnlinkEvent) next).getPath().equals("/file2"));
+
+      // AddOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce = (Event.CreateEvent) next;
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/file2"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertTrue(ce.getReplication() > 0);
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+
+      // CloseOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce2 = (Event.CloseEvent) next;
+      Assert.assertTrue(ce2.getPath().equals("/file2"));
+      Assert.assertTrue(ce2.getFileSize() > 0);
+      Assert.assertTrue(ce2.getTimestamp() > 0);
+
+      // AddOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+
+      // CloseOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
+
+      // TimesOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue.getPath().equals("/file2"));
+      Assert.assertTrue(mue.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.TIMES);
+
+      // SetReplicationOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue2.getPath().equals("/file2"));
+      Assert.assertTrue(mue2.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.REPLICATION);
+      Assert.assertTrue(mue2.getReplication() == 1);
+
+      // ConcatDeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
+      Assert.assertTrue(ue2.getPath().equals("/file3"));
+      Assert.assertTrue(ue2.getTimestamp() > 0);
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce3 = (Event.CloseEvent) next;
+      Assert.assertTrue(ce3.getPath().equals("/file2"));
+      Assert.assertTrue(ce3.getTimestamp() > 0);
+
+      // DeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
+      Assert.assertTrue(ue.getPath().equals("/file2"));
+      Assert.assertTrue(ue.getTimestamp() > 0);
+
+      // MkdirOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce4 = (Event.CreateEvent) next;
+      Assert.assertTrue(ce4.getiNodeType() ==
+          Event.CreateEvent.INodeType.DIRECTORY);
+      Assert.assertTrue(ce4.getPath().equals("/dir"));
+      Assert.assertTrue(ce4.getCtime() > 0);
+      Assert.assertTrue(ce4.getReplication() == 0);
+      Assert.assertTrue(ce4.getSymlinkTarget() == null);
+
+      // SetPermissionsOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue3.getPath().equals("/dir"));
+      Assert.assertTrue(mue3.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.PERMS);
+      Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
+
+      // SetOwnerOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue4.getPath().equals("/dir"));
+      Assert.assertTrue(mue4.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.OWNER);
+      Assert.assertTrue(mue4.getOwnerName().equals("username"));
+      Assert.assertTrue(mue4.getGroupName().equals("groupname"));
+
+      // SymlinkOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce5 = (Event.CreateEvent) next;
+      Assert.assertTrue(ce5.getiNodeType() ==
+          Event.CreateEvent.INodeType.SYMLINK);
+      Assert.assertTrue(ce5.getPath().equals("/dir2"));
+      Assert.assertTrue(ce5.getCtime() > 0);
+      Assert.assertTrue(ce5.getReplication() == 0);
+      Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
+
+      // SetXAttrOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue5.getPath().equals("/file5"));
+      Assert.assertTrue(mue5.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.XATTRS);
+      Assert.assertTrue(mue5.getxAttrs().size() == 1);
+      Assert.assertTrue(mue5.getxAttrs().get(0).getName().contains("field"));
+      Assert.assertTrue(!mue5.isxAttrsRemoved());
+
+      // RemoveXAttrOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue6.getPath().equals("/file5"));
+      Assert.assertTrue(mue6.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.XATTRS);
+      Assert.assertTrue(mue6.getxAttrs().size() == 1);
+      Assert.assertTrue(mue6.getxAttrs().get(0).getName().contains("field"));
+      Assert.assertTrue(mue6.isxAttrsRemoved());
+
+      // SetAclOp (1)
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue7.getPath().equals("/file5"));
+      Assert.assertTrue(mue7.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.ACLS);
+      Assert.assertTrue(mue7.getAcls().contains(
+          AclEntry.parseAclEntry("user::rwx", true)));
+
+      // SetAclOp (2)
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue8.getPath().equals("/file5"));
+      Assert.assertTrue(mue8.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.ACLS);
+      Assert.assertTrue(mue8.getAcls() == null);
+
+      // Returns null when there are no further events
+      Assert.assertTrue(eis.poll() == null);
+
+      // make sure the estimate hasn't changed since the above assertion
+      // tells us that we are fully caught up to the current namesystem state
+      // and we should not have been behind at all when eventsBehind was set
+      // either, since there were few enough events that they should have all
+      // been read to the client during the first poll() call
+      Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testNNFailover() throws IOException, URISyntaxException,
+      MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
+          (cluster.getDfsCluster(), conf)).dfs;
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      for (int i = 0; i < 10; i++) {
+        client.mkdirs("/dir" + i, null, false);
+      }
+      cluster.getDfsCluster().shutdownNameNode(0);
+      cluster.getDfsCluster().transitionToActive(1);
+      Event next = null;
+      // we can read all of the edits logged by the old active from the new
+      // active
+      for (int i = 0; i < 10; i++) {
+        next = waitForNextEvent(eis);
+        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+            i));
+      }
+      Assert.assertTrue(eis.poll() == null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testTwoActiveNNs() throws IOException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+          .getNameNodeAddress(), conf);
+      DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
+          .getNameNodeAddress(), conf);
+      DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
+      for (int i = 0; i < 10; i++) {
+        client0.mkdirs("/dir" + i, null, false);
+      }
+
+      cluster.getDfsCluster().transitionToActive(1);
+      for (int i = 10; i < 20; i++) {
+        client1.mkdirs("/dir" + i, null, false);
+      }
+
+      // make sure that the old active can't read any further than the edits
+      // it logged itself (it has no idea whether the in-progress edits from
+      // the other writer have actually been committed)
+      Event next = null;
+      for (int i = 0; i < 10; i++) {
+        next = waitForNextEvent(eis);
+        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+            i));
+      }
+      Assert.assertTrue(eis.poll() == null);
+    } finally {
+      try {
+        cluster.shutdown();
+      } catch (ExitUtil.ExitException e) {
+        // expected because the old active will be unable to flush the
+        // end-of-segment op since it is fenced
+      }
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testReadEventsWithTimeout() throws IOException,
+      InterruptedException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      final DFSClient client = new DFSClient(cluster.getDfsCluster()
+          .getNameNode(0).getNameNodeAddress(), conf);
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      ScheduledExecutorService ex = Executors
+          .newSingleThreadScheduledExecutor();
+      ex.schedule(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            client.mkdirs("/dir", null, false);
+          } catch (IOException e) {
+            // test will fail
+            LOG.error("Unable to create /dir", e);
+          }
+        }
+      }, 1, TimeUnit.SECONDS);
+      // a very generous wait period -- the edit will definitely have been
+      // processed by the time this is up
+      Event next = eis.poll(5, TimeUnit.SECONDS);
+      Assert.assertTrue(next != null);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java

@@ -56,7 +56,9 @@ public class MiniQJMHACluster {
     
     
     public Builder(Configuration conf) {
     public Builder(Configuration conf) {
       this.conf = conf;
       this.conf = conf;
-      this.dfsBuilder = new MiniDFSCluster.Builder(conf);
+      // most QJMHACluster tests don't need DataNodes, so we'll make
+      // this the default
+      this.dfsBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(0);
     }
     }
 
 
     public MiniDFSCluster.Builder getDfsBuilder() {
     public MiniDFSCluster.Builder getDfsBuilder() {
@@ -102,7 +104,7 @@ public class MiniQJMHACluster {
         cluster = builder.dfsBuilder.nnTopology(topology)
         cluster = builder.dfsBuilder.nnTopology(topology)
             .manageNameDfsSharedDirs(false).build();
             .manageNameDfsSharedDirs(false).build();
         cluster.waitActive();
         cluster.waitActive();
-        cluster.shutdown();
+        cluster.shutdownNameNodes();
 
 
         // initialize the journal nodes
         // initialize the journal nodes
         Configuration confNN0 = cluster.getConfiguration(0);
         Configuration confNN0 = cluster.getConfiguration(0);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java

@@ -382,7 +382,7 @@ public class TestQJMWithFaults {
     }
     }
 
 
     @Override
     @Override
-    protected ExecutorService createExecutor() {
+    protected ExecutorService createSingleThreadExecutor() {
       return MoreExecutors.sameThreadExecutor();
       return MoreExecutors.sameThreadExecutor();
     }
     }
   }
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -939,7 +939,7 @@ public class TestQuorumJournalManager {
       public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
       public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
           String journalId, InetSocketAddress addr) {
           String journalId, InetSocketAddress addr) {
         AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
         AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
-          protected ExecutorService createExecutor() {
+          protected ExecutorService createSingleThreadExecutor() {
             // Don't parallelize calls to the quorum in the tests.
             // Don't parallelize calls to the quorum in the tests.
             // This makes the tests more deterministic.
             // This makes the tests more deterministic.
             return MoreExecutors.sameThreadExecutor();
             return MoreExecutors.sameThreadExecutor();

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -916,6 +916,10 @@ public class TestEditLog {
     public void setMaxOpSize(int maxOpSize) {
     public void setMaxOpSize(int maxOpSize) {
       reader.setMaxOpSize(maxOpSize);
       reader.setMaxOpSize(maxOpSize);
     }
     }
+
+    @Override public boolean isLocalLog() {
+      return true;
+    }
   }
   }
 
 
   @Test
   @Test