|
@@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collection;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
@@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.junit.runner.RunWith;
|
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
|
+import org.junit.runners.Parameterized.Parameters;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.stubbing.Answer;
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
|
@@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer;
|
|
* This class tests various synchronization bugs in FSEditLog rolling
|
|
* This class tests various synchronization bugs in FSEditLog rolling
|
|
* and namespace saving.
|
|
* and namespace saving.
|
|
*/
|
|
*/
|
|
|
|
+@RunWith(Parameterized.class)
|
|
public class TestEditLogRace {
|
|
public class TestEditLogRace {
|
|
static {
|
|
static {
|
|
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
|
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
|
}
|
|
}
|
|
|
|
|
|
- private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
|
|
|
|
|
|
+ @Parameters
|
|
|
|
+ public static Collection<Object[]> data() {
|
|
|
|
+ Collection<Object[]> params = new ArrayList<Object[]>();
|
|
|
|
+ params.add(new Object[]{ false });
|
|
|
|
+ params.add(new Object[]{ true });
|
|
|
|
+ return params;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static boolean useAsyncEditLog;
|
|
|
|
|
|
- private static final String NAME_DIR =
|
|
|
|
- MiniDFSCluster.getBaseDirectory() + "name1";
|
|
|
|
|
|
+ public TestEditLogRace(boolean useAsyncEditLog) {
|
|
|
|
+ TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
|
|
|
|
|
|
// This test creates NUM_THREADS threads and each thread continuously writes
|
|
// This test creates NUM_THREADS threads and each thread continuously writes
|
|
// transactions
|
|
// transactions
|
|
@@ -94,21 +113,29 @@ public class TestEditLogRace {
|
|
* This value needs to be significantly longer than the average
|
|
* This value needs to be significantly longer than the average
|
|
* time for an fsync() or enterSafeMode().
|
|
* time for an fsync() or enterSafeMode().
|
|
*/
|
|
*/
|
|
- private static final int BLOCK_TIME = 10;
|
|
|
|
-
|
|
|
|
|
|
+ private static final int BLOCK_TIME = 4; // 4 sec pretty generous
|
|
|
|
+
|
|
//
|
|
//
|
|
// an object that does a bunch of transactions
|
|
// an object that does a bunch of transactions
|
|
//
|
|
//
|
|
static class Transactions implements Runnable {
|
|
static class Transactions implements Runnable {
|
|
final NamenodeProtocols nn;
|
|
final NamenodeProtocols nn;
|
|
|
|
+ final MiniDFSCluster cluster;
|
|
|
|
+ FileSystem fs;
|
|
short replication = 3;
|
|
short replication = 3;
|
|
long blockSize = 64;
|
|
long blockSize = 64;
|
|
volatile boolean stopped = false;
|
|
volatile boolean stopped = false;
|
|
volatile Thread thr;
|
|
volatile Thread thr;
|
|
final AtomicReference<Throwable> caught;
|
|
final AtomicReference<Throwable> caught;
|
|
|
|
|
|
- Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
|
|
|
|
- nn = ns;
|
|
|
|
|
|
+ Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) {
|
|
|
|
+ this.cluster = cluster;
|
|
|
|
+ this.nn = cluster.getNameNodeRpc();
|
|
|
|
+ try {
|
|
|
|
+ this.fs = cluster.getFileSystem();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ caught.set(e);
|
|
|
|
+ }
|
|
this.caught = caught;
|
|
this.caught = caught;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -122,11 +149,23 @@ public class TestEditLogRace {
|
|
while (!stopped) {
|
|
while (!stopped) {
|
|
try {
|
|
try {
|
|
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
|
|
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
|
|
- nn.mkdirs(dirname, p, true);
|
|
|
|
- nn.delete(dirname, true);
|
|
|
|
|
|
+ if (i % 2 == 0) {
|
|
|
|
+ Path dirnamePath = new Path(dirname);
|
|
|
|
+ fs.mkdirs(dirnamePath);
|
|
|
|
+ fs.delete(dirnamePath, true);
|
|
|
|
+ } else {
|
|
|
|
+ nn.mkdirs(dirname, p, true);
|
|
|
|
+ nn.delete(dirname, true);
|
|
|
|
+ }
|
|
} catch (SafeModeException sme) {
|
|
} catch (SafeModeException sme) {
|
|
// This is OK - the tests will bring NN in and out of safemode
|
|
// This is OK - the tests will bring NN in and out of safemode
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
|
|
+ // This is OK - the tests will bring NN in and out of safemode
|
|
|
|
+ if (e instanceof RemoteException &&
|
|
|
|
+ ((RemoteException)e).getClassName()
|
|
|
|
+ .contains("SafeModeException")) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
LOG.warn("Got error in transaction thread", e);
|
|
LOG.warn("Got error in transaction thread", e);
|
|
caught.compareAndSet(null, e);
|
|
caught.compareAndSet(null, e);
|
|
break;
|
|
break;
|
|
@@ -144,11 +183,11 @@ public class TestEditLogRace {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void startTransactionWorkers(NamenodeProtocols namesystem,
|
|
|
|
|
|
+ private void startTransactionWorkers(MiniDFSCluster cluster,
|
|
AtomicReference<Throwable> caughtErr) {
|
|
AtomicReference<Throwable> caughtErr) {
|
|
// Create threads and make them run transactions concurrently.
|
|
// Create threads and make them run transactions concurrently.
|
|
for (int i = 0; i < NUM_THREADS; i++) {
|
|
for (int i = 0; i < NUM_THREADS; i++) {
|
|
- Transactions trans = new Transactions(namesystem, caughtErr);
|
|
|
|
|
|
+ Transactions trans = new Transactions(cluster, caughtErr);
|
|
new Thread(trans, "TransactionThread-" + i).start();
|
|
new Thread(trans, "TransactionThread-" + i).start();
|
|
workers.add(trans);
|
|
workers.add(trans);
|
|
}
|
|
}
|
|
@@ -174,21 +213,21 @@ public class TestEditLogRace {
|
|
@Test
|
|
@Test
|
|
public void testEditLogRolling() throws Exception {
|
|
public void testEditLogRolling() throws Exception {
|
|
// start a cluster
|
|
// start a cluster
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
|
|
|
+ Configuration conf = getConf();
|
|
|
|
+ final MiniDFSCluster cluster =
|
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
|
FileSystem fileSys = null;
|
|
FileSystem fileSys = null;
|
|
|
|
|
|
|
|
|
|
AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
|
|
AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
|
|
try {
|
|
try {
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
|
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
fileSys = cluster.getFileSystem();
|
|
fileSys = cluster.getFileSystem();
|
|
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
|
|
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
|
|
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
|
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
|
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
|
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
|
|
|
|
|
- startTransactionWorkers(nn, caughtErr);
|
|
|
|
|
|
+ startTransactionWorkers(cluster, caughtErr);
|
|
|
|
|
|
long previousLogTxId = 1;
|
|
long previousLogTxId = 1;
|
|
|
|
|
|
@@ -256,7 +295,7 @@ public class TestEditLogRace {
|
|
@Test
|
|
@Test
|
|
public void testSaveNamespace() throws Exception {
|
|
public void testSaveNamespace() throws Exception {
|
|
// start a cluster
|
|
// start a cluster
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
|
|
|
+ Configuration conf = getConf();
|
|
MiniDFSCluster cluster = null;
|
|
MiniDFSCluster cluster = null;
|
|
FileSystem fileSys = null;
|
|
FileSystem fileSys = null;
|
|
|
|
|
|
@@ -266,12 +305,11 @@ public class TestEditLogRace {
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
fileSys = cluster.getFileSystem();
|
|
fileSys = cluster.getFileSystem();
|
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
|
- final NamenodeProtocols nn = cluster.getNameNodeRpc();
|
|
|
|
|
|
|
|
FSImage fsimage = namesystem.getFSImage();
|
|
FSImage fsimage = namesystem.getFSImage();
|
|
FSEditLog editLog = fsimage.getEditLog();
|
|
FSEditLog editLog = fsimage.getEditLog();
|
|
|
|
|
|
- startTransactionWorkers(nn, caughtErr);
|
|
|
|
|
|
+ startTransactionWorkers(cluster, caughtErr);
|
|
|
|
|
|
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
|
|
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
|
|
try {
|
|
try {
|
|
@@ -321,11 +359,13 @@ public class TestEditLogRace {
|
|
|
|
|
|
private Configuration getConf() {
|
|
private Configuration getConf() {
|
|
Configuration conf = new HdfsConfiguration();
|
|
Configuration conf = new HdfsConfiguration();
|
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
|
|
|
+ useAsyncEditLog);
|
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
|
|
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
|
|
|
|
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
|
|
|
|
|
+ //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
|
|
|
|
+ //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
|
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
|
return conf;
|
|
return conf;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -389,7 +429,7 @@ public class TestEditLogRace {
|
|
@Override
|
|
@Override
|
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
LOG.info("Flush called");
|
|
LOG.info("Flush called");
|
|
- if (Thread.currentThread() == doAnEditThread) {
|
|
|
|
|
|
+ if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) {
|
|
LOG.info("edit thread: Telling main thread we made it to flush section...");
|
|
LOG.info("edit thread: Telling main thread we made it to flush section...");
|
|
// Signal to main thread that the edit thread is in the racy section
|
|
// Signal to main thread that the edit thread is in the racy section
|
|
waitToEnterFlush.countDown();
|
|
waitToEnterFlush.countDown();
|
|
@@ -457,62 +497,52 @@ public class TestEditLogRace {
|
|
|
|
|
|
try {
|
|
try {
|
|
FSImage fsimage = namesystem.getFSImage();
|
|
FSImage fsimage = namesystem.getFSImage();
|
|
- FSEditLog editLog = spy(fsimage.getEditLog());
|
|
|
|
- DFSTestUtil.setEditLogForTesting(namesystem, editLog);
|
|
|
|
|
|
+ final FSEditLog editLog = fsimage.getEditLog();
|
|
|
|
|
|
final AtomicReference<Throwable> deferredException =
|
|
final AtomicReference<Throwable> deferredException =
|
|
new AtomicReference<Throwable>();
|
|
new AtomicReference<Throwable>();
|
|
- final CountDownLatch waitToEnterSync = new CountDownLatch(1);
|
|
|
|
-
|
|
|
|
|
|
+ final CountDownLatch sleepingBeforeSync = new CountDownLatch(1);
|
|
|
|
+
|
|
final Thread doAnEditThread = new Thread() {
|
|
final Thread doAnEditThread = new Thread() {
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
- LOG.info("Starting mkdirs");
|
|
|
|
- namesystem.mkdirs("/test",
|
|
|
|
- new PermissionStatus("test","test", new FsPermission((short)00755)),
|
|
|
|
- true);
|
|
|
|
- LOG.info("mkdirs complete");
|
|
|
|
|
|
+ LOG.info("Starting setOwner");
|
|
|
|
+ namesystem.writeLock();
|
|
|
|
+ try {
|
|
|
|
+ editLog.logSetOwner("/","test","test");
|
|
|
|
+ } finally {
|
|
|
|
+ namesystem.writeUnlock();
|
|
|
|
+ }
|
|
|
|
+ sleepingBeforeSync.countDown();
|
|
|
|
+ LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
|
|
|
|
+ Thread.sleep(BLOCK_TIME*1000);
|
|
|
|
+ editLog.logSync();
|
|
|
|
+ LOG.info("edit thread: logSync complete");
|
|
} catch (Throwable ioe) {
|
|
} catch (Throwable ioe) {
|
|
LOG.fatal("Got exception", ioe);
|
|
LOG.fatal("Got exception", ioe);
|
|
deferredException.set(ioe);
|
|
deferredException.set(ioe);
|
|
- waitToEnterSync.countDown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- Answer<Void> blockingSync = new Answer<Void>() {
|
|
|
|
- @Override
|
|
|
|
- public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
- LOG.info("logSync called");
|
|
|
|
- if (Thread.currentThread() == doAnEditThread) {
|
|
|
|
- LOG.info("edit thread: Telling main thread we made it just before logSync...");
|
|
|
|
- waitToEnterSync.countDown();
|
|
|
|
- LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
|
|
|
|
- Thread.sleep(BLOCK_TIME*1000);
|
|
|
|
- LOG.info("Going through to logSync. This will allow the main thread to continue.");
|
|
|
|
|
|
+ sleepingBeforeSync.countDown();
|
|
}
|
|
}
|
|
- invocation.callRealMethod();
|
|
|
|
- LOG.info("logSync complete");
|
|
|
|
- return null;
|
|
|
|
}
|
|
}
|
|
};
|
|
};
|
|
- doAnswer(blockingSync).when(editLog).logSync();
|
|
|
|
-
|
|
|
|
|
|
+ doAnEditThread.setDaemon(true);
|
|
doAnEditThread.start();
|
|
doAnEditThread.start();
|
|
LOG.info("Main thread: waiting to just before logSync...");
|
|
LOG.info("Main thread: waiting to just before logSync...");
|
|
- waitToEnterSync.await();
|
|
|
|
|
|
+ sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS);
|
|
assertNull(deferredException.get());
|
|
assertNull(deferredException.get());
|
|
LOG.info("Main thread: detected that logSync about to be called.");
|
|
LOG.info("Main thread: detected that logSync about to be called.");
|
|
LOG.info("Trying to enter safe mode.");
|
|
LOG.info("Trying to enter safe mode.");
|
|
- LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
|
|
|
|
-
|
|
|
|
|
|
+
|
|
long st = Time.now();
|
|
long st = Time.now();
|
|
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
long et = Time.now();
|
|
long et = Time.now();
|
|
- LOG.info("Entered safe mode");
|
|
|
|
- // Make sure we really waited for the flush to complete!
|
|
|
|
- assertTrue(et - st > (BLOCK_TIME - 1)*1000);
|
|
|
|
|
|
+ LOG.info("Entered safe mode after "+(et-st)+"ms");
|
|
|
|
+
|
|
|
|
+ // Make sure we didn't wait for the thread that did a logEdit but
|
|
|
|
+ // not logSync. Going into safemode does a logSyncAll that will flush
|
|
|
|
+ // its edit.
|
|
|
|
+ assertTrue(et - st < (BLOCK_TIME/2)*1000);
|
|
|
|
|
|
// Once we're in safe mode, save namespace.
|
|
// Once we're in safe mode, save namespace.
|
|
namesystem.saveNamespace(0, 0);
|
|
namesystem.saveNamespace(0, 0);
|