فهرست منبع

[partial-ns] Implement FlatINodeFileFeature.

Haohui Mai 10 سال پیش
والد
کامیت
c9335b2c63

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Encoding.java

@@ -51,6 +51,14 @@ class Encoding {
     }
     }
   }
   }
 
 
+  static int computeRawVarint32Size(final int value) {
+    if ((value & (0xffffffff <<  7)) == 0) return 1;
+    if ((value & (0xffffffff << 14)) == 0) return 2;
+    if ((value & (0xffffffff << 21)) == 0) return 3;
+    if ((value & (0xffffffff << 28)) == 0) return 4;
+    return 5;
+  }
+
   private static int computeRawVarint64Size(final long value) {
   private static int computeRawVarint64Size(final long value) {
     if ((value & (0xffffffffffffffffL <<  7)) == 0) return 1;
     if ((value & (0xffffffffffffffffL <<  7)) == 0) return 1;
     if ((value & (0xffffffffffffffffL << 14)) == 0) return 2;
     if ((value & (0xffffffffffffffffL << 14)) == 0) return 2;
@@ -63,4 +71,27 @@ class Encoding {
     if ((value & (0xffffffffffffffffL << 63)) == 0) return 9;
     if ((value & (0xffffffffffffffffL << 63)) == 0) return 9;
     return 10;
     return 10;
   }
   }
+
+  static String readString(ByteBuffer buf) {
+    int size = readRawVarint32(buf, buf.position());
+    byte[] r  = new byte[size];
+    ByteBuffer b = ((ByteBuffer) buf.slice().position(computeRawVarint32Size
+      (size)));
+    b.get(r);
+    return new String(r);
+  }
+
+  static int readRawVarint32(ByteBuffer buf, int off) {
+    int r = 0;
+    byte b = (byte) 0x80;
+    for (int i = 0; i < 5 && (b & 0x80) != 0; ++i) {
+      b = buf.get(off + i);
+      r = (r << 7) | (b & 0x7f);
+    }
+    return r;
+  }
+
+  static int computeArraySize(int length) {
+    return computeRawVarint32Size(length) + length;
+  }
 }
 }

