|
@@ -41,10 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
-import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
-import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
@@ -69,6 +65,9 @@ public class TestAppendSnapshotTruncate {
|
|
|
private static final int BLOCK_SIZE = 1024;
|
|
|
private static final int DATANODE_NUM = 3;
|
|
|
private static final short REPLICATION = 3;
|
|
|
+ private static final int FILE_WORKER_NUM = 3;
|
|
|
+ private static final long TEST_TIME_SECOND = 10;
|
|
|
+ private static final long TEST_TIMEOUT_SECOND = TEST_TIME_SECOND + 60;
|
|
|
|
|
|
static final int SHORT_HEARTBEAT = 1;
|
|
|
static final String[] EMPTY_STRINGS = {};
|
|
@@ -106,7 +105,7 @@ public class TestAppendSnapshotTruncate {
|
|
|
|
|
|
|
|
|
/** Test randomly mixing append, snapshot and truncate operations. */
|
|
|
- @Test
|
|
|
+ @Test(timeout=TEST_TIMEOUT_SECOND*1000)
|
|
|
public void testAST() throws Exception {
|
|
|
final String dirPathString = "/dir";
|
|
|
final Path dir = new Path(dirPathString);
|
|
@@ -121,12 +120,12 @@ public class TestAppendSnapshotTruncate {
|
|
|
}
|
|
|
localDir.mkdirs();
|
|
|
|
|
|
- final DirWorker w = new DirWorker(dir, localDir, 3);
|
|
|
+ final DirWorker w = new DirWorker(dir, localDir, FILE_WORKER_NUM);
|
|
|
w.startAllFiles();
|
|
|
w.start();
|
|
|
- Worker.sleep(10L*1000);
|
|
|
+ Worker.sleep(TEST_TIME_SECOND * 1000);
|
|
|
w.stop();
|
|
|
- w.stoptAllFiles();
|
|
|
+ w.stopAllFiles();
|
|
|
w.checkEverything();
|
|
|
}
|
|
|
|
|
@@ -259,7 +258,7 @@ public class TestAppendSnapshotTruncate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void stoptAllFiles() throws InterruptedException {
|
|
|
+ void stopAllFiles() throws InterruptedException {
|
|
|
for(FileWorker f : files) {
|
|
|
f.stop();
|
|
|
}
|
|
@@ -269,12 +268,12 @@ public class TestAppendSnapshotTruncate {
|
|
|
LOG.info("checkEverything");
|
|
|
for(FileWorker f : files) {
|
|
|
f.checkFullFile();
|
|
|
- Preconditions.checkState(f.state.get() != State.ERROR);
|
|
|
+ f.checkErrorState();
|
|
|
}
|
|
|
for(String snapshot : snapshotPaths.keySet()) {
|
|
|
checkSnapshot(snapshot);
|
|
|
}
|
|
|
- Preconditions.checkState(state.get() != State.ERROR);
|
|
|
+ checkErrorState();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -364,7 +363,7 @@ public class TestAppendSnapshotTruncate {
|
|
|
b.append(", newLength=").append(newLength)
|
|
|
.append(", isReady=").append(isReady);
|
|
|
if (!isReady) {
|
|
|
- TestFileTruncate.checkBlockRecovery(file, dfs);
|
|
|
+ TestFileTruncate.checkBlockRecovery(file, dfs, 100, 300L);
|
|
|
}
|
|
|
return isReady;
|
|
|
}
|
|
@@ -407,6 +406,7 @@ public class TestAppendSnapshotTruncate {
|
|
|
IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
|
|
|
|
|
|
final boolean isTerminated;
|
|
|
+
|
|
|
State(boolean isTerminated) {
|
|
|
this.isTerminated = isTerminated;
|
|
|
}
|
|
@@ -416,11 +416,29 @@ public class TestAppendSnapshotTruncate {
|
|
|
final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
|
|
|
final AtomicBoolean isCalling = new AtomicBoolean();
|
|
|
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
|
|
|
-
|
|
|
+
|
|
|
+ private Throwable thrown = null;
|
|
|
+
|
|
|
Worker(String name) {
|
|
|
this.name = name;
|
|
|
}
|
|
|
|
|
|
+ State checkErrorState() {
|
|
|
+ final State s = state.get();
|
|
|
+ if (s == State.ERROR) {
|
|
|
+ throw new IllegalStateException(name + " has " + s, thrown);
|
|
|
+ }
|
|
|
+ return s;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setErrorState(Throwable t) {
|
|
|
+ checkErrorState();
|
|
|
+
|
|
|
+ LOG.error("Worker " + name + " failed.", t);
|
|
|
+ state.set(State.ERROR);
|
|
|
+ thrown = t;
|
|
|
+ }
|
|
|
+
|
|
|
void start() {
|
|
|
Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
|
|
|
|
|
@@ -429,14 +447,13 @@ public class TestAppendSnapshotTruncate {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
final Random r = DFSUtil.getRandom();
|
|
|
- for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
|
|
|
+ for(State s; !(s = checkErrorState()).isTerminated;) {
|
|
|
if (s == State.RUNNING) {
|
|
|
isCalling.set(true);
|
|
|
try {
|
|
|
LOG.info(call());
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Worker " + name + " failed.", e);
|
|
|
- state.set(State.ERROR);
|
|
|
+ } catch(Throwable t) {
|
|
|
+ setErrorState(t);
|
|
|
return;
|
|
|
}
|
|
|
isCalling.set(false);
|
|
@@ -451,7 +468,11 @@ public class TestAppendSnapshotTruncate {
|
|
|
}
|
|
|
|
|
|
boolean isPaused() {
|
|
|
- return state.get() == State.IDLE && !isCalling.get();
|
|
|
+ final State s = checkErrorState();
|
|
|
+ if (s == State.STOPPED) {
|
|
|
+ throw new IllegalStateException(name + " is " + s);
|
|
|
+ }
|
|
|
+ return s == State.IDLE && !isCalling.get();
|
|
|
}
|
|
|
|
|
|
void pause() {
|
|
@@ -459,9 +480,7 @@ public class TestAppendSnapshotTruncate {
|
|
|
}
|
|
|
|
|
|
void stop() throws InterruptedException {
|
|
|
- if (state.get() == State.ERROR) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ checkErrorState();
|
|
|
|
|
|
state.set(State.STOPPED);
|
|
|
thread.get().join();
|