|
@@ -17,39 +17,18 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
|
|
|
-
|
|
|
|
-import java.io.File;
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
-import java.net.URI;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collection;
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.Iterator;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Properties;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
|
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.Util;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
@@ -65,8 +44,6 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.protobuf.BlockingService;
|
|
import com.google.protobuf.BlockingService;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* This class interfaces with the namenode using {@link JournalProtocol} over
|
|
* This class interfaces with the namenode using {@link JournalProtocol} over
|
|
* RPC. It has two modes: <br>
|
|
* RPC. It has two modes: <br>
|
|
@@ -85,7 +62,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY;
|
|
public class JournalService implements JournalProtocol {
|
|
public class JournalService implements JournalProtocol {
|
|
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
|
|
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
|
|
|
|
|
|
- private final JournalListener listener;
|
|
|
|
private final InetSocketAddress nnAddress;
|
|
private final InetSocketAddress nnAddress;
|
|
private NamenodeRegistration registration;
|
|
private NamenodeRegistration registration;
|
|
private final NamenodeProtocol namenode;
|
|
private final NamenodeProtocol namenode;
|
|
@@ -93,11 +69,10 @@ public class JournalService implements JournalProtocol {
|
|
private final RPC.Server rpcServer;
|
|
private final RPC.Server rpcServer;
|
|
private long epoch = 0;
|
|
private long epoch = 0;
|
|
private String fencerInfo;
|
|
private String fencerInfo;
|
|
- private StorageInfo storageInfo;
|
|
|
|
- private Configuration conf;
|
|
|
|
- private FSEditLog editLog;
|
|
|
|
- private FSImage image;
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private final Journal journal;
|
|
|
|
+ private final JournalListener listener;
|
|
|
|
+
|
|
enum State {
|
|
enum State {
|
|
/** The service is initialized and ready to start. */
|
|
/** The service is initialized and ready to start. */
|
|
INIT(false, false),
|
|
INIT(false, false),
|
|
@@ -193,66 +168,30 @@ public class JournalService implements JournalProtocol {
|
|
throws IOException {
|
|
throws IOException {
|
|
this.nnAddress = nnAddr;
|
|
this.nnAddress = nnAddr;
|
|
this.listener = listener;
|
|
this.listener = listener;
|
|
|
|
+ this.journal = new Journal(conf);
|
|
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
|
|
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
|
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
|
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
|
|
.getProxy();
|
|
.getProxy();
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
- this.conf = conf;
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- initializeJournalStorage(conf);
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- LOG.info("Exception in initialize: " + ioe.getMessage());
|
|
|
|
- throw ioe;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- /** This routine initializes the storage directory. It is possible that
|
|
|
|
- * the directory is not formatted. In this case, it creates dummy entries
|
|
|
|
- * and storage is later formatted.
|
|
|
|
- */
|
|
|
|
- private void initializeJournalStorage(Configuration conf) throws IOException {
|
|
|
|
-
|
|
|
|
- boolean isFormatted = false;
|
|
|
|
- Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
|
|
- List<URI> editUrisToFormat = getJournalEditsDirs(conf);
|
|
|
|
-
|
|
|
|
- // Load the newly formatted image, using all of the directories
|
|
|
|
- image = new FSImage(conf, dirsToFormat, editUrisToFormat);
|
|
|
|
- Map<StorageDirectory, StorageState> dataDirStates =
|
|
|
|
- new HashMap<StorageDirectory, StorageState>();
|
|
|
|
- isFormatted = image
|
|
|
|
- .recoverStorageDirs(StartupOption.REGULAR, dataDirStates);
|
|
|
|
-
|
|
|
|
- if (isFormatted == true) {
|
|
|
|
- // Directory has been formatted. So, it should have a versionfile.
|
|
|
|
- this.editLog = image.getEditLog();
|
|
|
|
- Iterator<StorageDirectory> sdit = image.getStorage().dirIterator(
|
|
|
|
- NNStorage.NameNodeDirType.IMAGE);
|
|
|
|
- StorageDirectory sd = sdit.next();
|
|
|
|
-
|
|
|
|
- Properties props = Storage.readPropertiesFile(sd.getVersionFile());
|
|
|
|
- String cid = props.getProperty("clusterID");
|
|
|
|
- String nsid = props.getProperty("namespaceID");
|
|
|
|
- String layout = props.getProperty("layoutVersion");
|
|
|
|
- storageInfo = new StorageInfo(Integer.parseInt(layout),
|
|
|
|
- Integer.parseInt(nsid), cid, 0);
|
|
|
|
|
|
+ Journal getJournal() {
|
|
|
|
+ return journal;
|
|
|
|
+ }
|
|
|
|
|
|
- LOG.info("JournalService constructor, nsid "
|
|
|
|
- + storageInfo.getNamespaceID() + " cluster id "
|
|
|
|
- + storageInfo.getClusterID());
|
|
|
|
|
|
+ synchronized NamenodeRegistration getRegistration() {
|
|
|
|
+ if (!journal.isFormatted()) {
|
|
|
|
+ throw new IllegalStateException("Journal is not formatted.");
|
|
|
|
+ }
|
|
|
|
|
|
- String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
|
|
|
|
- registration = new NamenodeRegistration(addr, "", storageInfo,
|
|
|
|
- NamenodeRole.BACKUP);
|
|
|
|
- } else {
|
|
|
|
- // Storage directory has not been formatted. So create dummy entries for now.
|
|
|
|
- image = new FSImage(conf);
|
|
|
|
- storageInfo = image.getStorage();
|
|
|
|
- editLog = null;
|
|
|
|
|
|
+ if (registration == null) {
|
|
|
|
+ registration = new NamenodeRegistration(
|
|
|
|
+ NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
|
|
|
|
+ journal.getStorageInfo(), NamenodeRole.BACKUP);
|
|
}
|
|
}
|
|
|
|
+ return registration;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Start the service.
|
|
* Start the service.
|
|
*/
|
|
*/
|
|
@@ -304,9 +243,10 @@ public class JournalService implements JournalProtocol {
|
|
* Stop the service. For application with RPC Server managed outside, the
|
|
* Stop the service. For application with RPC Server managed outside, the
|
|
* RPC Server must be stopped the application.
|
|
* RPC Server must be stopped the application.
|
|
*/
|
|
*/
|
|
- public void stop() {
|
|
|
|
|
|
+ public void stop() throws IOException {
|
|
if (!stateHandler.isStopped()) {
|
|
if (!stateHandler.isStopped()) {
|
|
rpcServer.stop();
|
|
rpcServer.stop();
|
|
|
|
+ journal.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -332,47 +272,15 @@ public class JournalService implements JournalProtocol {
|
|
listener.startLogSegment(this, txid);
|
|
listener.startLogSegment(this, txid);
|
|
stateHandler.startLogSegment();
|
|
stateHandler.startLogSegment();
|
|
}
|
|
}
|
|
-
|
|
|
|
- private void setupStorage(JournalInfo jinfo) throws IOException {
|
|
|
|
- formatStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void setupStorage(NamespaceInfo nsinfo) throws IOException {
|
|
|
|
- formatStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Will format the edits dir and save the nsid + cluster id.
|
|
|
|
- * @param nsId
|
|
|
|
- * @param clusterId
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- private void formatStorage(int nsId, String clusterId) throws IOException {
|
|
|
|
- Collection<URI> dirsToFormat = new ArrayList<URI>();
|
|
|
|
- List<URI> editUrisToFormat = getJournalEditsDirs(conf);
|
|
|
|
- NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
|
|
|
- LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
|
|
|
|
- nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
|
|
|
|
- image = new FSImage(conf, dirsToFormat, editUrisToFormat);
|
|
|
|
- this.editLog = image.getEditLog();
|
|
|
|
-
|
|
|
|
- storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
|
|
|
|
- nsId, clusterId, 0);
|
|
|
|
- registration = new NamenodeRegistration(
|
|
|
|
- NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
|
|
|
|
- storageInfo, NamenodeRole.BACKUP);
|
|
|
|
- }
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
|
String fencerInfo) throws IOException {
|
|
String fencerInfo) throws IOException {
|
|
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
|
|
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
|
|
|
|
|
|
- // This implies that this is the first fence on the journal service
|
|
|
|
- // It does not have any nsid or cluster id info.
|
|
|
|
- if ((storageInfo.getClusterID() == null)
|
|
|
|
- || (storageInfo.getNamespaceID() == 0)) {
|
|
|
|
- setupStorage(journalInfo);
|
|
|
|
|
|
+ // It is the first fence if the journal is not formatted,
|
|
|
|
+ if (!journal.isFormatted()) {
|
|
|
|
+ journal.format(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
}
|
|
}
|
|
verifyFence(epoch, fencerInfo);
|
|
verifyFence(epoch, fencerInfo);
|
|
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
@@ -422,19 +330,18 @@ public class JournalService implements JournalProtocol {
|
|
*/
|
|
*/
|
|
private void verify(int nsid, String clusid) throws IOException {
|
|
private void verify(int nsid, String clusid) throws IOException {
|
|
String errorMsg = null;
|
|
String errorMsg = null;
|
|
- int expectedNamespaceID = registration.getNamespaceID();
|
|
|
|
|
|
+ final NamenodeRegistration reg = getRegistration();
|
|
|
|
|
|
- if (nsid != expectedNamespaceID) {
|
|
|
|
|
|
+ if (nsid != reg.getNamespaceID()) {
|
|
errorMsg = "Invalid namespaceID in journal request - expected "
|
|
errorMsg = "Invalid namespaceID in journal request - expected "
|
|
- + expectedNamespaceID + " actual " + nsid;
|
|
|
|
|
|
+ + reg.getNamespaceID() + " actual " + nsid;
|
|
LOG.warn(errorMsg);
|
|
LOG.warn(errorMsg);
|
|
throw new UnregisteredNodeException(errorMsg);
|
|
throw new UnregisteredNodeException(errorMsg);
|
|
}
|
|
}
|
|
if ((clusid == null)
|
|
if ((clusid == null)
|
|
- || (!clusid.equals(registration.getClusterID()))) {
|
|
|
|
|
|
+ || (!clusid.equals(reg.getClusterID()))) {
|
|
errorMsg = "Invalid clusterId in journal request - incoming "
|
|
errorMsg = "Invalid clusterId in journal request - incoming "
|
|
- + clusid + " expected "
|
|
|
|
- + registration.getClusterID();
|
|
|
|
|
|
+ + clusid + " expected " + reg.getClusterID();
|
|
LOG.warn(errorMsg);
|
|
LOG.warn(errorMsg);
|
|
throw new UnregisteredNodeException(errorMsg);
|
|
throw new UnregisteredNodeException(errorMsg);
|
|
}
|
|
}
|
|
@@ -452,7 +359,7 @@ public class JournalService implements JournalProtocol {
|
|
* Register this service with the active namenode.
|
|
* Register this service with the active namenode.
|
|
*/
|
|
*/
|
|
private void registerWithNamenode() throws IOException {
|
|
private void registerWithNamenode() throws IOException {
|
|
- NamenodeRegistration nnReg = namenode.register(registration);
|
|
|
|
|
|
+ NamenodeRegistration nnReg = namenode.register(getRegistration());
|
|
String msg = null;
|
|
String msg = null;
|
|
if(nnReg == null) { // consider as a rejection
|
|
if(nnReg == null) { // consider as a rejection
|
|
msg = "Registration rejected by " + nnAddress;
|
|
msg = "Registration rejected by " + nnAddress;
|
|
@@ -472,9 +379,8 @@ public class JournalService implements JournalProtocol {
|
|
// If this is the first initialization of journal service, then storage
|
|
// If this is the first initialization of journal service, then storage
|
|
// directory will be setup. Otherwise, nsid and clusterid has to match with
|
|
// directory will be setup. Otherwise, nsid and clusterid has to match with
|
|
// the info saved in the edits dir.
|
|
// the info saved in the edits dir.
|
|
- if ((storageInfo.getClusterID() == null)
|
|
|
|
- || (storageInfo.getNamespaceID() == 0)) {
|
|
|
|
- setupStorage(nsInfo);
|
|
|
|
|
|
+ if (!journal.isFormatted()) {
|
|
|
|
+ journal.format(nsInfo.getNamespaceID(), nsInfo.getClusterID());
|
|
} else {
|
|
} else {
|
|
verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
|
|
verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
|
|
}
|
|
}
|
|
@@ -484,17 +390,4 @@ public class JournalService implements JournalProtocol {
|
|
long getEpoch() {
|
|
long getEpoch() {
|
|
return epoch;
|
|
return epoch;
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Returns edit directories that are shared between primary and secondary.
|
|
|
|
- * @param conf
|
|
|
|
- * @return Collection of edit directories.
|
|
|
|
- */
|
|
|
|
- public List<URI> getJournalEditsDirs(Configuration conf) {
|
|
|
|
- // don't use getStorageDirs here, because we want an empty default
|
|
|
|
- // rather than the dir in /tmp
|
|
|
|
- Collection<String> dirNames = conf.getTrimmedStringCollection(
|
|
|
|
- DFS_JOURNAL_EDITS_DIR_KEY);
|
|
|
|
- return Util.stringCollectionAsURIs(dirNames);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|