+ 103 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FlatINode.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -25,11 +26,17 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.util.LongBitFormat;
 import org.apache.hadoop.hdfs.util.LongBitFormat;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
 
 
 /**
 /**
  * In-memory representation of an INode.
  * In-memory representation of an INode.
  */
  */
 public final class FlatINode extends FlatObject {
 public final class FlatINode extends FlatObject {
+  public static final int FEATURE_OFFSET = Encoding.SIZEOF_LONG * 5 +
+    Encoding.SIZEOF_INT;
+
   private FlatINode(ByteString data) {
   private FlatINode(ByteString data) {
     super(data);
     super(data);
   }
   }
@@ -132,18 +139,77 @@ public final class FlatINode extends FlatObject {
   @Override
   @Override
   public String toString() {
   public String toString() {
     StringBuilder sb = new StringBuilder("INode[");
     StringBuilder sb = new StringBuilder("INode[");
-    sb.append(isFile() ? "file" : "dir")
-        .append(", id=" + id())
-        .append("]");
+    sb.append(isFile() ? "file" : "dir").append(", id=" + id()).append("]");
     return sb.toString();
     return sb.toString();
   }
   }
 
 
+  abstract static class Feature extends FlatObject {
+    protected Feature(ByteString data) {
+      super(data);
+    }
+
+    protected Feature(ByteBuffer data) {
+      super(data);
+    }
+  }
+
+  private int numOfFeatures() {
+    return data.getInt(Encoding.SIZEOF_LONG * 5);
+  }
+
+  public <T extends Feature> T feature(Class<? extends Feature> clazz) {
+    int off = FEATURE_OFFSET;
+    final FlatINodeFeatureId fid = FlatINodeFeatureId.valueOf(clazz);
+    for (int i = 0, e = numOfFeatures(); i < e; ++i) {
+      int typeId = data.getInt(off);
+      int length = data.getInt(off + Encoding.SIZEOF_INT);
+      off += Encoding.SIZEOF_INT * 2;
+      if (typeId == fid.id()) {
+        ByteBuffer b = (ByteBuffer)(data.duplicate().position(off));
+        @SuppressWarnings("unchecked")
+        T ret = (T) fid.wrap((ByteBuffer) b.slice().limit(length));
+        return ret;
+      }
+    }
+    return null;
+  }
+
+  public Iterable<Feature> features() {
+    final int e = numOfFeatures();
+    return new Iterable<Feature>() {
+      @Override
+      public Iterator<Feature> iterator() {
+        return new Iterator<Feature>() {
+          private int i;
+          private int off = FEATURE_OFFSET;
+          @Override
+          public boolean hasNext() {
+            return i < e;
+          }
+
+          @Override
+          public Feature next() {
+            int typeId = data.getInt(off);
+            int length = data.getInt(off + Encoding.SIZEOF_INT);
+            off += Encoding.SIZEOF_INT * 2;
+            ++i;
+            FlatINodeFeatureId fid = FlatINodeFeatureId.valueOf(typeId);
+            ByteBuffer b = (ByteBuffer)(data.duplicate().position(off));
+            return fid.wrap((ByteBuffer) b.slice().limit(length));
+          }
+        };
+      }
+    };
+  }
+
   public static class Builder {
   public static class Builder {
     private long header;
     private long header;
     private long id;
     private long id;
     private long parentId;
     private long parentId;
     private long atime;
     private long atime;
     private long mtime;
     private long mtime;
+    private final HashMap<Class<? extends Feature>, Feature> features = Maps
+      .newHashMap();
 
 
     Builder type(Type type) {
     Builder type(Type type) {
       this.header = Header.set(Header.TYPE, header, type.value());
       this.header = Header.set(Header.TYPE, header, type.value());
@@ -193,15 +259,40 @@ public final class FlatINode extends FlatObject {
       return this;
       return this;
     }
     }
 
 
+    Builder addFeature(Feature f) {
+      Feature old = features.put(f.getClass(), f);
+      assert old == null;
+      return this;
+    }
+
+    Builder replaceFeature(Feature f) {
+      features.put(f.getClass(), f);
+      return this;
+    }
+
+    Builder removeFeature(Class<? extends Feature> clazz) {
+      features.remove(clazz);
+      return this;
+    }
+
     Builder mergeFrom(FlatINode o) {
     Builder mergeFrom(FlatINode o) {
       header = o.header();
       header = o.header();
       id(o.id()).parentId(o.parentId()).mtime(o.mtime()).atime(o.atime());
       id(o.id()).parentId(o.parentId()).mtime(o.mtime()).atime(o.atime());
+      features.clear();
+      for (Feature f : o.features()) {
+        addFeature(f);
+      }
       return this;
       return this;
     }
     }
 
 
     ByteString build() {
     ByteString build() {
       Preconditions.checkState(id != 0);
       Preconditions.checkState(id != 0);
-      byte[] res = new byte[4 * Encoding.SIZEOF_LONG];
+      int size = 5 * Encoding.SIZEOF_LONG + Encoding.SIZEOF_INT;
+      for (Feature f : features.values()) {
+        size += Encoding.SIZEOF_INT * 2 + f.asReadOnlyByteBuffer().remaining();
+      }
+
+      byte[] res = new byte[size];
       CodedOutputStream o = CodedOutputStream.newInstance(res);
       CodedOutputStream o = CodedOutputStream.newInstance(res);
       try {
       try {
         o.writeFixed64NoTag(header);
         o.writeFixed64NoTag(header);
@@ -209,6 +300,14 @@ public final class FlatINode extends FlatObject {
         o.writeFixed64NoTag(parentId);
         o.writeFixed64NoTag(parentId);
         o.writeFixed64NoTag(atime);
         o.writeFixed64NoTag(atime);
         o.writeFixed64NoTag(mtime);
         o.writeFixed64NoTag(mtime);
+        o.writeFixed32NoTag(features.size());
+        for (Feature f : features.values()) {
+          FlatINodeFeatureId fid = FlatINodeFeatureId.valueOf(f.getClass());
+          o.writeFixed32NoTag(fid.id());
+          o.writeFixed32NoTag(f.data.remaining());
+          ByteString b = ByteString.copyFrom(f.data.asReadOnlyBuffer());
+          o.writeRawBytes(b);
+        }
         o.flush();
         o.flush();
       } catch (IOException ignored) {
       } catch (IOException ignored) {
       }
       }

+ 72 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FlatINodeFeatureId.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+enum FlatINodeFeatureId {
+  INODE_FILE(1, FlatINodeFileFeature.class, new Wrapper<FlatINodeFileFeature>() {
+    @Override
+    public FlatINodeFileFeature wrap(ByteBuffer data) {
+      return FlatINodeFileFeature.wrap(data);
+    }
+  });
+
+  private interface Wrapper<T> {
+    T wrap(ByteBuffer data);
+  }
+
+  private final int id;
+  private final Class<? extends FlatINode.Feature> clazz;
+  private final Wrapper<? extends FlatINode.Feature> constructor;
+
+  private static final Map<Class<? extends FlatINode.Feature>,
+      FlatINodeFeatureId> map;
+  private static final FlatINodeFeatureId[] VALUES = values();
+
+  static {
+    ImmutableMap.Builder<Class<? extends FlatINode.Feature>, FlatINodeFeatureId>
+        m = ImmutableMap.builder();
+    for (FlatINodeFeatureId f : values()) {
+      m.put(f.clazz, f);
+    }
+    map = m.build();
+  }
+
+  FlatINodeFeatureId(
+      int id, Class<? extends FlatINode.Feature> clazz,
+      Wrapper<? extends FlatINode.Feature> wrap) {
+    this.id = id;
+    this.clazz = clazz;
+    this.constructor = wrap;
+  }
+
+  static FlatINodeFeatureId valueOf(Class<? extends FlatINode.Feature> clazz) {
+    return map.get(clazz);
+  }
+
+  static FlatINodeFeatureId valueOf(int id) {
+    return VALUES[id - 1];
+  }
+
+  int id() { return id; }
+  FlatINode.Feature wrap(ByteBuffer data) { return constructor.wrap(data); }
+}

+ 301 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FlatINodeFileFeature.java

@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class FlatINodeFileFeature extends FlatINode.Feature {
+  private FlatINodeFileFeature(ByteBuffer data) {
+    super(data);
+  }
+  private FlatINodeFileFeature(ByteString data) {
+    super(data);
+  }
+
+  private static final int SIZEOF_BLOCK = 24;
+
+  public static FlatINodeFileFeature wrap(ByteString v) {
+    return new FlatINodeFileFeature(v);
+  }
+
+  public static FlatINodeFileFeature wrap(ByteBuffer v) {
+    return new FlatINodeFileFeature(v);
+  }
+
+  private enum Header {
+    PREFERRED_BLOCK_SIZE(null, 47, 1),
+    REPLICATION(PREFERRED_BLOCK_SIZE, 12, 1),
+    STORAGE_POLICY_ID(REPLICATION, BlockStoragePolicySuite.ID_BIT_LENGTH,
+      0),
+    IN_CONSTRUCTION(STORAGE_POLICY_ID, 1, 0);
+
+    private final LongBitFormat BITS;
+    Header(Header prev, int length, long min) {
+      BITS = new LongBitFormat(name(), prev == null ? null : prev.BITS, length,
+        min);
+    }
+
+    static long get(Header h, long bits) {
+      return h.BITS.retrieve(bits);
+    }
+
+    static long build(long blockSize, int replication,
+                      byte storagePolicyId, boolean inConstruction) {
+      long v = 0;
+      v = PREFERRED_BLOCK_SIZE.BITS.combine(blockSize, v);
+      v = REPLICATION.BITS.combine(replication, v);
+      v = STORAGE_POLICY_ID.BITS.combine(storagePolicyId, v);
+      v = IN_CONSTRUCTION.BITS.combine(inConstruction ? 1 : 0, v);
+      return v;
+    }
+  }
+
+  private long header() {
+    return data.getLong(0);
+  }
+
+  private boolean isInConstruction() {
+    return Header.IN_CONSTRUCTION.BITS.retrieve(header()) != 0;
+  }
+
+  public int numBlocks() {
+    return data.getInt(Encoding.SIZEOF_LONG);
+  }
+
+  public long blockSize() {
+    return Header.get(Header.PREFERRED_BLOCK_SIZE, header());
+  }
+
+  public short replication() {
+    return (short) Header.get(Header.REPLICATION, header());
+  }
+
+  public byte storagePolicyId() {
+    return (byte) Header.get(Header.STORAGE_POLICY_ID, header());
+  }
+
+  public boolean inConstruction() {
+    return Header.get(Header.IN_CONSTRUCTION, header()) != 0;
+  }
+
+  public Iterable<Block> blocks() {
+    final int numBlocks = numBlocks();
+    return new Iterable<Block>() {
+      @Override
+      public Iterator<Block> iterator() {
+        return new Iterator<Block>() {
+          private int i;
+          @Override
+          public boolean hasNext() {
+            return i < numBlocks;
+          }
+
+          @Override
+          public Block next() {
+            int off = offsetOfBlock(i);
+            Block bi = readBlock(off);
+            ++i;
+            return bi;
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+  long fileSize() {
+    long size = 0;
+    for (Block b : blocks()) {
+      size += b.getNumBytes();
+    }
+    return size;
+  }
+
+  public Block lastBlock() {
+    return numBlocks() == 0 ? null : block(numBlocks() - 1);
+  }
+
+  public Block penultimateBlock() {
+    return numBlocks() >= 2 ? block(numBlocks() - 2) : null;
+  }
+
+  public Block block(int index) {
+    Preconditions.checkArgument(0 <= index && index < numBlocks());
+    int off = offsetOfBlock(index);
+    return readBlock(off);
+  }
+
+  private Block readBlock(int off) {
+    return new Block(data.getLong(off), data.getLong(off + Encoding
+      .SIZEOF_LONG), data.getLong(off + 2 * Encoding.SIZEOF_LONG));
+  }
+
+  private static int offsetOfBlock(int index) {
+    return 2 * Encoding.SIZEOF_LONG + index * SIZEOF_BLOCK;
+  }
+
+  private int offsetOfClientName() {
+    return offsetOfBlock(numBlocks());
+  }
+
+  private int lengthOfClientName() {
+    assert isInConstruction();
+    return Encoding.readRawVarint32(data, offsetOfClientName());
+  }
+
+
+
+  public String clientName() {
+    if (!isInConstruction()) {
+      return null;
+    }
+    int off = offsetOfClientName();
+    return Encoding.readString((ByteBuffer) data.slice().position(off));
+  }
+
+  public String clientMachine() {
+    if (!isInConstruction()) {
+      return null;
+    }
+
+    int off = offsetOfClientName() + Encoding.computeArraySize
+      (lengthOfClientName());
+    return Encoding.readString((ByteBuffer) data.slice().position(off));
+  }
+
+  public static class Builder {
+    private long blockSize;
+    private int replication;
+    private boolean inConstruction;
+    private byte storagePolicyId;
+    private ArrayList<Block> blocks = new ArrayList<>();
+    private ByteString clientName;
+    private ByteString clientMachine;
+
+    public Builder blockSize(long blockSize) {
+      this.blockSize = blockSize;
+      return this;
+    }
+
+    public Builder replication(int replication) {
+      this.replication = replication;
+      return this;
+    }
+
+    public Builder storagePolicyId(byte storagePolicyId) {
+      this.storagePolicyId = storagePolicyId;
+      return this;
+    }
+
+    public Builder inConstruction(boolean inConstruction) {
+      this.inConstruction = inConstruction;
+      return this;
+    }
+
+    public boolean inConstruction() {
+      return inConstruction;
+    }
+
+    public Builder addBlock(Block block) {
+      this.blocks.add(block);
+      return this;
+    }
+
+    public Builder block(int index, Block b) {
+      blocks.set(index, b);
+      return this;
+    }
+
+    public ArrayList<Block> blocks() {
+      return blocks;
+    }
+
+    public Builder clearBlock() {
+      blocks.clear();
+      return this;
+    }
+
+    public Builder clientName(String clientName) {
+      this.clientName = clientName == null
+          ? null : ByteString.copyFromUtf8(clientName);
+      return this;
+    }
+
+    public Builder clientMachine(String clientMachine) {
+      this.clientMachine = clientMachine == null
+          ? null : ByteString.copyFromUtf8(clientMachine);
+      return this;
+    }
+
+    public ByteString build() {
+      Preconditions.checkState(!inConstruction || (clientName != null &&
+        clientMachine != null));
+      int size = Encoding.SIZEOF_LONG * 2 + 3 * Encoding.SIZEOF_LONG * blocks
+        .size();
+      if (inConstruction) {
+        size += Encoding.computeArraySize(clientName.size());
+        size += Encoding.computeArraySize(clientMachine.size());
+      }
+      byte[] res = new byte[size];
+      CodedOutputStream o = CodedOutputStream.newInstance(res);
+      try {
+        o.writeFixed64NoTag(Header.build(blockSize, replication,
+          storagePolicyId, inConstruction));
+        o.writeFixed64NoTag(blocks.size());
+        for (Block b : blocks) {
+          o.writeFixed64NoTag(b.getBlockId());
+          o.writeFixed64NoTag(b.getNumBytes());
+          o.writeFixed64NoTag(b.getGenerationStamp());
+        }
+        if (inConstruction) {
+          o.writeBytesNoTag(clientName);
+          o.writeBytesNoTag(clientMachine);
+        }
+        o.flush();
+      } catch (IOException ignored) {
+      }
+      return ByteString.copyFrom(res);
+    }
+
+    Builder mergeFrom(FlatINodeFileFeature o) {
+      blockSize(o.blockSize()).replication(o.replication())
+        .storagePolicyId(o.storagePolicyId())
+        .inConstruction(o.inConstruction());
+      blocks = Lists.newArrayList(o.blocks());
+      if (o.isInConstruction()) {
+        clientName(o.clientName()).clientMachine(o.clientMachine());
+      }
+      return this;
+    }
+  }
+}