Explorar el Código

HADOOP-5302. Added check for record bigger than MAX_READ_SIZE.
(Contribute by Jerome Boulon via eyang)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@748358 13f79535-47bb-0310-9956-ffa450edef68

Eric Yang hace 16 años
padre
commit
cea7c55b1e

+ 34 - 20
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java

@@ -18,20 +18,20 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
-import java.io.*;
-import java.util.Timer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  * An adaptor that repeatedly tails a specified file, sending the new bytes.
  * This class does not split out records, but just sends everything up to end of file.
@@ -200,9 +200,10 @@ public class FileTailingAdaptor implements Adaptor
 	    	if (len >= fileReadOffset) {
 	    		if(offsetOfFirstByte>fileReadOffset) {
 	    			// If the file rotated, the recorded offsetOfFirstByte is greater than file size,
-	    			// reset the first byte position to beginning of the file.
-	        		offsetOfFirstByte = 0L;    			
+	    			// reset the first byte position to beginning of the file.	
 	    			fileReadOffset=0;
+	    			offsetOfFirstByte = 0L;       
+	    			log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
 	    		}
 	    		
 	    		log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset );
@@ -241,13 +242,26 @@ public class FileTailingAdaptor implements Adaptor
 	    		
 	    		long curOffset = fileReadOffset;
 	    		
-	    		reader.read(buf);
+	    		int bufferRead = reader.read(buf);
 	    		assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
-	    				+ " pointer is "
-	    				+ reader.getFilePointer()
-	    				+ " but offset is " + fileReadOffset + bufSize;
-	    
+	    		  + " pointer is "
+	    		  + reader.getFilePointer()
+	    		  + " but offset is " + fileReadOffset + bufSize;
+
 	    		int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
+
+	    		// ===   WARNING   ===
+	    		// If we couldn't found a complete record AND
+	    		// we cannot read more, i.e bufferRead == MAX_READ_SIZE 
+	    		// it's because the record is too BIG
+	    		// So log.warn, and drop current buffer so we can keep moving
+	    		// instead of being stopped at that point for ever
+	    		if ( bytesUsed == 0 && bufferRead ==  MAX_READ_SIZE) {
+	    		  log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset=" 
+	    		      + curOffset + ", MAX_READ_SIZE=" + MAX_READ_SIZE + ", for " + toWatch.getPath());
+	    		  bytesUsed = buf.length;
+	    		}
+
 	    		fileReadOffset = fileReadOffset + bytesUsed;
 	    		
 	    		
