Ver código fonte

HADOOP-2113. A new shell command "dfs -text" to view the contents of
a gziped or SequenceFile. (Chris Douglas via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@597211 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 18 anos atrás
pai
commit
3527c961ed

+ 3 - 0
CHANGES.txt

@@ -72,6 +72,9 @@ Trunk (unreleased changes)
     HADOOP-2127. Added a pipes sort example to benchmark trivial pipes
     application versus trivial java application. (omalley via acmurthy)
 
+    HADOOP-2113. A new shell command "dfs -text" to view the contents of
+    a gziped or SequenceFile. (Chris Douglas via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 95 - 10
src/java/org/apache/hadoop/fs/FsShell.java

@@ -19,15 +19,23 @@ package org.apache.hadoop.fs;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -92,11 +100,7 @@ public class FsShell extends Configured implements Tool {
   /** 
    * Print from src to stdout.
    */
-  private void printToStdout(Path src) throws IOException {
-    if (fs.isDirectory(src)) {
-      throw new IOException("Source must be a file.");
-    }
-    FSDataInputStream in = fs.open(src);
+  private void printToStdout(InputStream in) throws IOException {
     IOUtils.copyBytes(in, System.out, getConf(), true);
   }
 
@@ -286,11 +290,81 @@ public class FsShell extends Configured implements Tool {
     new DelayedExceptionThrowing() {
       @Override
       void process(Path p) throws IOException {
-        printToStdout(p);
+        if (fs.isDirectory(p)) {
+          throw new IOException("Source must be a file.");
+        }
+        printToStdout(fs.open(p));
       }
     }.process(srcf);
   }
-    
+
+  private class TextRecordInputStream extends InputStream {
+    SequenceFile.Reader r;
+    WritableComparable key;
+    Writable val;
+
+    DataInputBuffer inbuf;
+    DataOutputBuffer outbuf;
+
+    public TextRecordInputStream(FileStatus f) throws IOException {
+      r = new SequenceFile.Reader(fs, f.getPath(), getConf());
+      key = (WritableComparable)ReflectionUtils.newInstance(
+          r.getKeyClass(), getConf());
+      val = (Writable)ReflectionUtils.newInstance(
+          r.getValueClass(), getConf());
+      inbuf = new DataInputBuffer();
+      outbuf = new DataOutputBuffer();
+    }
+
+    public int read() throws IOException {
+      int ret;
+      if (null == inbuf || -1 == (ret = inbuf.read())) {
+        if (!r.next(key, val)) {
+          return -1;
+        }
+        byte[] tmp = key.toString().getBytes();
+        outbuf.write(tmp, 0, tmp.length);
+        outbuf.write('\t');
+        tmp = val.toString().getBytes();
+        outbuf.write(tmp, 0, tmp.length);
+        outbuf.write('\n');
+        inbuf.reset(outbuf.getData(), outbuf.getLength());
+        outbuf.reset();
+        ret = inbuf.read();
+      }
+      return ret;
+    }
+  }
+
+  private InputStream forMagic(Path p) throws IOException {
+    FSDataInputStream i = fs.open(p);
+    switch(i.readShort()) {
+      case 0x1f8b: // RFC 1952
+        i.seek(0);
+        return new GZIPInputStream(i);
+      case 0x5345: // 'S' 'E'
+        if (i.readByte() == 'Q') {
+          i.close();
+          return new TextRecordInputStream(fs.getFileStatus(p));
+        }
+        break;
+    }
+    i.seek(0);
+    return i;
+  }
+
+  void text(String srcf) throws IOException {
+    new DelayedExceptionThrowing() {
+      @Override
+      void process(Path p) throws IOException {
+        if (fs.isDirectory(p)) {
+          throw new IOException("Source must be a file.");
+        }
+        printToStdout(forMagic(p));
+      }
+    }.process(srcf);
+  }
+
   /**
    * Parse the args of a command and check the format of args.
    */
@@ -961,7 +1035,7 @@ public class FsShell extends Configured implements Tool {
       "[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" +
       "[-mkdir <path>] [-report] [" + SETREP_SHORT_USAGE + "]\n\t" +
       "[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
-      "[-tail [-f] <path>]\n\t" +
+      "[-tail [-f] <path>] [-text <path>]\n\t" +
       "[-help [cmd]]\n";
 
     String conf ="-conf <configuration file>:  Specify an application configuration file.";
@@ -1039,6 +1113,10 @@ public class FsShell extends Configured implements Tool {
 
     String cat = "-cat <src>: \tFetch all files that match the file pattern <src> \n" +
       "\t\tand display their content on stdout.\n";
+
+    String text = "-text <path>: Attempt to decode contents if the first few bytes\n" +
+      "\t\tmatch a magic number associated with a known format\n" +
+      "\t\t(gzip, SequenceFile)\n";
         
     String copyToLocal = "-copyToLocal <src> <localdst>:  Identical to the -get command.\n";
 
@@ -1183,6 +1261,8 @@ public class FsShell extends Configured implements Tool {
           ls(argv[i], true);
         } else if ("-touchz".equals(cmd)) {
           touchz(argv[i]);
+        } else if ("-text".equals(cmd)) {
+          text(argv[i]);
         }
       } catch (RemoteException e) {
         //
@@ -1228,7 +1308,8 @@ public class FsShell extends Configured implements Tool {
     } else if ("-ls".equals(cmd) || "-lsr".equals(cmd) ||
                "-du".equals(cmd) || "-dus".equals(cmd) ||
                "-rm".equals(cmd) || "-rmr".equals(cmd) ||
-               "-touchz".equals(cmd) || "-mkdir".equals(cmd)) {
+               "-touchz".equals(cmd) || "-mkdir".equals(cmd) ||
+               "-text".equals(cmd)) {
       System.err.println("Usage: java FsShell" + 
                          " [" + cmd + " <path>]");
     } else if ("-mv".equals(cmd) || "-cp".equals(cmd)) {
@@ -1275,6 +1356,7 @@ public class FsShell extends Configured implements Tool {
       System.err.println("           [-get [-crc] <src> <localdst>]");
       System.err.println("           [-getmerge <src> <localdst> [addnl]]");
       System.err.println("           [-cat <src>]");
+      System.err.println("           [-text <src>]");
       System.err.println("           [-copyToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-moveToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-mkdir <path>]");
@@ -1325,7 +1407,8 @@ public class FsShell extends Configured implements Tool {
       }
     } else if ("-rm".equals(cmd) || "-rmr".equals(cmd) ||
                "-cat".equals(cmd) || "-mkdir".equals(cmd) ||
-               "-touchz".equals(cmd) || "-stat".equals(cmd)) {
+               "-touchz".equals(cmd) || "-stat".equals(cmd) ||
+               "-text".equals(cmd)) {
       if (argv.length < 2) {
         printUsage(cmd);
         return exitCode;
@@ -1359,6 +1442,8 @@ public class FsShell extends Configured implements Tool {
           copyMergeToLocal(argv[i++], new Path(argv[i++]));
       } else if ("-cat".equals(cmd)) {
         exitCode = doall(cmd, argv, getConf(), i);
+      } else if ("-text".equals(cmd)) {
+        exitCode = doall(cmd, argv, getConf(), i);
       } else if ("-moveToLocal".equals(cmd)) {
         moveToLocal(argv[i++], new Path(argv[i++]));
       } else if ("-setrep".equals(cmd)) {

+ 46 - 0
src/test/org/apache/hadoop/dfs/TestDFSShell.java

@@ -21,10 +21,12 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.security.*;
 import java.util.*;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * This class tests commands from DFSShell.
@@ -172,6 +174,50 @@ public class TestDFSShell extends TestCase {
     }
   }
 
+  public void testText() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    PrintStream bak = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      FileSystem fs = cluster.getFileSystem();
+      Path root = new Path("/texttest");
+      fs.mkdirs(root);
+      OutputStream zout = new GZIPOutputStream(
+          fs.create(new Path(root, "file.gz")));
+      Random r = new Random();
+      ByteArrayOutputStream file = new ByteArrayOutputStream();
+      for (int i = 0; i < 1024; ++i) {
+        char c = Character.forDigit(r.nextInt(26) + 10, 36);
+        file.write(c);
+        zout.write(c);
+      }
+      zout.close();
+
+      bak = System.out;
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(out));
+
+      String[] argv = new String[2];
+      argv[0] = "-text";
+      argv[1] = new Path(root, "file.gz").toUri().getPath();
+      int ret = ToolRunner.run(new FsShell(), argv);
+      assertTrue("-text returned -1", 0 >= ret);
+      file.reset();
+      out.reset();
+      assertTrue("Output doesn't match input",
+          Arrays.equals(file.toByteArray(), out.toByteArray()));
+
+    } finally {
+      if (null != bak) {
+        System.setOut(bak);
+      }
+      if (null != cluster) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   public void testCopyToLocal() throws IOException {
     Configuration conf = new Configuration();
     /* This tests some properties of ChecksumFileSystem as well.