Переглянути джерело

HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)

(cherry picked from commit 24f6a7c9563757234f53ca23e12f9c9208b53082)
Colin Patrick Mccabe 9 роки тому
батько
коміт
b52ae6140b

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -527,6 +527,8 @@ Release 2.8.0 - UNRELEASED
 
 
     HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
     HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
 
 
+    HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java

@@ -83,7 +83,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
     tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
     tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
     DataInputStream in = new DataInputStream(tracker);
     DataInputStream in = new DataInputStream(tracker);
 
 
-    reader = new FSEditLogOp.Reader(in, tracker, logVersion);
+    reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
   }
   }
 
 
   @Override
   @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java

@@ -87,7 +87,7 @@ public class LayoutVersion {
     FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"),
     FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"),
     FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"),
     FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"),
     REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"),
     REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"),
-    EDITS_CHESKUM(-28, "Support checksum for editlog"),
+    EDITS_CHECKSUM(-28, "Support checksum for editlog"),
     UNUSED(-29, "Skipped version"),
     UNUSED(-29, "Skipped version"),
     FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"),
     FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"),
     RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true,
     RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true,

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
 
 
     this.version = version;
     this.version = version;
 
 
-    reader = new FSEditLogOp.Reader(in, tracker, version);
+    reader = FSEditLogOp.Reader.create(in, tracker, version);
   }
   }
 
 
   void clear() throws IOException {
   void clear() throws IOException {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -158,7 +158,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
               "flags from log");
               "flags from log");
         }
         }
       }
       }
-      reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion);
       reader.setMaxOpSize(maxOpSize);
       reader.setMaxOpSize(maxOpSize);
       state = State.OPEN;
       state = State.OPEN;
     } finally {
     } finally {

+ 254 - 100
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -4520,42 +4520,46 @@ public abstract class FSEditLogOp {
   /**
   /**
    * Class for reading editlog ops from a stream
    * Class for reading editlog ops from a stream
    */
    */
-  public static class Reader {
-    private final DataInputStream in;
-    private final StreamLimiter limiter;
-    private final int logVersion;
-    private final Checksum checksum;
-    private final OpInstanceCache cache;
-    private int maxOpSize;
-    private final boolean supportEditLogLength;
+  public abstract static class Reader {
+    final DataInputStream in;
+    final StreamLimiter limiter;
+    final OpInstanceCache cache;
+    final byte[] temp = new byte[4096];
+    final int logVersion;
+    int maxOpSize;
+
+    public static Reader create(DataInputStream in, StreamLimiter limiter,
+                                int logVersion) {
+      if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
+        // Use the LengthPrefixedReader on edit logs which are newer than what
+        // we can parse.  (Newer layout versions are represented by smaller
+        // negative integers, for historical reasons.) Even though we can't
+        // parse the Ops contained in them, we should still be able to call
+        // scanOp on them.  This is important for the JournalNode during rolling
+        // upgrade.
+        return new LengthPrefixedReader(in, limiter, logVersion);
+      } else if (NameNodeLayoutVersion.supports(
+              NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) {
+        return new LengthPrefixedReader(in, limiter, logVersion);
+      } else if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) {
+        Checksum checksum = DataChecksum.newCrc32();
+        return new ChecksummedReader(checksum, in, limiter, logVersion);
+      } else {
+        return new LegacyReader(in, limiter, logVersion);
+      }
+    }
 
 
     /**
     /**
      * Construct the reader
      * Construct the reader
-     * @param in The stream to read from.
-     * @param logVersion The version of the data coming from the stream.
+     * @param in            The stream to read from.
+     * @param limiter       The limiter for this stream.
+     * @param logVersion    The version of the data coming from the stream.
      */
      */
-    public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
-      this.logVersion = logVersion;
-      if (NameNodeLayoutVersion.supports(
-          LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
-        this.checksum = DataChecksum.newCrc32();
-      } else {
-        this.checksum = null;
-      }
-      // It is possible that the logVersion is actually a future layoutversion
-      // during the rolling upgrade (e.g., the NN gets upgraded first). We
-      // assume future layout will also support length of editlog op.
-      this.supportEditLogLength = NameNodeLayoutVersion.supports(
-          NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
-          || logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
-
-      if (this.checksum != null) {
-        this.in = new DataInputStream(
-            new CheckedInputStream(in, this.checksum));
-      } else {
-        this.in = in;
-      }
+    Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
+      this.in = in;
       this.limiter = limiter;
       this.limiter = limiter;
+      this.logVersion = logVersion;
       this.cache = new OpInstanceCache();
       this.cache = new OpInstanceCache();
       this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
       this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
     }
     }
@@ -4608,26 +4612,25 @@ public abstract class FSEditLogOp {
       }
       }
     }
     }
 
 
