|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_ERASURE_CODING_POLICY;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
|
|
@@ -31,7 +32,9 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISABLE_ERASURE_CODING_POLICY;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ENABLE_ERASURE_CODING_POLICY;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
|
|
@@ -41,6 +44,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_ERASURE_CODING_POLICY;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
|
|
@@ -75,7 +79,9 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.EnumMap;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.zip.CheckedInputStream;
|
|
|
import java.util.zip.Checksum;
|
|
|
|
|
@@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
|
@@ -119,6 +126,7 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableFactories;
|
|
|
import org.apache.hadoop.io.WritableFactory;
|
|
|
+import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
|
import org.apache.hadoop.ipc.ClientId;
|
|
|
import org.apache.hadoop.ipc.RpcConstants;
|
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
@@ -4339,6 +4347,323 @@ public abstract class FSEditLogOp {
|
|
|
this.len = in.readLong();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Operation corresponding to add an erasure coding policy.
|
|
|
+ */
|
|
|
+ static class AddErasureCodingPolicyOp extends FSEditLogOp {
|
|
|
+ private ErasureCodingPolicy ecPolicy;
|
|
|
+
|
|
|
+ AddErasureCodingPolicyOp() {
|
|
|
+ super(OP_ADD_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ static AddErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (AddErasureCodingPolicyOp) cache
|
|
|
+ .get(OP_ADD_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void resetSubFields() {
|
|
|
+ this.ecPolicy = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ErasureCodingPolicy getEcPolicy() {
|
|
|
+ return this.ecPolicy;
|
|
|
+ }
|
|
|
+
|
|
|
+ public AddErasureCodingPolicyOp setErasureCodingPolicy(
|
|
|
+ ErasureCodingPolicy policy) {
|
|
|
+ Preconditions.checkNotNull(policy.getName());
|
|
|
+ Preconditions.checkNotNull(policy.getSchema());
|
|
|
+ Preconditions.checkArgument(policy.getCellSize() > 0);
|
|
|
+ this.ecPolicy = policy;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void readFields(DataInputStream in, int logVersion) throws IOException {
|
|
|
+ this.ecPolicy = FSImageSerialization.readErasureCodingPolicy(in);
|
|
|
+ readRpcIds(in, logVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeFields(DataOutputStream out) throws IOException {
|
|
|
+ Preconditions.checkNotNull(ecPolicy);
|
|
|
+ FSImageSerialization.writeErasureCodingPolicy(out, ecPolicy);
|
|
|
+ writeRpcIds(rpcClientId, rpcCallId, out);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
|
|
|
+ Preconditions.checkNotNull(ecPolicy);
|
|
|
+ XMLUtils.addSaxString(contentHandler, "CODEC", ecPolicy.getCodecName());
|
|
|
+ XMLUtils.addSaxString(contentHandler, "DATAUNITS",
|
|
|
+ Integer.toString(ecPolicy.getNumDataUnits()));
|
|
|
+ XMLUtils.addSaxString(contentHandler, "PARITYUNITS",
|
|
|
+ Integer.toString(ecPolicy.getNumParityUnits()));
|
|
|
+ XMLUtils.addSaxString(contentHandler, "CELLSIZE",
|
|
|
+ Integer.toString(ecPolicy.getCellSize()));
|
|
|
+
|
|
|
+ Map<String, String> extraOptions = ecPolicy.getSchema().getExtraOptions();
|
|
|
+ if (extraOptions == null || extraOptions.isEmpty()) {
|
|
|
+ XMLUtils.addSaxString(contentHandler, "EXTRAOPTIONS",
|
|
|
+ Integer.toString(0));
|
|
|
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ XMLUtils.addSaxString(contentHandler, "EXTRAOPTIONS",
|
|
|
+ Integer.toString(extraOptions.size()));
|
|
|
+
|
|
|
+ for (Map.Entry<String, String> entry : extraOptions.entrySet()) {
|
|
|
+ contentHandler.startElement("", "", "EXTRAOPTION",
|
|
|
+ new AttributesImpl());
|
|
|
+ XMLUtils.addSaxString(contentHandler, "KEY", entry.getKey());
|
|
|
+ XMLUtils.addSaxString(contentHandler, "VALUE", entry.getValue());
|
|
|
+ contentHandler.endElement("", "", "EXTRAOPTION");
|
|
|
+ }
|
|
|
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void fromXml(Stanza st) throws InvalidXmlException {
|
|
|
+ final String codecName = st.getValue("CODEC");
|
|
|
+ final int dataUnits = Integer.parseInt(st.getValue("DATAUNITS"));
|
|
|
+ final int parityUnits = Integer.parseInt(st.getValue("PARITYUNITS"));
|
|
|
+ final int cellSize = Integer.parseInt(st.getValue("CELLSIZE"));
|
|
|
+ final int extraOptionNum = Integer.parseInt(st.getValue("EXTRAOPTIONS"));
|
|
|
+
|
|
|
+ ECSchema schema;
|
|
|
+ if (extraOptionNum == 0) {
|
|
|
+ schema = new ECSchema(codecName, dataUnits, parityUnits, null);
|
|
|
+ } else {
|
|
|
+ Map<String, String> extraOptions = new HashMap<String, String>();
|
|
|
+ List<Stanza> stanzas = st.getChildren("EXTRAOPTION");
|
|
|
+ for (Stanza a: stanzas) {
|
|
|
+ extraOptions.put(a.getValue("KEY"), a.getValue("VALUE"));
|
|
|
+ }
|
|
|
+ schema = new ECSchema(codecName, dataUnits, parityUnits, extraOptions);
|
|
|
+ }
|
|
|
+ this.ecPolicy = new ErasureCodingPolicy(schema, cellSize);
|
|
|
+ readRpcIdsFromXml(st);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ builder.append("AddErasureCodingPolicy [");
|
|
|
+ builder.append(ecPolicy.toString());
|
|
|
+
|
|
|
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
|
|
+ builder.append("]");
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Operation corresponding to enable an erasure coding policy.
|
|
|
+ */
|
|
|
+ static class EnableErasureCodingPolicyOp extends FSEditLogOp {
|
|
|
+ private String ecPolicyName;
|
|
|
+
|
|
|
+ EnableErasureCodingPolicyOp() {
|
|
|
+ super(OP_ENABLE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ static EnableErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (EnableErasureCodingPolicyOp) cache
|
|
|
+ .get(OP_ENABLE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void resetSubFields() {
|
|
|
+ this.ecPolicyName = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getEcPolicy() {
|
|
|
+ return this.ecPolicyName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public EnableErasureCodingPolicyOp setErasureCodingPolicy(
|
|
|
+ String policyName) {
|
|
|
+ Preconditions.checkNotNull(policyName);
|
|
|
+ this.ecPolicyName = policyName;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void readFields(DataInputStream in, int logVersion) throws IOException {
|
|
|
+ this.ecPolicyName = FSImageSerialization.readString(in);
|
|
|
+ readRpcIds(in, logVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeFields(DataOutputStream out) throws IOException {
|
|
|
+ Preconditions.checkNotNull(ecPolicyName);
|
|
|
+ FSImageSerialization.writeString(ecPolicyName, out);
|
|
|
+ writeRpcIds(rpcClientId, rpcCallId, out);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
|
|
|
+ Preconditions.checkNotNull(ecPolicyName);
|
|
|
+ XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
|
|
|
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void fromXml(Stanza st) throws InvalidXmlException {
|
|
|
+ this.ecPolicyName = st.getValue("POLICYNAME");
|
|
|
+ readRpcIdsFromXml(st);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ builder.append("EnableErasureCodingPolicy [");
|
|
|
+ builder.append(ecPolicyName);
|
|
|
+
|
|
|
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
|
|
+ builder.append("]");
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Operation corresponding to disable an erasure coding policy.
|
|
|
+ */
|
|
|
+ static class DisableErasureCodingPolicyOp extends FSEditLogOp {
|
|
|
+ private String ecPolicyName;
|
|
|
+
|
|
|
+ DisableErasureCodingPolicyOp() {
|
|
|
+ super(OP_DISABLE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ static DisableErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (DisableErasureCodingPolicyOp) cache
|
|
|
+ .get(OP_DISABLE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void resetSubFields() {
|
|
|
+ this.ecPolicyName = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getEcPolicy() {
|
|
|
+ return this.ecPolicyName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public DisableErasureCodingPolicyOp setErasureCodingPolicy(
|
|
|
+ String policyName) {
|
|
|
+ Preconditions.checkNotNull(policyName);
|
|
|
+ this.ecPolicyName = policyName;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void readFields(DataInputStream in, int logVersion) throws IOException {
|
|
|
+ this.ecPolicyName = FSImageSerialization.readString(in);
|
|
|
+ readRpcIds(in, logVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeFields(DataOutputStream out) throws IOException {
|
|
|
+ FSImageSerialization.writeString(ecPolicyName, out);
|
|
|
+ writeRpcIds(rpcClientId, rpcCallId, out);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
|
|
|
+ XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
|
|
|
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void fromXml(Stanza st) throws InvalidXmlException {
|
|
|
+ this.ecPolicyName = st.getValue("POLICYNAME");
|
|
|
+ readRpcIdsFromXml(st);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ builder.append("DisableErasureCodingPolicy [");
|
|
|
+ builder.append(ecPolicyName);
|
|
|
+
|
|
|
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
|
|
+ builder.append("]");
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Operation corresponding to remove an erasure coding policy.
|
|
|
+ */
|
|
|
+ static class RemoveErasureCodingPolicyOp extends FSEditLogOp {
|
|
|
+ private String ecPolicyName;
|
|
|
+
|
|
|
+ RemoveErasureCodingPolicyOp() {
|
|
|
+ super(OP_REMOVE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ static RemoveErasureCodingPolicyOp getInstance(OpInstanceCache cache) {
|
|
|
+ return (RemoveErasureCodingPolicyOp) cache
|
|
|
+ .get(OP_REMOVE_ERASURE_CODING_POLICY);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void resetSubFields() {
|
|
|
+ this.ecPolicyName = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getEcPolicy() {
|
|
|
+ return this.ecPolicyName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public RemoveErasureCodingPolicyOp setErasureCodingPolicy(
|
|
|
+ String policyName) {
|
|
|
+ Preconditions.checkNotNull(policyName);
|
|
|
+ this.ecPolicyName = policyName;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void readFields(DataInputStream in, int logVersion) throws IOException {
|
|
|
+ this.ecPolicyName = FSImageSerialization.readString(in);
|
|
|
+ readRpcIds(in, logVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeFields(DataOutputStream out) throws IOException {
|
|
|
+ FSImageSerialization.writeString(ecPolicyName, out);
|
|
|
+ writeRpcIds(rpcClientId, rpcCallId, out);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
|
|
|
+ XMLUtils.addSaxString(contentHandler, "POLICYNAME", this.ecPolicyName);
|
|
|
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void fromXml(Stanza st) throws InvalidXmlException {
|
|
|
+ this.ecPolicyName = st.getValue("POLICYNAME");
|
|
|
+ readRpcIdsFromXml(st);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ builder.append("RemoveErasureCodingPolicy [");
|
|
|
+ builder.append(ecPolicyName);
|
|
|
+
|
|
|
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
|
|
+ builder.append("]");
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Operation corresponding to upgrade
|
|
|
*/
|