@@ -255,13 +269,13 @@ public class FileTailingAdaptor implements Adaptor
 	    		
 	    		
 	    	} else {
-	    		// file has rotated and no detection
-	    		reader.close();
-	    		reader=null;
-	    		fileReadOffset = 0L;
-	    		offsetOfFirstByte = 0L;
-	    		hasMoreData = true;
-				log.warn("Adaptor|" + adaptorID +"| file has rotated and no detection - reset counters to 0L");	    	
+	    	  // file has rotated and no detection
+	    	  reader.close();
+	    	  reader=null;
+	    	  fileReadOffset = 0L;
+	    	  offsetOfFirstByte = 0L;
+	    	  hasMoreData = true;
+	    	  log.warn("Adaptor|" + adaptorID +"| file: " + toWatch.getPath() +", has rotated and no detection - reset counters to 0L");	    	
 	    	}
 	    } catch (IOException e) {
 	    	log.warn("failure reading " + toWatch, e);

+ 125 - 115
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java

@@ -17,74 +17,82 @@
  */
 
 package org.apache.hadoop.chukwa.datacollection.agent;
-import java.net.*;
-import java.io.*;
 
-import org.apache.hadoop.chukwa.datacollection.adaptor.*;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.log4j.Logger;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
 import java.util.Map;
 
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.log4j.Logger;
+
 /**
- * Class to handle the agent control protocol.
- * This is a simple line-oriented ASCII protocol, that is designed
- * to be easy to work with both programmatically and via telnet.
- *
- *  The port to bind to can be specified by setting option
- *     chukwaAgent.agent.control.port
+ * Class to handle the agent control protocol. This is a simple line-oriented
+ * ASCII protocol, that is designed to be easy to work with both
+ * programmatically and via telnet.
+ * 
+ * The port to bind to can be specified by setting option
+ * chukwaAgent.agent.control.port.
+ * A port of 0 creates a socket on any free port.
  */
 public class AgentControlSocketListener extends Thread {
 
+  static Logger log = Logger.getLogger(AgentControlSocketListener.class);
 
-  static Logger log= Logger.getLogger(AgentControlSocketListener.class);
-  
-  ChukwaAgent agent;
-  int portno;
-  ServerSocket s= null;
+  protected ChukwaAgent agent;
+  protected int portno;
+  protected ServerSocket s = null;
   volatile boolean closing = false;
-  
-  private class ListenThread extends Thread
-  {
+
+  private class ListenThread extends Thread {
     Socket connection;
-    ListenThread(Socket conn)  {
+
+    ListenThread(Socket conn) {
       connection = conn;
       this.setName("listen thread for " + connection.getRemoteSocketAddress());
     }
-    
-    public void run()  {
+
+    public void run() {
       try {
-      InputStream in = connection.getInputStream();
-      BufferedReader br = new BufferedReader(new InputStreamReader(in));
-      PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
-      //out.println("You are connected to the chukwa agent on "+ InetAddress.getLocalHost().getCanonicalHostName());
-      //out.flush();
-      String cmd = null;
-      while((cmd = br.readLine()) != null)  {
-        processCommand(cmd, out);
-      }
-      if (log.isDebugEnabled())
-  		{ log.debug("control connection closed");}
-      }
-      catch(SocketException e ) {
-        if(e.getMessage().equals("Socket Closed"))
+        InputStream in = connection.getInputStream();
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+        String cmd = null;
+        while ((cmd = br.readLine()) != null) {
+          processCommand(cmd, out);
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("control connection closed");
+        }
+      } catch (SocketException e) {
+        if (e.getMessage().equals("Socket Closed"))
           log.info("control socket closed");
-      } catch(IOException e)  {
+      } catch (IOException e) {
         log.warn("a control connection broke", e);
       }
     }
-    
+
     /**
      * process a protocol command
+     * 
      * @param cmd the command given by the user
-     * @param out  a PrintStream writing to the socket
+     * @param out a PrintStream writing to the socket
      * @throws IOException
      */
-    public void processCommand(String cmd, PrintStream out) throws IOException  {
+    public void processCommand(String cmd, PrintStream out) throws IOException {
       String[] words = cmd.split("\\s+");
-      if (log.isDebugEnabled())
-  		{ log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);}
-      
-      if(words[0].equalsIgnoreCase("help"))  {
+      if (log.isDebugEnabled()) {
+        log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+      }
+
+      if (words[0].equalsIgnoreCase("help")) {
         out.println("you're talking to the Chukwa agent.  Commands available: ");
         out.println("add [adaptorname] [args] [offset] -- start an adaptor");
         out.println("shutdown [adaptornumber]  -- graceful stop");
@@ -95,149 +103,151 @@ public class AgentControlSocketListener extends Thread {
         out.println("reloadCollectors -- reload the list of collectors");
         out.println("help -- print this message");
         out.println("\t Command names are case-blind.");
-      }
-      else if(words[0].equalsIgnoreCase("close"))  {
+      } else if (words[0].equalsIgnoreCase("close")) {
         connection.close();
-      }
-      else if(words[0].equalsIgnoreCase("add"))   {
+      } else if (words[0].equalsIgnoreCase("add")) {
         long newID = agent.processCommand(cmd);
-        if(newID != -1)
-          out.println("OK add completed; new ID is " +newID);
+        if (newID != -1)
+          out.println("OK add completed; new ID is " + newID);
         else
           out.println("failed to start adaptor...check logs for details");
-      }
-      else if(words[0].equalsIgnoreCase("shutdown"))  {
-        if(words.length < 2) {
+      } else if (words[0].equalsIgnoreCase("shutdown")) {
+        if (words.length < 2) {
           out.println("need to specify an adaptor to shut down, by number");
-        }
-        else {
+        } else {
           long num = Long.parseLong(words[1]);
           long offset = agent.stopAdaptor(num, true);
-          if(offset != -1)
-            out.println("OK adaptor "+ num+ " stopping gracefully at " + offset);
+          if (offset != -1)
+            out.println("OK adaptor " + num + " stopping gracefully at "
+                + offset);
           else
             out.println("FAIL: perhaps adaptor " + num + " does not exist");
         }
-      }     
-      else if(words[0].equalsIgnoreCase("stop"))  {
-        if(words.length < 2) {
+      } else if (words[0].equalsIgnoreCase("stop")) {
+        if (words.length < 2) {
           out.println("need to specify an adaptor to shut down, by number");
         } else {
           long num = Long.parseLong(words[1]);
           agent.stopAdaptor(num, false);
-          out.println("OK adaptor "+ num+ " stopped");
+          out.println("OK adaptor " + num + " stopped");
         }
-      }
-      else if(words[0].equalsIgnoreCase("reloadCollectors"))  {
-            agent.getConnector().reloadConfiguration();
-            out.println("OK reloadCollectors done");
-        }else if(words[0].equalsIgnoreCase("list") )  {
+      } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
+        agent.getConnector().reloadConfiguration();
+        out.println("OK reloadCollectors done");
+      } else if (words[0].equalsIgnoreCase("list")) {
         Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
-        
-        if (log.isDebugEnabled())
-    		{ log.debug("number of adaptors: " + adaptorsByNumber.size());}
-        
-        synchronized(adaptorsByNumber)   {
-          for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet())  {
-            try{
+
+        if (log.isDebugEnabled()) {
+          log.debug("number of adaptors: " + adaptorsByNumber.size());
+        }
+
+        synchronized (adaptorsByNumber) {
+          for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet()) {
+            try {
               out.print(a.getKey());
               out.print(") ");
               out.print(" ");
               out.println(formatAdaptorStatus(a.getValue()));
-            }  catch(AdaptorException e)  {
+            } catch (AdaptorException e) {
               log.error(e);
             }
           }
           out.println("");
         }
-      } else if(words[0].equalsIgnoreCase("stopagent")) {
+      } else if (words[0].equalsIgnoreCase("stopagent")) {
         out.println("stopping agent process.");
         connection.close();
         agent.shutdown(true);
-      }
-      else  {
+      } else {
         log.warn("unknown command " + words[0]);
         out.println("unknown command" + words[0]);
         out.println("say 'help' for a list of legal commands");
       }
       out.flush();
     }
-    
+
   }
+
   /**
    * Initializes listener, but does not bind to socket.
+   * 
    * @param a the agent to control
    */
-  public AgentControlSocketListener(ChukwaAgent a)
-  {
-    ChukwaConfiguration conf = new ChukwaConfiguration();
-    this.setDaemon(false); //to keep the local agent alive
-    agent = a;
-    portno = conf.getInt("chukwaAgent.agent.control.port", 9093);
-    log.info("AgentControlSocketListerner port set to " + portno);
+  public AgentControlSocketListener(ChukwaAgent agent) {
+
+    this.setDaemon(false); // to keep the local agent alive
+    this.agent = agent;
+    this.portno = agent.getConfiguration().getInt("chukwaAgent.agent.control.port", 9093);
+    log.info("AgentControlSocketListerner ask for port: " + portno);
     this.setName("control socket listener");
   }
-  
-  public String formatAdaptorStatus(Adaptor a)  throws AdaptorException  {
+
+  public String formatAdaptorStatus(Adaptor a) throws AdaptorException {
     return a.getClass().getCanonicalName() + " " + a.getCurrentStatus();
   }
 
   /**
    * Binds to socket, starts looping listening for commands
    */
-  public void run()  {
+  public void run() {
     try {
-      if(!isBound()) 
+      if (!isBound())
         tryToBind();
-    } catch(IOException e) {
+    } catch (IOException e) {
       return;
     }
-    
-    while(!closing)
-    {
+
+    while (!closing) {
       try {
         Socket connection = s.accept();
-        if (log.isDebugEnabled())
-        	{ log.debug("new connection from " + connection.getInetAddress());}
+        if (log.isDebugEnabled()) {
+          log.debug("new connection from " + connection.getInetAddress());
+        }
         ListenThread l = new ListenThread(connection);
         l.setDaemon(true);
         l.start();
-      } catch(IOException e)  {
-        if(!closing)
-          log.warn("control socket error: ",e );
+      } catch (IOException e) {
+        if (!closing)
+          log.warn("control socket error: ", e);
         else {
-          log.info("shutting down listen thread due to shutdown() call");
+          log.warn("shutting down listen thread due to shutdown() call");
           break;
         }
       }
-    }//end while
+    }// end while
   }
+
   /**
    * Close the control socket, and exit. Triggers graceful thread shutdown.
    */
-  public void shutdown()  {
+  public void shutdown() {
     closing = true;
-    try{
-      if(s != null)
+    try {
+      if (s != null)
         s.close();
       s = null;
-    }
-    catch(IOException e)
-    {}  //ignore exception on close
+    } catch (IOException e) {
+    } // ignore exception on close
   }
 
   public boolean isBound() {
-    return s!= null &&  s.isBound();
+    return s != null && s.isBound();
   }
 
-  public void tryToBind() throws IOException
-  {
-    s= new ServerSocket(portno);
-    if(s.isBound())
-      log.debug("socket bound to " + portno);
+  public void tryToBind() throws IOException {
+    s = new ServerSocket(portno);
+    portno = s.getLocalPort();
+    if (s.isBound())
+      log.info("socket bound to " + s.getLocalPort());
     else
-      log.debug("socket isn't bound");
-     
+      log.info("socket isn't bound");
+  }
+
+  public int getPort() {
+    if (!s.isBound()) {
+      return -1;
+    } else {
+      return portno;
+    }
   }
-  
 }

+ 4 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

@@ -102,6 +102,10 @@ public class ChukwaAgent
 
   private final AgentControlSocketListener controlSock;
 
+  public int getControllerPort() {
+    return controlSock.getPort();
+  }
+  
   /**
    * @param args
    * @throws AdaptorException

+ 103 - 0
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java

@@ -0,0 +1,103 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestFileTailingAdaptorBigRecord extends TestCase {
+
+  ChunkCatcherConnector chunks;
+
+  public void testBigRecord() {
+    File f = null;
+    try {
+      File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+      if (!tempDir.exists()) {
+        tempDir.mkdirs();
+      }
+      String logFile = tempDir.getPath() + "/Chukwa-bigRecord.txt";
+      f = makeTestFile(logFile);
+
+      chunks = new ChunkCatcherConnector();
+      chunks.start();
+
+      // Remove any adaptor left over from previous run
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      cc.set("chukwaAgent.agent.control.port", "0");
+      cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
+      ChukwaAgent agent = new ChukwaAgent(cc);
+      int portno = agent.getControllerPort();
+      while (portno == -1) {
+        Thread.sleep(1000);
+        portno = agent.getControllerPort();
+      }
+
+      // System.out.println("Port number:" + portno);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      cli.removeAll();
+      // sleep for some time to make sure we don't get chunk from existing
+      // streams
+      Thread.sleep(5000);
+      long adaptorId = agent
+          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"
+              + " BigRecord " + logFile + " 0");
+      assertTrue(adaptorId != -1);
+
+      boolean record8Found = false;
+      Chunk c = null;
+      // Keep reading until record8
+      // If the adaptor is stopped then Junit will fail with a timeOut
+      while (!record8Found) {
+        c = chunks.waitForAChunk();
+        String data = new String(c.getData());
+        if (c.getDataType().equals("BigRecord")
+            && data.indexOf("8 abcdefghijklmnopqrstuvwxyz") >= 0) {
+          record8Found = true;
+        }
+      }
+      agent.getAdaptorList().get(adaptorId).shutdown();
+      agent.shutdown();
+    } catch (Exception e) {
+      Assert.fail("Exception in testBigRecord" + e.getMessage());
+    } finally {
+      if (f != null) {
+        f.delete();
+      }
+    }
+  }
+
+  private File makeTestFile(String name) throws IOException {
+    File tmpOutput = new File(name);
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+    PrintWriter pw = new PrintWriter(fos);
+    for (int i = 0; i < 5; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.print("6 ");
+    for (int i = 0; i < 10; ++i) {
+      pw.print("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz");
+    }
+    pw.print("\n");
+    pw.print("7 ");
+    pw.println("abcdefghijklmnopqrstuvwxyz");
+    pw.print("8 ");
+    pw.println("abcdefghijklmnopqrstuvwxyz");
+
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+
+}