Ver Fonte

HDFS-3863. Track last "committed" txid in QJM. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1380976 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon há 12 anos atrás
pai
commit
8021d9199f
16 ficheiros alterados com 421 adições e 43 exclusões
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
  2. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
  3. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
  4. 14 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
  5. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
  6. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
  7. 12 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
  8. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
  9. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
  10. 78 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
  11. 117 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java
  12. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
  13. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
  14. 44 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
  15. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
  16. 86 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt

@@ -36,3 +36,5 @@ HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli)
 HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd)
 
 HDFS-3877. QJM: Provide defaults for dfs.journalnode.*address (eli)
+
+HDFS-3863. Track last "committed" txid in QJM (todd)

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java

@@ -123,6 +123,13 @@ interface AsyncLogger {
    */
   public void setEpoch(long e);
 
+  /**
+   * Let the logger know the highest committed txid across all loggers in the
+   * set. This txid may be higher than the last committed txid for <em>this</em>
+   * logger. See HDFS-3863 for details.
+   */
+  public void setCommittedTxId(long txid);
+
   /**
    * Build an HTTP URL to fetch the log segment with the given startTxId.
    */

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java

@@ -99,6 +99,18 @@ class AsyncLoggerSet {
     }
   }
 
+
+  /**
+   * Set the highest successfully committed txid seen by the writer.
+   * This should be called after a successful write to a quorum, and is used
+   * for extra sanity checks against the protocol. See HDFS-3863.
+   */
+  public void setCommittedTxId(long txid) {
+    for (AsyncLogger logger : loggers) {
+      logger.setCommittedTxId(txid);
+    }
+  }
+
   /**
    * @return the epoch number for this writer. This may only be called after
    * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
@@ -73,6 +74,8 @@ public class IPCLoggerChannel implements AsyncLogger {
   private final ListeningExecutorService executor;
   private long ipcSerial = 0;
   private long epoch = -1;
+  private long committedTxId = HdfsConstants.INVALID_TXID;
+  
   private final String journalId;
   private final NamespaceInfo nsInfo;
   private int httpPort = -1;
@@ -115,11 +118,20 @@ public class IPCLoggerChannel implements AsyncLogger {
     executor = MoreExecutors.listeningDecorator(
         createExecutor());
   }
+  
   @Override
   public synchronized void setEpoch(long epoch) {
     this.epoch = epoch;
   }
   
+  @Override
+  public synchronized void setCommittedTxId(long txid) {
+    Preconditions.checkArgument(txid >= committedTxId,
+        "Trying to move committed txid backwards in client " +
+         "old: %s new: %s", committedTxId, txid);
+    this.committedTxId = txid;
+  }
+  
   @Override
   public void close() {
     // No more tasks may be submitted after this point.
@@ -183,7 +195,8 @@ public class IPCLoggerChannel implements AsyncLogger {
 
   private synchronized RequestInfo createReqInfo() {
     Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
-    return new RequestInfo(journalId, epoch, ipcSerial++);
+    return new RequestInfo(journalId, epoch, ipcSerial++,
+        committedTxId);
   }
 
   @VisibleForTesting

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -265,6 +265,21 @@ public class QuorumJournalManager implements JournalManager {
     SegmentStateProto logToSync = bestResponse.getSegmentState();
     assert segmentTxId == logToSync.getStartTxId();
     
+    // Sanity check: none of the loggers should be aware of a higher
+    // txid than the txid we intend to truncate to
+    for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e :
+         prepareResponses.entrySet()) {
+      AsyncLogger logger = e.getKey();
+      PrepareRecoveryResponseProto resp = e.getValue();
+
+      if (resp.hasLastCommittedTxId() &&
+          resp.getLastCommittedTxId() > logToSync.getEndTxId()) {
+        throw new AssertionError("Decided to synchronize log to " + logToSync +
+            " but logger " + logger + " had seen txid " +
+            resp.getLastCommittedTxId() + " committed");
+      }
+    }
+    
     URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
     
     QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java

@@ -102,6 +102,11 @@ class QuorumOutputStream extends EditLogOutputStream {
           segmentTxId, firstTxToFlush,
           numReadyTxns, data);
       loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
+      
+      // Since we successfully wrote this batch, let the loggers know. Any future
+      // RPCs will thus let the loggers know of the most recent transaction, even
+      // if a logger has fallen behind.
+      loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
     }
   }
 }

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java

@@ -18,17 +18,21 @@
 package org.apache.hadoop.hdfs.qjournal.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 @InterfaceAudience.Private
 public class RequestInfo {
   private String jid;
   private long epoch;
   private long ipcSerialNumber;
+  private long committedTxId;
   
-  public RequestInfo(String jid, long epoch, long ipcSerialNumber) {
+  public RequestInfo(String jid, long epoch, long ipcSerialNumber,
+      long committedTxId) {
     this.jid = jid;
     this.epoch = epoch;
     this.ipcSerialNumber = ipcSerialNumber;
+    this.committedTxId = committedTxId;
   }
 
   public long getEpoch() {
@@ -51,4 +55,11 @@ public class RequestInfo {
     this.ipcSerialNumber = ipcSerialNumber;
   }
 
+  public long getCommittedTxId() {
+    return committedTxId;
+  }
+
+  public boolean hasCommittedTxId() {
+    return (committedTxId != HdfsConstants.INVALID_TXID);
+  }
 }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.qjournal.protocolPB;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
@@ -199,6 +200,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
     return new RequestInfo(
         reqInfo.getJournalId().getIdentifier(),
         reqInfo.getEpoch(),
-        reqInfo.getIpcSerialNumber());
+        reqInfo.getIpcSerialNumber(),
+        reqInfo.hasCommittedTxId() ?
+          reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
   }
 }

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRec
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.RequestInfoProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
@@ -143,11 +144,14 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
 
   private QJournalProtocolProtos.RequestInfoProto convert(
       RequestInfo reqInfo) {
-    return QJournalProtocolProtos.RequestInfoProto.newBuilder()
-      .setJournalId(convertJournalId(reqInfo.getJournalId()))
-      .setEpoch(reqInfo.getEpoch())
-      .setIpcSerialNumber(reqInfo.getIpcSerialNumber())
-      .build();
+    RequestInfoProto.Builder builder = RequestInfoProto.newBuilder()
+        .setJournalId(convertJournalId(reqInfo.getJournalId()))
+        .setEpoch(reqInfo.getEpoch())
+        .setIpcSerialNumber(reqInfo.getIpcSerialNumber());
+    if (reqInfo.hasCommittedTxId()) {
+      builder.setCommittedTxId(reqInfo.getCommittedTxId());
+    }
+    return builder.build();
   }
 
   @Override

+ 78 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -46,10 +46,13 @@ import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.hdfs.util.BestEffortLongFile;
 import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.Ranges;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
 
@@ -85,9 +88,18 @@ class Journal implements Closeable {
    */
   private PersistentLongFile lastWriterEpoch;
   
