Sfoglia il codice sorgente

HDFS-4906. HDFS Output streams should not accept writes after being closed. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1494308 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 12 anni fa
parent
commit
8614595722

+ 16 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -368,6 +369,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     private FSDataOutputStream datas;    
     private FSDataOutputStream sums;
     private static final float CHKSUM_AS_FRACTION = 0.01f;
+    private boolean isClosed = false;
     
     public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
                           Path file, 
@@ -391,9 +393,13 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     
     @Override
     public void close() throws IOException {
-      flushBuffer();
-      sums.close();
-      datas.close();
+      try {
+        flushBuffer();
+        sums.close();
+        datas.close();
+      } finally {
+        isClosed = true;
+      }
     }
     
     @Override
@@ -402,6 +408,13 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       datas.write(b, offset, len);
       sums.write(checksum);
     }
+
+    @Override
+    protected void checkClosed() throws IOException {
+      if (isClosed) {
+        throw new ClosedChannelException();
+      }
+    }
   }
 
   @Override

+ 16 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 import java.net.URISyntaxException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -325,6 +326,7 @@ public abstract class ChecksumFs extends FilterFs {
     private FSDataOutputStream datas;    
     private FSDataOutputStream sums;
     private static final float CHKSUM_AS_FRACTION = 0.01f;
+    private boolean isClosed = false;
     
     
     public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file, 
@@ -356,9 +358,13 @@ public abstract class ChecksumFs extends FilterFs {
     
     @Override
     public void close() throws IOException {
-      flushBuffer();
-      sums.close();
-      datas.close();
+      try {
+        flushBuffer();
+        sums.close();
+        datas.close();
+      } finally {
+        isClosed = true;
+      }
     }
     
     @Override
@@ -367,6 +373,13 @@ public abstract class ChecksumFs extends FilterFs {
       datas.write(b, offset, len);
       sums.write(checksum);
     }
+
+    @Override
+    protected void checkClosed() throws IOException {
+      if (isClosed) {
+        throw new ClosedChannelException();
+      }
+    }
   }
 
   @Override

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java

@@ -53,6 +53,15 @@ abstract public class FSOutputSummer extends OutputStream {
    */
   protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
   throws IOException;
+  
+  /**
+   * Check if the implementing OutputStream is closed and should no longer
+   * accept writes. Implementations should do nothing if this stream is not
+   * closed, and should throw an {@link IOException} if it is closed.
+   * 
+   * @throws IOException if this stream is already closed.
+   */
+  protected abstract void checkClosed() throws IOException;
 
   /** Write one byte */
   @Override
@@ -84,7 +93,10 @@ abstract public class FSOutputSummer extends OutputStream {
    */
   @Override
   public synchronized void write(byte b[], int off, int len)
-  throws IOException {
+      throws IOException {
+    
+    checkClosed();
+    
     if (off < 0 || len < 0 || off > b.length - len) {
       throw new ArrayIndexOutOfBoundsException();
     }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -364,6 +364,9 @@ Release 2.1.0-beta - UNRELEASED
 
     HDFS-4910. TestPermission failed in branch-2. (Chuan Liu via cnauroth)
 
+    HDFS-4906. HDFS Output streams should not accept writes after being
+    closed. (atm)
+
   BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
 
     HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

+ 11 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -30,6 +30,7 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -1303,10 +1304,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     return sock;
   }
 
-  private void isClosed() throws IOException {
+  protected void checkClosed() throws IOException {
     if (closed) {
       IOException e = lastException;
-      throw e != null ? e : new IOException("DFSOutputStream is closed");
+      throw e != null ? e : new ClosedChannelException();
     }
   }
 
@@ -1475,7 +1476,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
           break;
         }
       }
-      isClosed();
+      checkClosed();
       queueCurrentPacket();
     }
   }
@@ -1485,7 +1486,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                         throws IOException {
     dfsClient.checkOpen();
-    isClosed();
+    checkClosed();
 
     int cklen = checksum.length;
     int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
@@ -1617,7 +1618,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
       throws IOException {
     dfsClient.checkOpen();
-    isClosed();
+    checkClosed();
     try {
       long toWaitFor;
       long lastBlockLength = -1L;
@@ -1704,7 +1705,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
           // If we got an error here, it might be because some other thread called
           // close before our hflush completed. In that case, we should throw an
           // exception that the stream is closed.
-          isClosed();
+          checkClosed();
           // If we aren't closed but failed to sync, we should expose that to the
           // caller.
           throw ioe;
@@ -1749,7 +1750,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
    */
   public synchronized int getCurrentBlockReplication() throws IOException {
     dfsClient.checkOpen();
-    isClosed();
+    checkClosed();
     if (streamer == null) {
       return blockReplication; // no pipeline, return repl factor of file
     }
@@ -1768,7 +1769,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     long toWaitFor;
     synchronized (this) {
       dfsClient.checkOpen();
-      isClosed();
+      checkClosed();
       //
       // If there is data in the current buffer, send it across
       //
@@ -1785,7 +1786,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     }
     synchronized (dataQueue) {
       while (!closed) {
-        isClosed();
+        checkClosed();
         if (lastAckedSeqno >= seqno) {
           break;
         }
@@ -1797,7 +1798,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         }
       }
     }
-    isClosed();
+    checkClosed();
   }
 
   private synchronized void start() {

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClose.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+public class TestClose {
+
+  @Test
+  public void testWriteAfterClose() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .build();
+    
+    try {
+      final byte[] data = "foo".getBytes();
+      
+      FileSystem fs = FileSystem.get(conf);
+      OutputStream out = fs.create(new Path("/test"));
+      
+      out.write(data);
+      out.close();
+      try {
+        // Should fail.
+        out.write(data);
+        fail("Should not have been able to write more data after file is closed.");
+      } catch (ClosedChannelException cce) {
+        // We got the correct exception. Ignoring.
+      }
+      // Should succeed. Double closes are OK.
+      out.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+}

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -169,12 +170,9 @@ public class TestMultiThreadedHflush {
                 while (true) {
                   try {
                     stm.hflush();
-                  } catch (IOException ioe) {
-                    if (!ioe.toString().contains("DFSOutputStream is closed")) {
-                      throw ioe;
-                    } else {
-                      return;
-                    }
+                  } catch (ClosedChannelException ioe) {
+                    // Expected exception caught. Ignoring.
+                    return;
                   }
                 }
               } catch (Throwable t) {