|
@@ -0,0 +1,330 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs;
|
|
|
+
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.anyObject;
|
|
|
+import static org.mockito.Matchers.anyString;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
+
|
|
|
+/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
|
|
|
+ * using append()/sync() to recover block information
|
|
|
+ */
|
|
|
+public class TestFileAppend4 {
|
|
|
+ static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
|
|
|
+ static final long BLOCK_SIZE = 1024;
|
|
|
+ static final long BBW_SIZE = 500; // don't align on bytes/checksum
|
|
|
+
|
|
|
+ static final Object [] NO_ARGS = new Object []{};
|
|
|
+
|
|
|
+ Configuration conf;
|
|
|
+ MiniDFSCluster cluster;
|
|
|
+ Path file1;
|
|
|
+ FSDataOutputStream stm;
|
|
|
+ boolean simulatedStorage = false;
|
|
|
+
|
|
|
+ {
|
|
|
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
|
|
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ this.conf = new Configuration();
|
|
|
+ if (simulatedStorage) {
|
|
|
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
|
|
|
+ }
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
|
|
+
|
|
|
+ // lower heartbeat interval for fast recognition of DN death
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
|
+ 1000);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
|
|
|
+ // handle under-replicated blocks quickly (for replication asserts)
|
|
|
+ conf.setInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
|
|
+
|
|
|
+ // handle failures in the DFSClient pipeline quickly
|
|
|
+ // (for cluster.shutdown(); fs.close() idiom)
|
|
|
+ conf.setInt("ipc.client.connect.max.retries", 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Recover file.
|
|
|
+ * Try and open file in append mode.
|
|
|
+ * Doing this, we get a hold of the file that crashed writer
|
|
|
+ * was writing to. Once we have it, close it. This will
|
|
|
+ * allow subsequent reader to see up to last sync.
|
|
|
+ * NOTE: This is the same algorithm that HBase uses for file recovery
|
|
|
+ * @param fs
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private void recoverFile(final FileSystem fs) throws Exception {
|
|
|
+ LOG.info("Recovering File Lease");
|
|
|
+
|
|
|
+ // set the soft limit to be 1 second so that the
|
|
|
+ // namenode triggers lease recovery upon append request
|
|
|
+ cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
|
|
|
+
|
|
|
+ // Trying recovery
|
|
|
+ int tries = 60;
|
|
|
+ boolean recovered = false;
|
|
|
+ FSDataOutputStream out = null;
|
|
|
+ while (!recovered && tries-- > 0) {
|
|
|
+ try {
|
|
|
+ out = fs.append(file1);
|
|
|
+ LOG.info("Successfully opened for appends");
|
|
|
+ recovered = true;
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("Failed open for append, waiting on lease recovery");
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ // ignore it and try again
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (out != null) {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ if (!recovered) {
|
|
|
+ fail("Recovery should take < 1 min");
|
|
|
+ }
|
|
|
+ LOG.info("Past out lease recovery");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case that stops a writer after finalizing a block but
|
|
|
+ * before calling completeFile, and then tries to recover
|
|
|
+ * the lease from another thread.
|
|
|
+ */
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testRecoverFinalizedBlock() throws Throwable {
|
|
|
+ cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ NameNode preSpyNN = cluster.getNameNode();
|
|
|
+ NameNode spyNN = spy(preSpyNN);
|
|
|
+
|
|
|
+ // Delay completeFile
|
|
|
+ DelayAnswer delayer = new DelayAnswer();
|
|
|
+ doAnswer(delayer).when(spyNN).complete(
|
|
|
+ anyString(), anyString(), (Block)anyObject());
|
|
|
+
|
|
|
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
|
|
|
+ file1 = new Path("/testRecoverFinalized");
|
|
|
+ final OutputStream stm = client.create("/testRecoverFinalized", true);
|
|
|
+
|
|
|
+ // write 1/2 block
|
|
|
+ AppendTestUtil.write(stm, 0, 4096);
|
|
|
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
|
|
|
+ Thread t = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ stm.close();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ err.set(t);
|
|
|
+ }
|
|
|
+ }};
|
|
|
+ t.start();
|
|
|
+ LOG.info("Waiting for close to get to latch...");
|
|
|
+ delayer.waitForCall();
|
|
|
+
|
|
|
+ // At this point, the block is finalized on the DNs, but the file
|
|
|
+ // has not been completed in the NN.
|
|
|
+ // Lose the leases
|
|
|
+ LOG.info("Killing lease checker");
|
|
|
+ client.leasechecker.interruptAndJoin();
|
|
|
+
|
|
|
+ FileSystem fs1 = cluster.getFileSystem();
|
|
|
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
|
|
|
+ fs1.getConf());
|
|
|
+
|
|
|
+ LOG.info("Recovering file");
|
|
|
+ recoverFile(fs2);
|
|
|
+
|
|
|
+ LOG.info("Telling close to proceed.");
|
|
|
+ delayer.proceed();
|
|
|
+ LOG.info("Waiting for close to finish.");
|
|
|
+ t.join();
|
|
|
+ LOG.info("Close finished.");
|
|
|
+
|
|
|
+ // We expect that close will get a "File is not open"
|
|
|
+ // error.
|
|
|
+ Throwable thrownByClose = err.get();
|
|
|
+ assertNotNull(thrownByClose);
|
|
|
+ assertTrue(thrownByClose instanceof IOException);
|
|
|
+ if (!thrownByClose.getMessage().contains(
|
|
|
+ "No lease on /testRecoverFinalized"))
|
|
|
+ throw thrownByClose;
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case that stops a writer after finalizing a block but
|
|
|
+ * before calling completeFile, recovers a file from another writer,
|
|
|
+ * starts writing from that writer, and then has the old lease holder
|
|
|
+ * call completeFile
|
|
|
+ */
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testCompleteOtherLeaseHoldersFile() throws Throwable {
|
|
|
+ cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ NameNode preSpyNN = cluster.getNameNode();
|
|
|
+ NameNode spyNN = spy(preSpyNN);
|
|
|
+
|
|
|
+ // Delay completeFile
|
|
|
+ DelayAnswer delayer = new DelayAnswer();
|
|
|
+ doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), (Block)anyObject());
|
|
|
+
|
|
|
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
|
|
|
+ file1 = new Path("/testCompleteOtherLease");
|
|
|
+ final OutputStream stm = client.create("/testCompleteOtherLease", true);
|
|
|
+
|
|
|
+ // write 1/2 block
|
|
|
+ AppendTestUtil.write(stm, 0, 4096);
|
|
|
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
|
|
|
+ Thread t = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ stm.close();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ err.set(t);
|
|
|
+ }
|
|
|
+ }};
|
|
|
+ t.start();
|
|
|
+ LOG.info("Waiting for close to get to latch...");
|
|
|
+ delayer.waitForCall();
|
|
|
+
|
|
|
+ // At this point, the block is finalized on the DNs, but the file
|
|
|
+ // has not been completed in the NN.
|
|
|
+ // Lose the leases
|
|
|
+ LOG.info("Killing lease checker");
|
|
|
+ client.leasechecker.interruptAndJoin();
|
|
|
+
|
|
|
+ FileSystem fs1 = cluster.getFileSystem();
|
|
|
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
|
|
|
+ fs1.getConf());
|
|
|
+
|
|
|
+ LOG.info("Recovering file");
|
|
|
+ recoverFile(fs2);
|
|
|
+
|
|
|
+ LOG.info("Opening file for append from new fs");
|
|
|
+ FSDataOutputStream appenderStream = fs2.append(file1);
|
|
|
+
|
|
|
+ LOG.info("Writing some data from new appender");
|
|
|
+ AppendTestUtil.write(appenderStream, 0, 4096);
|
|
|
+
|
|
|
+ LOG.info("Telling old close to proceed.");
|
|
|
+ delayer.proceed();
|
|
|
+ LOG.info("Waiting for close to finish.");
|
|
|
+ t.join();
|
|
|
+ LOG.info("Close finished.");
|
|
|
+
|
|
|
+ // We expect that close will get a "Lease mismatch"
|
|
|
+ // error.
|
|
|
+ Throwable thrownByClose = err.get();
|
|
|
+ assertNotNull(thrownByClose);
|
|
|
+ assertTrue(thrownByClose instanceof IOException);
|
|
|
+ if (!thrownByClose.getMessage().contains(
|
|
|
+ "Lease mismatch"))
|
|
|
+ throw thrownByClose;
|
|
|
+
|
|
|
+ // The appender should be able to close properly
|
|
|
+ appenderStream.close();
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mockito answer helper that triggers one latch as soon as the
|
|
|
+ * method is called, then waits on another before continuing.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private static class DelayAnswer implements Answer {
|
|
|
+ private final CountDownLatch fireLatch = new CountDownLatch(1);
|
|
|
+ private final CountDownLatch waitLatch = new CountDownLatch(1);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until the method is called.
|
|
|
+ */
|
|
|
+ public void waitForCall() throws InterruptedException {
|
|
|
+ fireLatch.await();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tell the method to proceed.
|
|
|
+ * This should only be called after waitForCall()
|
|
|
+ */
|
|
|
+ public void proceed() {
|
|
|
+ waitLatch.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ LOG.info("DelayAnswer firing fireLatch");
|
|
|
+ fireLatch.countDown();
|
|
|
+ try {
|
|
|
+ LOG.info("DelayAnswer waiting on waitLatch");
|
|
|
+ waitLatch.await();
|
|
|
+ LOG.info("DelayAnswer delay complete");
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted waiting on latch", ie);
|
|
|
+ }
|
|
|
+ return invocation.callRealMethod();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|