+  /**
+   * Lower-bound on the last committed transaction ID. This is not
+   * depended upon for correctness, but acts as a sanity check
+   * during the recovery procedures, and as a visibility mark
+   * for clients reading in-progress logs.
+   */
+  private BestEffortLongFile committedTxnId;
+  
   private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
   private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
-
+  private static final String COMMITTED_TXID_FILENAME = "committed-txid";
+  
   private final FileJournalManager fjm;
 
   Journal(File logDir, StorageErrorReporter errorReporter) throws IOException {
@@ -98,7 +110,10 @@ class Journal implements Closeable {
         new File(currentDir, LAST_PROMISED_FILENAME), 0);
     this.lastWriterEpoch = new PersistentLongFile(
         new File(currentDir, LAST_WRITER_EPOCH), 0);
-
+    this.committedTxnId = new BestEffortLongFile(
+        new File(currentDir, COMMITTED_TXID_FILENAME),
+        HdfsConstants.INVALID_TXID);
+    
     this.fjm = storage.getJournalManager();
   }
   
@@ -113,22 +128,21 @@ class Journal implements Closeable {
     }
     LOG.info("Scanning storage " + fjm);
     List<EditLogFile> files = fjm.getLogFiles(0);
-    if (files.isEmpty()) {
-      curSegmentTxId = HdfsConstants.INVALID_TXID;
-      return;
-    }
+    curSegmentTxId = HdfsConstants.INVALID_TXID;
     
