|
@@ -958,41 +958,49 @@ public class NameNode implements NameNodeStatusMXBean {
|
|
|
FSEditLog sourceEditLog = fsns.getFSImage().editLog;
|
|
|
|
|
|
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
|
|
|
- Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
|
|
|
- fromTxId+1, 0);
|
|
|
-
|
|
|
- // Set the nextTxid to the CheckpointTxId+1
|
|
|
- newSharedEditLog.setNextTxId(fromTxId + 1);
|
|
|
|
|
|
- // Copy all edits after last CheckpointTxId to shared edits dir
|
|
|
- for (EditLogInputStream stream : streams) {
|
|
|
- LOG.debug("Beginning to copy stream " + stream + " to shared edits");
|
|
|
- FSEditLogOp op;
|
|
|
- boolean segmentOpen = false;
|
|
|
- while ((op = stream.readOp()) != null) {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("copying op: " + op);
|
|
|
- }
|
|
|
- if (!segmentOpen) {
|
|
|
- newSharedEditLog.startLogSegment(op.txid, false);
|
|
|
- segmentOpen = true;
|
|
|
+ Collection<EditLogInputStream> streams = null;
|
|
|
+ try {
|
|
|
+ streams = sourceEditLog.selectInputStreams(fromTxId + 1, 0);
|
|
|
+
|
|
|
+ // Set the nextTxid to the CheckpointTxId+1
|
|
|
+ newSharedEditLog.setNextTxId(fromTxId + 1);
|
|
|
+
|
|
|
+ // Copy all edits after last CheckpointTxId to shared edits dir
|
|
|
+ for (EditLogInputStream stream : streams) {
|
|
|
+ LOG.debug("Beginning to copy stream " + stream + " to shared edits");
|
|
|
+ FSEditLogOp op;
|
|
|
+ boolean segmentOpen = false;
|
|
|
+ while ((op = stream.readOp()) != null) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("copying op: " + op);
|
|
|
+ }
|
|
|
+ if (!segmentOpen) {
|
|
|
+ newSharedEditLog.startLogSegment(op.txid, false);
|
|
|
+ segmentOpen = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ newSharedEditLog.logEdit(op);
|
|
|
+
|
|
|
+ if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
|
|
|
+ newSharedEditLog.logSync();
|
|
|
+ newSharedEditLog.endCurrentLogSegment(false);
|
|
|
+ LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
|
|
|
+ + stream);
|
|
|
+ segmentOpen = false;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- newSharedEditLog.logEdit(op);
|
|
|
|
|
|
- if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
|
|
|
+ if (segmentOpen) {
|
|
|
+ LOG.debug("ending log segment because of end of stream in " + stream);
|
|
|
newSharedEditLog.logSync();
|
|
|
newSharedEditLog.endCurrentLogSegment(false);
|
|
|
- LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
|
|
|
segmentOpen = false;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (segmentOpen) {
|
|
|
- LOG.debug("ending log segment because of end of stream in " + stream);
|
|
|
- newSharedEditLog.logSync();
|
|
|
- newSharedEditLog.endCurrentLogSegment(false);
|
|
|
- segmentOpen = false;
|
|
|
+ } finally {
|
|
|
+ if (streams != null) {
|
|
|
+ FSEditLog.closeAllStreams(streams);
|
|
|
}
|
|
|
}
|
|
|
}
|