Procházet zdrojové kódy

HADOOP-17657: implement StreamCapabilities in SequenceFile.Writer and fall back to flush, if hflush is not supported (#2949)

Co-authored-by: Kishen Das <kishen@cloudera.com>
Reviewed-by: Steve Loughran <stevel@apache.org>
kishendas před 4 roky
rodič
revize
e571025f5b

+ 18 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

@@ -27,6 +27,7 @@ import java.security.MessageDigest;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.util.Options;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -834,7 +835,8 @@ public class SequenceFile {
   }
   
   /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable, Syncable {
+  public static class Writer implements java.io.Closeable, Syncable,
+                  Flushable, StreamCapabilities {
     private Configuration conf;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
@@ -1367,6 +1369,21 @@ public class SequenceFile {
         out.hflush();
       }
     }
+
+    @Override
+    public void flush() throws IOException {
+      if (out != null) {
+        out.flush();
+      }
+    }
+
+    @Override
+    public boolean hasCapability(String capability) {
+      if (out !=null && capability != null) {
+        return out.hasCapability(capability);
+      }
+      return false;
+    }
     
     /** Returns the configuration of this file. */
     Configuration getConf() { return conf; }

+ 26 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.conf.*;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -730,6 +731,31 @@ public class TestSequenceFile {
     }
   }
 
+  @Test
+  public void testSequenceFileWriter() throws Exception {
+    Configuration conf = new Configuration();
+    // This test only works with Raw File System and not Local File System
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path p = new Path(GenericTestUtils
+      .getTempPath("testSequenceFileWriter.seq"));
+    try(SequenceFile.Writer writer = SequenceFile.createWriter(
+            fs, conf, p, LongWritable.class, Text.class)) {
+      Assertions.assertThat(writer.hasCapability
+        (StreamCapabilities.HSYNC)).isEqualTo(true);
+      Assertions.assertThat(writer.hasCapability(
+        StreamCapabilities.HFLUSH)).isEqualTo(true);
+      LongWritable key = new LongWritable();
+      key.set(1);
+      Text value = new Text();
+      value.set("somevalue");
+      writer.append(key, value);
+      writer.flush();
+      writer.hflush();
+      writer.hsync();
+      Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0);
+    }
+  }
+
   /** For debugging and testing. */
   public static void main(String[] args) throws Exception {
     int count = 1024 * 1024;