|
@@ -19,6 +19,10 @@ package org.apache.hadoop.hdfs.server.journalservice;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -31,6 +35,9 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
@@ -66,12 +73,14 @@ public class JournalService implements JournalProtocol {
|
|
|
|
|
|
private final JournalListener listener;
|
|
|
private final InetSocketAddress nnAddress;
|
|
|
- private final NamenodeRegistration registration;
|
|
|
+ private NamenodeRegistration registration;
|
|
|
private final NamenodeProtocol namenode;
|
|
|
private final StateHandler stateHandler = new StateHandler();
|
|
|
private final RPC.Server rpcServer;
|
|
|
private long epoch = 0;
|
|
|
private String fencerInfo;
|
|
|
+ private StorageInfo storageInfo;
|
|
|
+ private Configuration conf;
|
|
|
|
|
|
enum State {
|
|
|
/** The service is initialized and ready to start. */
|
|
@@ -172,11 +181,16 @@ public class JournalService implements JournalProtocol {
|
|
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
|
|
|
.getProxy();
|
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
|
+ this.conf = conf;
|
|
|
+
|
|
|
+ // Load the newly formatted image, using all of the directories
|
|
|
+ FSImage image = new FSImage(conf);
|
|
|
+ storageInfo = image.getStorage();
|
|
|
+ LOG.info("JournalService constructor, nsid " + storageInfo.getNamespaceID()
|
|
|
+ + " cluster id " + storageInfo.getClusterID());
|
|
|
|
|
|
String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
|
|
|
- StorageInfo storage = new StorageInfo(
|
|
|
- LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
|
|
|
- registration = new NamenodeRegistration(addr, "", storage,
|
|
|
+ registration = new NamenodeRegistration(addr, "", storageInfo,
|
|
|
NamenodeRole.BACKUP);
|
|
|
}
|
|
|
|
|
@@ -187,10 +201,10 @@ public class JournalService implements JournalProtocol {
|
|
|
stateHandler.start();
|
|
|
|
|
|
// Start the RPC server
|
|
|
- LOG.info("Starting rpc server");
|
|
|
+ LOG.info("Starting journal service rpc server");
|
|
|
rpcServer.start();
|
|
|
|
|
|
- for(boolean registered = false, handshakeComplete = false; ; ) {
|
|
|
+ for (boolean registered = false, handshakeComplete = false;;) {
|
|
|
try {
|
|
|
// Perform handshake
|
|
|
if (!handshakeComplete) {
|
|
@@ -198,7 +212,7 @@ public class JournalService implements JournalProtocol {
|
|
|
handshakeComplete = true;
|
|
|
LOG.info("handshake completed");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Register with the namenode
|
|
|
if (!registered) {
|
|
|
registerWithNamenode();
|
|
@@ -211,7 +225,7 @@ public class JournalService implements JournalProtocol {
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Encountered exception ", e);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException ie) {
|
|
@@ -259,13 +273,47 @@ public class JournalService implements JournalProtocol {
|
|
|
listener.rollLogs(this, txid);
|
|
|
stateHandler.startLogSegment();
|
|
|
}
|
|
|
+
|
|
|
+ private void setupStorage(JournalInfo jinfo) throws IOException {
|
|
|
+ setupStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setupStorage(NamespaceInfo nsinfo) throws IOException {
|
|
|
+ setupStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Will format the edits dir and save the nsid + cluster id.
|
|
|
+ private void setupStorage(int nsId, String clusterId) throws IOException {
|
|
|
+ // For now, use the namespacedirs and edits dir to save the journal info.
|
|
|
+ // Going forward, this can be modified to extract journal specific edits
|
|
|
+ // dir.
|
|
|
+ Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
|
+ List<URI> editUrisToFormat = FSNamesystem
|
|
|
+ .getNamespaceEditsDirs(conf, false);
|
|
|
+ NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
|
|
+ LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
|
|
|
+ nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
|
|
|
+
|
|
|
+ storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
|
|
|
+ nsId, clusterId, 0);
|
|
|
+ registration = new NamenodeRegistration(
|
|
|
+ NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
|
|
|
+ storageInfo, NamenodeRole.BACKUP);
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
|
|
String fencerInfo) throws IOException {
|
|
|
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
|
|
|
+
|
|
|
+ // This implies that this is the first fence on the journal service
|
|
|
+ // It does not have any nsid or cluster id info.
|
|
|
+ if ((storageInfo.getClusterID() == null)
|
|
|
+ || (storageInfo.getNamespaceID() == 0)) {
|
|
|
+ setupStorage(journalInfo);
|
|
|
+ }
|
|
|
verifyFence(epoch, fencerInfo);
|
|
|
- verify(journalInfo);
|
|
|
+ verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
|
long previousEpoch = epoch;
|
|
|
this.epoch = epoch;
|
|
|
this.fencerInfo = fencerInfo;
|
|
@@ -310,20 +358,23 @@ public class JournalService implements JournalProtocol {
|
|
|
/**
|
|
|
* Verifies a journal request
|
|
|
*/
|
|
|
- private void verify(JournalInfo journalInfo) throws IOException {
|
|
|
+ private void verify(int nsid, String clusid) throws IOException {
|
|
|
String errorMsg = null;
|
|
|
int expectedNamespaceID = registration.getNamespaceID();
|
|
|
- if (journalInfo.getNamespaceId() != expectedNamespaceID) {
|
|
|
- errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
|
|
|
- + " actual " + journalInfo.getNamespaceId();
|
|
|
+
|
|
|
+ if (nsid != expectedNamespaceID) {
|
|
|
+ errorMsg = "Invalid namespaceID in journal request - expected "
|
|
|
+ + expectedNamespaceID + " actual " + nsid;
|
|
|
LOG.warn(errorMsg);
|
|
|
- throw new UnregisteredNodeException(journalInfo);
|
|
|
- }
|
|
|
- if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
|
|
|
- errorMsg = "Invalid clusterId in journal request - expected "
|
|
|
- + journalInfo.getClusterId() + " actual " + registration.getClusterID();
|
|
|
+ throw new UnregisteredNodeException(errorMsg);
|
|
|
+ }
|
|
|
+ if ((clusid == null)
|
|
|
+ || (!clusid.equals(registration.getClusterID()))) {
|
|
|
+ errorMsg = "Invalid clusterId in journal request - incoming "
|
|
|
+ + clusid + " expected "
|
|
|
+ + registration.getClusterID();
|
|
|
LOG.warn(errorMsg);
|
|
|
- throw new UnregisteredNodeException(journalInfo);
|
|
|
+ throw new UnregisteredNodeException(errorMsg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -332,7 +383,7 @@ public class JournalService implements JournalProtocol {
|
|
|
*/
|
|
|
private void verify(long e, JournalInfo journalInfo) throws IOException {
|
|
|
verifyEpoch(e);
|
|
|
- verify(journalInfo);
|
|
|
+ verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -355,7 +406,16 @@ public class JournalService implements JournalProtocol {
|
|
|
private void handshake() throws IOException {
|
|
|
NamespaceInfo nsInfo = namenode.versionRequest();
|
|
|
listener.verifyVersion(this, nsInfo);
|
|
|
- registration.setStorageInfo(nsInfo);
|
|
|
+
|
|
|
+ // If this is the first initialization of journal service, then storage
|
|
|
+ // directory will be setup. Otherwise, nsid and clusterid has to match with
|
|
|
+ // the info saved in the edits dir.
|
|
|
+ if ((storageInfo.getClusterID() == null)
|
|
|
+ || (storageInfo.getNamespaceID() == 0)) {
|
|
|
+ setupStorage(nsInfo);
|
|
|
+ } else {
|
|
|
+ verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|