Ver Fonte

HDFS-695. RaidNode should read in configuration from hdfs-site.xml.
(dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@827928 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur há 15 anos atrás
pai
commit
ccf130c3d2

+ 3 - 0
CHANGES.txt

@@ -22,6 +22,9 @@ Trunk (unreleased changes)
     HDFS-646. Fix test-patch failure by adding test-contrib ant target.
     (gkesavan)
 
+    HDFS-695. RaidNode should read in configuration from hdfs-site.xml.
+    (dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 7 - 0
src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java

@@ -39,6 +39,8 @@ public class DistRaid {
   static final String JOB_DIR_LABEL = NAME + ".job.dir";
   static final String OP_LIST_LABEL = NAME + ".op.list";
   static final String OP_COUNT_LABEL = NAME + ".op.count";
+  static final int   OP_LIST_BLOCK_SIZE = 32 * 1024 * 1024; // block size of control file
+  static final short OP_LIST_REPLICATION = 10; // replication factor of control file
 
   private static final long OP_PER_MAP = 100;
   private static final int MAX_MAPS_PER_NODE = 20;
@@ -287,6 +289,10 @@ public class DistRaid {
     jobconf.set(JOB_DIR_LABEL, jobdir.toString());
     Path log = new Path(jobdir, "_logs");
 
+    // The control file should have small size blocks. This helps
+    // in spreading out the load from mappers that will be spawned.
+    jobconf.setInt("dfs.blocks.size",  OP_LIST_BLOCK_SIZE);
+
     FileOutputFormat.setOutputPath(jobconf, log);
     LOG.info("log=" + log);
 
@@ -314,6 +320,7 @@ public class DistRaid {
       if (opWriter != null) {
         opWriter.close();
       }
+      fs.setReplication(opList, OP_LIST_REPLICATION); // increase replication for control file
     }
     raidPolicyPathPairList.clear();
     

+ 25 - 6
src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.raid;
 
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.util.Collection;
 import java.util.List;
 import java.util.LinkedList;
@@ -56,6 +57,14 @@ import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
  * A {@link RaidNode} that implements 
  */
 public class RaidNode implements RaidProtocol {
+
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
   public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
   public static final long SLEEP_TIME = 10000L; // 10 seconds
   public static final int DEFAULT_PORT = 60000;
@@ -712,7 +721,7 @@ public class RaidNode implements RaidProtocol {
           ins[i].seek(blockSize * (startBlock + i));
         }
 
-        generateParity(ins,out,blockSize,bufs,xor);
+        generateParity(ins,out,blockSize,bufs,xor, reporter);
         
         // close input file handles
         for (int i = 0; i < ins.length; i++) {
@@ -769,7 +778,7 @@ public class RaidNode implements RaidProtocol {
   }
   
   private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout, 
-      long parityBlockSize, byte[] bufs, byte[] xor) throws IOException {
+      long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
     
     int bufSize;
     if ((bufs == null) || (bufs.length == 0)){
@@ -796,6 +805,11 @@ public class RaidNode implements RaidProtocol {
 
       // read all remaining blocks and xor them into the buffer
       for (int i = 1; i < ins.length; i++) {
+
+        // report progress to Map-reduce framework
+        if (reporter != null) {
+          reporter.progress();
+        }
         
         int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
         
@@ -911,7 +925,7 @@ public class RaidNode implements RaidProtocol {
     byte[] bufs = new byte[bufSize];
     byte[] xor = new byte[bufSize];
    
-    generateParity(ins,fout,corruptBlockSize,bufs,xor);
+    generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
     
     // close all files
     fout.close();
@@ -1055,12 +1069,17 @@ public class RaidNode implements RaidProtocol {
                         info.getName() + " has already been procesed.");
                 continue;
               }
-              LOG.info("Purging obsolete parity files for policy " + 
-                        info.getName() + " " + destp);
 
               FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
-              FileStatus stat = destFs.getFileStatus(destp);
+              FileStatus stat = null;
+              try {
+                stat = destFs.getFileStatus(destp);
+              } catch (FileNotFoundException e) {
+                // do nothing, leave stat = null;
+              }
               if (stat != null) {
+                LOG.info("Purging obsolete parity files for policy " + 
+                          info.getName() + " " + destp);
                 recursePurge(srcFs, destFs, destinationPrefix, stat);
               }
 

+ 0 - 1
src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java

@@ -60,7 +60,6 @@ public class TestRaidPurge extends TestCase {
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
   final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
-  final Random rand = new Random();
 
   {
     ((Log4JLogger)RaidNode.LOG).getLogger().setLevel(Level.ALL);