-    private void verifyTerminator() throws IOException {
+    void verifyTerminator() throws IOException {
       /** The end of the edit log should contain only 0x00 or 0xff bytes.
       /** The end of the edit log should contain only 0x00 or 0xff bytes.
        * If it contains other bytes, the log itself may be corrupt.
        * If it contains other bytes, the log itself may be corrupt.
        * It is important to check this; if we don't, a stray OP_INVALID byte 
        * It is important to check this; if we don't, a stray OP_INVALID byte 
        * could make us stop reading the edit log halfway through, and we'd never
        * could make us stop reading the edit log halfway through, and we'd never
        * know that we had lost data.
        * know that we had lost data.
        */
        */
-      byte[] buf = new byte[4096];
       limiter.clearLimit();
       limiter.clearLimit();
       int numRead = -1, idx = 0;
       int numRead = -1, idx = 0;
       while (true) {
       while (true) {
         try {
         try {
           numRead = -1;
           numRead = -1;
           idx = 0;
           idx = 0;
-          numRead = in.read(buf);
+          numRead = in.read(temp);
           if (numRead == -1) {
           if (numRead == -1) {
             return;
             return;
           }
           }
           while (idx < numRead) {
           while (idx < numRead) {
-            if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
+            if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) {
               throw new IOException("Read extra bytes after " +
               throw new IOException("Read extra bytes after " +
                 "the terminator!");
                 "the terminator!");
             }
             }
@@ -4640,7 +4643,7 @@ public abstract class FSEditLogOp {
           if (numRead != -1) { 
           if (numRead != -1) { 
             in.reset();
             in.reset();
             IOUtils.skipFully(in, idx);
             IOUtils.skipFully(in, idx);
-            in.mark(buf.length + 1);
+            in.mark(temp.length + 1);
             IOUtils.skipFully(in, 1);
             IOUtils.skipFully(in, 1);
           }
           }
         }
         }
@@ -4655,14 +4658,164 @@ public abstract class FSEditLogOp {
      * If an exception is thrown, the stream's mark will be set to the first
      * If an exception is thrown, the stream's mark will be set to the first
      * problematic byte.  This usually means the beginning of the opcode.
      * problematic byte.  This usually means the beginning of the opcode.
      */
      */
-    private FSEditLogOp decodeOp() throws IOException {
-      limiter.setLimit(maxOpSize);
+    public abstract FSEditLogOp decodeOp() throws IOException;
+
+    /**
+     * Similar to decodeOp(), but we only retrieve the transaction ID of the
+     * Op rather than reading it.  If the edit log format supports length
+     * prefixing, this can be much faster than full decoding.
+     *
+     * @return the last txid of the segment, or INVALID_TXID on EOF.
+     */
+    public abstract long scanOp() throws IOException;
+  }
+
+  /**
+   * Reads edit logs which are prefixed with a length.  These edit logs also
+   * include a checksum and transaction ID.
+   */
+  private static class LengthPrefixedReader extends Reader {
+    /**
+     * The minimum length of a length-prefixed Op.
+     *
+     * The minimum Op has:
+     * 1-byte opcode
+     * 4-byte length
+     * 8-byte txid
+     * 0-byte body
+     * 4-byte checksum
+     */
+    private static final int MIN_OP_LENGTH = 17;
+
+    /**
+     * The op id length.
+     *
+     * Not included in the stored length.
+     */
+    private static final int OP_ID_LENGTH = 1;
+
+    /**
+     * The checksum length.
+     *
+     * Not included in the stored length.
+     */
+    private static final int CHECKSUM_LENGTH = 4;
+
+    private final Checksum checksum;
+
+    LengthPrefixedReader(DataInputStream in, StreamLimiter limiter,
+                         int logVersion) {
+      super(in, limiter, logVersion);
+      this.checksum = DataChecksum.newCrc32();
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      long txid = decodeOpFrame();
+      if (txid == HdfsServerConstants.INVALID_TXID) {
+        return null;
+      }
+      in.reset();
       in.mark(maxOpSize);
       in.mark(maxOpSize);
+      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
+      FSEditLogOp op = cache.get(opCode);
+      if (op == null) {
+        throw new IOException("Read invalid opcode " + opCode);
+      }
+      op.setTransactionId(txid);
+      IOUtils.skipFully(in, 4 + 8); // skip length and txid
+      op.readFields(in, logVersion);
+      // skip over the checksum, which we validated above.
+      IOUtils.skipFully(in, CHECKSUM_LENGTH);
+      return op;
+    }
+
+    @Override
+    public long scanOp() throws IOException {
+      return decodeOpFrame();
+    }
 
 
-      if (checksum != null) {
-        checksum.reset();
+    /**
+     * Decode the opcode "frame".  This includes reading the opcode and
+     * transaction ID, and validating the checksum and length.  It does not
+     * include reading the opcode-specific fields.
+     * The input stream will be advanced to the end of the op at the end of this
+     * function.
+     *
+     * @return        An op with the txid set, but none of the other fields
+     *                  filled in, or null if we hit EOF.
+     */
+    private long decodeOpFrame() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      byte opCodeByte;
+      try {
+        opCodeByte = in.readByte();
+      } catch (EOFException eof) {
+        // EOF at an opcode boundary is expected.
+        return HdfsServerConstants.INVALID_TXID;
       }
       }
+      if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+        verifyTerminator();
+        return HdfsServerConstants.INVALID_TXID;
+      }
+      // Here, we verify that the Op size makes sense and that the
+      // data matches its checksum before attempting to construct an Op.
+      // This is important because otherwise we may encounter an
+      // OutOfMemoryException which could bring down the NameNode or
+      // JournalNode when reading garbage data.
+      int opLength =  in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH;
+      if (opLength > maxOpSize) {
+        throw new IOException("Op " + (int)opCodeByte + " has size " +
+            opLength + ", but maxOpSize = " + maxOpSize);
+      } else  if (opLength < MIN_OP_LENGTH) {
+        throw new IOException("Op " + (int)opCodeByte + " has size " +
+            opLength + ", but the minimum op size is " + MIN_OP_LENGTH);
+      }
+      long txid = in.readLong();
+      // Verify checksum
+      in.reset();
+      in.mark(maxOpSize);
+      checksum.reset();
+      for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) {
+        int toRead = Math.min(temp.length, rem);
+        IOUtils.readFully(in, temp, 0, toRead);
+        checksum.update(temp, 0, toRead);
+        rem -= toRead;
+      }
+      int expectedChecksum = in.readInt();
+      int calculatedChecksum = (int)checksum.getValue();
+      if (expectedChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction is corrupt. Calculated checksum is " +
+            calculatedChecksum + " but read checksum " +
+            expectedChecksum, txid);
+      }
+      return txid;
+    }
+  }
+
+  /**
+   * Read edit logs which have a checksum and a transaction ID, but not a
+   * length.
+   */
+  private static class ChecksummedReader extends Reader {
+    private final Checksum checksum;
 
 
+    ChecksummedReader(Checksum checksum, DataInputStream in,
+                      StreamLimiter limiter, int logVersion) {
+      super(new DataInputStream(
+          new CheckedInputStream(in, checksum)), limiter, logVersion);
+      this.checksum = checksum;
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      // Reset the checksum.  Since we are using a CheckedInputStream, each
+      // subsequent read from the  stream will update the checksum.
+      checksum.reset();
       byte opCodeByte;
       byte opCodeByte;
       try {
       try {
         opCodeByte = in.readByte();
         opCodeByte = in.readByte();
@@ -4670,88 +4823,89 @@ public abstract class FSEditLogOp {
         // EOF at an opcode boundary is expected.
         // EOF at an opcode boundary is expected.
         return null;
         return null;
       }
       }
-
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
       if (opCode == OP_INVALID) {
       if (opCode == OP_INVALID) {
         verifyTerminator();
         verifyTerminator();
         return null;
         return null;
       }
       }
-
       FSEditLogOp op = cache.get(opCode);
       FSEditLogOp op = cache.get(opCode);
       if (op == null) {
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
         throw new IOException("Read invalid opcode " + opCode);
       }
       }
-
-      if (supportEditLogLength) {
-        in.readInt();
+      op.setTransactionId(in.readLong());
+      op.readFields(in, logVersion);
+      // Verify checksum
+      int calculatedChecksum = (int)checksum.getValue();
+      int expectedChecksum = in.readInt();
+      if (expectedChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction is corrupt. Calculated checksum is " +
+                calculatedChecksum + " but read checksum " +
+                expectedChecksum, op.txid);
       }
       }
+      return op;
+    }
 
 
+    @Override
+    public long scanOp() throws IOException {
+      // Edit logs of this age don't have any length prefix, so we just have
+      // to read the entire Op.
+      FSEditLogOp op = decodeOp();
+      return op == null ?
+          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
+    }
+  }
+
+  /**
+   * Read older edit logs which may or may not have transaction IDs and other
+   * features.  This code is used during upgrades and to allow HDFS INotify to
+   * read older edit log files.
+   */
+  private static class LegacyReader extends Reader {
+    LegacyReader(DataInputStream in,
+                      StreamLimiter limiter, int logVersion) {
+      super(in, limiter, logVersion);
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      byte opCodeByte;
+      try {
+        opCodeByte = in.readByte();
+      } catch (EOFException eof) {
+        // EOF at an opcode boundary is expected.
+        return null;
+      }
+      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+      if (opCode == OP_INVALID) {
+        verifyTerminator();
+        return null;
+      }
+      FSEditLogOp op = cache.get(opCode);
+      if (op == null) {
+        throw new IOException("Read invalid opcode " + opCode);
+      }
       if (NameNodeLayoutVersion.supports(
       if (NameNodeLayoutVersion.supports(
-          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
-        // Read the txid
+            LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
         op.setTransactionId(in.readLong());
         op.setTransactionId(in.readLong());
       } else {
       } else {
         op.setTransactionId(HdfsServerConstants.INVALID_TXID);
         op.setTransactionId(HdfsServerConstants.INVALID_TXID);
       }
       }
-
       op.readFields(in, logVersion);
       op.readFields(in, logVersion);
-
-      validateChecksum(in, checksum, op.txid);
       return op;
       return op;
     }
     }
 
 
-    /**
-     * Similar with decodeOp(), but instead of doing the real decoding, we skip
-     * the content of the op if the length of the editlog is supported.
-     * @return the last txid of the segment, or INVALID_TXID on exception
-     */
+    @Override
     public long scanOp() throws IOException {
     public long scanOp() throws IOException {
-      if (supportEditLogLength) {
-        limiter.setLimit(maxOpSize);
-        in.mark(maxOpSize);
-
-        final byte opCodeByte;
-        try {
-          opCodeByte = in.readByte(); // op code
-        } catch (EOFException e) {
-          return HdfsServerConstants.INVALID_TXID;
-        }
-
-        FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-        if (opCode == OP_INVALID) {
-          verifyTerminator();
-          return HdfsServerConstants.INVALID_TXID;
-        }
-
-        int length = in.readInt(); // read the length of the op
-        long txid = in.readLong(); // read the txid
-
-        // skip the remaining content
-        IOUtils.skipFully(in, length - 8); 
-        // TODO: do we want to verify checksum for JN? For now we don't.
-        return txid;
-      } else {
-        FSEditLogOp op = decodeOp();
-        return op == null ? HdfsServerConstants.INVALID_TXID : op.getTransactionId();
-      }
-    }
-
-    /**
-     * Validate a transaction's checksum
-     */
-    private void validateChecksum(DataInputStream in,
-                                  Checksum checksum,
-                                  long txid)
-        throws IOException {
-      if (checksum != null) {
-        int calculatedChecksum = (int)checksum.getValue();
-        int readChecksum = in.readInt(); // read in checksum
-        if (readChecksum != calculatedChecksum) {
-          throw new ChecksumException(
-              "Transaction is corrupt. Calculated checksum is " +
-              calculatedChecksum + " but read checksum " + readChecksum, txid);
-        }
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
+        throw new IOException("Can't scan a pre-transactional edit log.");
       }
       }
+      FSEditLogOp op = decodeOp();
+      return op == null ?
+          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
     }
     }
   }
   }
 
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -875,7 +875,7 @@ public class TestEditLog {
       tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
       tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
       in = new DataInputStream(tracker);
       in = new DataInputStream(tracker);
             
             
-      reader = new FSEditLogOp.Reader(in, tracker, version);
+      reader = FSEditLogOp.Reader.create(in, tracker, version);
     }
     }
   
   
     @Override
     @Override

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java

