|
@@ -26,6 +26,8 @@ import java.io.InputStream;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.EnumMap;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
@@ -69,6 +71,8 @@ import com.google.common.base.Joiner;
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
public class FSEditLogLoader {
|
|
|
+ static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
|
|
|
+ static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
|
|
private final FSNamesystem fsNamesys;
|
|
|
|
|
|
public FSEditLogLoader(FSNamesystem fsNamesys) {
|
|
@@ -110,6 +114,10 @@ public class FSEditLogLoader {
|
|
|
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
|
|
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
|
|
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Acquiring write lock to replay edit log");
|
|
|
+ }
|
|
|
+
|
|
|
fsNamesys.writeLock();
|
|
|
fsDir.writeLock();
|
|
|
|
|
@@ -117,6 +125,16 @@ public class FSEditLogLoader {
|
|
|
Arrays.fill(recentOpcodeOffsets, -1);
|
|
|
|
|
|
long txId = expectedStartingTxId - 1;
|
|
|
+ long lastTxId = in.getLastTxId();
|
|
|
+ long numTxns = (lastTxId - expectedStartingTxId) + 1;
|
|
|
+
|
|
|
+ long lastLogTime = now();
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("edit log length: " + in.length() + ", start txid: "
|
|
|
+ + expectedStartingTxId + ", last txid: " + lastTxId);
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
try {
|
|
|
while (true) {
|
|
@@ -153,6 +171,15 @@ public class FSEditLogLoader {
|
|
|
FSImage.LOG.error(errorMessage);
|
|
|
throw new IOException(errorMessage, t);
|
|
|
}
|
|
|
+
|
|
|
+ // log progress
|
|
|
+ if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
|
|
+ int percent = Math.round((float) txId / numTxns * 100);
|
|
|
+ LOG.info("replaying edit log: " + txId + "/" + numTxns
|
|
|
+ + " transactions completed. (" + percent + "%)");
|
|
|
+ lastLogTime = now();
|
|
|
+ }
|
|
|
+
|
|
|
numEdits++;
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
@@ -164,6 +191,11 @@ public class FSEditLogLoader {
|
|
|
} finally {
|
|
|
fsDir.writeUnlock();
|
|
|
fsNamesys.writeUnlock();
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("replaying edit log finished");
|
|
|
+ }
|
|
|
+
|
|
|
if (FSImage.LOG.isDebugEnabled()) {
|
|
|
dumpOpCounts(opCounts);
|
|
|
}
|
|
@@ -174,6 +206,11 @@ public class FSEditLogLoader {
|
|
|
@SuppressWarnings("deprecation")
|
|
|
private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|
|
int logVersion) throws IOException {
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("replaying edit log: " + op);
|
|
|
+ }
|
|
|
+
|
|
|
switch (op.opCode) {
|
|
|
case OP_ADD: {
|
|
|
AddCloseOp addCloseOp = (AddCloseOp)op;
|