浏览代码

HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer (#6925). Contributed by Aswin M Prabhu.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Aswin M Prabhu 9 月之前
父节点
当前提交
e2a0dca43b

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
       "dfs.journalnode.sync.interval";
   public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
+  public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
+      "dfs.journalnode.enable.sync.format";
+  public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
   public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
       "dfs.journalnode.edit-cache-size.bytes";
 

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.security.KerberosInfo;
@@ -51,4 +52,13 @@ public interface InterQJournalProtocol {
       String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
       throws IOException;
 
+  /**
+   * Get the storage info for the specified journal.
+   * @param jid the journal identifier
+   * @param nameServiceId the name service id
+   * @return the storage info object
+   */
+  StorageInfoProto getStorageInfo(String jid, String nameServiceId)
+      throws IOException;
+
 }

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 
@@ -60,4 +62,18 @@ public class InterQJournalProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public StorageInfoProto getStorageInfo(
+      RpcController controller, GetStorageInfoRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getStorageInfo(
+          request.getJid().getIdentifier(),
+          request.hasNameServiceId() ? request.getNameServiceId() : null
+      );
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java

@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hdfs.qjournal.protocolPB;
 
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -75,6 +77,18 @@ public class InterQJournalProtocolTranslatorPB implements ProtocolMetaInterface,
         req.build()));
   }
 
+  @Override
+  public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
+      throws IOException {
+    InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
+        InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
+            .setJid(convertJournalId(jid));
+    if (nameServiceId != null) {
+      req.setNameServiceId(nameServiceId);
+    }
+    return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
+  }
+
   private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
     return QJournalProtocolProtos.JournalIdProto.newBuilder()
         .setIdentifier(jid)

+ 18 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.qjournal.server;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
 import org.apache.hadoop.thirdparty.protobuf.BlockingService;
 import org.slf4j.Logger;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,
 
   JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
     this.jn = jn;
-    
+
     Configuration confCopy = new Configuration(conf);
-    
+
     // Ensure that nagling doesn't kick in, which could cause latency issues.
     confCopy.setBoolean(
         CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
         true);
