Bläddra i källkod

HDFS-5774. Serialize CachePool directives in protobuf. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1559874 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 år sedan
förälder
incheckning
93c8a05f12

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -14,3 +14,5 @@ HDFS-5698 subtasks
     jing9)
 
     HDFS-5743. Use protobuf to serialize snapshot information. (jing9)
+
+    HDFS-5774. Serialize CachePool directives in protobuf. (Haohui Mai via jing9)

+ 122 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -50,8 +50,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -62,11 +64,15 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -81,6 +87,7 @@ import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 /**
  * The Cache Manager handles caching on DataNodes.
@@ -167,6 +174,19 @@ public final class CacheManager {
    */
   private CacheReplicationMonitor monitor;
 
+  public static final class PersistState {
+    public final CacheManagerSection section;
+    public final List<CachePoolInfoProto> pools;
+    public final List<CacheDirectiveInfoProto> directives;
+
+    public PersistState(CacheManagerSection section,
+        List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) {
+      this.section = section;
+      this.pools = pools;
+      this.directives = directives;
+    }
+  }
+
   CacheManager(FSNamesystem namesystem, Configuration conf,
       BlockManager blockManager) {
     this.namesystem = namesystem;
@@ -933,6 +953,57 @@ public final class CacheManager {
     serializerCompat.save(out, sdPath);
   }
 
+  public PersistState saveState() throws IOException {
+    ArrayList<CachePoolInfoProto> pools = Lists
+        .newArrayListWithCapacity(cachePools.size());
+    ArrayList<CacheDirectiveInfoProto> directives = Lists
+        .newArrayListWithCapacity(directivesById.size());
+
+    for (CachePool pool : cachePools.values()) {
+      CachePoolInfo p = pool.getInfo(true);
+      CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder();
+
+      if (p.getOwnerName() != null)
+        b.setOwnerName(p.getOwnerName());
+
+      if (p.getGroupName() != null)
+        b.setGroupName(p.getGroupName());
+
+      if (p.getMode() != null)
+        b.setMode(p.getMode().toShort());
+
+      if (p.getLimit() != null)
+        b.setLimit(p.getLimit());
+
+      pools.add(b.build());
+    }
+
+    for (CacheDirective directive : directivesById.values()) {
+      CacheDirectiveInfo info = directive.toInfo();
+      CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder();
+
+      if (info.getPath() != null)
+        b.setPath(info.getPath().toUri().getPath());
+
+      if (info.getReplication() != null)
+        b.setReplication(info.getReplication());
+
+      if (info.getPool() != null)
+        b.setPool(info.getPool());
+
+      if (info.getExpiration() != null)
+        b.setExpiration(CacheDirectiveInfoExpirationProto.newBuilder()
+            .setMillis(info.getExpiration().getMillis()));
+
+      directives.add(b.build());
+    }
+    CacheManagerSection s = CacheManagerSection.newBuilder()
+        .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size())
+        .setNumDirectives(directives.size()).build();
+
+    return new PersistState(s, pools, directives);
+  }
+
   /**
    * Reloads CacheManager state from the passed DataInput. Used during namenode
    * startup to restore CacheManager state from an FSImage.
@@ -943,6 +1014,56 @@ public final class CacheManager {
     serializerCompat.load(in);
   }
 
+  public void loadState(PersistState s) throws IOException {
+    nextDirectiveId = s.section.getNextDirectiveId();
+    for (CachePoolInfoProto p : s.pools) {
+      CachePoolInfo info = new CachePoolInfo(p.getPoolName());
+      if (p.hasOwnerName())
+        info.setOwnerName(p.getOwnerName());
+
+      if (p.hasGroupName())
+        info.setGroupName(p.getGroupName());
+
+      if (p.hasMode())
+        info.setMode(new FsPermission((short) p.getMode()));
+
+      if (p.hasLimit())
+        info.setLimit(p.getLimit());
+
+      addCachePool(info);
+    }
+
+    for (CacheDirectiveInfoProto p : s.directives) {
+      // Get pool reference by looking it up in the map
+      final String poolName = p.getPool();
+      CacheDirective directive = new CacheDirective(p.getId(), new Path(
+          p.getPath()).toUri().getPath(), (short) p.getReplication(), p
+          .getExpiration().getMillis());
+      addCacheDirective(poolName, directive);
+    }
+  }
+
+  private void addCacheDirective(final String poolName,
+      final CacheDirective directive) throws IOException {
+    CachePool pool = cachePools.get(poolName);
+    if (pool == null) {
+      throw new IOException("Directive refers to pool " + poolName
+          + ", which does not exist.");
+    }
+    boolean addedDirective = pool.getDirectiveList().add(directive);
+    assert addedDirective;
+    if (directivesById.put(directive.getId(), directive) != null) {
+      throw new IOException("A directive with ID " + directive.getId()
+          + " already exists");
+    }
+    List<CacheDirective> directives = directivesByPath.get(directive.getPath());
+    if (directives == null) {
+      directives = new LinkedList<CacheDirective>();
+      directivesByPath.put(directive.getPath(), directives);
+    }
+    directives.add(directive);
+  }
+
   private final class SerializerCompat {
     private void save(DataOutputStream out, String sdPath) throws IOException {
       out.writeLong(nextDirectiveId);
@@ -1025,27 +1146,10 @@ public final class CacheManager {
         CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
         // Get pool reference by looking it up in the map
         final String poolName = info.getPool();
-        CachePool pool = cachePools.get(poolName);
-        if (pool == null) {
-          throw new IOException("Directive refers to pool " + poolName +
-              ", which does not exist.");
-        }
         CacheDirective directive =
             new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
                 info.getReplication(), info.getExpiration().getAbsoluteMillis());
-        boolean addedDirective = pool.getDirectiveList().add(directive);
-        assert addedDirective;
-        if (directivesById.put(directive.getId(), directive) != null) {
-          throw new IOException("A directive with ID " + directive.getId() +
-              " already exists");
-        }
-        List<CacheDirective> directives =
-            directivesByPath.get(directive.getPath());
-        if (directives == null) {
-          directives = new LinkedList<CacheDirective>();
-          directivesByPath.put(directive.getPath(), directives);
-        }
-        directives.add(directive);
+        addCacheDirective(poolName, directive);
         counter.increment();
       }
       prog.endStep(Phase.LOADING_FSIMAGE, step);

+ 38 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -46,6 +46,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
@@ -220,6 +224,9 @@ public final class FSImageFormatProtobuf {
         case SNAPSHOT_DIFF:
           snapshotLoader.loadSnapshotDiffSection(in);
           break;
+        case CACHE_MANAGER:
+          loadCacheManagerSection(in);
+          break;
         default:
           LOG.warn("Unregconized section " + n);
           break;
@@ -246,6 +253,21 @@ public final class FSImageFormatProtobuf {
         stringTable[e.getId()] = e.getStr();
       }
     }
+
+    private void loadCacheManagerSection(InputStream in) throws IOException {
+      CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in);
+      ArrayList<CachePoolInfoProto> pools = Lists.newArrayListWithCapacity(s
+          .getNumPools());
+      ArrayList<CacheDirectiveInfoProto> directives = Lists
+          .newArrayListWithCapacity(s.getNumDirectives());
+      for (int i = 0; i < s.getNumPools(); ++i)
+        pools.add(CachePoolInfoProto.parseDelimitedFrom(in));
+      for (int i = 0; i < s.getNumDirectives(); ++i)
+        directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in));
+      fsn.getCacheManager().loadState(
+          new CacheManager.PersistState(s, pools, directives));
+    }
+
   }
 
   public static final class Saver {
@@ -352,6 +374,8 @@ public final class FSImageFormatProtobuf {
       saveSnapshots(b);
       saveStringTableSection(b);
 
+      saveCacheManagerSection(b);
+
       // Flush the buffered data into the file before appending the header
       flushSectionOutputStream();
 
@@ -361,6 +385,20 @@ public final class FSImageFormatProtobuf {
       savedDigest = new MD5Hash(digester.digest());
     }
 
+    private void saveCacheManagerSection(FileSummary.Builder summary) throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      CacheManager.PersistState state = fsn.getCacheManager().saveState();
+      state.section.writeDelimitedTo(sectionOutputStream);
+
+      for (CachePoolInfoProto p : state.pools)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      for (CacheDirectiveInfoProto p : state.directives)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      commitSection(summary, SectionName.CACHE_MANAGER);
+    }
+
     private void saveNameSystemSection(
         FileSummary.Builder summary) throws IOException {
       final FSNamesystem fsn = context.getSourceNamesystem();
@@ -443,7 +481,6 @@ public final class FSImageFormatProtobuf {
     INODE_DIR("INODE_DIR"),
     FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
     SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
-    SECRET_MANAGER("SECRET_MANAGER"),
     CACHE_MANAGER("CACHE_MANAGER");
 
     private static final SectionName[] values = SectionName.values();

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -73,7 +73,7 @@ message NameSystemSection {
 }
 
 /**
- * Permission is serialized as a 64-bit long. [0:24):[25:48):[48:64) (in Big Endian). 
+ * Permission is serialized as a 64-bit long. [0:24):[25:48):[48:64) (in Big Endian).
  * The first and the second parts are the string ids of the user and
  * group name, and the last 16 bits are the permission bits.
  *
@@ -113,7 +113,7 @@ message INodeSection {
   }
 
   message INodeReference {
-    // id of the referred inode 
+    // id of the referred inode
     optional uint64 referredId = 1;
     // local name recorded in WithName
     optional bytes name = 2;
@@ -132,7 +132,7 @@ message INodeSection {
     required Type type = 1;
     required uint64 id = 2;
     optional bytes name = 3;
-    
+
     optional INodeFile file = 4;
     optional INodeDirectory directory = 5;
     optional INodeSymlink symlink = 6;
@@ -205,7 +205,7 @@ message SnapshotDiffSection {
     optional INodeSection.INodeDirectory snapshotCopy = 5;
     optional uint32 clistSize = 6;
     optional uint32 dlistSize = 7;
-    repeated uint64 deletedINode = 8; // id of deleted inode 
+    repeated uint64 deletedINode = 8; // id of deleted inode
     // repeated CreatedListEntry
     // repeated INodeReference (number of ref: dlistSize - dlist.size)
   }
@@ -238,3 +238,11 @@ message StringTableSection {
   optional uint32 numEntry = 1;
   // repeated Entry
 }
+
+message CacheManagerSection {
+  required uint64 nextDirectiveId = 1;
+  required uint32 numPools        = 2;
+  required uint32 numDirectives   = 3;
+  // repeated CachePoolInfoProto pools
+  // repeated CacheDirectiveInfoProto directives
+}