|
@@ -16,8 +16,10 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
-#include "hdfs/hdfs.h"
|
|
|
-#include "hdfs_test.h"
|
|
|
+#include "expect.h"
|
|
|
+#include "hdfs/hdfs.h"
|
|
|
+#include "hdfs_test.h"
|
|
|
+#include "native_mini_dfs.h"
|
|
|
#include "platform.h"
|
|
|
|
|
|
#include <inttypes.h>
|
|
@@ -59,7 +61,18 @@ void permission_disp(short permissions, char *rtr) {
|
|
|
strncpy(rtr, perm, 3);
|
|
|
rtr+=3;
|
|
|
}
|
|
|
-}
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Shutdown and free the given mini cluster, and then exit with the provided exit_code. This method is meant to be
|
|
|
+ * called with a non-zero exit code, which is why we ignore the return status of calling MiniDFSCluster#shutdown since
|
|
|
+ * the process is going to fail anyway.
|
|
|
+ */
|
|
|
+void shutdown_and_exit(struct NativeMiniDfsCluster* cl, int exit_code) {
|
|
|
+ nmdShutdown(cl);
|
|
|
+ nmdFree(cl);
|
|
|
+ exit(exit_code);
|
|
|
+}
|
|
|
|
|
|
int main(int argc, char **argv) {
|
|
|
const char *writePath = "/tmp/testfile.txt";
|
|
@@ -88,16 +101,47 @@ int main(int argc, char **argv) {
|
|
|
short newPerm = 0666;
|
|
|
tTime newMtime, newAtime;
|
|
|
|
|
|
- fs = hdfsConnectNewInstance("default", 0);
|
|
|
+ // Create and start the mini cluster
|
|
|
+ struct NativeMiniDfsCluster* cl;
|
|
|
+ struct NativeMiniDfsConf conf = {
|
|
|
+ 1, /* doFormat */
|
|
|
+ };
|
|
|
+
|
|
|
+ cl = nmdCreate(&conf);
|
|
|
+ EXPECT_NONNULL(cl);
|
|
|
+ EXPECT_ZERO(nmdWaitClusterUp(cl));
|
|
|
+ tPort port;
|
|
|
+ port = (tPort) nmdGetNameNodePort(cl);
|
|
|
+
|
|
|
+ // Create a hdfs connection to the mini cluster
|
|
|
+ struct hdfsBuilder *bld;
|
|
|
+ bld = hdfsNewBuilder();
|
|
|
+ EXPECT_NONNULL(bld);
|
|
|
+
|
|
|
+ hdfsBuilderSetForceNewInstance(bld);
|
|
|
+ hdfsBuilderSetNameNode(bld, "localhost");
|
|
|
+ hdfsBuilderSetNameNodePort(bld, port);
|
|
|
+ // The HDFS append tests require setting this property otherwise the tests fail with:
|
|
|
+ //
|
|
|
+ // IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being
|
|
|
+ // available to try. The current failed datanode replacement policy is DEFAULT, and a client may configure this
|
|
|
+ // via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
|
|
|
+ //
|
|
|
+ // It seems that when operating against a mini DFS cluster, some HDFS append tests require setting this property
|
|
|
+ // (for example, see TestFileAppend#testMultipleAppends)
|
|
|
+ hdfsBuilderConfSetStr(bld, "dfs.client.block.write.replace-datanode-on-failure.enable", "false");
|
|
|
+
|
|
|
+ fs = hdfsBuilderConnect(bld);
|
|
|
+
|
|
|
if(!fs) {
|
|
|
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
lfs = hdfsConnectNewInstance(NULL, 0);
|
|
|
if(!lfs) {
|
|
|
fprintf(stderr, "Oops! Failed to connect to 'local' hdfs!\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
{
|
|
@@ -106,7 +150,7 @@ int main(int argc, char **argv) {
|
|
|
writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
|
|
|
if(!writeFile) {
|
|
|
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
|
|
|
num_written_bytes =
|
|
@@ -115,7 +159,7 @@ int main(int argc, char **argv) {
|
|
|
if (num_written_bytes != strlen(fileContents) + 1) {
|
|
|
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
|
|
|
(int)(strlen(fileContents) + 1), (int)num_written_bytes);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
|
|
|
|
@@ -124,19 +168,19 @@ int main(int argc, char **argv) {
|
|
|
fprintf(stderr,
|
|
|
"Failed to get current file position correctly! Got %" PRId64 "!\n",
|
|
|
currentPos);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
|
|
|
|
|
|
if (hdfsFlush(fs, writeFile)) {
|
|
|
fprintf(stderr, "Failed to 'flush' %s\n", writePath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Flushed %s successfully!\n", writePath);
|
|
|
|
|
|
if (hdfsHFlush(fs, writeFile)) {
|
|
|
fprintf(stderr, "Failed to 'hflush' %s\n", writePath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "HFlushed %s successfully!\n", writePath);
|
|
|
|
|
@@ -150,20 +194,20 @@ int main(int argc, char **argv) {
|
|
|
|
|
|
if (exists) {
|
|
|
fprintf(stderr, "Failed to validate existence of %s\n", readPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
|
|
|
if (!readFile) {
|
|
|
fprintf(stderr, "Failed to open %s for reading!\n", readPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
if (!hdfsFileIsOpenForRead(readFile)) {
|
|
|
fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
|
|
|
"with O_RDONLY, and it did not show up as 'open for "
|
|
|
"read'\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile));
|
|
@@ -171,7 +215,7 @@ int main(int argc, char **argv) {
|
|
|
seekPos = 1;
|
|
|
if(hdfsSeek(fs, readFile, seekPos)) {
|
|
|
fprintf(stderr, "Failed to seek %s for reading!\n", readPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
currentPos = -1;
|
|
@@ -179,14 +223,14 @@ int main(int argc, char **argv) {
|
|
|
fprintf(stderr,
|
|
|
"Failed to get current file position correctly! Got %" PRId64 "!\n",
|
|
|
currentPos);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Current position: %" PRId64 "\n", currentPos);
|
|
|
|
|
|
if (!hdfsFileUsesDirectRead(readFile)) {
|
|
|
fprintf(stderr, "Direct read support incorrectly not detected "
|
|
|
"for HDFS filesystem\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
fprintf(stderr, "Direct read support detected for HDFS\n");
|
|
@@ -194,7 +238,7 @@ int main(int argc, char **argv) {
|
|
|
// Test the direct read path
|
|
|
if(hdfsSeek(fs, readFile, 0)) {
|
|
|
fprintf(stderr, "Failed to seek %s for reading!\n", readPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
memset(buffer, 0, sizeof(buffer));
|
|
|
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
|
|
@@ -202,13 +246,13 @@ int main(int argc, char **argv) {
|
|
|
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
|
|
|
fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n",
|
|
|
fileContents, buffer, num_read_bytes);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
|
|
|
num_read_bytes, buffer);
|
|
|
if (hdfsSeek(fs, readFile, 0L)) {
|
|
|
fprintf(stderr, "Failed to seek to file start!\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
// Disable the direct read path so that we really go through the slow
|
|
@@ -233,7 +277,7 @@ int main(int argc, char **argv) {
|
|
|
localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
|
|
|
if(!localFile) {
|
|
|
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
num_written_bytes = hdfsWrite(lfs, localFile, (void*)fileContents,
|
|
@@ -245,7 +289,7 @@ int main(int argc, char **argv) {
|
|
|
if (hdfsFileUsesDirectRead(localFile)) {
|
|
|
fprintf(stderr, "Direct read support incorrectly detected for local "
|
|
|
"filesystem\n");
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
hdfsCloseFile(lfs, localFile);
|
|
@@ -425,7 +469,7 @@ int main(int argc, char **argv) {
|
|
|
appendFile = hdfsOpenFile(fs, appendPath, O_WRONLY, 0, 0, 0);
|
|
|
if(!appendFile) {
|
|
|
fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Opened %s for writing successfully...\n", appendPath);
|
|
|
|
|
@@ -435,10 +479,10 @@ int main(int argc, char **argv) {
|
|
|
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
|
|
|
|
|
if (hdfsFlush(fs, appendFile)) {
|
|
|
- fprintf(stderr, "Failed to 'flush' %s\n", appendPath);
|
|
|
- exit(-1);
|
|
|
+ fprintf(stderr, "Failed to 'flush' %s\n", appendPath);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
- fprintf(stderr, "Flushed %s successfully!\n", appendPath);
|
|
|
+ fprintf(stderr, "Flushed %s successfully!\n", appendPath);
|
|
|
|
|
|
hdfsCloseFile(fs, appendFile);
|
|
|
|
|
@@ -446,7 +490,7 @@ int main(int argc, char **argv) {
|
|
|
appendFile = hdfsOpenFile(fs, appendPath, O_WRONLY|O_APPEND, 0, 0, 0);
|
|
|
if(!appendFile) {
|
|
|
fprintf(stderr, "Failed to open %s for writing!\n", appendPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Opened %s for writing successfully...\n", appendPath);
|
|
|
|
|
@@ -456,10 +500,10 @@ int main(int argc, char **argv) {
|
|
|
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
|
|
|
|
|
|
if (hdfsFlush(fs, appendFile)) {
|
|
|
- fprintf(stderr, "Failed to 'flush' %s\n", appendPath);
|
|
|
- exit(-1);
|
|
|
+ fprintf(stderr, "Failed to 'flush' %s\n", appendPath);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
- fprintf(stderr, "Flushed %s successfully!\n", appendPath);
|
|
|
+ fprintf(stderr, "Flushed %s successfully!\n", appendPath);
|
|
|
|
|
|
hdfsCloseFile(fs, appendFile);
|
|
|
|
|
@@ -472,11 +516,11 @@ int main(int argc, char **argv) {
|
|
|
readFile = hdfsOpenFile(fs, appendPath, O_RDONLY, 0, 0, 0);
|
|
|
if (!readFile) {
|
|
|
fprintf(stderr, "Failed to open %s for reading!\n", appendPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
num_read_bytes = hdfsRead(fs, readFile, (void*)rdbuffer, sizeof(rdbuffer));
|
|
|
- fprintf(stderr, "Read following %d bytes:\n%s\n",
|
|
|
+ fprintf(stderr, "Read following %d bytes:\n%s\n",
|
|
|
num_read_bytes, rdbuffer);
|
|
|
|
|
|
fprintf(stderr, "read == Hello, World %s\n", ((result = (strcmp(rdbuffer, "Hello, World"))) == 0 ? "Success!" : "Failed!"));
|
|
@@ -496,16 +540,16 @@ int main(int argc, char **argv) {
|
|
|
// the actual fs user capabilities. Thus just create a file and read
|
|
|
// the owner is correct.
|
|
|
|
|
|
- fs = hdfsConnectAsUserNewInstance("default", 0, tuser);
|
|
|
+ fs = hdfsConnectAsUserNewInstance("localhost", port, tuser);
|
|
|
if(!fs) {
|
|
|
fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
|
|
|
userFile = hdfsOpenFile(fs, userPath, O_WRONLY|O_CREAT, 0, 0, 0);
|
|
|
if(!userFile) {
|
|
|
fprintf(stderr, "Failed to open %s for writing!\n", userPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Opened %s for writing successfully...\n", userPath);
|
|
|
|
|
@@ -515,7 +559,7 @@ int main(int argc, char **argv) {
|
|
|
|
|
|
if (hdfsFlush(fs, userFile)) {
|
|
|
fprintf(stderr, "Failed to 'flush' %s\n", userPath);
|
|
|
- exit(-1);
|
|
|
+ shutdown_and_exit(cl, -1);
|
|
|
}
|
|
|
fprintf(stderr, "Flushed %s successfully!\n", userPath);
|
|
|
|
|
@@ -528,6 +572,9 @@ int main(int argc, char **argv) {
|
|
|
|
|
|
totalResult += (hdfsDisconnect(fs) != 0);
|
|
|
|
|
|
+ EXPECT_ZERO(nmdShutdown(cl));
|
|
|
+ nmdFree(cl);
|
|
|
+
|
|
|
if (totalResult != 0) {
|
|
|
return -1;
|
|
|
} else {
|