|
@@ -41,6 +41,7 @@ import java.security.MessageDigest;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -79,8 +80,10 @@ import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.log4j.Level;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.mockito.internal.stubbing.answers.ThrowsException;
|
|
@@ -842,6 +845,8 @@ public class TestDFSClientRetries {
|
|
|
final Path dir = new Path("/testNamenodeRestart");
|
|
|
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
|
|
+ conf.setInt(MiniDFSCluster.DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 5000);
|
|
|
|
|
|
final short numDatanodes = 3;
|
|
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
@@ -864,11 +869,38 @@ public class TestDFSClientRetries {
|
|
|
final FileStatus s1 = fs.getFileStatus(file1);
|
|
|
assertEquals(length, s1.getLen());
|
|
|
|
|
|
+ //create file4, write some data but not close
|
|
|
+ final Path file4 = new Path(dir, "file4");
|
|
|
+ final FSDataOutputStream out4 = fs.create(file4, false, 4096,
|
|
|
+ fs.getDefaultReplication(file4), 1024L, null);
|
|
|
+ final byte[] bytes = new byte[1000];
|
|
|
+ new Random().nextBytes(bytes);
|
|
|
+ out4.write(bytes);
|
|
|
+ out4.write(bytes);
|
|
|
+ out4.hflush();
|
|
|
+
|
|
|
//shutdown namenode
|
|
|
assertTrue(HdfsUtils.isHealthy(uri));
|
|
|
cluster.shutdownNameNode(0);
|
|
|
assertFalse(HdfsUtils.isHealthy(uri));
|
|
|
|
|
|
+ //namenode is down, continue writing file4 in a thread
|
|
|
+ final Thread file4thread = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ //write some more data and then close the file
|
|
|
+ out4.write(bytes);
|
|
|
+ out4.write(bytes);
|
|
|
+ out4.write(bytes);
|
|
|
+ out4.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ exceptions.add(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ file4thread.start();
|
|
|
+
|
|
|
//namenode is down, read the file in a thread
|
|
|
final Thread reader = new Thread(new Runnable() {
|
|
|
@Override
|
|
@@ -927,10 +959,26 @@ public class TestDFSClientRetries {
|
|
|
|
|
|
//check file1 and file3
|
|
|
thread.join();
|
|
|
+ assertEmpty(exceptions);
|
|
|
assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
|
|
|
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
|
|
|
|
|
|
reader.join();
|
|
|
+ assertEmpty(exceptions);
|
|
|
+
|
|
|
+ //check file4
|
|
|
+ file4thread.join();
|
|
|
+ assertEmpty(exceptions);
|
|
|
+ {
|
|
|
+ final FSDataInputStream in = fs.open(file4);
|
|
|
+ int count = 0;
|
|
|
+ for(int r; (r = in.read()) != -1; count++) {
|
|
|
+ Assert.assertEquals(String.format("count=%d", count),
|
|
|
+ bytes[count % bytes.length], (byte)r);
|
|
|
+ }
|
|
|
+ Assert.assertEquals(5 * bytes.length, count);
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
|
|
|
//enter safe mode
|
|
|
assertTrue(HdfsUtils.isHealthy(uri));
|
|
@@ -970,18 +1018,27 @@ public class TestDFSClientRetries {
|
|
|
LOG.info("GOOD!", fnfe);
|
|
|
}
|
|
|
|
|
|
- if (!exceptions.isEmpty()) {
|
|
|
- LOG.error("There are " + exceptions.size() + " exception(s):");
|
|
|
- for(int i = 0; i < exceptions.size(); i++) {
|
|
|
- LOG.error("Exception " + i, exceptions.get(i));
|
|
|
- }
|
|
|
- fail();
|
|
|
- }
|
|
|
+ assertEmpty(exceptions);
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static void assertEmpty(final List<Exception> exceptions) {
|
|
|
+ if (!exceptions.isEmpty()) {
|
|
|
+ final StringBuilder b = new StringBuilder("There are ")
|
|
|
+ .append(exceptions.size())
|
|
|
+ .append(" exception(s):");
|
|
|
+ for(int i = 0; i < exceptions.size(); i++) {
|
|
|
+ b.append("\n Exception ")
|
|
|
+ .append(i)
|
|
|
+ .append(": ")
|
|
|
+ .append(StringUtils.stringifyException(exceptions.get(i)));
|
|
|
+ }
|
|
|
+ fail(b.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static FileSystem createFsWithDifferentUsername(
|
|
|
final Configuration conf, final boolean isWebHDFS
|
|
|
) throws IOException, InterruptedException {
|