Forráskód Böngészése

HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1365788 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 éve
szülő
commit
939f4a9f92

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt

@@ -2,3 +2,5 @@ Changes for HDFS-3077 branch.
 This will be merged into the main CHANGES.txt when the branch is merged.
 
 HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude.
+
+HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java

@@ -20,12 +20,12 @@ package org.apache.hadoop.hdfs.qjournal.client;
 import java.net.URL;
 
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -81,7 +81,7 @@ interface AsyncLogger {
   /**
    * Fetch the list of edit logs available on the remote node.
    */
-  public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+  public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       long fromTxnId);
 
   /**

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -232,13 +233,13 @@ class AsyncLoggerSet {
     return QuorumCall.create(calls);
   }
 
-  public QuorumCall<AsyncLogger,GetEditLogManifestResponseProto>
+  public QuorumCall<AsyncLogger, RemoteEditLogManifest>
       getEditLogManifest(long fromTxnId) {
     Map<AsyncLogger,
-        ListenableFuture<GetEditLogManifestResponseProto>> calls
+        ListenableFuture<RemoteEditLogManifest>> calls
         = Maps.newHashMap();
     for (AsyncLogger logger : loggers) {
-      ListenableFuture<GetEditLogManifestResponseProto> future =
+      ListenableFuture<RemoteEditLogManifest> future =
           logger.getEditLogManifest(fromTxnId);
       calls.put(logger, future);
     }

+ 11 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 
@@ -290,12 +292,17 @@ public class IPCLoggerChannel implements AsyncLogger {
   }
   
   @Override
-  public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+  public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId) {
-    return executor.submit(new Callable<GetEditLogManifestResponseProto>() {
+    return executor.submit(new Callable<RemoteEditLogManifest>() {
       @Override
-      public GetEditLogManifestResponseProto call() throws IOException {
-        return getProxy().getEditLogManifest(journalId, fromTxnId);
+      public RemoteEditLogManifest call() throws IOException {
+        GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
+            journalId, fromTxnId);
+        // Update the http port, since we need this to build URLs to any of the
+        // returned logs.
+        httpPort = ret.getHttpPort();
+        return PBHelper.convert(ret.getManifest());
       }
     });
   }

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
@@ -334,9 +335,9 @@ public class QuorumJournalManager implements JournalManager {
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxnId, boolean inProgressOk) {
 
-    QuorumCall<AsyncLogger,GetEditLogManifestResponseProto> q =
+    QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
         loggers.getEditLogManifest(fromTxnId);
-    Map<AsyncLogger, GetEditLogManifestResponseProto> resps;
+    Map<AsyncLogger, RemoteEditLogManifest> resps;
     try {
       resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
     } catch (IOException ioe) {
@@ -344,16 +345,15 @@ public class QuorumJournalManager implements JournalManager {
       throw new RuntimeException(ioe);
     }
     
-    LOG.info("selectInputStream manifests:\n" +
-        QuorumCall.mapToString(resps));
+    LOG.debug("selectInputStream manifests:\n" +
+        Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
     
     final PriorityQueue<EditLogInputStream> allStreams = 
         new PriorityQueue<EditLogInputStream>(64,
             JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
-    for (Map.Entry<AsyncLogger, GetEditLogManifestResponseProto> e : resps.entrySet()) {
+    for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
       AsyncLogger logger = e.getKey();
-      GetEditLogManifestResponseProto response = e.getValue();
-      RemoteEditLogManifest manifest = PBHelper.convert(response.getManifest());
+      RemoteEditLogManifest manifest = e.getValue();
       
       for (RemoteEditLog remoteLog : manifest.getLogs()) {
         URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -281,7 +281,8 @@ class Journal implements Closeable {
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
       throws IOException {
-    // TODO: check fencing info?
+    // No need to checkRequest() here - anyone may ask for the list
+    // of segments.
     RemoteEditLogManifest manifest = new RemoteEditLogManifest(
         fjm.getRemoteEditLogs(sinceTxId));
     return manifest;

+ 69 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -35,11 +34,15 @@ import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
 import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -97,6 +100,69 @@ public class TestQuorumJournalManager {
     checkRecovery(cluster, 4, 4);
   }
   
+  @Test
+  public void testReaderWhileAnotherWrites() throws Exception {
+    
+    QuorumJournalManager readerQjm = createSpyingQJM();
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    readerQjm.selectInputStreams(streams, 0, false);
+    assertEquals(0, streams.size());
+    writeSegment(qjm, 1, 3, true);
+
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(1, streams.size());
+      // Validate the actual stream contents.
+      EditLogInputStream stream = streams.get(0);
+      assertEquals(1, stream.getFirstTxId());
+      assertEquals(3, stream.getLastTxId());
+      
+      for (int i = 1; i <= 3; i++) {
+        FSEditLogOp op = stream.readOp();
+        assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+        assertEquals(i, op.getTransactionId());
+      }
+      assertNull(stream.readOp());
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+    
+    // Ensure correct results when there is a stream in-progress, but we don't
+    // ask for in-progress.
+    writeSegment(qjm, 4, 3, false);
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(1, streams.size());
+      EditLogInputStream stream = streams.get(0);
+      assertEquals(1, stream.getFirstTxId());
+      assertEquals(3, stream.getLastTxId());
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+    
+    // TODO: check results for selectInputStreams with inProgressOK = true.
+    // This doesn't currently work, due to a bug where RedundantEditInputStream
+    // throws an exception if there are any unvalidated in-progress edits in the list!
+    // But, it shouldn't be necessary for current use cases.
+    
+    qjm.finalizeLogSegment(4, 6);
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(2, streams.size());
+      assertEquals(4, streams.get(1).getFirstTxId());
+      assertEquals(6, streams.get(1).getLastTxId());
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+  }
+  
+  /**
+   * TODO: this test needs to be fleshed out to be an exhaustive failure test
+   * @throws Exception
+   */
   @Test
   public void testOrchestratedFailures() throws Exception {
     writeSegment(qjm, 1, 3, true);
@@ -287,7 +353,6 @@ public class TestQuorumJournalManager {
         conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) {
           @Override
           protected List<AsyncLogger> createLoggers() throws IOException {
-            LOG.info("===> make spies");
             List<AsyncLogger> realLoggers = super.createLoggers();
             List<AsyncLogger> spies = Lists.newArrayList();
             for (AsyncLogger logger : realLoggers) {