|
@@ -17,12 +17,19 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
|
|
|
+
|
|
|
|
+import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Properties;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -34,10 +41,15 @@ import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtoc
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.Storage;
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.Util;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
@@ -53,6 +65,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.protobuf.BlockingService;
|
|
import com.google.protobuf.BlockingService;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This class interfaces with the namenode using {@link JournalProtocol} over
|
|
* This class interfaces with the namenode using {@link JournalProtocol} over
|
|
* RPC. It has two modes: <br>
|
|
* RPC. It has two modes: <br>
|
|
@@ -81,6 +95,8 @@ public class JournalService implements JournalProtocol {
|
|
private String fencerInfo;
|
|
private String fencerInfo;
|
|
private StorageInfo storageInfo;
|
|
private StorageInfo storageInfo;
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
|
|
+ private FSEditLog editLog;
|
|
|
|
+ private FSImage image;
|
|
|
|
|
|
enum State {
|
|
enum State {
|
|
/** The service is initialized and ready to start. */
|
|
/** The service is initialized and ready to start. */
|
|
@@ -182,16 +198,59 @@ public class JournalService implements JournalProtocol {
|
|
.getProxy();
|
|
.getProxy();
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ initializeJournalStorage(conf);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.info("Exception in initialize: " + ioe.getMessage());
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ /** This routine initializes the storage directory. It is possible that
|
|
|
|
+ * the directory is not formatted. In this case, it creates dummy entries
|
|
|
|
+ * and storage is later formatted.
|
|
|
|
+ */
|
|
|
|
+ private void initializeJournalStorage(Configuration conf) throws IOException {
|
|
|
|
+
|
|
|
|
+ boolean isFormatted = false;
|
|
|
|
+ Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
|
|
+ List<URI> editUrisToFormat = getJournalEditsDirs(conf);
|
|
|
|
+
|
|
// Load the newly formatted image, using all of the directories
|
|
// Load the newly formatted image, using all of the directories
|
|
- FSImage image = new FSImage(conf);
|
|
|
|
- storageInfo = image.getStorage();
|
|
|
|
- LOG.info("JournalService constructor, nsid " + storageInfo.getNamespaceID()
|
|
|
|
- + " cluster id " + storageInfo.getClusterID());
|
|
|
|
-
|
|
|
|
- String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
|
|
|
|
- registration = new NamenodeRegistration(addr, "", storageInfo,
|
|
|
|
- NamenodeRole.BACKUP);
|
|
|
|
|
|
+ image = new FSImage(conf, dirsToFormat, editUrisToFormat);
|
|
|
|
+ Map<StorageDirectory, StorageState> dataDirStates =
|
|
|
|
+ new HashMap<StorageDirectory, StorageState>();
|
|
|
|
+ isFormatted = image
|
|
|
|
+ .recoverStorageDirs(StartupOption.REGULAR, dataDirStates);
|
|
|
|
+
|
|
|
|
+ if (isFormatted == true) {
|
|
|
|
+ // Directory has been formatted. So, it should have a versionfile.
|
|
|
|
+ this.editLog = image.getEditLog();
|
|
|
|
+ Iterator<StorageDirectory> sdit = image.getStorage().dirIterator(
|
|
|
|
+ NNStorage.NameNodeDirType.IMAGE);
|
|
|
|
+ StorageDirectory sd = sdit.next();
|
|
|
|
+
|
|
|
|
+ Properties props = Storage.readPropertiesFile(sd.getVersionFile());
|
|
|
|
+ String cid = props.getProperty("clusterID");
|
|
|
|
+ String nsid = props.getProperty("namespaceID");
|
|
|
|
+ String layout = props.getProperty("layoutVersion");
|
|
|
|
+ storageInfo = new StorageInfo(Integer.parseInt(layout),
|
|
|
|
+ Integer.parseInt(nsid), cid, 0);
|
|
|
|
+
|
|
|
|
+ LOG.info("JournalService constructor, nsid "
|
|
|
|
+ + storageInfo.getNamespaceID() + " cluster id "
|
|
|
|
+ + storageInfo.getClusterID());
|
|
|
|
+
|
|
|
|
+ String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
|
|
|
|
+ registration = new NamenodeRegistration(addr, "", storageInfo,
|
|
|
|
+ NamenodeRole.BACKUP);
|
|
|
|
+ } else {
|
|
|
|
+ // Storage directory has not been formatted. So create dummy entries for now.
|
|
|
|
+ image = new FSImage(conf);
|
|
|
|
+ storageInfo = image.getStorage();
|
|
|
|
+ editLog = null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -275,24 +334,27 @@ public class JournalService implements JournalProtocol {
|
|
}
|
|
}
|
|
|
|
|
|
private void setupStorage(JournalInfo jinfo) throws IOException {
|
|
private void setupStorage(JournalInfo jinfo) throws IOException {
|
|
- setupStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
|
|
|
|
|
|
+ formatStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
|
|
}
|
|
}
|
|
|
|
|
|
private void setupStorage(NamespaceInfo nsinfo) throws IOException {
|
|
private void setupStorage(NamespaceInfo nsinfo) throws IOException {
|
|
- setupStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
|
|
|
|
|
|
+ formatStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
|
|
}
|
|
}
|
|
|
|
|
|
- // Will format the edits dir and save the nsid + cluster id.
|
|
|
|
- private void setupStorage(int nsId, String clusterId) throws IOException {
|
|
|
|
- // For now, use the namespacedirs and edits dir to save the journal info.
|
|
|
|
- // Going forward, this can be modified to extract journal specific edits
|
|
|
|
- // dir.
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Will format the edits dir and save the nsid + cluster id.
|
|
|
|
+ * @param nsId
|
|
|
|
+ * @param clusterId
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void formatStorage(int nsId, String clusterId) throws IOException {
|
|
Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
- List<URI> editUrisToFormat = FSNamesystem
|
|
|
|
- .getNamespaceEditsDirs(conf, false);
|
|
|
|
|
|
+ List<URI> editUrisToFormat = getJournalEditsDirs(conf);
|
|
NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
|
NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
|
LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
|
|
LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
|
|
nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
|
|
nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
|
|
|
|
+ image = new FSImage(conf, dirsToFormat, editUrisToFormat);
|
|
|
|
+ this.editLog = image.getEditLog();
|
|
|
|
|
|
storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
|
|
storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
|
|
nsId, clusterId, 0);
|
|
nsId, clusterId, 0);
|
|
@@ -422,4 +484,17 @@ public class JournalService implements JournalProtocol {
|
|
long getEpoch() {
|
|
long getEpoch() {
|
|
return epoch;
|
|
return epoch;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns edit directories that are shared between primary and secondary.
|
|
|
|
+ * @param conf
|
|
|
|
+ * @return Collection of edit directories.
|
|
|
|
+ */
|
|
|
|
+ public List<URI> getJournalEditsDirs(Configuration conf) {
|
|
|
|
+ // don't use getStorageDirs here, because we want an empty default
|
|
|
|
+ // rather than the dir in /tmp
|
|
|
|
+ Collection<String> dirNames = conf.getTrimmedStringCollection(
|
|
|
|
+ DFS_JOURNAL_EDITS_DIR_KEY);
|
|
|
|
+ return Util.stringCollectionAsURIs(dirNames);
|
|
|
|
+ }
|
|
}
|
|
}
|