|
@@ -18,11 +18,13 @@
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.EnumSet;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -195,19 +198,26 @@ public class TestDFSStripedOutputStream {
|
|
|
public void testStreamFlush() throws Exception {
|
|
|
final byte[] bytes = StripedFileTestUtil.generateBytes(blockSize *
|
|
|
dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123);
|
|
|
- FSDataOutputStream os = fs.create(new Path("/ec-file-1"));
|
|
|
- assertFalse("DFSStripedOutputStream should not have hflush() " +
|
|
|
- "capability yet!", os.hasCapability(
|
|
|
- StreamCapability.HFLUSH.getValue()));
|
|
|
- assertFalse("DFSStripedOutputStream should not have hsync() " +
|
|
|
- "capability yet!", os.hasCapability(
|
|
|
- StreamCapability.HSYNC.getValue()));
|
|
|
- InputStream is = new ByteArrayInputStream(bytes);
|
|
|
- IOUtils.copyBytes(is, os, bytes.length);
|
|
|
- os.hflush();
|
|
|
- IOUtils.copyBytes(is, os, bytes.length);
|
|
|
- os.hsync();
|
|
|
- os.close();
|
|
|
+ try (FSDataOutputStream os = fs.create(new Path("/ec-file-1"))) {
|
|
|
+ assertFalse(
|
|
|
+ "DFSStripedOutputStream should not have hflush() capability yet!",
|
|
|
+ os.hasCapability(StreamCapability.HFLUSH.getValue()));
|
|
|
+ assertFalse(
|
|
|
+ "DFSStripedOutputStream should not have hsync() capability yet!",
|
|
|
+ os.hasCapability(StreamCapability.HSYNC.getValue()));
|
|
|
+ try (InputStream is = new ByteArrayInputStream(bytes)) {
|
|
|
+ IOUtils.copyBytes(is, os, bytes.length);
|
|
|
+ os.hflush();
|
|
|
+ IOUtils.copyBytes(is, os, bytes.length);
|
|
|
+ os.hsync();
|
|
|
+ IOUtils.copyBytes(is, os, bytes.length);
|
|
|
+ }
|
|
|
+ assertTrue("stream is not a DFSStripedOutputStream",
|
|
|
+ os.getWrappedStream() instanceof DFSStripedOutputStream);
|
|
|
+ final DFSStripedOutputStream dfssos =
|
|
|
+ (DFSStripedOutputStream) os.getWrappedStream();
|
|
|
+ dfssos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void testOneFile(String src, int writeBytes) throws Exception {
|