-    
+
     InetSocketAddress addr = getAddress(confCopy);
     String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
     if (bindHost == null) {
@@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
     this.handlerCount = confHandlerCount;
     LOG.info("The number of JournalNodeRpcServer handlers is {}.",
         this.handlerCount);
-    
+
     this.server = new RPC.Builder(confCopy)
         .setProtocol(QJournalProtocolPB.class)
         .setInstance(service)
@@ -149,15 +150,15 @@ public class JournalNodeRpcServer implements QJournalProtocol,
   public InetSocketAddress getAddress() {
     return server.getListenerAddress();
   }
-  
+
   void join() throws InterruptedException {
     this.server.join();
   }
-  
+
   void stop() {
     this.server.stop();
   }
-  
+
   static InetSocketAddress getAddress(Configuration conf) {
     String addr = conf.get(
         DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
@@ -211,7 +212,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
     jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
        .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
   }
-  
+
   @Override
   public void heartbeat(RequestInfo reqInfo) throws IOException {
     jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
@@ -245,10 +246,10 @@ public class JournalNodeRpcServer implements QJournalProtocol,
       String jid, String nameServiceId,
       long sinceTxId, boolean inProgressOk)
       throws IOException {
-    
+
     RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
         .getEditLogManifest(sinceTxId, inProgressOk);
-    
+
     return GetEditLogManifestResponseProto.newBuilder()
         .setManifest(PBHelper.convert(manifest))
         .setHttpPort(jn.getBoundHttpAddress().getPort())
@@ -256,6 +257,13 @@ public class JournalNodeRpcServer implements QJournalProtocol,
         .build();
   }
 
+  @Override
+  public StorageInfoProto getStorageInfo(String jid,
+      String nameServiceId) throws IOException {
+    StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
+    return PBHelper.convert(storage);
+  }
+
   @Override
   public GetJournaledEditsResponseProto getJournaledEdits(String jid,
       String nameServiceId, long sinceTxId, int maxTxns) throws IOException {

+ 80 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java

@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.qjournal.server;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -79,6 +82,7 @@ public class JournalNodeSyncer {
   private int numOtherJNs;
   private int journalNodeIndexForSync = 0;
   private final long journalSyncInterval;
+  private final boolean tryFormatting;
   private final int logSegmentTransferTimeout;
   private final DataTransferThrottler throttler;
   private final JournalMetrics metrics;
@@ -98,6 +102,9 @@ public class JournalNodeSyncer {
     logSegmentTransferTimeout = conf.getInt(
         DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
         DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
+    tryFormatting = conf.getBoolean(
+        DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
     throttler = getThrottler(conf);
     metrics = journal.getMetrics();
     journalSyncerStarted = false;
@@ -171,6 +178,8 @@ public class JournalNodeSyncer {
       // Wait for journal to be formatted to create edits.sync directory
       while(!journal.isFormatted()) {
         try {
+          // Format the journal with namespace info from the other JNs if it is not formatted
+          formatWithSyncer();
           Thread.sleep(journalSyncInterval);
         } catch (InterruptedException e) {
           LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
@@ -187,7 +196,15 @@ public class JournalNodeSyncer {
       while(shouldSync) {
         try {
           if (!journal.isFormatted()) {
-            LOG.warn("Journal cannot sync. Not formatted.");
+            LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
+            formatWithSyncer();
+            if (journal.isFormatted() && !createEditsSyncDir()) {
+              LOG.error("Failed to create directory for downloading log " +
+                      "segments: {}. Stopping Journal Node Sync.",
+                  journal.getStorage().getEditsSyncDir());
+              return;
+            }
+            continue;
           } else {
             syncJournals();
           }
@@ -233,6 +250,68 @@ public class JournalNodeSyncer {
     journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
   }
 
+  private void formatWithSyncer() {
+    if (!tryFormatting) {
+      return;
+    }
+    LOG.info("Trying to format the journal with the syncer");
+    try {
+      StorageInfo storage = null;
+      for (JournalNodeProxy jnProxy : otherJNProxies) {
+        if (!hasEditLogs(jnProxy)) {
+          // This avoids a race condition between `hdfs namenode -format` and
+          // JN syncer by checking if the other JN is not newly formatted.
+          continue;
+        }
+        try {
+          HdfsServerProtos.StorageInfoProto storageInfoResponse =
+              jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
+          storage = PBHelper.convert(
+              storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
+          );
+          if (storage.getNamespaceID() == 0) {
+            LOG.error("Got invalid StorageInfo from " + jnProxy);
+            storage = null;
+            continue;
+          }
+          LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
+          break;
+        } catch (IOException e) {
+          LOG.error("Could not get StorageInfo from " + jnProxy, e);
+        }
+      }
+      if (storage == null) {
+        LOG.error("Could not get StorageInfo from any JournalNode. " +
+            "JournalNodeSyncer cannot format the journal.");
+        return;
+      }
+      NamespaceInfo nsInfo = new NamespaceInfo(storage);
+      journal.format(nsInfo, true);
+    } catch (IOException e) {
+      LOG.error("Exception in formatting the journal with the syncer", e);
+    }
+  }
+
+  private boolean hasEditLogs(JournalNodeProxy journalProxy) {
+    GetEditLogManifestResponseProto editLogManifest;
+    try {
+      editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
+          jid, nameServiceId, 0, false);
+    } catch (IOException e) {
+      LOG.error("Could not get edit log manifest from " + journalProxy, e);
+      return false;
+    }
+
+    List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
+        editLogManifest.getManifest()).getLogs();
+    if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
+      LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
+      return false;
+    }
+
+    return true;
+  }
+
   private void syncWithJournalAtIndex(int index) {
     LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
         + jn.getBoundIpcAddress().getPort() + " with "

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto

@@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal;
 import "HdfsServer.proto";
 import "QJournalProtocol.proto";
 
+message GetStorageInfoRequestProto {
+    required JournalIdProto jid = 1;
+    optional string nameServiceId = 2;
+}
 
 service InterQJournalProtocolService {
     rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
     returns (GetEditLogManifestResponseProto);
+
+    rpc getStorageInfo(GetStorageInfoRequestProto)
+    returns (StorageInfoProto);
 }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -5071,6 +5071,16 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.journalnode.enable.sync.format</name>
+  <value>false</value>
+  <description>
+    If true, the journal node syncer daemon that tries to sync edit
+    logs between journal nodes will try to format its journal if it is not.
+    It will query the other journal nodes for the storage info required to format.
+  </description>
+</property>
+
 <property>
   <name>dfs.journalnode.edit-cache-size.bytes</name>
   <value></value>

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.server;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -75,6 +76,7 @@ public class TestJournalNodeSync {
     conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+    conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true);
     if (testName.getMethodName().equals(
         "testSyncAfterJNdowntimeWithoutQJournalQueue")) {
       conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
@@ -478,6 +480,33 @@ public class TestJournalNodeSync {
     }
   }
 
+  @Test(timeout=300_000)
+  public void testFormatWithSyncer() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs
+    long firstTxId = generateEditLog();
+
+    // Delete them from the JN01
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+
+    // Wait to ensure sync starts, delete the storage directory itself to simulate a disk wipe
+    // and ensure that the in-memory formatting state of JNStorage gets updated
+    Thread.sleep(2000);
+    FileUtils.deleteDirectory(firstJournalDir);
+    jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage();
+
+    // Wait for JN formatting with Syncer
+    GenericTestUtils.waitFor(jnFormatted(0), 500, 30000);
+    // Generate some more edit log so that the JN updates its committed tx id
+    generateEditLog();
+    // Check that the missing edit logs have been synced
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
   private File deleteEditLog(File currentDir, long startTxId)
       throws IOException {
     EditLogFile logFile = getLogFile(currentDir, startTxId);
@@ -581,4 +610,19 @@ public class TestJournalNodeSync {
     };
     return supplier;
   }
+
+  private Supplier<Boolean> jnFormatted(int jnIndex) throws Exception {
+    Supplier<Boolean> supplier = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return jCluster.getJournalNode(jnIndex).getOrCreateJournal(jid)
+              .isFormatted();
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    };
+    return supplier;
+  }
 }