|
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Options.Rename;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
|
|
import org.apache.hadoop.util.PureJavaCrc32;
|
|
@@ -54,6 +56,8 @@ import org.xml.sax.ContentHandler;
|
|
|
import org.xml.sax.SAXException;
|
|
|
import org.xml.sax.helpers.AttributesImpl;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+
|
|
|
import java.io.DataInput;
|
|
|
import java.io.DataOutput;
|
|
|
import java.io.DataInputStream;
|
|
@@ -74,42 +78,44 @@ public abstract class FSEditLogOp {
|
|
|
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
|
- private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
|
|
|
- new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
|
|
|
- @Override
|
|
|
- protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
|
|
|
- EnumMap<FSEditLogOpCodes, FSEditLogOp> instances
|
|
|
- = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
|
|
|
- instances.put(OP_ADD, new AddOp());
|
|
|
- instances.put(OP_CLOSE, new CloseOp());
|
|
|
- instances.put(OP_SET_REPLICATION, new SetReplicationOp());
|
|
|
- instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
|
|
|
- instances.put(OP_RENAME_OLD, new RenameOldOp());
|
|
|
- instances.put(OP_DELETE, new DeleteOp());
|
|
|
- instances.put(OP_MKDIR, new MkdirOp());
|
|
|
- instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
|
|
|
- instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
|
|
- instances.put(OP_SET_OWNER, new SetOwnerOp());
|
|
|
- instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
|
|
- instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
|
|
|
- instances.put(OP_SET_QUOTA, new SetQuotaOp());
|
|
|
- instances.put(OP_TIMES, new TimesOp());
|
|
|
- instances.put(OP_SYMLINK, new SymlinkOp());
|
|
|
- instances.put(OP_RENAME, new RenameOp());
|
|
|
- instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
|
|
|
- instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
|
|
|
- instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
|
|
|
- instances.put(OP_CANCEL_DELEGATION_TOKEN,
|
|
|
- new CancelDelegationTokenOp());
|
|
|
- instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
|
|
|
- instances.put(OP_START_LOG_SEGMENT,
|
|
|
- new LogSegmentOp(OP_START_LOG_SEGMENT));
|
|
|
- instances.put(OP_END_LOG_SEGMENT,
|
|
|
- new LogSegmentOp(OP_END_LOG_SEGMENT));
|
|
|
- instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
|
|
|
- return instances;
|
|
|
- }
|
|
|
- };
|
|
|
+ final public static class OpInstanceCache {
|
|
|
+ private EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
|
|
|
+ new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
|
|
|
+
|
|
|
+ public OpInstanceCache() {
|
|
|
+ inst.put(OP_ADD, new AddOp());
|
|
|
+ inst.put(OP_CLOSE, new CloseOp());
|
|
|
+ inst.put(OP_SET_REPLICATION, new SetReplicationOp());
|
|
|
+ inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
|
|
|
+ inst.put(OP_RENAME_OLD, new RenameOldOp());
|
|
|
+ inst.put(OP_DELETE, new DeleteOp());
|
|
|
+ inst.put(OP_MKDIR, new MkdirOp());
|
|
|
+ inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
|
|
|
+ inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
|
|
+ inst.put(OP_SET_OWNER, new SetOwnerOp());
|
|
|
+ inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
|
|
+ inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
|
|
|
+ inst.put(OP_SET_QUOTA, new SetQuotaOp());
|
|
|
+ inst.put(OP_TIMES, new TimesOp());
|
|
|
+ inst.put(OP_SYMLINK, new SymlinkOp());
|
|
|
+ inst.put(OP_RENAME, new RenameOp());
|
|
|
+ inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
|
|
|
+ inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
|
|
|
+ inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
|
|
|
+ inst.put(OP_CANCEL_DELEGATION_TOKEN,
|
|
|
+ new CancelDelegationTokenOp());
|
|
|
+ inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
|
|
|
+ inst.put(OP_START_LOG_SEGMENT,
|
|
|
+ new LogSegmentOp(OP_START_LOG_SEGMENT));
|
|
|
+ inst.put(OP_END_LOG_SEGMENT,
|
|
|
+ new LogSegmentOp(OP_END_LOG_SEGMENT));
|
|
|
+ inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
|
|
|
+ }
|
|
|
+
|
|
|
+ public FSEditLogOp get(FSEditLogOpCodes opcode) {
|
|
|
+ return inst.get(opcode);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Constructor for an EditLog Op. EditLog ops cannot be constructed
|
|
@@ -117,13 +123,22 @@ public abstract class FSEditLogOp {
|
|
|
*/
|
|
|
private FSEditLogOp(FSEditLogOpCodes opCode) {
|
|
|
this.opCode = opCode;
|
|
|
- this.txid = 0;
|
|
|
+ this.txid = HdfsConstants.INVALID_TXID;
|
|
|
}
|
|
|
|
|
|
public long getTransactionId() {
|
|
|
+ Preconditions.checkState(txid != HdfsConstants.INVALID_TXID);
|
|
|
return txid;
|
|
|
}
|
|
|
|
|
|
+ public String getTransactionIdStr() {
|
|
|
+ return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean hasTransactionId() {
|
|
|
+ return (txid != HdfsConstants.INVALID_TXID);
|
|
|
+ }
|
|
|
+
|
|
|
public void setTransactionId(long txid) {
|
|
|
this.txid = txid;
|
|
|
}
|
|
@@ -373,8 +388,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_ADD);
|
|
|
}
|
|
|
|
|
|
- static AddOp getInstance() {
|
|
|
- return (AddOp)opInstances.get().get(OP_ADD);
|
|
|
+ static AddOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (AddOp)cache.get(OP_ADD);
|
|
|
}
|
|
|
|
|
|
public boolean shouldCompleteLastBlock() {
|
|
@@ -395,8 +410,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_CLOSE);
|
|
|
}
|
|
|
|
|
|
- static CloseOp getInstance() {
|
|
|
- return (CloseOp)opInstances.get().get(OP_CLOSE);
|
|
|
+ static CloseOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (CloseOp)cache.get(OP_CLOSE);
|
|
|
}
|
|
|
|
|
|
public boolean shouldCompleteLastBlock() {
|
|
@@ -420,9 +435,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_UPDATE_BLOCKS);
|
|
|
}
|
|
|
|
|
|
- static UpdateBlocksOp getInstance() {
|
|
|
- return (UpdateBlocksOp)opInstances.get()
|
|
|
- .get(OP_UPDATE_BLOCKS);
|
|
|
+ static UpdateBlocksOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -500,9 +514,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_REPLICATION);
|
|
|
}
|
|
|
|
|
|
- static SetReplicationOp getInstance() {
|
|
|
- return (SetReplicationOp)opInstances.get()
|
|
|
- .get(OP_SET_REPLICATION);
|
|
|
+ static SetReplicationOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
|
|
|
}
|
|
|
|
|
|
SetReplicationOp setPath(String path) {
|
|
@@ -571,9 +584,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_CONCAT_DELETE);
|
|
|
}
|
|
|
|
|
|
- static ConcatDeleteOp getInstance() {
|
|
|
- return (ConcatDeleteOp)opInstances.get()
|
|
|
- .get(OP_CONCAT_DELETE);
|
|
|
+ static ConcatDeleteOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
|
|
|
}
|
|
|
|
|
|
ConcatDeleteOp setTarget(String trg) {
|
|
@@ -697,9 +709,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_RENAME_OLD);
|
|
|
}
|
|
|
|
|
|
- static RenameOldOp getInstance() {
|
|
|
- return (RenameOldOp)opInstances.get()
|
|
|
- .get(OP_RENAME_OLD);
|
|
|
+ static RenameOldOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (RenameOldOp)cache.get(OP_RENAME_OLD);
|
|
|
}
|
|
|
|
|
|
RenameOldOp setSource(String src) {
|
|
@@ -790,9 +801,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_DELETE);
|
|
|
}
|
|
|
|
|
|
- static DeleteOp getInstance() {
|
|
|
- return (DeleteOp)opInstances.get()
|
|
|
- .get(OP_DELETE);
|
|
|
+ static DeleteOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (DeleteOp)cache.get(OP_DELETE);
|
|
|
}
|
|
|
|
|
|
DeleteOp setPath(String path) {
|
|
@@ -872,9 +882,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_MKDIR);
|
|
|
}
|
|
|
|
|
|
- static MkdirOp getInstance() {
|
|
|
- return (MkdirOp)opInstances.get()
|
|
|
- .get(OP_MKDIR);
|
|
|
+ static MkdirOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (MkdirOp)cache.get(OP_MKDIR);
|
|
|
}
|
|
|
|
|
|
MkdirOp setPath(String path) {
|
|
@@ -977,9 +986,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_GENSTAMP);
|
|
|
}
|
|
|
|
|
|
- static SetGenstampOp getInstance() {
|
|
|
- return (SetGenstampOp)opInstances.get()
|
|
|
- .get(OP_SET_GENSTAMP);
|
|
|
+ static SetGenstampOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
|
|
|
}
|
|
|
|
|
|
SetGenstampOp setGenerationStamp(long genStamp) {
|
|
@@ -1031,9 +1039,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_PERMISSIONS);
|
|
|
}
|
|
|
|
|
|
- static SetPermissionsOp getInstance() {
|
|
|
- return (SetPermissionsOp)opInstances.get()
|
|
|
- .get(OP_SET_PERMISSIONS);
|
|
|
+ static SetPermissionsOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
|
|
|
}
|
|
|
|
|
|
SetPermissionsOp setSource(String src) {
|
|
@@ -1098,9 +1105,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_OWNER);
|
|
|
}
|
|
|
|
|
|
- static SetOwnerOp getInstance() {
|
|
|
- return (SetOwnerOp)opInstances.get()
|
|
|
- .get(OP_SET_OWNER);
|
|
|
+ static SetOwnerOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetOwnerOp)cache.get(OP_SET_OWNER);
|
|
|
}
|
|
|
|
|
|
SetOwnerOp setSource(String src) {
|
|
@@ -1179,9 +1185,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_NS_QUOTA);
|
|
|
}
|
|
|
|
|
|
- static SetNSQuotaOp getInstance() {
|
|
|
- return (SetNSQuotaOp)opInstances.get()
|
|
|
- .get(OP_SET_NS_QUOTA);
|
|
|
+ static SetNSQuotaOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1232,9 +1237,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_CLEAR_NS_QUOTA);
|
|
|
}
|
|
|
|
|
|
- static ClearNSQuotaOp getInstance() {
|
|
|
- return (ClearNSQuotaOp)opInstances.get()
|
|
|
- .get(OP_CLEAR_NS_QUOTA);
|
|
|
+ static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1281,9 +1285,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SET_QUOTA);
|
|
|
}
|
|
|
|
|
|
- static SetQuotaOp getInstance() {
|
|
|
- return (SetQuotaOp)opInstances.get()
|
|
|
- .get(OP_SET_QUOTA);
|
|
|
+ static SetQuotaOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SetQuotaOp)cache.get(OP_SET_QUOTA);
|
|
|
}
|
|
|
|
|
|
SetQuotaOp setSource(String src) {
|
|
@@ -1360,9 +1363,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_TIMES);
|
|
|
}
|
|
|
|
|
|
- static TimesOp getInstance() {
|
|
|
- return (TimesOp)opInstances.get()
|
|
|
- .get(OP_TIMES);
|
|
|
+ static TimesOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (TimesOp)cache.get(OP_TIMES);
|
|
|
}
|
|
|
|
|
|
TimesOp setPath(String path) {
|
|
@@ -1458,9 +1460,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_SYMLINK);
|
|
|
}
|
|
|
|
|
|
- static SymlinkOp getInstance() {
|
|
|
- return (SymlinkOp)opInstances.get()
|
|
|
- .get(OP_SYMLINK);
|
|
|
+ static SymlinkOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (SymlinkOp)cache.get(OP_SYMLINK);
|
|
|
}
|
|
|
|
|
|
SymlinkOp setPath(String path) {
|
|
@@ -1579,9 +1580,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_RENAME);
|
|
|
}
|
|
|
|
|
|
- static RenameOp getInstance() {
|
|
|
- return (RenameOp)opInstances.get()
|
|
|
- .get(OP_RENAME);
|
|
|
+ static RenameOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (RenameOp)cache.get(OP_RENAME);
|
|
|
}
|
|
|
|
|
|
RenameOp setSource(String src) {
|
|
@@ -1723,9 +1723,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_REASSIGN_LEASE);
|
|
|
}
|
|
|
|
|
|
- static ReassignLeaseOp getInstance() {
|
|
|
- return (ReassignLeaseOp)opInstances.get()
|
|
|
- .get(OP_REASSIGN_LEASE);
|
|
|
+ static ReassignLeaseOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
|
|
|
}
|
|
|
|
|
|
ReassignLeaseOp setLeaseHolder(String leaseHolder) {
|
|
@@ -1798,9 +1797,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_GET_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
- static GetDelegationTokenOp getInstance() {
|
|
|
- return (GetDelegationTokenOp)opInstances.get()
|
|
|
- .get(OP_GET_DELEGATION_TOKEN);
|
|
|
+ static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
GetDelegationTokenOp setDelegationTokenIdentifier(
|
|
@@ -1870,9 +1868,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_RENEW_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
- static RenewDelegationTokenOp getInstance() {
|
|
|
- return (RenewDelegationTokenOp)opInstances.get()
|
|
|
- .get(OP_RENEW_DELEGATION_TOKEN);
|
|
|
+ static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
RenewDelegationTokenOp setDelegationTokenIdentifier(
|
|
@@ -1941,9 +1938,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_CANCEL_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
- static CancelDelegationTokenOp getInstance() {
|
|
|
- return (CancelDelegationTokenOp)opInstances.get()
|
|
|
- .get(OP_CANCEL_DELEGATION_TOKEN);
|
|
|
+ static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
|
|
|
}
|
|
|
|
|
|
CancelDelegationTokenOp setDelegationTokenIdentifier(
|
|
@@ -1996,9 +1992,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_UPDATE_MASTER_KEY);
|
|
|
}
|
|
|
|
|
|
- static UpdateMasterKeyOp getInstance() {
|
|
|
- return (UpdateMasterKeyOp)opInstances.get()
|
|
|
- .get(OP_UPDATE_MASTER_KEY);
|
|
|
+ static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
|
|
|
}
|
|
|
|
|
|
UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
|
|
@@ -2050,8 +2045,9 @@ public abstract class FSEditLogOp {
|
|
|
code == OP_END_LOG_SEGMENT : "Bad op: " + code;
|
|
|
}
|
|
|
|
|
|
- static LogSegmentOp getInstance(FSEditLogOpCodes code) {
|
|
|
- return (LogSegmentOp)opInstances.get().get(code);
|
|
|
+ static LogSegmentOp getInstance(OpInstanceCache cache,
|
|
|
+ FSEditLogOpCodes code) {
|
|
|
+ return (LogSegmentOp)cache.get(code);
|
|
|
}
|
|
|
|
|
|
public void readFields(DataInputStream in, int logVersion)
|
|
@@ -2091,8 +2087,8 @@ public abstract class FSEditLogOp {
|
|
|
super(OP_INVALID);
|
|
|
}
|
|
|
|
|
|
- static InvalidOp getInstance() {
|
|
|
- return (InvalidOp)opInstances.get().get(OP_INVALID);
|
|
|
+ static InvalidOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (InvalidOp)cache.get(OP_INVALID);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -2207,6 +2203,7 @@ public abstract class FSEditLogOp {
|
|
|
private final DataInputStream in;
|
|
|
private final int logVersion;
|
|
|
private final Checksum checksum;
|
|
|
+ private final OpInstanceCache cache;
|
|
|
|
|
|
/**
|
|
|
* Construct the reader
|
|
@@ -2228,6 +2225,7 @@ public abstract class FSEditLogOp {
|
|
|
} else {
|
|
|
this.in = in;
|
|
|
}
|
|
|
+ this.cache = new OpInstanceCache();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2236,16 +2234,42 @@ public abstract class FSEditLogOp {
|
|
|
* Note that the objects returned from this method may be re-used by future
|
|
|
* calls to the same method.
|
|
|
*
|
|
|
+ * @param skipBrokenEdits If true, attempt to skip over damaged parts of
|
|
|
+ * the input stream, rather than throwing an IOException
|
|
|
* @return the operation read from the stream, or null at the end of the file
|
|
|
* @throws IOException on error.
|
|
|
*/
|
|
|
- public FSEditLogOp readOp() throws IOException {
|
|
|
+ public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
|
|
|
+ FSEditLogOp op = null;
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ in.mark(in.available());
|
|
|
+ try {
|
|
|
+ op = decodeOp();
|
|
|
+ } finally {
|
|
|
+ // If we encountered an exception or an end-of-file condition,
|
|
|
+ // do not advance the input stream.
|
|
|
+ if (op == null) {
|
|
|
+ in.reset();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return op;
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (!skipBrokenEdits) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ if (in.skip(1) < 1) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private FSEditLogOp decodeOp() throws IOException {
|
|
|
if (checksum != null) {
|
|
|
checksum.reset();
|
|
|
}
|
|
|
|
|
|
- in.mark(1);
|
|
|
-
|
|
|
byte opCodeByte;
|
|
|
try {
|
|
|
opCodeByte = in.readByte();
|
|
@@ -2255,12 +2279,10 @@ public abstract class FSEditLogOp {
|
|
|
}
|
|
|
|
|
|
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
|
|
|
- if (opCode == OP_INVALID) {
|
|
|
- in.reset(); // reset back to end of file if somebody reads it again
|
|
|
+ if (opCode == OP_INVALID)
|
|
|
return null;
|
|
|
- }
|
|
|
|
|
|
- FSEditLogOp op = opInstances.get().get(opCode);
|
|
|
+ FSEditLogOp op = cache.get(opCode);
|
|
|
if (op == null) {
|
|
|
throw new IOException("Read invalid opcode " + opCode);
|
|
|
}
|
|
@@ -2268,6 +2290,8 @@ public abstract class FSEditLogOp {
|
|
|
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
|
|
// Read the txid
|
|
|
op.setTransactionId(in.readLong());
|
|
|
+ } else {
|
|
|
+ op.setTransactionId(HdfsConstants.INVALID_TXID);
|
|
|
}
|
|
|
|
|
|
op.readFields(in, logVersion);
|
|
@@ -2426,8 +2450,4 @@ public abstract class FSEditLogOp {
|
|
|
short mode = Short.valueOf(st.getValue("MODE"));
|
|
|
return new PermissionStatus(username, groupname, new FsPermission(mode));
|
|
|
}
|
|
|
-
|
|
|
- public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) {
|
|
|
- return opInstances.get().get(opCode);
|
|
|
- }
|
|
|
-}
|
|
|
+ }
|