Sfoglia il codice sorgente

HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)

Lei Xu 7 anni fa
parent
commit
6959db9c20

+ 26 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java

@@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream
     implements StreamCapabilities {
   private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
 
+  /**
+   * OutputStream level last exception, will be used to indicate the fatal
+   * exception of this stream, i.e., being aborted.
+   */
+  private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen();
+
   static class MultipleBlockingQueue<T> {
     private final List<BlockingQueue<T>> queues;
 
@@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream
       if (isClosed()) {
         return;
       }
-      for (StripedDataStreamer streamer : streamers) {
-        streamer.getLastException().set(
-            new IOException("Lease timeout of "
-                + (dfsClient.getConf().getHdfsTimeout() / 1000)
-                + " seconds expired."));
-      }
+      exceptionLastSeen.set(new IOException("Lease timeout of "
+          + (dfsClient.getConf().getHdfsTimeout() / 1000)
+          + " seconds expired."));
 
       try {
         closeThreads(true);
@@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream
   @Override
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
+      exceptionLastSeen.check(true);
+
+      // Writing to at least {dataUnits} replicas can be considered as success,
+      // and the rest of data can be recovered.
+      final int minReplication = ecPolicy.getNumDataUnits();
+      int goodStreamers = 0;
       final MultipleIOException.Builder b = new MultipleIOException.Builder();
-      for(int i = 0; i < streamers.size(); i++) {
-        final StripedDataStreamer si = getStripedDataStreamer(i);
+      for (final StripedDataStreamer si : streamers) {
         try {
           si.getLastException().check(true);
+          goodStreamers++;
         } catch (IOException e) {
           b.add(e);
         }
       }
-      final IOException ioe = b.build();
-      if (ioe != null) {
-        throw ioe;
+      if (goodStreamers < minReplication) {
+        final IOException ioe = b.build();
+        if (ioe != null) {
+          throw ioe;
+        }
       }
       return;
     }
@@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream
         }
       } finally {
         // Failures may happen when flushing data/parity data out. Exceptions
-        // may be thrown if more than 3 streamers fail, or updatePipeline RPC
-        // fails. Streamers may keep waiting for the new block/GS information.
-        // Thus need to force closing these threads.
+        // may be thrown if the number of failed streamers is more than the
+        // number of parity blocks, or updatePipeline RPC fails. Streamers may
+        // keep waiting for the new block/GS information. Thus need to force
+        // closing these threads.
         closeThreads(true);
       }
 

+ 7 - 24
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -285,39 +285,22 @@ class DataStreamer extends Daemon {
     packets.clear();
   }
 
-  class LastExceptionInStreamer {
-    private IOException thrown;
-
-    synchronized void set(Throwable t) {
-      assert t != null;
-      this.thrown = t instanceof IOException ?
-          (IOException) t : new IOException(t);
-    }
-
-    synchronized void clear() {
-      thrown = null;
-    }
-
-    /** Check if there already is an exception. */
+  class LastExceptionInStreamer extends ExceptionLastSeen {
+    /**
+     * Check if there already is an exception.
+     */
+    @Override
     synchronized void check(boolean resetToNull) throws IOException {
+      final IOException thrown = get();
       if (thrown != null) {
         if (LOG.isTraceEnabled()) {
           // wrap and print the exception to know when the check is called
           LOG.trace("Got Exception while checking, " + DataStreamer.this,
               new Throwable(thrown));
         }
-        final IOException e = thrown;
-        if (resetToNull) {
-          thrown = null;
-        }
-        throw e;
+        super.check(resetToNull);
       }
     }
-
-    synchronized void throwException4Close() throws IOException {
-      check(false);
-      throw new ClosedChannelException();
-    }
   }
 
   enum ErrorType {

+ 75 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The exception last seen by the {@link DataStreamer} or
+ * {@link DFSOutputStream}.
+ */
+@InterfaceAudience.Private
+class ExceptionLastSeen {
+  private IOException thrown;
+
+  /** Get the last seen exception. */
+  synchronized protected IOException get() {
+    return thrown;
+  }
+
+  /**
+   * Set the last seen exception.
+   * @param t the exception.
+   */
+  synchronized void set(Throwable t) {
+    assert t != null;
+    this.thrown = t instanceof IOException ?
+        (IOException) t : new IOException(t);
+  }
+
+  /** Clear the last seen exception. */
+  synchronized void clear() {
+    thrown = null;
+  }
+
+  /**
+   * Check if there already is an exception. Throw the exception if exist.
+   *
+   * @param resetToNull set to true to reset exception to null after calling
+   *                    this function.
+   * @throws IOException on existing IOException.
+   */
+  synchronized void check(boolean resetToNull) throws IOException {
+    if (thrown != null) {
+      final IOException e = thrown;
+      if (resetToNull) {
+        thrown = null;
+      }
+      throw e;
+    }
+  }
+
+  synchronized void throwException4Close() throws IOException {
+    check(false);
+    throw new ClosedChannelException();
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java

@@ -319,6 +319,82 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
+  private void testCloseWithExceptionsInStreamer(
+      int numFailures, boolean shouldFail) throws Exception {
+    assertTrue(numFailures <=
+        ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+    final Path dirFile = new Path(dir, "ecfile-" + numFailures);
+    try (FSDataOutputStream out = dfs.create(dirFile, true)) {
+      out.write("idempotent close".getBytes());
+
+      // Expect to raise IOE on the first close call, but any following
+      // close() should be no-op.
+      LambdaTestUtils.intercept(IOException.class,
+          out::close);
+
+      assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
+      DFSStripedOutputStream stripedOut =
+          (DFSStripedOutputStream) out.getWrappedStream();
+      for (int i = 0; i < numFailures; i++) {
+        // Only inject 1 stream failure.
+        stripedOut.getStripedDataStreamer(i).getLastException().set(
+            new IOException("injected failure")
+        );
+      }
+      if (shouldFail) {
+        LambdaTestUtils.intercept(IOException.class, out::close);
+      }
+
+      // Close multiple times. All the following close() should have no
+      // side-effect.
+      out.close();
+    }
+  }
+
+  // HDFS-12612
+  @Test
+  public void testIdempotentCloseWithFailedStreams() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    try {
+      setup(conf);
+      // shutdown few datanodes to avoid getting sufficient data blocks number
+      // of datanodes.
+      while (cluster.getDataNodes().size() >= dataBlocks) {
+        cluster.stopDataNode(0);
+      }
+      cluster.restartNameNodes();
+      cluster.triggerHeartbeats();
+
+      testCloseWithExceptionsInStreamer(1, false);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits(), false);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits() + 1, true);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumDataUnits(), true);
+    } finally {
+      tearDown();
+    }
+  }
+
+  @Test
+  public void testCloseAfterAbort() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    try {
+      setup(conf);
+
+      final Path dirFile = new Path(dir, "ecfile");
+      FSDataOutputStream out = dfs.create(dirFile, true);
+      assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
+      DFSStripedOutputStream stripedOut =
+          (DFSStripedOutputStream) out.getWrappedStream();
+      stripedOut.abort();
+      LambdaTestUtils.intercept(IOException.class,
+          "Lease timeout", stripedOut::close);
+    } finally {
+      tearDown();
+    }
+  }
+
   @Test(timeout = 90000)
   public void testAddBlockWhenNoSufficientParityNumOfNodes()
       throws IOException {