|
@@ -66,6 +66,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
@@ -83,6 +85,9 @@ import org.apache.hadoop.test.PathUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.log4j.Level;
|
|
|
+import org.apache.log4j.AppenderSkeleton;
|
|
|
+import org.apache.log4j.LogManager;
|
|
|
+import org.apache.log4j.spi.LoggingEvent;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.xml.sax.ContentHandler;
|
|
@@ -1223,7 +1228,8 @@ public class TestEditLog {
|
|
|
TXNS_PER_ROLL*11);
|
|
|
|
|
|
for (EditLogInputStream edits : editStreams) {
|
|
|
- FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
|
|
|
+ FSEditLogLoader.EditLogValidation val =
|
|
|
+ FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE);
|
|
|
long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
|
|
|
LOG.info("Loading edits " + edits + " read " + read);
|
|
|
assertEquals(startTxId, edits.getFirstTxId());
|
|
@@ -1573,4 +1579,99 @@ public class TestEditLog {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ class TestAppender extends AppenderSkeleton {
|
|
|
+ private final List<LoggingEvent> log = new ArrayList<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean requiresLayout() {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void append(final LoggingEvent loggingEvent) {
|
|
|
+ log.add(loggingEvent);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<LoggingEvent> getLog() {
|
|
|
+ return new ArrayList<>(log);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testReadActivelyUpdatedLog() throws Exception {
|
|
|
+ final TestAppender appender = new TestAppender();
|
|
|
+ LogManager.getRootLogger().addAppender(appender);
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
|
|
+ // Set single handler thread, so all transactions hit same thread-local ops.
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1);
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ FSImage fsimage = cluster.getNamesystem().getFSImage();
|
|
|
+ StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
|
|
+
|
|
|
+ final DistributedFileSystem fileSys = cluster.getFileSystem();
|
|
|
+ DFSInotifyEventInputStream events = fileSys.getInotifyEventStream();
|
|
|
+ fileSys.mkdirs(new Path("/test"));
|
|
|
+ fileSys.mkdirs(new Path("/test/dir1"));
|
|
|
+ fileSys.delete(new Path("/test/dir1"), true);
|
|
|
+ fsimage.getEditLog().logSync();
|
|
|
+ fileSys.mkdirs(new Path("/test/dir2"));
|
|
|
+
|
|
|
+
|
|
|
+ final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1);
|
|
|
+ assertTrue(inProgressEdit.exists());
|
|
|
+ EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit);
|
|
|
+ FSEditLogOp op;
|
|
|
+ long pos = 0;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ op = elis.readOp();
|
|
|
+ if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) {
|
|
|
+ pos = elis.getPosition();
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ elis.close();
|
|
|
+ assertTrue(pos > 0);
|
|
|
+
|
|
|
+ RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw");
|
|
|
+ rwf.seek(pos);
|
|
|
+ assertEquals(rwf.readByte(), (byte) -1);
|
|
|
+
|
|
|
+ rwf.seek(pos + 1);
|
|
|
+ rwf.writeByte(2);
|
|
|
+
|
|
|
+ rwf.close();
|
|
|
+
|
|
|
+ events.poll();
|
|
|
+ String pattern = "Caught exception after reading (.*) ops";
|
|
|
+ Pattern r = Pattern.compile(pattern);
|
|
|
+ final List<LoggingEvent> log = appender.getLog();
|
|
|
+ for (LoggingEvent event : log) {
|
|
|
+ Matcher m = r.matcher(event.getRenderedMessage());
|
|
|
+ if (m.find()) {
|
|
|
+ fail("Should not try to read past latest syned edit log op");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ LogManager.getRootLogger().removeAppender(appender);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|