|
@@ -0,0 +1,478 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.hdfs;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileFilter;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.RandomAccessFile;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.junit.AfterClass;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test randomly mixing append, snapshot and truncate operations.
|
|
|
+ * Use local file system to simulate the each operation and verify
|
|
|
+ * the correctness.
|
|
|
+ */
|
|
|
+public class TestAppendSnapshotTruncate {
|
|
|
+ static {
|
|
|
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
|
|
|
+ }
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestAppendSnapshotTruncate.class);
|
|
|
+ private static final int BLOCK_SIZE = 1024;
|
|
|
+ private static final int DATANODE_NUM = 3;
|
|
|
+ private static final short REPLICATION = 3;
|
|
|
+
|
|
|
+ static final int SHORT_HEARTBEAT = 1;
|
|
|
+ static final String[] EMPTY_STRINGS = {};
|
|
|
+
|
|
|
+ static Configuration conf;
|
|
|
+ static MiniDFSCluster cluster;
|
|
|
+ static DistributedFileSystem dfs;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void startUp() throws IOException {
|
|
|
+ conf = new HdfsConfiguration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
|
|
|
+ conf.setLong(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .format(true)
|
|
|
+ .numDataNodes(DATANODE_NUM)
|
|
|
+ .nameNodePort(NameNode.DEFAULT_PORT)
|
|
|
+ .waitSafeMode(true)
|
|
|
+ .build();
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void tearDown() throws IOException {
|
|
|
+ if(dfs != null) {
|
|
|
+ dfs.close();
|
|
|
+ }
|
|
|
+ if(cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /** Test randomly mixing append, snapshot and truncate operations. */
|
|
|
+ @Test
|
|
|
+ public void testAST() throws Exception {
|
|
|
+ final String dirPathString = "/dir";
|
|
|
+ final Path dir = new Path(dirPathString);
|
|
|
+ dfs.mkdirs(dir);
|
|
|
+ dfs.allowSnapshot(dir);
|
|
|
+
|
|
|
+ final File localDir = new File(
|
|
|
+ System.getProperty("test.build.data", "target/test/data")
|
|
|
+ + dirPathString);
|
|
|
+ if (localDir.exists()) {
|
|
|
+ FileUtil.fullyDelete(localDir);
|
|
|
+ }
|
|
|
+ localDir.mkdirs();
|
|
|
+
|
|
|
+ final DirWorker w = new DirWorker(dir, localDir, 3);
|
|
|
+ w.startAllFiles();
|
|
|
+ w.start();
|
|
|
+ Worker.sleep(10L*1000);
|
|
|
+ w.stop();
|
|
|
+ w.stoptAllFiles();
|
|
|
+ w.checkEverything();
|
|
|
+ }
|
|
|
+
|
|
|
+ static final FileFilter FILE_ONLY = new FileFilter() {
|
|
|
+ @Override
|
|
|
+ public boolean accept(File f) {
|
|
|
+ return f.isFile();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ static class DirWorker extends Worker {
|
|
|
+ final Path dir;
|
|
|
+ final File localDir;
|
|
|
+
|
|
|
+ final FileWorker[] files;
|
|
|
+
|
|
|
+ private Map<String, Path> snapshotPaths = new HashMap<String, Path>();
|
|
|
+ private AtomicInteger snapshotCount = new AtomicInteger();
|
|
|
+
|
|
|
+ DirWorker(Path dir, File localDir, int nFiles) throws IOException {
|
|
|
+ super(dir.getName());
|
|
|
+ this.dir = dir;
|
|
|
+ this.localDir = localDir;
|
|
|
+
|
|
|
+ this.files = new FileWorker[nFiles];
|
|
|
+ for(int i = 0; i < files.length; i++) {
|
|
|
+ files[i] = new FileWorker(dir, localDir, String.format("file%02d", i));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getSnapshotName(int n) {
|
|
|
+ return String.format("s%02d", n);
|
|
|
+ }
|
|
|
+
|
|
|
+ String createSnapshot(String snapshot) throws IOException {
|
|
|
+ final StringBuilder b = new StringBuilder("createSnapshot: ")
|
|
|
+ .append(snapshot).append(" for ").append(dir);
|
|
|
+
|
|
|
+ {
|
|
|
+ //copy all local files to a sub dir to simulate snapshot.
|
|
|
+ final File subDir = new File(localDir, snapshot);
|
|
|
+ Assert.assertFalse(subDir.exists());
|
|
|
+ subDir.mkdir();
|
|
|
+
|
|
|
+ for(File f : localDir.listFiles(FILE_ONLY)) {
|
|
|
+ FileUtils.copyFile(f, new File(subDir, f.getName()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ final Path p = dfs.createSnapshot(dir, snapshot);
|
|
|
+ snapshotPaths.put(snapshot, p);
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String checkSnapshot(String snapshot) throws IOException {
|
|
|
+ final StringBuilder b = new StringBuilder("checkSnapshot: ")
|
|
|
+ .append(snapshot);
|
|
|
+
|
|
|
+ final File subDir = new File(localDir, snapshot);
|
|
|
+ Assert.assertTrue(subDir.exists());
|
|
|
+
|
|
|
+ final File[] localFiles = subDir.listFiles(FILE_ONLY);
|
|
|
+ final Path p = snapshotPaths.get(snapshot);
|
|
|
+ final FileStatus[] statuses = dfs.listStatus(p);
|
|
|
+ Assert.assertEquals(localFiles.length, statuses.length);
|
|
|
+ b.append(p).append(" vs ").append(subDir).append(", ")
|
|
|
+ .append(statuses.length).append(" entries");
|
|
|
+
|
|
|
+ Arrays.sort(localFiles);
|
|
|
+ Arrays.sort(statuses);
|
|
|
+ for(int i = 0; i < statuses.length; i++) {
|
|
|
+ FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]);
|
|
|
+ }
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String deleteSnapshot(String snapshot) throws IOException {
|
|
|
+ final StringBuilder b = new StringBuilder("deleteSnapshot: ")
|
|
|
+ .append(snapshot).append(" from ").append(dir);
|
|
|
+ FileUtil.fullyDelete(new File(localDir, snapshot));
|
|
|
+ dfs.deleteSnapshot(dir, snapshot);
|
|
|
+ snapshotPaths.remove(snapshot);
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String call() throws Exception {
|
|
|
+ final Random r = DFSUtil.getRandom();
|
|
|
+ final int op = r.nextInt(6);
|
|
|
+ if (op <= 1) {
|
|
|
+ pauseAllFiles();
|
|
|
+ try {
|
|
|
+ final String snapshot = getSnapshotName(snapshotCount.getAndIncrement());
|
|
|
+ return createSnapshot(snapshot);
|
|
|
+ } finally {
|
|
|
+ startAllFiles();
|
|
|
+ }
|
|
|
+ } else if (op <= 3) {
|
|
|
+ final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS);
|
|
|
+ if (keys.length == 0) {
|
|
|
+ return "NO-OP";
|
|
|
+ }
|
|
|
+ final String snapshot = keys[r.nextInt(keys.length)];
|
|
|
+ final String s = checkSnapshot(snapshot);
|
|
|
+
|
|
|
+ if (op == 2) {
|
|
|
+ return deleteSnapshot(snapshot);
|
|
|
+ }
|
|
|
+ return s;
|
|
|
+ } else {
|
|
|
+ return "NO-OP";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void pauseAllFiles() {
|
|
|
+ for(FileWorker f : files) {
|
|
|
+ f.pause();
|
|
|
+ }
|
|
|
+
|
|
|
+ for(int i = 0; i < files.length; ) {
|
|
|
+ sleep(100);
|
|
|
+ for(; i < files.length && files[i].isPaused(); i++);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void startAllFiles() {
|
|
|
+ for(FileWorker f : files) {
|
|
|
+ f.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void stoptAllFiles() throws InterruptedException {
|
|
|
+ for(FileWorker f : files) {
|
|
|
+ f.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void checkEverything() throws IOException {
|
|
|
+ LOG.info("checkEverything");
|
|
|
+ for(FileWorker f : files) {
|
|
|
+ f.checkFullFile();
|
|
|
+ Preconditions.checkState(f.state.get() != State.ERROR);
|
|
|
+ }
|
|
|
+ for(String snapshot : snapshotPaths.keySet()) {
|
|
|
+ checkSnapshot(snapshot);
|
|
|
+ }
|
|
|
+ Preconditions.checkState(state.get() != State.ERROR);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class FileWorker extends Worker {
|
|
|
+ final Path file;
|
|
|
+ final File localFile;
|
|
|
+
|
|
|
+ FileWorker(Path dir, File localDir, String filename) throws IOException {
|
|
|
+ super(filename);
|
|
|
+ this.file = new Path(dir, filename);
|
|
|
+ this.localFile = new File(localDir, filename);
|
|
|
+
|
|
|
+ localFile.createNewFile();
|
|
|
+ dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String call() throws IOException {
|
|
|
+ final Random r = DFSUtil.getRandom();
|
|
|
+ final int op = r.nextInt(9);
|
|
|
+ if (op == 0) {
|
|
|
+ return checkFullFile();
|
|
|
+ } else {
|
|
|
+ final int nBlocks = r.nextInt(4) + 1;
|
|
|
+ final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1;
|
|
|
+ final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
|
|
|
+
|
|
|
+ if (op <= 4) {
|
|
|
+ return append(nBytes);
|
|
|
+ } else if (op <= 6) {
|
|
|
+ return truncateArbitrarily(nBytes);
|
|
|
+ } else {
|
|
|
+ return truncateToBlockBoundary(nBlocks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ String append(int n) throws IOException {
|
|
|
+ final StringBuilder b = new StringBuilder("append ")
|
|
|
+ .append(n).append(" bytes to ").append(file.getName());
|
|
|
+
|
|
|
+ final byte[] bytes = new byte[n];
|
|
|
+ DFSUtil.getRandom().nextBytes(bytes);
|
|
|
+
|
|
|
+ { // write to local file
|
|
|
+ final FileOutputStream out = new FileOutputStream(localFile, true);
|
|
|
+ out.write(bytes, 0, bytes.length);
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ final FSDataOutputStream out = dfs.append(file);
|
|
|
+ out.write(bytes, 0, bytes.length);
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String truncateArbitrarily(int nBytes) throws IOException {
|
|
|
+ Preconditions.checkArgument(nBytes > 0);
|
|
|
+ final int length = checkLength();
|
|
|
+ final StringBuilder b = new StringBuilder("truncateArbitrarily: ")
|
|
|
+ .append(nBytes).append(" bytes from ").append(file.getName())
|
|
|
+ .append(", length=" + length);
|
|
|
+
|
|
|
+ truncate(length > nBytes? length - nBytes: 0, b);
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String truncateToBlockBoundary(int nBlocks) throws IOException {
|
|
|
+ Preconditions.checkArgument(nBlocks > 0);
|
|
|
+ final int length = checkLength();
|
|
|
+ final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ")
|
|
|
+ .append(nBlocks).append(" blocks from ").append(file.getName())
|
|
|
+ .append(", length=" + length);
|
|
|
+ final int n = (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE);
|
|
|
+ Preconditions.checkState(truncate(length > n? length - n: 0, b), b);
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean truncate(long newLength, StringBuilder b) throws IOException {
|
|
|
+ final RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
|
|
|
+ raf.setLength(newLength);
|
|
|
+ raf.close();
|
|
|
+
|
|
|
+ final boolean isReady = dfs.truncate(file, newLength);
|
|
|
+ b.append(", newLength=").append(newLength)
|
|
|
+ .append(", isReady=").append(isReady);
|
|
|
+ if (!isReady) {
|
|
|
+ TestFileTruncate.checkBlockRecovery(file, dfs);
|
|
|
+ }
|
|
|
+ return isReady;
|
|
|
+ }
|
|
|
+
|
|
|
+ int checkLength() throws IOException {
|
|
|
+ return checkLength(file, localFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ static int checkLength(Path file, File localFile) throws IOException {
|
|
|
+ final long length = dfs.getFileStatus(file).getLen();
|
|
|
+ Assert.assertEquals(localFile.length(), length);
|
|
|
+ Assert.assertTrue(length <= Integer.MAX_VALUE);
|
|
|
+ return (int)length;
|
|
|
+ }
|
|
|
+
|
|
|
+ String checkFullFile() throws IOException {
|
|
|
+ return checkFullFile(file, localFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ static String checkFullFile(Path file, File localFile) throws IOException {
|
|
|
+ final StringBuilder b = new StringBuilder("checkFullFile: ")
|
|
|
+ .append(file.getName()).append(" vs ").append(localFile);
|
|
|
+ final byte[] bytes = new byte[checkLength(file, localFile)];
|
|
|
+ b.append(", length=").append(bytes.length);
|
|
|
+
|
|
|
+ final FileInputStream in = new FileInputStream(localFile);
|
|
|
+ for(int n = 0; n < bytes.length; ) {
|
|
|
+ n += in.read(bytes, n, bytes.length - n);
|
|
|
+ }
|
|
|
+ in.close();
|
|
|
+
|
|
|
+ AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes,
|
|
|
+ "File content mismatch: " + b, false);
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static abstract class Worker implements Callable<String> {
|
|
|
+ enum State {
|
|
|
+ IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
|
|
|
+
|
|
|
+ final boolean isTerminated;
|
|
|
+ State(boolean isTerminated) {
|
|
|
+ this.isTerminated = isTerminated;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ final String name;
|
|
|
+ final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
|
|
|
+ final AtomicBoolean isCalling = new AtomicBoolean();
|
|
|
+ final AtomicReference<Thread> thread = new AtomicReference<Thread>();
|
|
|
+
|
|
|
+ Worker(String name) {
|
|
|
+ this.name = name;
|
|
|
+ }
|
|
|
+
|
|
|
+ void start() {
|
|
|
+ Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
|
|
|
+
|
|
|
+ if (thread.get() == null) {
|
|
|
+ final Thread t = new Thread(null, new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ final Random r = DFSUtil.getRandom();
|
|
|
+ for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
|
|
|
+ if (s == State.RUNNING) {
|
|
|
+ isCalling.set(true);
|
|
|
+ try {
|
|
|
+ LOG.info(call());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Worker " + name + " failed.", e);
|
|
|
+ state.set(State.ERROR);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ isCalling.set(false);
|
|
|
+ }
|
|
|
+ sleep(r.nextInt(100) + 50);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, name);
|
|
|
+ Preconditions.checkState(thread.compareAndSet(null, t));
|
|
|
+ t.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isPaused() {
|
|
|
+ return state.get() == State.IDLE && !isCalling.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ void pause() {
|
|
|
+ Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE));
|
|
|
+ }
|
|
|
+
|
|
|
+ void stop() throws InterruptedException {
|
|
|
+ if (state.get() == State.ERROR) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ state.set(State.STOPPED);
|
|
|
+ thread.get().join();
|
|
|
+ }
|
|
|
+
|
|
|
+ static void sleep(final long sleepTimeMs) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(sleepTimeMs);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|