-    EditLogFile latestLog = files.get(files.size() - 1);
-    latestLog.validateLog();
-    LOG.info("Latest log is " + latestLog);
-    if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
-      // the log contains no transactions
-      LOG.warn("Latest log " + latestLog + " has no transactions. " +
-          "moving it aside");
-      latestLog.moveAsideEmptyFile();
-      curSegmentTxId = HdfsConstants.INVALID_TXID;
-    } else {
-      curSegmentTxId = latestLog.getFirstTxId();
+    while (!files.isEmpty()) {
+      EditLogFile latestLog = files.remove(files.size() - 1);
+      latestLog.validateLog();
+      LOG.info("Latest log is " + latestLog);
+      if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
+        // the log contains no transactions
+        LOG.warn("Latest log " + latestLog + " has no transactions. " +
+            "moving it aside and looking for previous log");
+        latestLog.moveAsideEmptyFile();
+      } else {
+        curSegmentTxId = latestLog.getFirstTxId();
+        break;
+      }
     }
   }
 
@@ -150,6 +164,8 @@ class Journal implements Closeable {
   @Override // Closeable
   public void close() throws IOException {
     storage.close();
+    
+    IOUtils.closeStream(committedTxnId);
   }
   
   JNStorage getStorage() {
@@ -164,6 +180,10 @@ class Journal implements Closeable {
     checkFormatted();
     return lastPromisedEpoch.get();
   }
+  
+  synchronized long getCommittedTxnIdForTests() throws IOException {
+    return committedTxnId.get();
+  }
 
   /**
    * Try to create a new epoch for this journal.
@@ -213,8 +233,8 @@ class Journal implements Closeable {
   synchronized void journal(RequestInfo reqInfo,
       long segmentTxId, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
-    checkWriteRequest(reqInfo);
     checkFormatted();
+    checkWriteRequest(reqInfo);
     
     // TODO: if a JN goes down and comes back up, then it will throw
     // this exception on every edit. We should instead send back
@@ -245,6 +265,7 @@ class Journal implements Closeable {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1));
     }
+    
     curSegment.writeRaw(records, 0, records.length);
     curSegment.setReadyToFlush();
     curSegment.flush();
@@ -270,6 +291,15 @@ class Journal implements Closeable {
     
     // TODO: some check on serial number that they only increase from a given
     // client
+
+    if (reqInfo.hasCommittedTxId()) {
+      Preconditions.checkArgument(
+          reqInfo.getCommittedTxId() >= committedTxnId.get(),
+          "Client trying to move committed txid backward from " +
+          committedTxnId.get() + " to " + reqInfo.getCommittedTxId());
+      
+      committedTxnId.set(reqInfo.getCommittedTxId());
+    }
   }
   
   private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
@@ -296,8 +326,8 @@ class Journal implements Closeable {
   public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
       throws IOException {
     assert fjm != null;
-    checkRequest(reqInfo);
     checkFormatted();
+    checkRequest(reqInfo);
     
     if (curSegment != null) {
       LOG.warn("Client is requesting a new log segment " + txid + 
@@ -352,8 +382,8 @@ class Journal implements Closeable {
    */
   public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
       long endTxId) throws IOException {
-    checkRequest(reqInfo);
     checkFormatted();
+    checkRequest(reqInfo);
 
     if (startTxId == curSegmentTxId) {
       if (curSegment != null) {
@@ -397,8 +427,8 @@ class Journal implements Closeable {
    */
   public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
       long minTxIdToKeep) throws IOException {
-    checkRequest(reqInfo);
     checkFormatted();
+    checkRequest(reqInfo);
     
     fjm.purgeLogsOlderThan(minTxIdToKeep);
     purgePaxosDecisionsOlderThan(minTxIdToKeep);
@@ -492,8 +522,8 @@ class Journal implements Closeable {
    */
   public synchronized PrepareRecoveryResponseProto prepareRecovery(
       RequestInfo reqInfo, long segmentTxId) throws IOException {
-    checkRequest(reqInfo);
     checkFormatted();
+    checkRequest(reqInfo);
     
     PrepareRecoveryResponseProto.Builder builder =
         PrepareRecoveryResponseProto.newBuilder();
@@ -519,6 +549,9 @@ class Journal implements Closeable {
     }
     
     builder.setLastWriterEpoch(lastWriterEpoch.get());
+    if (committedTxnId.get() != HdfsConstants.INVALID_TXID) {
+      builder.setLastCommittedTxId(committedTxnId.get());
+    }
     
     PrepareRecoveryResponseProto resp = builder.build();
     LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
@@ -532,8 +565,8 @@ class Journal implements Closeable {
   public synchronized void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto segment, URL fromUrl)
       throws IOException {
-    checkRequest(reqInfo);
     checkFormatted();
+    checkRequest(reqInfo);
     long segmentTxId = segment.getStartTxId();
 
     // TODO: right now, a recovery of a segment when the log is
@@ -563,8 +596,22 @@ class Journal implements Closeable {
             ": no current segment in place");
       } else {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
-            ": old segment " + TextFormat.shortDebugString(segment) + " is " +
-            "not the right length");
+            ": old segment " + TextFormat.shortDebugString(currentSegment) +
+            " is not the right length");
+        
+        // Paranoid sanity check: if the new log is shorter than the log we
+        // currently have, we should not end up discarding any transactions
+        // which are already Committed.
+        if (txnRange(currentSegment).contains(committedTxnId.get()) &&
+            !txnRange(segment).contains(committedTxnId.get())) {
+          throw new AssertionError(
+              "Cannot replace segment " +
+              TextFormat.shortDebugString(currentSegment) +
+              " with new segment " +
+              TextFormat.shortDebugString(segment) + 
+              ": would discard already-committed txn " +
+              committedTxnId.get());
+        }
       }
       syncLog(reqInfo, segment, fromUrl);
     } else {
@@ -581,6 +628,12 @@ class Journal implements Closeable {
         TextFormat.shortDebugString(newData));
   }
 
+  private Range<Long> txnRange(SegmentStateProto seg) {
+    Preconditions.checkArgument(seg.hasEndTxId(),
+        "invalid segment: %s", seg);
+    return Ranges.closed(seg.getStartTxId(), seg.getEndTxId());
+  }
+
   /**
    * Synchronize a log segment from another JournalNode.
    * @param reqInfo the request info for the recovery IPC

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+
+/**
+ * Class that represents a file on disk which stores a single <code>long</code>
+ * value, but does not make any effort to make it truly durable. This is in
+ * contrast to {@link PersistentLongFile} which fsync()s the value on every
+ * change.
+ * 
+ * This should be used for values which are updated frequently (such that
+ * performance is important) and not required to be up-to-date for correctness.
+ * 
+ * This class also differs in that it stores the value as binary data instead
+ * of a textual string.
+ */
+@InterfaceAudience.Private
+public class BestEffortLongFile implements Closeable {
+
+  private final File file;
+  private final long defaultVal;
+
+  private long value;
+  
+  private FileChannel ch = null;
+  
+  private ByteBuffer buf = ByteBuffer.allocate(Long.SIZE/8);
+  
+  public BestEffortLongFile(File file, long defaultVal) {
+    this.file = file;
+    this.defaultVal = defaultVal;
+  }
+  
+  public long get() throws IOException {
+    lazyOpen();
+    return value;
+  }
+
+  public void set(long newVal) throws IOException {
+    lazyOpen();
+    buf.clear();
+    buf.putLong(newVal);
+    buf.flip();
+    IOUtils.writeFully(ch, buf, 0);
+    value = newVal;
+  }
+  
+  private void lazyOpen() throws IOException {
+    if (ch != null) {
+      return;
+    }
+
+    // Load current value.
+    byte[] data = null;
+    try {
+      data = Files.toByteArray(file);
+    } catch (FileNotFoundException fnfe) {
+      // Expected - this will use default value.
+    }
+
+    if (data != null && data.length != 0) {
+      if (data.length != Longs.BYTES) {
+        throw new IOException("File " + file + " had invalid length: " +
+            data.length);
+      }
+      value = Longs.fromByteArray(data);
+    } else {
+      value = defaultVal;
+    }
+    
+    // Now open file for future writes.
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    try {
+      ch = raf.getChannel();
+    } finally {
+      if (ch == null) {
+        IOUtils.closeStream(raf);
+      }
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (ch != null) {
+      ch.close();
+    }
+  }
+}

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto

@@ -31,6 +31,13 @@ message RequestInfoProto {
   required JournalIdProto journalId = 1;
   required uint64 epoch = 2;
   required uint64 ipcSerialNumber = 3;
+
+  // Whenever a writer makes a request, it informs
+  // the node of the latest committed txid. This may
+  // be higher than the transaction data included in the
+  // request itself, eg in the case that the node has
+  // fallen behind.
+  optional uint64 committedTxId = 4;
 }
 
 message SegmentStateProto {
@@ -163,6 +170,11 @@ message PrepareRecoveryResponseProto {
   optional SegmentStateProto segmentState = 1;
   optional uint64 acceptedInEpoch = 2;
   required uint64 lastWriterEpoch = 3;
+
+  // The highest committed txid that this logger has ever seen.
+  // This may be higher than the data it actually has, in the case
+  // that it was lagging before the old writer crashed.
+  optional uint64 lastCommittedTxId = 4;
 }
 
 /**

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java

@@ -186,6 +186,8 @@ public class TestQuorumJournalManagerUnit {
     Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
         anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
     stm.flush();
+    
+    Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
   }
 
   private EditLogOutputStream createLogSegment() throws IOException {

+ 44 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assume;
@@ -70,6 +71,11 @@ public class TestJournal {
       .reportErrorOnFile(Mockito.<File>any());
   }
   
+  @After
+  public void cleanup() {
+    IOUtils.closeStream(journal);
+  }
+  
   @Test
   public void testEpochHandling() throws Exception {
     assertEquals(0, journal.getLastPromisedEpoch());
@@ -88,28 +94,41 @@ public class TestJournal {
           "Proposed epoch 3 <= last promise 3", ioe);
     }
     try {
-      journal.startLogSegment(new RequestInfo(JID, 1L, 1L),
-          12345L);
+      journal.startLogSegment(makeRI(1), 12345L);
       fail("Should have rejected call from prior epoch");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
           "epoch 1 is less than the last promised epoch 3", ioe);
     }
     try {
-      journal.journal(new RequestInfo(JID, 1L, 1L),
-          12345L, 100L, 0, new byte[0]);
+      journal.journal(makeRI(1), 12345L, 100L, 0, new byte[0]);
       fail("Should have rejected call from prior epoch");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
           "epoch 1 is less than the last promised epoch 3", ioe);
     }
   }
-
+  
+  @Test
+  public void testMaintainCommittedTxId() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    // Send txids 1-3, with a request indicating only 0 committed
+    journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
+        QJMTestUtil.createTxnData(1, 3));
+    assertEquals(0, journal.getCommittedTxnIdForTests());
+    
+    // Send 4-6, with request indicating that through 3 is committed.
+    journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
+        QJMTestUtil.createTxnData(4, 6));
+    assertEquals(3, journal.getCommittedTxnIdForTests());    
+  }
+  
   @Test
   public void testRestartJournal() throws Exception {
     journal.newEpoch(FAKE_NSINFO, 1);
-    journal.startLogSegment(new RequestInfo("j", 1, 1), 1);
-    journal.journal(new RequestInfo("j", 1, 2), 1, 1, 2, 
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 2, 
         QJMTestUtil.createTxnData(1, 2));
     // Don't finalize.
     
@@ -129,6 +148,23 @@ public class TestJournal {
     assertEquals(1, newEpoch.getLastSegmentTxId());
   }
   
+  /**
+   * Test that, if the writer crashes at the very beginning of a segment,
+   * before any transactions are written, that the next newEpoch() call
+   * returns the prior segment txid as its most recent segment.
+   */
+  @Test
+  public void testNewEpochAtBeginningOfSegment() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 2, 
+        QJMTestUtil.createTxnData(1, 2));
+    journal.finalizeLogSegment(makeRI(3), 1, 2);
+    journal.startLogSegment(makeRI(3), 3);
+    NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
+    assertEquals(1, resp.getLastSegmentTxId());
+  }
+  
   @Test
   public void testJournalLocking() throws Exception {
     Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
@@ -289,7 +325,7 @@ public class TestJournal {
   }
   
   private static RequestInfo makeRI(int serial) {
-    return new RequestInfo(JID, 1, serial);
+    return new RequestInfo(JID, 1, serial, 0);
   }
   
   @Test

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

@@ -119,8 +119,8 @@ public class TestJournalNode {
     response = ch.newEpoch(4).get();
     ch.setEpoch(4);
     // Because the new segment is empty, it is equivalent to not having
-    // started writing it.
-    assertEquals(0, response.getLastSegmentTxId());
+    // started writing it. Hence, we should return the prior segment txid.
+    assertEquals(1, response.getLastSegmentTxId());
   }
   
   @Test

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestBestEffortLongFile {
+
+  private static final File FILE = new File(MiniDFSCluster.getBaseDirectory() +
+      File.separatorChar + "TestBestEffortLongFile");
+
+  @Before
+  public void cleanup() {
+    if (FILE.exists()) {
+      assertTrue(FILE.delete());
+    }
+    FILE.getParentFile().mkdirs();
+  }
+  
+  @Test
+  public void testGetSet() throws IOException {
+    BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+    try {
+      // Before the file exists, should return default.
+      assertEquals(12345L, f.get());
+      
+      // And first access should open it.
+      assertTrue(FILE.exists());
+  
+      Random r = new Random();
+      for (int i = 0; i < 100; i++) {
+        long newVal = r.nextLong();
+        // Changing the value should be reflected in the next get() call.
+        f.set(newVal);
+        assertEquals(newVal, f.get());
+        
+        // And should be reflected in a new instance (ie it actually got
+        // written to the file)
+        BestEffortLongFile f2 = new BestEffortLongFile(FILE, 999L);
+        try {
+          assertEquals(newVal, f2.get());
+        } finally {
+          IOUtils.closeStream(f2);
+        }
+      }
+    } finally {
+      IOUtils.closeStream(f);
+    }
+  }
+  
+  @Test
+  public void testTruncatedFileReturnsDefault() throws IOException {
+    assertTrue(FILE.createNewFile());
+    assertEquals(0, FILE.length());
+    BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+    try {
+      assertEquals(12345L, f.get());
+    } finally {
+      f.close();
+    }
+  }
+}