|
@@ -17,18 +17,10 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.qjournal.server;
|
|
|
|
|
|
-import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
|
-
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileFilter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-import javax.management.ObjectName;
|
|
|
-
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -47,14 +39,22 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.tracing.TraceUtils;
|
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
|
+import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.apache.htrace.core.Tracer;
|
|
|
import org.eclipse.jetty.util.ajax.JSON;
|
|
|
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
-import com.google.common.collect.Maps;
|
|
|
+import javax.management.ObjectName;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileFilter;
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* The JournalNode is a daemon which allows namenodes using
|
|
@@ -74,7 +74,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
.newHashMap();
|
|
|
private ObjectName journalNodeInfoBeanName;
|
|
|
private String httpServerURI;
|
|
|
- private File localDir;
|
|
|
+ private final ArrayList<File> localDir = Lists.newArrayList();
|
|
|
Tracer tracer;
|
|
|
|
|
|
static {
|
|
@@ -94,11 +94,10 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
|
|
|
Journal journal = journalsById.get(jid);
|
|
|
if (journal == null) {
|
|
|
- File logDir = getLogDir(jid);
|
|
|
- LOG.info("Initializing journal in directory " + logDir);
|
|
|
+ File logDir = getLogDir(jid, nameServiceId);
|
|
|
+ LOG.info("Initializing journal in directory " + logDir);
|
|
|
journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
|
|
|
journalsById.put(jid, journal);
|
|
|
-
|
|
|
// Start SyncJouranl thread, if JournalNode Sync is enabled
|
|
|
if (conf.getBoolean(
|
|
|
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
|
|
@@ -148,9 +147,34 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
@Override
|
|
|
public void setConf(Configuration conf) {
|
|
|
this.conf = conf;
|
|
|
- this.localDir = new File(
|
|
|
- conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
|
|
- DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim());
|
|
|
+
|
|
|
+ String journalNodeDir = null;
|
|
|
+ Collection<String> nameserviceIds;
|
|
|
+
|
|
|
+ nameserviceIds = conf.getTrimmedStringCollection(
|
|
|
+ DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
|
|
|
+
|
|
|
+ if (nameserviceIds.size() == 0) {
|
|
|
+ nameserviceIds = conf.getTrimmedStringCollection(
|
|
|
+ DFSConfigKeys.DFS_NAMESERVICES);
|
|
|
+ }
|
|
|
+
|
|
|
+ //if nameservicesIds size is less than 2, it means it is not a federated
|
|
|
+ // setup
|
|
|
+ if (nameserviceIds.size() < 2) {
|
|
|
+ // Check in HA, if journal edit dir is set by appending with
|
|
|
+ // nameserviceId
|
|
|
+ for (String nameService : nameserviceIds) {
|
|
|
+ journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY +
|
|
|
+ "." + nameService);
|
|
|
+ }
|
|
|
+ if (journalNodeDir == null) {
|
|
|
+ journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
|
|
+ DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
|
|
+ }
|
|
|
+ localDir.add(new File(journalNodeDir.trim()));
|
|
|
+ }
|
|
|
+
|
|
|
if (this.tracer == null) {
|
|
|
this.tracer = new Tracer.Builder("JournalNode").
|
|
|
conf(TraceUtils.wrapHadoopConf("journalnode.htrace", conf)).
|
|
@@ -158,12 +182,13 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void validateAndCreateJournalDir(File dir) throws IOException {
|
|
|
+ private static void validateAndCreateJournalDir(File dir)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
if (!dir.isAbsolute()) {
|
|
|
throw new IllegalArgumentException(
|
|
|
"Journal dir '" + dir + "' should be an absolute path");
|
|
|
}
|
|
|
-
|
|
|
DiskChecker.checkDir(dir);
|
|
|
}
|
|
|
|
|
@@ -186,8 +211,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
|
|
|
try {
|
|
|
|
|
|
- validateAndCreateJournalDir(localDir);
|
|
|
-
|
|
|
+ for (File journalDir : localDir) {
|
|
|
+ validateAndCreateJournalDir(journalDir);
|
|
|
+ }
|
|
|
DefaultMetricsSystem.initialize("JournalNode");
|
|
|
JvmMetrics.create("JournalNode",
|
|
|
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
|
|
@@ -297,16 +323,33 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
* @param jid the journal identifier
|
|
|
* @return the file, which may or may not exist yet
|
|
|
*/
|
|
|
- private File getLogDir(String jid) {
|
|
|
- String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
|
|
- DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
|
|
+ private File getLogDir(String jid, String nameServiceId) throws IOException{
|
|
|
+ String dir = null;
|
|
|
+ if (nameServiceId != null) {
|
|
|
+ dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY + "." +
|
|
|
+ nameServiceId);
|
|
|
+ }
|
|
|
+ if (dir == null) {
|
|
|
+ dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
|
|
|
+ DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ File journalDir = new File(dir.trim());
|
|
|
+ if (!localDir.contains(journalDir)) {
|
|
|
+ //It is a federated setup, we need to validate journalDir
|
|
|
+ validateAndCreateJournalDir(journalDir);
|
|
|
+ localDir.add(journalDir);
|
|
|
+ }
|
|
|
+
|
|
|
Preconditions.checkArgument(jid != null &&
|
|
|
!jid.isEmpty(),
|
|
|
"bad journal identifier: %s", jid);
|
|
|
assert jid != null;
|
|
|
- return new File(new File(dir), jid);
|
|
|
+ return new File(journalDir, jid);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
@Override // JournalNodeMXBean
|
|
|
public String getJournalsStatus() {
|
|
|
// jid:{Formatted:True/False}
|
|
@@ -328,20 +371,22 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|
|
// Also note that we do not need to check localDir here since
|
|
|
// validateAndCreateJournalDir has been called before we register the
|
|
|
// MXBean.
|
|
|
- File[] journalDirs = localDir.listFiles(new FileFilter() {
|
|
|
- @Override
|
|
|
- public boolean accept(File file) {
|
|
|
- return file.isDirectory();
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- if (journalDirs != null) {
|
|
|
- for (File journalDir : journalDirs) {
|
|
|
- String jid = journalDir.getName();
|
|
|
- if (!status.containsKey(jid)) {
|
|
|
- Map<String, String> jMap = new HashMap<String, String>();
|
|
|
- jMap.put("Formatted", "true");
|
|
|
- status.put(jid, jMap);
|
|
|
+ for (File jDir : localDir) {
|
|
|
+ File[] journalDirs = jDir.listFiles(new FileFilter() {
|
|
|
+ @Override
|
|
|
+ public boolean accept(File file) {
|
|
|
+ return file.isDirectory();
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ if (journalDirs != null) {
|
|
|
+ for (File journalDir : journalDirs) {
|
|
|
+ String jid = journalDir.getName();
|
|
|
+ if (!status.containsKey(jid)) {
|
|
|
+ Map<String, String> jMap = new HashMap<String, String>();
|
|
|
+ jMap.put("Formatted", "true");
|
|
|
+ status.put(jid, jMap);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|