Explorar o código

HADOOP-1914. Introduce a new NamenodeProtocol to allow secondary
namenodes and rebalancing processes to communicate with a primary
namenode. (Hairong Kuang via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@581344 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur %!s(int64=18) %!d(string=hai) anos
pai
achega
32232e1388

+ 4 - 0
CHANGES.txt

@@ -66,6 +66,10 @@ Trunk (unreleased changes)
     to Web User Interface. Users not using Firefox may install a plugin to 
     their browsers to see svg graphics. (enis)
 
+    HADOOP-1914. Introduce a new NamenodeProtocol to allow secondary 
+    namenodes and rebalancing processes to communicate with a primary 
+    namenode.  (Hairong Kuang via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-1910.  Reduce the number of RPCs that DistributedFileSystem.create()

+ 116 - 0
src/java/org/apache/hadoop/dfs/BlocksWithLocations.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/** A class to implement an array of BlockLocations
+ *  It provide efficient customized serialization/deserialization methods
+ *  in stead of using the default array (de)serialization provided by RPC
+ */
+class BlocksWithLocations implements Writable {
+
+  /**
+   * A class to keep track of a block and its locations
+   */
+  static class BlockWithLocations  implements Writable {
+    Block block;
+    String datanodeIDs[];
+    
+    /** default constructor */
+    public BlockWithLocations() {
+      block = new Block();
+      datanodeIDs = null;
+    }
+    
+    /** constructor */
+    public BlockWithLocations(Block b, String[] datanodes) {
+      block = b;
+      datanodeIDs = datanodes;
+    }
+    
+    /** get the block */
+    Block getBlock() {
+      return block;
+    }
+    
+    /** get the block's locations */
+    String[] getDatanodes() {
+      return datanodeIDs;
+    }
+    
+    /** deserialization method */
+    public void readFields(DataInput in) throws IOException {
+      block.readFields(in);
+      int len = WritableUtils.readVInt(in); // variable length integer
+      datanodeIDs = new String[len];
+      for(int i=0; i<len; i++) {
+        datanodeIDs[i] = Text.readString(in);
+      }
+    }
+    
+    /** serialization method */
+    public void write(DataOutput out) throws IOException {
+      block.write(out);
+      WritableUtils.writeVInt(out, datanodeIDs.length); // variable length int
+      for(String id:datanodeIDs) {
+        Text.writeString(out, id);
+      }
+    }
+  }
+
+  private BlockWithLocations[] blocks;
+
+  /** default constructor */
+  BlocksWithLocations() {
+  }
+
+  /** Constructor with one parameter */
+  BlocksWithLocations( BlockWithLocations[] blocks ) {
+    this.blocks = blocks;
+  }
+
+  /** getter */
+  BlockWithLocations[] getBlocks() {
+    return blocks;
+  }
+
+  /** serialization method */
+  public void write( DataOutput out ) throws IOException {
+    WritableUtils.writeVInt(out, blocks.length);
+    for(int i=0; i<blocks.length; i++) {
+      blocks[i].write(out);
+    }
+  }
+
+  /** deserialization method */
+  public void readFields(DataInput in) throws IOException {
+    int len = WritableUtils.readVInt(in);
+    blocks = new BlockWithLocations[len];
+    for(int i=0; i<len; i++) {
+      blocks[i] = new BlockWithLocations();
+      blocks[i].readFields(in);
+    }
+  }
+}

+ 76 - 0
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.dfs;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetworkTopology;
@@ -550,6 +551,81 @@ class FSNamesystem implements FSConstants {
     return crcInfo;
   }
   
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by secondary namenodes
+  //
+  /////////////////////////////////////////////////////////
+  /**
+   * return a list of blocks & their locations on <code>datanode</code> whose
+   * total size is <code>size</code>
+   * 
+   * @param datanode on which blocks are located
+   * @parm size total size of blocks
+   */
+  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+      throws IOException {
+    DatanodeDescriptor node = getDatanode(datanode);
+    if (node == null) {
+      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+          + "Asking for blocks from an unrecorded node " + datanode.getName());
+      throw new IllegalArgumentException(
+          "Unexpected exception.  Got getBlocks message for datanode " + 
+          datanode.getName() + ", but there is no info for it");
+    }
+
+    int numBlocks = node.numBlocks();
+    if(numBlocks == 0) {
+      return new BlocksWithLocations(new BlockWithLocations[0]);
+    }
+    Iterator<Block> iter = node.getBlockIterator();
+    int startBlock = r.nextInt(numBlocks); // starting from a random block
+    // skip blocks
+    for(int i=0; i<startBlock; i++) {
+      iter.next();
+    }
+    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    long totalSize = 0;
+    while(totalSize<size && iter.hasNext()) {
+      totalSize += addBlock(iter.next(), results);
+    }
+    if(totalSize<size) {
+      iter = node.getBlockIterator(); // start from the beginning
+      for(int i=0; i<startBlock&&totalSize<size; i++) {
+        totalSize += addBlock(iter.next(), results);
+      }
+    }
+    
+    return new BlocksWithLocations(
+        results.toArray(new BlockWithLocations[results.size()]));
+  }
+  
+  /* Get all valid locations of the block & add the block to results
+   * return the length of the added block; 0 if the block is not added
+   */
+  private long addBlock(Block block, List<BlockWithLocations> results) {
+    int numNodes = blocksMap.numNodes(block);
+    String[] machineSet = new String[numNodes];
+    if (numNodes > 0) {
+      numNodes = 0;
+      for(Iterator<DatanodeDescriptor> it = 
+          blocksMap.nodeIterator(block); it.hasNext();) {
+        String storageID = it.next().getStorageID();
+        // filter invalidate replicas
+        Collection<Block> blocks = recentInvalidateSets.get(storageID); 
+        if(blocks==null || !blocks.contains(block)) {
+          machineSet[numNodes++] = storageID;
+        }
+      }
+    }
+    if(numNodes == 0) {
+      return 0;
+    } else {
+      results.add(new BlockWithLocations(block, machineSet));
+      return block.getNumBytes();
+    }
+  }
+
   /////////////////////////////////////////////////////////
   //
   // These methods are called by HadoopFS clients

+ 27 - 1
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -67,14 +67,20 @@ import org.apache.hadoop.metrics.jvm.JvmMetrics;
  * methods are invoked repeatedly and automatically by all the
  * DataNodes in a DFS deployment.
  *
+ * NameNode also implements the NamenodeProtocol interface, used by
+ * secondary namenodes or rebalancing processes to get partial namenode's
+ * state, for example partial blocksMap etc.
  **********************************************************/
-public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
+public class NameNode implements ClientProtocol, DatanodeProtocol,
+                                 NamenodeProtocol, FSConstants {
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException { 
     if (protocol.equals(ClientProtocol.class.getName())) {
       return ClientProtocol.versionID; 
     } else if (protocol.equals(DatanodeProtocol.class.getName())){
       return DatanodeProtocol.versionID;
+    } else if (protocol.equals(NamenodeProtocol.class.getName())){
+      return NamenodeProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to name node: " + protocol);
     }
@@ -254,7 +260,27 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       server.stop();
     }
   }
+  
+  /////////////////////////////////////////////////////
+  // NamenodeProtocol
+  /////////////////////////////////////////////////////
+  /**
+   * return a list of blocks & their locations on <code>datanode</code> whose
+   * total size is <code>size</code>
+   * 
+   * @param datanode on which blocks are located
+   * @param size total size of blocks
+   */
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+  throws IOException {
+    if(size <= 0) {
+      throw new IllegalArgumentException(
+        "Unexpected not positive size: "+size);
+    }
 
+    return namesystem.getBlocks(datanode, size); 
+  }
+  
   /////////////////////////////////////////////////////
   // ClientProtocol
   /////////////////////////////////////////////////////

+ 43 - 0
src/java/org/apache/hadoop/dfs/NamenodeProtocol.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/*****************************************************************************
+ * Protocol that a secondary NameNode uses to communicate with the NameNode.
+ * It's used to get part of the name node state
+ *****************************************************************************/
+interface NamenodeProtocol extends VersionedProtocol {
+  public static final long versionID = 0L;
+
+  /** Get a list of blocks belonged to <code>datanode</code>
+    * whose total size is equal to <code>size</code>
+   * @param datanode  a data node
+   * @param size      requested size
+   * @return          a list of blocks & their locations
+   * @Exception RemoteException if size is less than or equal to 0 or
+                                   datanode does not exist
+   */
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+  throws IOException;
+
+}

+ 146 - 0
src/test/org/apache/hadoop/dfs/TestGetBlocks.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+
+import junit.framework.TestCase;
+/**
+ * This class tests if block replacement request to data nodes work correctly.
+ */
+public class TestGetBlocks extends TestCase {
+  /** test getBlocks */
+  public void testGetBlocks() throws IOException {
+    final Configuration CONF = new Configuration();
+
+    final short REPLICATION_FACTOR = (short)2;
+    final int DEFAULT_BLOCK_SIZE = 1024;
+    final Random r = new Random();
+    
+    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    MiniDFSCluster cluster = new MiniDFSCluster(
+          CONF, REPLICATION_FACTOR, true, null );
+    try {
+      cluster.waitActive();
+      
+      // create a file with two blocks
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(new Path("/tmp.txt"),
+          REPLICATION_FACTOR);
+      byte [] data = new byte[1024];
+      long fileLen = 2*DEFAULT_BLOCK_SIZE;
+      long bytesToWrite = fileLen;
+      while( bytesToWrite > 0 ) {
+        r.nextBytes(data);
+        int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
+        out.write(data, 0, bytesToWriteNext);
+        bytesToWrite -= bytesToWriteNext;
+      }
+      out.close();
+
+      // get blocks & data nodes
+      List<LocatedBlock> locatedBlocks;
+      DatanodeInfo[] dataNodes=null;
+      boolean notWritten;
+      do {
+        locatedBlocks = cluster.getNameNode().
+          getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
+        assertEquals(2, locatedBlocks.size());
+        notWritten = false;
+        for(int i=0; i<2; i++) {
+          dataNodes = locatedBlocks.get(i).getLocations();
+          if(dataNodes.length != REPLICATION_FACTOR) {
+            notWritten = true;
+            try {
+              Thread.sleep(10);
+            } catch(InterruptedException e) {
+            }
+            break;
+          }
+        }
+      } while(notWritten);
+      
+      // get RPC client to namenode
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      NamenodeProtocol namenode = (NamenodeProtocol) RPC.getProxy(
+          NamenodeProtocol.class, NamenodeProtocol.versionID, addr, CONF);
+
+      // get blocks of size fileLen from dataNodes[0]
+      BlockWithLocations[] locs;
+      locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks();
+      assertEquals(locs.length, 2);
+      assertEquals(locs[0].getDatanodes().length, 2);
+      assertEquals(locs[1].getDatanodes().length, 2);
+
+      // get blocks of size BlockSize from dataNodes[0]
+      locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE).getBlocks();
+      assertEquals(locs.length, 1);
+      assertEquals(locs[0].getDatanodes().length, 2);
+
+      // get blocks of size 1 from dataNodes[0]
+      locs = namenode.getBlocks(dataNodes[0], 1).getBlocks();
+      assertEquals(locs.length, 1);
+      assertEquals(locs[0].getDatanodes().length, 2);
+
+      // get blocks of size 0 from dataNodes[0]
+      getBlocksWithException(namenode, dataNodes[0], 0);     
+
+      // get blocks of size -1 from dataNodes[0]
+      getBlocksWithException(namenode, dataNodes[0], -1);
+
+      // get blocks of size BlockSize from a non-existent datanode
+      getBlocksWithException(namenode, new DatanodeInfo(), 2);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void getBlocksWithException(NamenodeProtocol namenode,
+                                      DatanodeInfo datanode,
+                                      long size) throws IOException {
+    boolean getException = false;
+    try {
+        namenode.getBlocks(new DatanodeInfo(), 2);
+    } catch(RemoteException e) {
+      getException = true;
+      assertTrue(e.getMessage().contains("IllegalArgumentException"));
+    }
+    assertTrue(getException);
+  }
+ 
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    (new TestGetBlocks()).testGetBlocks();
+  }
+
+}