|
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
|
|
|
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
@@ -27,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -37,8 +39,6 @@ import org.junit.Rule;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.rules.ExpectedException;
|
|
|
import org.mockito.Mockito;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
|
|
@@ -68,8 +68,8 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
fs.initialize(uri, conf);
|
|
|
|
|
|
// Capture logs
|
|
|
- logs = LogCapturer.captureLogs(
|
|
|
- LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
|
|
|
+ logs = LogCapturer.captureLogs(new Log4JLogger(org.apache.log4j.Logger
|
|
|
+ .getRootLogger()));
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -131,17 +131,19 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are created.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("ms with threads: " + expectedThreadsCreated));
|
|
|
+ assertInLog(content, "ms with threads: " + expectedThreadsCreated);
|
|
|
|
|
|
// Validate thread executions
|
|
|
for (int i = 0; i < expectedThreadsCreated; i++) {
|
|
|
- assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertInLog(content,
|
|
|
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
|
|
|
// Also ensure that we haven't spawned extra threads.
|
|
|
if (expectedThreadsCreated < renameThreads) {
|
|
|
for (int i = expectedThreadsCreated; i < renameThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertNotInLog(content,
|
|
|
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -158,11 +160,12 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are created.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("ms with threads: " + renameThreads));
|
|
|
+ assertInLog(content, "ms with threads: " + renameThreads);
|
|
|
|
|
|
// Validate thread executions
|
|
|
for (int i = 0; i < renameThreads; i++) {
|
|
|
- assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertInLog(content,
|
|
|
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -184,11 +187,45 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Disabling threads for Rename operation as thread count 0"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Disabling threads for Rename operation as thread count 0");
|
|
|
|
|
|
// Validate no thread executions
|
|
|
for (int i = 0; i < renameThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ String term = "AzureBlobRenameThread-"
|
|
|
+ + Thread.currentThread().getName()
|
|
|
+ + "-" + i;
|
|
|
+ assertNotInLog(content, term);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Assert that a log contains the given term.
|
|
|
+ * @param content log output
|
|
|
+ * @param term search term
|
|
|
+ */
|
|
|
+ protected void assertInLog(String content, String term) {
|
|
|
+ assertTrue("Empty log", !content.isEmpty());
|
|
|
+ if (!content.contains(term)) {
|
|
|
+ String message = "No " + term + " found in logs";
|
|
|
+ LOG.error(message);
|
|
|
+ System.err.println(content);
|
|
|
+ fail(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Assert that a log does not contain the given term.
|
|
|
+ * @param content log output
|
|
|
+ * @param term search term
|
|
|
+ */
|
|
|
+ protected void assertNotInLog(String content, String term) {
|
|
|
+ assertTrue("Empty log", !content.isEmpty());
|
|
|
+ if (content.contains(term)) {
|
|
|
+ String message = term + " found in logs";
|
|
|
+ LOG.error(message);
|
|
|
+ System.err.println(content);
|
|
|
+ fail(message);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -210,11 +247,13 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Disabling threads for Rename operation as thread count 1"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Disabling threads for Rename operation as thread count 1");
|
|
|
|
|
|
// Validate no thread executions
|
|
|
for (int i = 0; i < renameThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertNotInLog(content,
|
|
|
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -254,17 +293,19 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("ms with threads: " + expectedThreadsCreated));
|
|
|
+ assertInLog(content, "ms with threads: " + expectedThreadsCreated);
|
|
|
|
|
|
// Validate thread executions
|
|
|
for (int i = 0; i < expectedThreadsCreated; i++) {
|
|
|
- assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertInLog(content,
|
|
|
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
|
|
|
// Also ensure that we haven't spawned extra threads.
|
|
|
if (expectedThreadsCreated < deleteThreads) {
|
|
|
for (int i = expectedThreadsCreated; i < deleteThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertNotInLog(content,
|
|
|
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -280,11 +321,12 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("ms with threads: " + deleteThreads));
|
|
|
+ assertInLog(content, "ms with threads: " + deleteThreads);
|
|
|
|
|
|
// Validate thread executions
|
|
|
for (int i = 0; i < deleteThreads; i++) {
|
|
|
- assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertInLog(content,
|
|
|
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -304,11 +346,13 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Disabling threads for Delete operation as thread count 0"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Disabling threads for Delete operation as thread count 0");
|
|
|
|
|
|
// Validate no thread executions
|
|
|
for (int i = 0; i < deleteThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertNotInLog(content,
|
|
|
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -329,11 +373,13 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Disabling threads for Delete operation as thread count 1"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Disabling threads for Delete operation as thread count 1");
|
|
|
|
|
|
// Validate no thread executions
|
|
|
for (int i = 0; i < deleteThreads; i++) {
|
|
|
- assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
|
|
|
+ assertNotInLog(content,
|
|
|
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -361,8 +407,8 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Failed to create thread pool with threads"));
|
|
|
- assertTrue(content.contains("Serializing the Delete operation"));
|
|
|
+ assertInLog(content, "Failed to create thread pool with threads");
|
|
|
+ assertInLog(content, "Serializing the Delete operation");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -393,8 +439,9 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Rejected execution of thread for Delete operation on blob"));
|
|
|
- assertTrue(content.contains("Serializing the Delete operation"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Rejected execution of thread for Delete operation on blob");
|
|
|
+ assertInLog(content, "Serializing the Delete operation");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -427,8 +474,10 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and unused threads.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Delete operation with threads 7"));
|
|
|
- assertTrue(content.contains("6 threads not used for Delete operation on blob"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Delete operation with threads 7");
|
|
|
+ assertInLog(content,
|
|
|
+ "6 threads not used for Delete operation on blob");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -472,9 +521,11 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and delete operation is failed.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Delete operation with threads"));
|
|
|
- assertTrue(content.contains("Threads got interrupted Delete blob operation"));
|
|
|
- assertTrue(content.contains("Delete failed as operation on subfolders and files failed."));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Delete operation with threads");
|
|
|
+ assertInLog(content, "Threads got interrupted Delete blob operation");
|
|
|
+ assertInLog(content,
|
|
|
+ "Delete failed as operation on subfolders and files failed.");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -496,10 +547,12 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and delete operation failed.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Delete operation with threads"));
|
|
|
- assertTrue(content.contains("Delete operation failed for file " + path));
|
|
|
- assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed"));
|
|
|
- assertTrue(content.contains("Failed to delete files / subfolders in blob"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Delete operation with threads");
|
|
|
+ assertInLog(content, "Delete operation failed for file " + path);
|
|
|
+ assertInLog(content,
|
|
|
+ "Terminating execution of Delete operation now as some other thread already got exception or operation failed");
|
|
|
+ assertInLog(content, "Failed to delete files / subfolders in blob");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -528,9 +581,12 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and delete operation failed.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Delete operation with threads"));
|
|
|
- assertTrue(content.contains("Encountered Exception for Delete operation for file " + path));
|
|
|
- assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Delete operation with threads");
|
|
|
+ assertInLog(content,
|
|
|
+ "Encountered Exception for Delete operation for file " + path);
|
|
|
+ assertInLog(content,
|
|
|
+ "Terminating execution of Delete operation now as some other thread already got exception or operation failed");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -556,8 +612,8 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Failed to create thread pool with threads"));
|
|
|
- assertTrue(content.contains("Serializing the Rename operation"));
|
|
|
+ assertInLog(content, "Failed to create thread pool with threads");
|
|
|
+ assertInLog(content, "Serializing the Rename operation");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -587,8 +643,9 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are disabled.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Rejected execution of thread for Rename operation on blob"));
|
|
|
- assertTrue(content.contains("Serializing the Rename operation"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Rejected execution of thread for Rename operation on blob");
|
|
|
+ assertInLog(content, "Serializing the Rename operation");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -621,8 +678,10 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and unused threads exists.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Rename operation with threads 7"));
|
|
|
- assertTrue(content.contains("6 threads not used for Rename operation on blob"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Rename operation with threads 7");
|
|
|
+ assertInLog(content,
|
|
|
+ "6 threads not used for Rename operation on blob");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -666,9 +725,11 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and rename operation is failed.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Rename operation with threads"));
|
|
|
- assertTrue(content.contains("Threads got interrupted Rename blob operation"));
|
|
|
- assertTrue(content.contains("Rename failed as operation on subfolders and files failed."));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Rename operation with threads");
|
|
|
+ assertInLog(content, "Threads got interrupted Rename blob operation");
|
|
|
+ assertInLog(content,
|
|
|
+ "Rename failed as operation on subfolders and files failed.");
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -707,9 +768,12 @@ public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
|
|
|
|
|
|
// Validate from logs that threads are enabled and delete operation failed.
|
|
|
String content = logs.getOutput();
|
|
|
- assertTrue(content.contains("Using thread pool for Rename operation with threads"));
|
|
|
- assertTrue(content.contains("Encountered Exception for Rename operation for file " + path));
|
|
|
- assertTrue(content.contains("Terminating execution of Rename operation now as some other thread already got exception or operation failed"));
|
|
|
+ assertInLog(content,
|
|
|
+ "Using thread pool for Rename operation with threads");
|
|
|
+ assertInLog(content,
|
|
|
+ "Encountered Exception for Rename operation for file " + path);
|
|
|
+ assertInLog(content,
|
|
|
+ "Terminating execution of Rename operation now as some other thread already got exception or operation failed");
|
|
|
}
|
|
|
|
|
|
@Override
|