Pārlūkot izejas kodu

HADOOP-19270 Use stable sort in commandQueue (#7038)

geatrigger 2 mēneši atpakaļ
vecāks
revīzija
103b054654

+ 3 - 2
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java

@@ -32,7 +32,7 @@ public interface AuditCommandParser {
 
   /**
    * Initialize this parser with the given configuration. Guaranteed to be
-   * called prior to any calls to {@link #parse(Text, Function)}.
+   * called prior to any calls to {@link #parse(Long, Text, Function)}.
    *
    * @param conf The Configuration to be used to set up this parser.
    * @throws IOException if error on initializing a parser.
@@ -46,6 +46,7 @@ public interface AuditCommandParser {
    * between the start of the audit log and this command) into absolute
    * timestamps.
    *
+   * @param sequence Sequence order of input line.
    * @param inputLine Single input line to convert.
    * @param relativeToAbsolute Function converting relative timestamps
    *                           (in milliseconds) to absolute timestamps
@@ -53,7 +54,7 @@ public interface AuditCommandParser {
    * @return A command representing the input line.
    * @throws IOException if error on parsing.
    */
-  AuditReplayCommand parse(Text inputLine,
+  AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException;
 
 }

+ 3 - 2
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java

@@ -112,7 +112,7 @@ public class AuditLogDirectParser implements AuditCommandParser {
   }
 
   @Override
-  public AuditReplayCommand parse(Text inputLine,
+  public AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException {
     Matcher m = logLineParseRegex.matcher(inputLine.toString());
     if (!m.find()) {
@@ -147,7 +147,8 @@ public class AuditLogDirectParser implements AuditCommandParser {
       }
     }
 
-    return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp),
+    return new AuditReplayCommand(sequence,
+        relativeToAbsolute.apply(relativeTimestamp),
         // Split the UGI on space to remove the auth and proxy portions of it
         SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(),
         parameterMap.get("cmd").replace("(options:", "(options="),

+ 2 - 2
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java

@@ -60,12 +60,12 @@ public class AuditLogHiveTableParser implements AuditCommandParser {
   }
 
   @Override
-  public AuditReplayCommand parse(Text inputLine,
+  public AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException {
     String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
     long absoluteTimestamp = relativeToAbsolute
         .apply(Long.parseLong(fields[0]));
-    return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2],
+    return new AuditReplayCommand(sequence, absoluteTimestamp, fields[1], fields[2],
         fields[3], fields[4], fields[5]);
   }
 

+ 20 - 10
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java

@@ -43,6 +43,7 @@ class AuditReplayCommand implements Delayed {
   private static final Pattern SIMPLE_UGI_PATTERN = Pattern
       .compile("([^/@ ]*).*?");
 
+  private Long sequence;
   private long absoluteTimestamp;
   private String ugi;
   private String command;
@@ -50,8 +51,9 @@ class AuditReplayCommand implements Delayed {
   private String dest;
   private String sourceIP;
 
-  AuditReplayCommand(long absoluteTimestamp, String ugi, String command,
+  AuditReplayCommand(Long sequence, long absoluteTimestamp, String ugi, String command,
       String src, String dest, String sourceIP) {
+    this.sequence = sequence;
     this.absoluteTimestamp = absoluteTimestamp;
     this.ugi = ugi;
     this.command = command;
@@ -60,6 +62,9 @@ class AuditReplayCommand implements Delayed {
     this.sourceIP = sourceIP;
   }
 
+  Long getSequence() {
+    return sequence;
+  }
   long getAbsoluteTimestamp() {
     return absoluteTimestamp;
   }
@@ -103,8 +108,12 @@ class AuditReplayCommand implements Delayed {
 
   @Override
   public int compareTo(Delayed o) {
-    return Long.compare(absoluteTimestamp,
-        ((AuditReplayCommand) o).absoluteTimestamp);
+    int result = Long.compare(absoluteTimestamp,
+            ((AuditReplayCommand) o).absoluteTimestamp);
+    if (result != 0) {
+      return result;
+    }
+    return Long.compare(sequence, ((AuditReplayCommand) o).sequence);
   }
 
   /**
@@ -122,9 +131,10 @@ class AuditReplayCommand implements Delayed {
    * information besides a timestamp; other getter methods wil return null.
    */
   private static final class PoisonPillCommand extends AuditReplayCommand {
+    private static final Long DEFAULT_SEQUENCE = -1L;
 
     private PoisonPillCommand(long absoluteTimestamp) {
-      super(absoluteTimestamp, null, null, null, null, null);
+      super(DEFAULT_SEQUENCE, absoluteTimestamp, null, null, null, null, null);
     }
 
     @Override
@@ -144,9 +154,9 @@ class AuditReplayCommand implements Delayed {
       return false;
     }
     AuditReplayCommand o = (AuditReplayCommand) other;
-    return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi)
-        && command.equals(o.command) && src.equals(o.src) && dest.equals(o.dest)
-        && sourceIP.equals(o.sourceIP);
+    return sequence.equals(o.sequence) && absoluteTimestamp == o.absoluteTimestamp
+        && ugi.equals(o.ugi) && command.equals(o.command) && src.equals(o.src)
+        && dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
   }
 
   @Override
@@ -156,8 +166,8 @@ class AuditReplayCommand implements Delayed {
 
   @Override
   public String toString() {
-    return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, "
-            + "command=%s, src=%s, dest=%s, sourceIP=%s",
-        absoluteTimestamp, ugi, command, src, dest, sourceIP);
+    return String.format("AuditReplayCommand(sequence=%d, absoluteTimestamp=%d, "
+        + "ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
+        sequence, absoluteTimestamp, ugi, command, src, dest, sourceIP);
   }
 }

+ 3 - 1
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java

@@ -169,6 +169,7 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text,
   private int numThreads;
   private double rateFactor;
   private long highestTimestamp;
+  private Long highestSequence;
   private List<AuditReplayThread> threads;
   private DelayQueue<AuditReplayCommand> commandQueue;
   private Function<Long, Long> relativeToAbsoluteTimestamp;
@@ -246,7 +247,7 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text,
   @Override
   public void map(LongWritable lineNum, Text inputLine, Mapper.Context context)
       throws IOException, InterruptedException {
-    AuditReplayCommand cmd = commandParser.parse(inputLine,
+    AuditReplayCommand cmd = commandParser.parse(lineNum.get(), inputLine,
         relativeToAbsoluteTimestamp);
     long delay = cmd.getDelay(TimeUnit.MILLISECONDS);
     // Prevent from loading too many elements into memory all at once
@@ -255,6 +256,7 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text,
     }
     commandQueue.put(cmd);
     highestTimestamp = cmd.getAbsoluteTimestamp();
+    highestSequence = cmd.getSequence();
   }
 
   @Override

+ 20 - 19
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java

@@ -30,6 +30,7 @@ public class TestAuditLogDirectParser {
 
   private static final long START_TIMESTAMP = 10000;
   private AuditLogDirectParser parser;
+  private Long sequence = 1L;
 
   @Before
   public void setup() throws Exception {
@@ -53,55 +54,55 @@ public class TestAuditLogDirectParser {
   public void testSimpleInput() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
         "listStatus", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
         "listStatus", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithEquals() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
             "listStatus", "day=1970", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
             "listStatus", "day=1970", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithRenameOptions() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
         "rename (options=[TO_TRASH])", "sourcePath", "destPath");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
         "rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithTokenAuth() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser (auth:TOKEN)",
         "create", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
         "create", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithProxyUser() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000",
         "proxyUser (auth:TOKEN) via fakeUser", "create", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "proxyUser",
-        "create", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000,
+            "proxyUser", "create", "sourcePath", "null", "0.0.0.0");
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testParseDefaultDateFormat() throws Exception {
     Text in = getAuditString("1970-01-01 13:00:00,000",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(
+    AuditReplayCommand expected = new AuditReplayCommand(sequence,
         13 * 60 * 60 * 1000 - START_TIMESTAMP,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -114,9 +115,9 @@ public class TestAuditLogDirectParser {
     parser.initialize(conf);
     Text in = getAuditString("1970-01-01 01:00:00,000 PM",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(13 * 60 * 60 * 1000,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 13 * 60 * 60 * 1000,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -128,9 +129,9 @@ public class TestAuditLogDirectParser {
     parser.initialize(conf);
     Text in = getAuditString("1970-01-01 01:00:00,000",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(0,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -144,9 +145,9 @@ public class TestAuditLogDirectParser {
     conf.set(AuditLogDirectParser.AUDIT_LOG_PARSE_REGEX_KEY,
         "CUSTOM FORMAT \\((?<timestamp>.+?)\\) (?<message>.+)");
     parser.initialize(conf);
-    AuditReplayCommand expected = new AuditReplayCommand(0,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
         "fakeUser", "fakeCommand", "src", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(auditLine, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, auditLine, Function.identity()));
   }
 
 }