|
@@ -17,12 +17,15 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
package org.apache.hadoop.hdfs.server.journalservice;
|
|
|
|
|
|
|
|
+import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.util.Collection;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
@@ -34,9 +37,12 @@ import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.FencedException;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
@@ -47,6 +53,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.protobuf.BlockingService;
|
|
import com.google.protobuf.BlockingService;
|
|
@@ -68,14 +75,20 @@ import com.google.protobuf.BlockingService;
|
|
*/
|
|
*/
|
|
public class JournalService implements JournalServiceProtocols {
|
|
public class JournalService implements JournalServiceProtocols {
|
|
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
|
|
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
|
|
-
|
|
|
|
private final InetSocketAddress nnAddress;
|
|
private final InetSocketAddress nnAddress;
|
|
private NamenodeRegistration registration;
|
|
private NamenodeRegistration registration;
|
|
private final NamenodeProtocol namenode;
|
|
private final NamenodeProtocol namenode;
|
|
private final StateHandler stateHandler = new StateHandler();
|
|
private final StateHandler stateHandler = new StateHandler();
|
|
private final RPC.Server rpcServer;
|
|
private final RPC.Server rpcServer;
|
|
|
|
+ private final JournalHttpServer httpServer;
|
|
private long epoch = 0;
|
|
private long epoch = 0;
|
|
private String fencerInfo;
|
|
private String fencerInfo;
|
|
|
|
+ private Daemon syncThread = null;
|
|
|
|
+ private Configuration conf;
|
|
|
|
+
|
|
|
|
+ // Flags to indicate whether to start sync
|
|
|
|
+ private boolean toStartSync = false;
|
|
|
|
+ private long syncSinceTxid = -1;
|
|
|
|
|
|
private final Journal journal;
|
|
private final Journal journal;
|
|
private final JournalListener listener;
|
|
private final JournalListener listener;
|
|
@@ -128,12 +141,35 @@ public class JournalService implements JournalServiceProtocols {
|
|
current = State.WAITING_FOR_ROLL;
|
|
current = State.WAITING_FOR_ROLL;
|
|
}
|
|
}
|
|
|
|
|
|
- synchronized void startLogSegment() {
|
|
|
|
|
|
+ synchronized State startLogSegment() {
|
|
|
|
+ State prevState = current;
|
|
if (current == State.WAITING_FOR_ROLL) {
|
|
if (current == State.WAITING_FOR_ROLL) {
|
|
current = State.SYNCING;
|
|
current = State.SYNCING;
|
|
}
|
|
}
|
|
|
|
+ return prevState;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Try to transit to IN_SYNC state
|
|
|
|
+ * @return current state. if returned state is not IN_SYNC, caller should
|
|
|
|
+ * know inSync failed
|
|
|
|
+ */
|
|
|
|
+ synchronized State inSync() {
|
|
|
|
+ if (current == State.IN_SYNC) {
|
|
|
|
+ throw new IllegalStateException("Service cannot be in " + current
|
|
|
|
+ + " state.");
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ if (current == State.SYNCING) {
|
|
|
|
+ current = State.IN_SYNC;
|
|
|
|
+ }
|
|
|
|
+ return current;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ synchronized void fence() {
|
|
|
|
+ current = State.WAITING_FOR_ROLL;
|
|
|
|
+ }
|
|
|
|
+
|
|
synchronized void isStartLogSegmentAllowed() throws IOException {
|
|
synchronized void isStartLogSegmentAllowed() throws IOException {
|
|
if (!current.isStartLogSegmentAllowed) {
|
|
if (!current.isStartLogSegmentAllowed) {
|
|
throw new IOException("Cannot start log segment in " + current
|
|
throw new IOException("Cannot start log segment in " + current
|
|
@@ -168,18 +204,21 @@ public class JournalService implements JournalServiceProtocols {
|
|
* {@code server} is a valid server that is managed out side this
|
|
* {@code server} is a valid server that is managed out side this
|
|
* service.
|
|
* service.
|
|
* @param listener call-back interface to listen to journal activities
|
|
* @param listener call-back interface to listen to journal activities
|
|
|
|
+ * @param journal the journal used by both Listener and JournalService
|
|
* @throws IOException on error
|
|
* @throws IOException on error
|
|
*/
|
|
*/
|
|
JournalService(Configuration conf, InetSocketAddress nnAddr,
|
|
JournalService(Configuration conf, InetSocketAddress nnAddr,
|
|
- InetSocketAddress serverAddress, JournalListener listener)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ InetSocketAddress serverAddress, InetSocketAddress httpAddress,
|
|
|
|
+ JournalListener listener, Journal journal) throws IOException {
|
|
this.nnAddress = nnAddr;
|
|
this.nnAddress = nnAddr;
|
|
this.listener = listener;
|
|
this.listener = listener;
|
|
- this.journal = new Journal(conf);
|
|
|
|
|
|
+ this.journal = journal;
|
|
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
|
|
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
|
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
|
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
|
|
.getProxy();
|
|
.getProxy();
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
this.rpcServer = createRpcServer(conf, serverAddress, this);
|
|
|
|
+ this.httpServer = new JournalHttpServer(conf, journal, httpAddress);
|
|
|
|
+ this.conf = conf;
|
|
}
|
|
}
|
|
|
|
|
|
Journal getJournal() {
|
|
Journal getJournal() {
|
|
@@ -201,13 +240,18 @@ public class JournalService implements JournalServiceProtocols {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Start the service.
|
|
* Start the service.
|
|
|
|
+ * @throws IOException on error
|
|
*/
|
|
*/
|
|
- public void start() {
|
|
|
|
|
|
+ public void start() throws IOException {
|
|
stateHandler.start();
|
|
stateHandler.start();
|
|
|
|
|
|
// Start the RPC server
|
|
// Start the RPC server
|
|
LOG.info("Starting journal service rpc server");
|
|
LOG.info("Starting journal service rpc server");
|
|
rpcServer.start();
|
|
rpcServer.start();
|
|
|
|
+
|
|
|
|
+ // Start the HTTP server
|
|
|
|
+ LOG.info("Starting journal service http server");
|
|
|
|
+ httpServer.start();
|
|
|
|
|
|
for (boolean registered = false, handshakeComplete = false;;) {
|
|
for (boolean registered = false, handshakeComplete = false;;) {
|
|
try {
|
|
try {
|
|
@@ -239,6 +283,14 @@ public class JournalService implements JournalServiceProtocols {
|
|
}
|
|
}
|
|
|
|
|
|
stateHandler.waitForRoll();
|
|
stateHandler.waitForRoll();
|
|
|
|
+
|
|
|
|
+ // Create a never ending daemon to sync journal segments
|
|
|
|
+ // TODO: remove the assumption that "won't delete logs"
|
|
|
|
+ // use 3 because NN rolls with txid=3 when first journal service joining.
|
|
|
|
+ // need to fix this after NN is modified to ignore its local storage dir
|
|
|
|
+ syncThread = new Daemon(new JournalSync(this));
|
|
|
|
+ syncThread.start();
|
|
|
|
+
|
|
try {
|
|
try {
|
|
namenode.rollEditLog();
|
|
namenode.rollEditLog();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -249,9 +301,12 @@ public class JournalService implements JournalServiceProtocols {
|
|
/**
|
|
/**
|
|
* Stop the service. For application with RPC Server managed outside, the
|
|
* Stop the service. For application with RPC Server managed outside, the
|
|
* RPC Server must be stopped the application.
|
|
* RPC Server must be stopped the application.
|
|
|
|
+ * @throws IOException on error
|
|
*/
|
|
*/
|
|
public void stop() throws IOException {
|
|
public void stop() throws IOException {
|
|
if (!stateHandler.isStopped()) {
|
|
if (!stateHandler.isStopped()) {
|
|
|
|
+ syncThread.interrupt();
|
|
|
|
+ httpServer.stop();
|
|
rpcServer.stop();
|
|
rpcServer.stop();
|
|
journal.close();
|
|
journal.close();
|
|
}
|
|
}
|
|
@@ -277,7 +332,11 @@ public class JournalService implements JournalServiceProtocols {
|
|
stateHandler.isStartLogSegmentAllowed();
|
|
stateHandler.isStartLogSegmentAllowed();
|
|
verify(epoch, journalInfo);
|
|
verify(epoch, journalInfo);
|
|
listener.startLogSegment(this, txid);
|
|
listener.startLogSegment(this, txid);
|
|
- stateHandler.startLogSegment();
|
|
|
|
|
|
+
|
|
|
|
+ if (stateHandler.startLogSegment() == State.WAITING_FOR_ROLL) {
|
|
|
|
+ LOG.info("Notify syncThread to re-sync with txid:" + syncSinceTxid);
|
|
|
|
+ startSync(syncSinceTxid);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -294,7 +353,8 @@ public class JournalService implements JournalServiceProtocols {
|
|
long previousEpoch = epoch;
|
|
long previousEpoch = epoch;
|
|
this.epoch = epoch;
|
|
this.epoch = epoch;
|
|
this.fencerInfo = fencerInfo;
|
|
this.fencerInfo = fencerInfo;
|
|
-
|
|
|
|
|
|
+ stateHandler.fence();
|
|
|
|
+
|
|
// TODO:HDFS-3092 set lastTransId and inSync
|
|
// TODO:HDFS-3092 set lastTransId and inSync
|
|
return new FenceResponse(previousEpoch, 0, false);
|
|
return new FenceResponse(previousEpoch, 0, false);
|
|
}
|
|
}
|
|
@@ -310,8 +370,8 @@ public class JournalService implements JournalServiceProtocols {
|
|
}
|
|
}
|
|
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
|
|
|
|
|
|
- //journal has only one storage directory
|
|
|
|
- return journal.getRemoteEditLogs(sinceTxId);
|
|
|
|
|
|
+ // Journal has only one storage directory
|
|
|
|
+ return journal.getEditLogManifest(sinceTxId);
|
|
}
|
|
}
|
|
|
|
|
|
/** Create an RPC server. */
|
|
/** Create an RPC server. */
|
|
@@ -435,4 +495,150 @@ public class JournalService implements JournalServiceProtocols {
|
|
|
|
|
|
return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
|
|
return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Only invoked by sync thread to wait for {@code #syncSinceTxid} to be set to
|
|
|
|
+ * start syncing.
|
|
|
|
+ *
|
|
|
|
+ * @return txid to start syncing from
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ synchronized long waitForStartSync() throws InterruptedException {
|
|
|
|
+ while (!toStartSync) {
|
|
|
|
+ wait();
|
|
|
|
+ }
|
|
|
|
+ // Sync starting - Unset toStartSync so main thread can set it again
|
|
|
|
+ toStartSync = false;
|
|
|
|
+ return syncSinceTxid;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Only invoked by main thread to notify sync thread that another round of
|
|
|
|
+ * sync is needed
|
|
|
|
+ */
|
|
|
|
+ synchronized void startSync(long sinceTxid) {
|
|
|
|
+ if (toStartSync) {
|
|
|
|
+ LOG.trace("toStartSync is already set.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ toStartSync = true;
|
|
|
|
+ syncSinceTxid = sinceTxid;
|
|
|
|
+ notify();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * JournalSync downloads journal segments from other journal services
|
|
|
|
+ */
|
|
|
|
+ class JournalSync implements Runnable {
|
|
|
|
+ private final JournalInfo journalInfo;
|
|
|
|
+ private final JournalService journalService;
|
|
|
|
+ private long sinceTxid;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Constructor
|
|
|
|
+ * @param journalService Local journal service
|
|
|
|
+ */
|
|
|
|
+ JournalSync(JournalService journalService) {
|
|
|
|
+ NNStorage storage = journalService.getJournal().getStorage();
|
|
|
|
+ this.journalInfo = new JournalInfo(storage.layoutVersion,
|
|
|
|
+ storage.clusterID, storage.namespaceID);
|
|
|
|
+ this.sinceTxid = 0;
|
|
|
|
+ this.journalService = journalService;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ while (true) {
|
|
|
|
+ try {
|
|
|
|
+ sinceTxid = journalService.waitForStartSync();
|
|
|
|
+ syncAllJournalSegments(conf, journalInfo, sinceTxid);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Unable to sync for "
|
|
|
|
+ + journalService.getRegistration().getHttpAddress()
|
|
|
|
+ + " with exception: " + e);
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(60000);
|
|
|
|
+ } catch (InterruptedException e1) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Stopping the JouranlSync thread");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String toString() {
|
|
|
|
+ return "JournalSync for "
|
|
|
|
+ + journalService.getRegistration().getHttpAddress();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Contact journal service one by one to get all missed journal segments
|
|
|
|
+ *
|
|
|
|
+ * @param conf Configuration
|
|
|
|
+ * @param journalInfo the JournalInfo of the local journal service
|
|
|
|
+ * @param sinceTxid the transaction id to start with
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ void syncAllJournalSegments(Configuration conf, JournalInfo journalInfo,
|
|
|
|
+ long sinceTxid) throws IOException {
|
|
|
|
+
|
|
|
|
+ // Get a list of configured journal services
|
|
|
|
+ Collection<InetSocketAddress> addrList = DFSUtil
|
|
|
|
+ .getJournalNodeHttpAddresses(conf);
|
|
|
|
+ FSEditLog editLog = journal.getEditLog();
|
|
|
|
+ File currentDir = new File(
|
|
|
|
+ conf.get(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY));
|
|
|
|
+
|
|
|
|
+ boolean needSync = !editLog.hasCompleteJournalSegments(sinceTxid,
|
|
|
|
+ currentDir);
|
|
|
|
+ if (!needSync) {
|
|
|
|
+ LOG.trace("Nothing to sync.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
+ InetSocketAddress selfAddr = journalService.httpServer.getHttpAddress();
|
|
|
|
+ for (InetSocketAddress addr : addrList) {
|
|
|
|
+ try {
|
|
|
|
+ // Skip itself
|
|
|
|
+ if (addr.getHostName().equals(selfAddr.getHostName())
|
|
|
|
+ && addr.getPort() == selfAddr.getPort()) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ // Download journal segments
|
|
|
|
+ InetSocketAddress rpcAddr = DFSUtil.getJournalRpcAddrFromHostName(
|
|
|
|
+ conf, addr.getHostName());
|
|
|
|
+ JournalSyncProtocol syncProxy = createProxyWithJournalSyncProtocol(
|
|
|
|
+ rpcAddr, conf, ugi);
|
|
|
|
+ RemoteEditLogManifest manifest = syncProxy.getEditLogManifest(
|
|
|
|
+ journalInfo, sinceTxid);
|
|
|
|
+ httpServer.downloadEditFiles(NetUtils.getHostPortString(addr),
|
|
|
|
+ manifest);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.debug("Sync failed for " + selfAddr + "with exception ", e);
|
|
|
|
+ // Ignore error and try the next journal service
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (editLog.hasCompleteJournalSegments(sinceTxid, currentDir)) {
|
|
|
|
+ needSync = false;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (needSync) {
|
|
|
|
+ throw new IOException("Journal sync failed.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Journal service may not be in SYNCING state
|
|
|
|
+ State jState = stateHandler.inSync();
|
|
|
|
+ if (jState != State.IN_SYNC) {
|
|
|
|
+ LOG.debug("Journal service state changed during syncing : " + jState);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.debug("Journal sync is done.");
|
|
|
|
+ // TODO: report IN_SYNC state to NN. Note that, it's ok if state changes
|
|
|
|
+ // to another state because NN could reject the IN_SYNC report
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|