@@ -24,19 +24,35 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URL;
 import java.util.EnumMap;
 import java.util.EnumMap;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.util.Holder;
 import org.apache.hadoop.hdfs.util.Holder;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.Mockito;
 
 
 public class TestEditLogFileInputStream {
 public class TestEditLogFileInputStream {
+  private static final Log LOG =
+      LogFactory.getLog(TestEditLogFileInputStream.class);
   private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
   private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
 
 
+  private final static File TEST_DIR = PathUtils
+      .getTestDir(TestEditLogFileInputStream.class);
+
   @Test
   @Test
   public void testReadURL() throws Exception {
   public void testReadURL() throws Exception {
     HttpURLConnection conn = mock(HttpURLConnection.class);
     HttpURLConnection conn = mock(HttpURLConnection.class);
@@ -62,4 +78,68 @@ public class TestEditLogFileInputStream {
     assertEquals(FAKE_LOG_DATA.length, elis.length());
     assertEquals(FAKE_LOG_DATA.length, elis.length());
     elis.close();
     elis.close();
   }
   }
+
+  /**
+   * Regression test for HDFS-8965 which verifies that
+   * FSEditLogFileInputStream#scanOp verifies Op checksums.
+   */
+  @Test(timeout=60000)
+  public void testScanCorruptEditLog() throws Exception {
+    Configuration conf = new Configuration();
+    File editLog = new File(System.getProperty(
+        "test.build.data", "/tmp"), "testCorruptEditLog");
+
+    LOG.debug("Creating test edit log file: " + editLog);
+    EditLogFileOutputStream elos = new EditLogFileOutputStream(conf,
+        editLog.getAbsoluteFile(), 8192);
+    elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
+    FSEditLogOp.MkdirOp mkdirOp = FSEditLogOp.MkdirOp.getInstance(cache);
+    mkdirOp.reset();
+    mkdirOp.setRpcCallId(123);
+    mkdirOp.setTransactionId(1);
+    mkdirOp.setInodeId(789L);
+    mkdirOp.setPath("/mydir");
+    PermissionStatus perms = PermissionStatus.createImmutable(
+        "myuser", "mygroup", FsPermission.createImmutable((short)0777));
+    mkdirOp.setPermissionStatus(perms);
+    elos.write(mkdirOp);
+    mkdirOp.reset();
+    mkdirOp.setRpcCallId(456);
+    mkdirOp.setTransactionId(2);
+    mkdirOp.setInodeId(123L);
+    mkdirOp.setPath("/mydir2");
+    perms = PermissionStatus.createImmutable(
+        "myuser", "mygroup", FsPermission.createImmutable((short)0666));
+    mkdirOp.setPermissionStatus(perms);
+    elos.write(mkdirOp);
+    elos.setReadyToFlush();
+    elos.flushAndSync(false);
+    elos.close();
+    long fileLen = editLog.length();
+
+    LOG.debug("Corrupting last 4 bytes of edit log file " + editLog +
+        ", whose length is " + fileLen);
+    RandomAccessFile rwf = new RandomAccessFile(editLog, "rw");
+    rwf.seek(fileLen - 4);
+    int b = rwf.readInt();
+    rwf.seek(fileLen - 4);
+    rwf.writeInt(b + 1);
+    rwf.close();
+
+    EditLogFileInputStream elis = new EditLogFileInputStream(editLog);
+    Assert.assertEquals(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+        elis.getVersion(true));
+    Assert.assertEquals(1, elis.scanNextOp());
+    LOG.debug("Read transaction 1 from " + editLog);
+    try {
+      elis.scanNextOp();
+      Assert.fail("Expected scanNextOp to fail when op checksum was corrupt.");
+    } catch (IOException e) {
+      LOG.debug("Caught expected checksum error when reading corrupt " +
+          "transaction 2", e);
+      GenericTestUtils.assertExceptionContains("Transaction is corrupt.", e);
+    }
+    elis.close();
+  }
 }
 }