|
@@ -305,6 +305,87 @@ public class TestEditLog {
|
|
|
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMultiStreamsLoadEditWithConfMaxTxns()
|
|
|
+ throws IOException {
|
|
|
+ Configuration conf = getConf();
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ FileSystem fileSystem = null;
|
|
|
+ FSImage writeFsImage = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster
|
|
|
+ .Builder(conf)
|
|
|
+ .numDataNodes(NUM_DATA_NODES)
|
|
|
+ .build();
|
|
|
+ cluster.waitActive();
|
|
|
+ fileSystem = cluster.getFileSystem();
|
|
|
+ final FSNamesystem namesystem = cluster.getNamesystem();
|
|
|
+ writeFsImage = namesystem.getFSImage();
|
|
|
+ for (Iterator<URI> it = cluster.getNameDirs(0)
|
|
|
+ .iterator(); it.hasNext();) {
|
|
|
+ File dir = new File(it.next().getPath());
|
|
|
+ System.out.println(dir);
|
|
|
+ }
|
|
|
+ // Roll log so new output buffer size takes effect
|
|
|
+ // we should now be writing to edits_inprogress_3
|
|
|
+ long originalLastInodeId = namesystem.dir.getLastInodeId();
|
|
|
+ // Reopen some files as for append
|
|
|
+ Transactions trans = new Transactions(
|
|
|
+ namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
|
|
+ trans.run();
|
|
|
+ // Roll another time to finalize edits_inprogress_3
|
|
|
+ writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
|
|
+ Transactions trans1 = new Transactions(
|
|
|
+ namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
|
|
+ trans1.run();
|
|
|
+ writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
|
|
+ namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId);
|
|
|
+ for(Iterator<StorageDirectory> it = writeFsImage.getStorage().
|
|
|
+ dirIterator(NameNodeDirType.EDITS); it.hasNext();){
|
|
|
+ long expectedTxns = (2 * NUM_TRANSACTIONS) + 2;
|
|
|
+ File editFile = NNStorage.getFinalizedEditsFile(it.next(),
|
|
|
+ 1, expectedTxns);
|
|
|
+ File editFile1 = NNStorage.getFinalizedEditsFile(it.next(),
|
|
|
+ 203, 404);
|
|
|
+ assertTrue("Expect " + editFile + " exists", editFile.exists());
|
|
|
+ assertTrue("Expect " + editFile1 + " exists", editFile1.exists());
|
|
|
+ EditLogFileInputStream editLogFileInputStream1 =
|
|
|
+ new EditLogFileInputStream(editFile, 1, 202, false);
|
|
|
+ EditLogFileInputStream editLogFileInputStream2 =
|
|
|
+ new EditLogFileInputStream(editFile1, 203, 404, false);
|
|
|
+ List<EditLogInputStream> editStreams = Lists.newArrayList();
|
|
|
+ editStreams.add(editLogFileInputStream1);
|
|
|
+ editStreams.add(editLogFileInputStream2);
|
|
|
+ FSImage readFsImage = new FSImage(conf);
|
|
|
+ try {
|
|
|
+ readFsImage.loadEdits(editStreams, namesystem, 100, null, null);
|
|
|
+ } catch (Exception e){
|
|
|
+ LOG.error("There appears to be an out-of-order edit in the edit log",
|
|
|
+ e.getMessage());
|
|
|
+ fail("no exception should be thrown");
|
|
|
+ } finally {
|
|
|
+ if (readFsImage != null) {
|
|
|
+ readFsImage.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ if(fileSystem != null) {
|
|
|
+ fileSystem.close();
|
|
|
+ }
|
|
|
+ if(cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ if(writeFsImage != null){
|
|
|
+ writeFsImage.close();
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Couldn't shut down cleanly", t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Simple test for writing to and rolling the edit log.
|
|
|
*/
|