|
@@ -17,21 +17,41 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
|
|
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ConcurrentNavigableMap;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
|
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
|
|
|
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
|
|
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
|
|
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
|
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
|
|
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
|
|
+import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
|
|
|
+import org.apache.hadoop.nfs.nfs3.request.READ3Request;
|
|
|
+import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
|
|
|
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
|
|
+import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
|
|
|
+import org.apache.hadoop.nfs.nfs3.response.READ3Response;
|
|
|
+import org.apache.hadoop.oncrpc.XDR;
|
|
|
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
@@ -105,7 +125,7 @@ public class TestWrites {
|
|
|
Assert.assertTrue(limit - position == 1);
|
|
|
Assert.assertTrue(appendedData.get(position) == (byte) 19);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test
|
|
|
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
|
|
|
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
|
|
@@ -162,4 +182,117 @@ public class TestWrites {
|
|
|
ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
|
|
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
|
|
}
|
|
|
+
|
|
|
+ private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
|
|
+ throws InterruptedException {
|
|
|
+ int waitedTime = 0;
|
|
|
+ ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
|
|
|
+ .getOpenFileMap();
|
|
|
+ OpenFileCtx ctx = openFileMap.get(handle);
|
|
|
+ assertTrue(ctx != null);
|
|
|
+ do {
|
|
|
+ Thread.sleep(3000);
|
|
|
+ waitedTime += 3000;
|
|
|
+ if (ctx.getPendingWritesForTest().size() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } while (waitedTime < maxWaitTime);
|
|
|
+
|
|
|
+ fail("Write can't finish.");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testWriteStableHow() throws IOException, InterruptedException {
|
|
|
+ HdfsConfiguration config = new HdfsConfiguration();
|
|
|
+ DFSClient client = null;
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ RpcProgramNfs3 nfsd;
|
|
|
+ SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
|
|
|
+ Mockito.when(securityHandler.getUser()).thenReturn(
|
|
|
+ System.getProperty("user.name"));
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ client = new DFSClient(NameNode.getAddress(config), config);
|
|
|
+
|
|
|
+ // Start nfs
|
|
|
+ List<String> exports = new ArrayList<String>();
|
|
|
+ exports.add("/");
|
|
|
+ Nfs3 nfs3 = new Nfs3(exports, config);
|
|
|
+ nfs3.start(false);
|
|
|
+ nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
|
|
+
|
|
|
+ HdfsFileStatus status = client.getFileInfo("/");
|
|
|
+ FileHandle rootHandle = new FileHandle(status.getFileId());
|
|
|
+ // Create file1
|
|
|
+ CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
|
|
|
+ Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
|
|
+ XDR createXdr = new XDR();
|
|
|
+ createReq.serialize(createXdr);
|
|
|
+ CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
|
|
|
+ securityHandler, InetAddress.getLocalHost());
|
|
|
+ FileHandle handle = createRsp.getObjHandle();
|
|
|
+
|
|
|
+ // Test DATA_SYNC
|
|
|
+ byte[] buffer = new byte[10];
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ buffer[i] = (byte) i;
|
|
|
+ }
|
|
|
+ WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
|
|
|
+ WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
|
|
|
+ XDR writeXdr = new XDR();
|
|
|
+ writeReq.serialize(writeXdr);
|
|
|
+ nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
|
|
|
+ InetAddress.getLocalHost());
|
|
|
+
|
|
|
+ waitWrite(nfsd, handle, 60000);
|
|
|
+
|
|
|
+ // Readback
|
|
|
+ READ3Request readReq = new READ3Request(handle, 0, 10);
|
|
|
+ XDR readXdr = new XDR();
|
|
|
+ readReq.serialize(readXdr);
|
|
|
+ READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
|
|
|
+ securityHandler, InetAddress.getLocalHost());
|
|
|
+
|
|
|
+ assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
|
|
|
+
|
|
|
+ // Test FILE_SYNC
|
|
|
+
|
|
|
+ // Create file2
|
|
|
+ CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2",
|
|
|
+ Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
|
|
+ XDR createXdr2 = new XDR();
|
|
|
+ createReq2.serialize(createXdr2);
|
|
|
+ CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
|
|
|
+ securityHandler, InetAddress.getLocalHost());
|
|
|
+ FileHandle handle2 = createRsp2.getObjHandle();
|
|
|
+
|
|
|
+ WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
|
|
|
+ WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer));
|
|
|
+ XDR writeXdr2 = new XDR();
|
|
|
+ writeReq2.serialize(writeXdr2);
|
|
|
+ nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
|
|
|
+ InetAddress.getLocalHost());
|
|
|
+
|
|
|
+ waitWrite(nfsd, handle2, 60000);
|
|
|
+
|
|
|
+ // Readback
|
|
|
+ READ3Request readReq2 = new READ3Request(handle2, 0, 10);
|
|
|
+ XDR readXdr2 = new XDR();
|
|
|
+ readReq2.serialize(readXdr2);
|
|
|
+ READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
|
|
|
+ securityHandler, InetAddress.getLocalHost());
|
|
|
+
|
|
|
+ assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
|
|
|
+ // FILE_SYNC should sync the file size
|
|
|
+ status = client.getFileInfo("/file2");
|
|
|
+ assertTrue(status.getLen() == 10);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|