Browse Source

Revert "HDFS-13164. File not closed if streamer fail with DSQuotaExceededException."

This reverts commit 8763d07f97c4667566badabc2ec2e2cd9ae92c0e.
Jason Lowe 7 years ago
parent
commit
23a658c4e7

+ 8 - 55
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -815,19 +815,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
-          closed, getStreamer().streamerClosed());
-      try {
-        getStreamer().getLastException().check(true);
-      } catch (IOException ioe) {
-        cleanupAndRethrowIOException(ioe);
-      } finally {
-        if (!closed) {
-          // If stream is not closed but streamer closed, clean up the stream.
-          // Most importantly, end the file lease.
-          closeThreads(true);
-        }
-      }
+      getStreamer().getLastException().check(true);
       return;
     }
 
@@ -842,12 +830,14 @@ public class DFSOutputStream extends FSOutputSummer
         setCurrentPacketToEmpty();
       }
 
-      try {
-        flushInternal();             // flush all data to Datanodes
-      } catch (IOException ioe) {
-        cleanupAndRethrowIOException(ioe);
+      flushInternal();             // flush all data to Datanodes
+      // get last block before destroying the streamer
+      ExtendedBlock lastBlock = getStreamer().getBlock();
+
+      try (TraceScope ignored =
+               dfsClient.getTracer().newScope("completeFile")) {
+        completeFile(lastBlock);
       }
-      completeFile();
     } catch (ClosedChannelException ignored) {
     } finally {
       // Failures may happen when flushing data.
@@ -859,43 +849,6 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  private void completeFile() throws IOException {
-    // get last block before destroying the streamer
-    ExtendedBlock lastBlock = getStreamer().getBlock();
-    try (TraceScope ignored =
-        dfsClient.getTracer().newScope("completeFile")) {
-      completeFile(lastBlock);
-    }
-  }
-
-  /**
-   * Determines whether an IOException thrown needs extra cleanup on the stream.
-   * Space quota exceptions will be thrown when getting new blocks, so the
-   * open HDFS file need to be closed.
-   *
-   * @param ioe the IOException
-   * @return whether the stream needs cleanup for the given IOException
-   */
-  private boolean exceptionNeedsCleanup(IOException ioe) {
-    return ioe instanceof DSQuotaExceededException
-        || ioe instanceof QuotaByStorageTypeExceededException;
-  }
-
-  private void cleanupAndRethrowIOException(IOException ioe)
-      throws IOException {
-    if (exceptionNeedsCleanup(ioe)) {
-      final MultipleIOException.Builder b = new MultipleIOException.Builder();
-      b.add(ioe);
-      try {
-        completeFile();
-      } catch (IOException e) {
-        b.add(e);
-        throw b.build();
-      }
-    }
-    throw ioe;
-  }
-
   // should be called holding (this) lock since setTestFilename() may
   // be called during unit tests
   protected void completeFile(ExtendedBlock last) throws IOException {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java

@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 public class LeaseRenewer {
-  public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
+  static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
 
   private static long leaseRenewerGraceDefault = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

+ 1 - 108
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java

@@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Scanner;
 
-import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -44,7 +43,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -60,21 +58,14 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.event.Level;
-import org.slf4j.LoggerFactory;
 
 /** A class for testing quota-related commands */
 public class TestQuota {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class);
-
+  
   private static Configuration conf = null;
   private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
   private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream();
@@ -86,9 +77,6 @@ public class TestQuota {
   /* set a smaller block size so that we can test with smaller space quotas */
   private static final int DEFAULT_BLOCK_SIZE = 512;
 
-  @Rule
-  public final Timeout testTestout = new Timeout(120000);
-
   @BeforeClass
   public static void setUpClass() throws Exception {
     conf = new HdfsConfiguration();
@@ -1474,101 +1462,6 @@ public class TestQuota {
         "clrSpaceQuota");
   }
 
-  @Test
-  public void testSpaceQuotaExceptionOnClose() throws Exception {
-    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
-    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
-    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
-    final Path dir = new Path(PathUtils.getTestPath(getClass()),
-        GenericTestUtils.getMethodName());
-    assertTrue(dfs.mkdirs(dir));
-    final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
-    assertEquals(0, ToolRunner.run(dfsAdmin, args));
-
-    final Path testFile = new Path(dir, "file");
-    final FSDataOutputStream stream = dfs.create(testFile);
-    stream.write("whatever".getBytes());
-    try {
-      stream.close();
-      fail("close should fail");
-    } catch (DSQuotaExceededException expected) {
-    }
-
-    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
-  }
-
-  @Test
-  public void testSpaceQuotaExceptionOnFlush() throws Exception {
-    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
-    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
-    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
-    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
-    final Path dir = new Path(PathUtils.getTestPath(getClass()),
-        GenericTestUtils.getMethodName());
-    assertTrue(dfs.mkdirs(dir));
-    final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
-    assertEquals(0, ToolRunner.run(dfsAdmin, args));
-
-    Path testFile = new Path(dir, "file");
-    FSDataOutputStream stream = dfs.create(testFile);
-    // get the lease renewer now so we can verify it later without calling
-    // getLeaseRenewer, which will automatically add the client into it.
-    final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer();
-    stream.write("whatever".getBytes());
-    try {
-      stream.hflush();
-      fail("flush should fail");
-    } catch (DSQuotaExceededException expected) {
-    }
-    // even if we close the stream in finially, it won't help.
-    try {
-      stream.close();
-      fail("close should fail too");
-    } catch (DSQuotaExceededException expected) {
-    }
-
-    GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE);
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LOG.info("LeaseRenewer: {}", leaseRenewer);
-        return leaseRenewer.isEmpty();
-      }
-    }, 100, 10000);
-    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
-  }
-
-  @Test
-  public void testSpaceQuotaExceptionOnAppend() throws Exception {
-    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
-    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
-    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
-    final Path dir = new Path(PathUtils.getTestPath(getClass()),
-        GenericTestUtils.getMethodName());
-    dfs.delete(dir, true);
-    assertTrue(dfs.mkdirs(dir));
-    final String[] args =
-        new String[] {"-setSpaceQuota", "4000", dir.toString()};
-    ToolRunner.run(dfsAdmin, args);
-
-    final Path testFile = new Path(dir, "file");
-    OutputStream stream = dfs.create(testFile);
-    stream.write("whatever".getBytes());
-    stream.close();
-
-    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
-
-    stream = dfs.append(testFile);
-    byte[] buf = AppendTestUtil.initBuffer(4096);
-    stream.write(buf);
-    try {
-      stream.close();
-      fail("close after append should fail");
-    } catch (DSQuotaExceededException expected) {
-    }
-    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
-  }
-
   private void testSetAndClearSpaceQuotaNoAccessInternal(
       final String[] args,
       final int cmdRet,