Selaa lähdekoodia

AMBARI-11211. Make Kafka Metrics Reporter agnostic to the metrics names size (Emil Anca via rlevas)

Emil Anca 10 vuotta sitten
vanhempi
commit
955a668508

+ 1 - 1
ambari-metrics/ambari-metrics-kafka-sink/pom.xml

@@ -43,7 +43,7 @@ limitations under the License.
               <goal>copy-dependencies</goal>
             </goals>
             <configuration>
-              <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc</includeArtifactIds>
+              <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,jackson-core-asl,jackson-mapper-asl,jackson-xc</includeArtifactIds>
               <outputDirectory>${project.build.directory}/lib</outputDirectory>
             </configuration>
           </execution>

+ 6 - 27
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -34,6 +34,7 @@ import kafka.metrics.KafkaMetricsConfig;
 import kafka.metrics.KafkaMetricsReporter;
 import kafka.utils.VerifiableProperties;
 import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
@@ -260,17 +261,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
       String[] metricHNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, histogram);
       String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
 
-      String[] metricNames = new String[] {
-          metricHNames[0],
-          metricHNames[1],
-          metricSNames[0],
-          metricHNames[2],
-          metricSNames[1],
-          metricSNames[2],
-          metricSNames[3],
-          metricSNames[4],
-          metricSNames[5],
-          metricHNames[3] };
+      String[] metricNames = (String[]) ArrayUtils.addAll(metricHNames, metricSNames);
+
       populateMetricsList(context, metricNames);
     }
 
@@ -284,22 +276,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
       String[] metricTNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, timer);
       String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
 
-      String[] metricNames = new String[] {
-          metricMNames[0],
-          metricMNames[1],
-          metricMNames[2],
-          metricMNames[3],
-          metricMNames[4],
-          metricTNames[0],
-          metricTNames[1],
-          metricSNames[0],
-          metricTNames[2],
-          metricSNames[1],
-          metricSNames[2],
-          metricSNames[3],
-          metricSNames[4],
-          metricSNames[5],
-          metricTNames[3] };
+      String[] metricNames = (String[]) ArrayUtils.addAll(metricMNames, metricTNames);
+      metricNames = (String[]) ArrayUtils.addAll(metricNames, metricSNames);
+
       populateMetricsList(context, metricNames);
     }