|
@@ -0,0 +1,958 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.mapred.gridmix;
|
|
|
+
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.CustomOutputCommitter;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.PositionedReadable;
|
|
|
+import org.apache.hadoop.fs.Seekable;
|
|
|
+import org.apache.hadoop.io.DataInputBuffer;
|
|
|
+import org.apache.hadoop.io.LongWritable;
|
|
|
+import org.apache.hadoop.io.NullWritable;
|
|
|
+import org.apache.hadoop.io.RawComparator;
|
|
|
+import org.apache.hadoop.io.WritableUtils;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.JobContext;
|
|
|
+import org.apache.hadoop.mapred.RawKeyValueIterator;
|
|
|
+import org.apache.hadoop.mapred.gridmix.GridmixKey.Spec;
|
|
|
+import org.apache.hadoop.mapred.gridmix.SleepJob.SleepReducer;
|
|
|
+import org.apache.hadoop.mapred.gridmix.SleepJob.SleepSplit;
|
|
|
+import org.apache.hadoop.mapreduce.Counter;
|
|
|
+import org.apache.hadoop.mapreduce.InputSplit;
|
|
|
+import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.MapContext;
|
|
|
+import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
+import org.apache.hadoop.mapreduce.RecordReader;
|
|
|
+import org.apache.hadoop.mapreduce.RecordWriter;
|
|
|
+import org.apache.hadoop.mapreduce.ReduceContext;
|
|
|
+import org.apache.hadoop.mapreduce.StatusReporter;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
+import org.apache.hadoop.mapreduce.Mapper.Context;
|
|
|
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
|
|
|
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
|
|
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
|
|
|
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
|
|
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
|
|
|
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.tools.rumen.JobStory;
|
|
|
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
|
|
|
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
|
|
|
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
|
|
|
+import org.apache.hadoop.util.Progress;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
+
|
|
|
+import static org.junit.Assert.*;
|
|
|
+
|
|
|
+public class TestGridMixClasses {
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestGridMixClasses.class);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * simple test LoadSplit (getters,copy, write, read...)
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testLoadSplit() throws Exception {
|
|
|
+
|
|
|
+ LoadSplit test = getLoadSplit();
|
|
|
+
|
|
|
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream out = new DataOutputStream(data);
|
|
|
+ test.write(out);
|
|
|
+ LoadSplit copy = new LoadSplit();
|
|
|
+ copy.readFields(new DataInputStream(new ByteArrayInputStream(data
|
|
|
+ .toByteArray())));
|
|
|
+
|
|
|
+ // data should be the same
|
|
|
+ assertEquals(test.getId(), copy.getId());
|
|
|
+ assertEquals(test.getMapCount(), copy.getMapCount());
|
|
|
+ assertEquals(test.getInputRecords(), copy.getInputRecords());
|
|
|
+
|
|
|
+ assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
|
|
|
+ assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
|
|
|
+ assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
|
|
|
+ assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
|
|
|
+ assertEquals(test.getMapResourceUsageMetrics().getCumulativeCpuUsage(),
|
|
|
+ copy.getMapResourceUsageMetrics().getCumulativeCpuUsage());
|
|
|
+ assertEquals(test.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage(),
|
|
|
+ copy.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * simple test GridmixSplit (copy, getters, write, read..)
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testGridmixSplit() throws Exception {
|
|
|
+ Path[] files = {new Path("one"), new Path("two")};
|
|
|
+ long[] start = {1, 2};
|
|
|
+ long[] lengths = {100, 200};
|
|
|
+ String[] locations = {"locOne", "loctwo"};
|
|
|
+
|
|
|
+ CombineFileSplit cfsplit = new CombineFileSplit(files, start, lengths,
|
|
|
+ locations);
|
|
|
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
|
|
|
+ metrics.setCumulativeCpuUsage(200);
|
|
|
+
|
|
|
+ double[] reduceBytes = {8.1d, 8.2d};
|
|
|
+ double[] reduceRecords = {9.1d, 9.2d};
|
|
|
+ long[] reduceOutputBytes = {101L, 102L};
|
|
|
+ long[] reduceOutputRecords = {111L, 112L};
|
|
|
+
|
|
|
+ GridmixSplit test = new GridmixSplit(cfsplit, 2, 3, 4L, 5L, 6L, 7L,
|
|
|
+ reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords);
|
|
|
+
|
|
|
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream out = new DataOutputStream(data);
|
|
|
+ test.write(out);
|
|
|
+ GridmixSplit copy = new GridmixSplit();
|
|
|
+ copy.readFields(new DataInputStream(new ByteArrayInputStream(data
|
|
|
+ .toByteArray())));
|
|
|
+
|
|
|
+ // data should be the same
|
|
|
+ assertEquals(test.getId(), copy.getId());
|
|
|
+ assertEquals(test.getMapCount(), copy.getMapCount());
|
|
|
+ assertEquals(test.getInputRecords(), copy.getInputRecords());
|
|
|
+
|
|
|
+ assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
|
|
|
+ assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
|
|
|
+ assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
|
|
|
+ assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test LoadMapper loadMapper should write to writer record for each reduce
|
|
|
+ */
|
|
|
+ @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testLoadMapper() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JobContext.NUM_REDUCES, 2);
|
|
|
+
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
|
|
|
+
|
|
|
+ TaskAttemptID taskid = new TaskAttemptID();
|
|
|
+ RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
|
|
|
+
|
|
|
+ LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
|
|
|
+
|
|
|
+ OutputCommitter committer = new CustomOutputCommitter();
|
|
|
+ StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
|
|
|
+ LoadSplit split = getLoadSplit();
|
|
|
+
|
|
|
+ MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapcontext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
|
|
|
+ conf, taskid, reader, writer, committer, reporter, split);
|
|
|
+ // context
|
|
|
+ Context ctxt = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
|
|
|
+ .getMapContext(mapcontext);
|
|
|
+
|
|
|
+ reader.initialize(split, ctxt);
|
|
|
+ ctxt.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(
|
|
|
+ ctxt.getConfiguration(), true);
|
|
|
+
|
|
|
+ LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
|
|
|
+ // setup, map, clean
|
|
|
+ mapper.run(ctxt);
|
|
|
+
|
|
|
+ Map<GridmixKey, GridmixRecord> data = writer.getData();
|
|
|
+ // check result
|
|
|
+ assertEquals(2, data.size());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private LoadSplit getLoadSplit() throws Exception {
|
|
|
+
|
|
|
+ Path[] files = {new Path("one"), new Path("two")};
|
|
|
+ long[] start = {1, 2};
|
|
|
+ long[] lengths = {100, 200};
|
|
|
+ String[] locations = {"locOne", "loctwo"};
|
|
|
+
|
|
|
+ CombineFileSplit cfsplit = new CombineFileSplit(files, start, lengths,
|
|
|
+ locations);
|
|
|
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
|
|
|
+ metrics.setCumulativeCpuUsage(200);
|
|
|
+ ResourceUsageMetrics[] rMetrics = {metrics};
|
|
|
+
|
|
|
+ double[] reduceBytes = {8.1d, 8.2d};
|
|
|
+ double[] reduceRecords = {9.1d, 9.2d};
|
|
|
+ long[] reduceOutputBytes = {101L, 102L};
|
|
|
+ long[] reduceOutputRecords = {111L, 112L};
|
|
|
+
|
|
|
+ return new LoadSplit(cfsplit, 2, 1, 4L, 5L, 6L, 7L,
|
|
|
+ reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
|
|
|
+ metrics, rMetrics);
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeRecordLLReader extends
|
|
|
+ RecordReader<LongWritable, LongWritable> {
|
|
|
+
|
|
|
+ int counter = 10;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void initialize(InputSplit split, TaskAttemptContext context)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
|
+ counter--;
|
|
|
+ return counter > 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public LongWritable getCurrentKey() throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+
|
|
|
+ return new LongWritable(counter);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public LongWritable getCurrentValue() throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ return new LongWritable(counter * 10);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() throws IOException, InterruptedException {
|
|
|
+ return counter / 10.0f;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ // restore data
|
|
|
+ counter = 10;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeRecordReader extends
|
|
|
+ RecordReader<NullWritable, GridmixRecord> {
|
|
|
+
|
|
|
+ int counter = 10;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void initialize(InputSplit split, TaskAttemptContext context)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
|
+ counter--;
|
|
|
+ return counter > 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public NullWritable getCurrentKey() throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+
|
|
|
+ return NullWritable.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GridmixRecord getCurrentValue() throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ return new GridmixRecord(100, 100L);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() throws IOException, InterruptedException {
|
|
|
+ return counter / 10.0f;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ // restore data
|
|
|
+ counter = 10;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class LoadRecordGkGrWriter extends
|
|
|
+ RecordWriter<GridmixKey, GridmixRecord> {
|
|
|
+ private Map<GridmixKey, GridmixRecord> data = new HashMap<GridmixKey, GridmixRecord>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(GridmixKey key, GridmixRecord value) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ data.put(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(TaskAttemptContext context) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<GridmixKey, GridmixRecord> getData() {
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private class LoadRecordGkNullWriter extends
|
|
|
+ RecordWriter<GridmixKey, NullWritable> {
|
|
|
+ private Map<GridmixKey, NullWritable> data = new HashMap<GridmixKey, NullWritable>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(GridmixKey key, NullWritable value) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ data.put(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(TaskAttemptContext context) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<GridmixKey, NullWritable> getData() {
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private class LoadRecordWriter extends
|
|
|
+ RecordWriter<NullWritable, GridmixRecord> {
|
|
|
+ private Map<NullWritable, GridmixRecord> data = new HashMap<NullWritable, GridmixRecord>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(NullWritable key, GridmixRecord value)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ data.put(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(TaskAttemptContext context) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<NullWritable, GridmixRecord> getData() {
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test SpecGroupingComparator
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testGridmixJobSpecGroupingComparator() throws Exception {
|
|
|
+ GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator();
|
|
|
+
|
|
|
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream dos = new DataOutputStream(data);
|
|
|
+ WritableUtils.writeVInt(dos, 2);
|
|
|
+ WritableUtils.writeVInt(dos, 1);
|
|
|
+ // 0: REDUCE SPEC
|
|
|
+ WritableUtils.writeVInt(dos, 0);
|
|
|
+ WritableUtils.writeVInt(dos, 7);
|
|
|
+ WritableUtils.writeVInt(dos, 4);
|
|
|
+
|
|
|
+ byte[] b1 = data.toByteArray();
|
|
|
+
|
|
|
+ byte[] b2 = data.toByteArray();
|
|
|
+
|
|
|
+ // the same object should be equals
|
|
|
+ assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
|
|
|
+ b2[2] = 1;
|
|
|
+ // for Reduce
|
|
|
+ assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
|
|
|
+ // by Reduce spec
|
|
|
+ b2[2] = 1; // 1: DATA SPEC
|
|
|
+ assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
|
|
|
+ // compare GridmixKey the same objects should be equals
|
|
|
+ assertEquals(0, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
|
|
|
+ new GridmixKey(GridmixKey.DATA, 100, 2)));
|
|
|
+ // REDUSE SPEC
|
|
|
+ assertEquals(-1, test.compare(
|
|
|
+ new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2), new GridmixKey(
|
|
|
+ GridmixKey.DATA, 100, 2)));
|
|
|
+ assertEquals(1, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
|
|
|
+ new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2)));
|
|
|
+ // only DATA
|
|
|
+ assertEquals(2, test.compare(new GridmixKey(GridmixKey.DATA, 102, 2),
|
|
|
+ new GridmixKey(GridmixKey.DATA, 100, 2)));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test CompareGridmixJob only equals and compare
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testCompareGridmixJob() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ Path outRoot = new Path("target");
|
|
|
+ JobStory jobdesc = mock(JobStory.class);
|
|
|
+ when(jobdesc.getName()).thenReturn("JobName");
|
|
|
+ when(jobdesc.getJobConf()).thenReturn(new JobConf(conf));
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ GridmixJob j1 = new LoadJob(conf, 1000L, jobdesc, outRoot, ugi, 0);
|
|
|
+ GridmixJob j2 = new LoadJob(conf, 1000L, jobdesc, outRoot, ugi, 0);
|
|
|
+ GridmixJob j3 = new LoadJob(conf, 1000L, jobdesc, outRoot, ugi, 1);
|
|
|
+ GridmixJob j4 = new LoadJob(conf, 1000L, jobdesc, outRoot, ugi, 1);
|
|
|
+
|
|
|
+ assertTrue(j1.equals(j2));
|
|
|
+ assertEquals(0, j1.compareTo(j2));
|
|
|
+ // Only one parameter matters
|
|
|
+ assertFalse(j1.equals(j3));
|
|
|
+ // compare id and submissionMillis
|
|
|
+ assertEquals(-1, j1.compareTo(j3));
|
|
|
+ assertEquals(-1, j1.compareTo(j4));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test ReadRecordFactory. should read all data from inputstream
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testReadRecordFactory() throws Exception {
|
|
|
+
|
|
|
+ // RecordFactory factory, InputStream src, Configuration conf
|
|
|
+ RecordFactory rf = new FakeRecordFactory();
|
|
|
+ FakeInputStream input = new FakeInputStream();
|
|
|
+ ReadRecordFactory test = new ReadRecordFactory(rf, input,
|
|
|
+ new Configuration());
|
|
|
+ GridmixKey key = new GridmixKey(GridmixKey.DATA, 100, 2);
|
|
|
+ GridmixRecord val = new GridmixRecord(200, 2);
|
|
|
+ while (test.next(key, val)) {
|
|
|
+
|
|
|
+ }
|
|
|
+ // should be read 10* (GridmixKey.size +GridmixRecord.value)
|
|
|
+ assertEquals(3000, input.getCounter());
|
|
|
+ // should be -1 because all data readed;
|
|
|
+ assertEquals(-1, rf.getProgress(), 0.01);
|
|
|
+
|
|
|
+ test.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeRecordFactory extends RecordFactory {
|
|
|
+
|
|
|
+ private int counter = 10;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
|
|
|
+ counter--;
|
|
|
+ return counter >= 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() throws IOException {
|
|
|
+ return counter;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeInputStream extends InputStream implements Seekable,
|
|
|
+ PositionedReadable {
|
|
|
+ private long counter;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read() throws IOException {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read(byte[] b, int off, int len) throws IOException {
|
|
|
+ int realLen = len - off;
|
|
|
+ counter += realLen;
|
|
|
+ for (int i = 0; i < b.length; i++) {
|
|
|
+ b[i] = 0;
|
|
|
+ }
|
|
|
+ return realLen;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getCounter() {
|
|
|
+ return counter;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void seek(long pos) throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long getPos() throws IOException {
|
|
|
+ return counter;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read(long position, byte[] buffer, int offset, int length)
|
|
|
+ throws IOException {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFully(long position, byte[] buffer, int offset, int length)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFully(long position, byte[] buffer) throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeFSDataInputStream extends FSDataInputStream {
|
|
|
+
|
|
|
+ public FakeFSDataInputStream(InputStream in) throws IOException {
|
|
|
+ super(in);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test LoadRecordReader. It class reads data from some files.
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testLoadJobLoadRecordReader() throws Exception {
|
|
|
+ LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader();
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ FileSystem fs1 = mock(FileSystem.class);
|
|
|
+ when(fs1.open((Path) anyObject())).thenReturn(
|
|
|
+ new FakeFSDataInputStream(new FakeInputStream()));
|
|
|
+ Path p1 = mock(Path.class);
|
|
|
+ when(p1.getFileSystem((JobConf) anyObject())).thenReturn(fs1);
|
|
|
+
|
|
|
+ FileSystem fs2 = mock(FileSystem.class);
|
|
|
+ when(fs2.open((Path) anyObject())).thenReturn(
|
|
|
+ new FakeFSDataInputStream(new FakeInputStream()));
|
|
|
+ Path p2 = mock(Path.class);
|
|
|
+ when(p2.getFileSystem((JobConf) anyObject())).thenReturn(fs2);
|
|
|
+
|
|
|
+ Path[] paths = {p1, p2};
|
|
|
+
|
|
|
+ long[] start = {0, 0};
|
|
|
+ long[] lengths = {1000, 1000};
|
|
|
+ String[] locations = {"temp1", "temp2"};
|
|
|
+ CombineFileSplit cfsplit = new CombineFileSplit(paths, start, lengths,
|
|
|
+ locations);
|
|
|
+ double[] reduceBytes = {100, 100};
|
|
|
+ double[] reduceRecords = {2, 2};
|
|
|
+ long[] reduceOutputBytes = {500, 500};
|
|
|
+ long[] reduceOutputRecords = {2, 2};
|
|
|
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
|
|
|
+ ResourceUsageMetrics[] rMetrics = {new ResourceUsageMetrics(),
|
|
|
+ new ResourceUsageMetrics()};
|
|
|
+ LoadSplit input = new LoadSplit(cfsplit, 2, 3, 1500L, 2L, 3000L, 2L,
|
|
|
+ reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
|
|
|
+ metrics, rMetrics);
|
|
|
+ TaskAttemptID taskId = new TaskAttemptID();
|
|
|
+ TaskAttemptContext ctxt = new TaskAttemptContextImpl(conf, taskId);
|
|
|
+ test.initialize(input, ctxt);
|
|
|
+ GridmixRecord gr = test.getCurrentValue();
|
|
|
+ int counter = 0;
|
|
|
+ while (test.nextKeyValue()) {
|
|
|
+ gr = test.getCurrentValue();
|
|
|
+ if (counter == 0) {
|
|
|
+ // read first file
|
|
|
+ assertEquals(0.5, test.getProgress(), 0.001);
|
|
|
+ } else if (counter == 1) {
|
|
|
+ // read second file
|
|
|
+ assertEquals(1.0, test.getProgress(), 0.001);
|
|
|
+ }
|
|
|
+ //
|
|
|
+ assertEquals(1000, gr.getSize());
|
|
|
+ counter++;
|
|
|
+ }
|
|
|
+ assertEquals(1000, gr.getSize());
|
|
|
+ // Two files have been read
|
|
|
+ assertEquals(2, counter);
|
|
|
+
|
|
|
+ test.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test LoadReducer
|
|
|
+ */
|
|
|
+
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testLoadJobLoadReducer() throws Exception {
|
|
|
+ LoadJob.LoadReducer test = new LoadJob.LoadReducer();
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JobContext.NUM_REDUCES, 2);
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
|
|
|
+
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
|
|
|
+ TaskAttemptID taskid = new TaskAttemptID();
|
|
|
+
|
|
|
+ RawKeyValueIterator input = new FakeRawKeyValueIterator();
|
|
|
+
|
|
|
+ Counter counter = new GenericCounter();
|
|
|
+ Counter inputValueCounter = new GenericCounter();
|
|
|
+ LoadRecordWriter output = new LoadRecordWriter();
|
|
|
+
|
|
|
+ OutputCommitter committer = new CustomOutputCommitter();
|
|
|
+
|
|
|
+ StatusReporter reporter = new DummyReporter();
|
|
|
+ RawComparator<GridmixKey> comparator = new FakeRawComparator();
|
|
|
+
|
|
|
+ ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reducecontext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
|
|
|
+ conf, taskid, input, counter, inputValueCounter, output, committer,
|
|
|
+ reporter, comparator, GridmixKey.class, GridmixRecord.class);
|
|
|
+ // read for previous data
|
|
|
+ reducecontext.nextKeyValue();
|
|
|
+ org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
|
|
|
+ .getReducerContext(reducecontext);
|
|
|
+
|
|
|
+ // test.setup(context);
|
|
|
+ test.run(context);
|
|
|
+ // have been readed 9 records (-1 for previous)
|
|
|
+ assertEquals(9, counter.getValue());
|
|
|
+ assertEquals(10, inputValueCounter.getValue());
|
|
|
+ assertEquals(1, output.getData().size());
|
|
|
+ GridmixRecord record = output.getData().values().iterator()
|
|
|
+ .next();
|
|
|
+
|
|
|
+ assertEquals(1593, record.getSize());
|
|
|
+ }
|
|
|
+
|
|
|
+ protected class FakeRawKeyValueIterator implements RawKeyValueIterator {
|
|
|
+
|
|
|
+ int counter = 10;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputBuffer getKey() throws IOException {
|
|
|
+ ByteArrayOutputStream dt = new ByteArrayOutputStream();
|
|
|
+ GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
|
|
|
+ Spec spec = new Spec();
|
|
|
+ spec.rec_in = counter;
|
|
|
+ spec.rec_out = counter;
|
|
|
+ spec.bytes_out = counter * 100;
|
|
|
+
|
|
|
+ key.setSpec(spec);
|
|
|
+ key.write(new DataOutputStream(dt));
|
|
|
+ DataInputBuffer result = new DataInputBuffer();
|
|
|
+ byte[] b = dt.toByteArray();
|
|
|
+ result.reset(b, 0, b.length);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputBuffer getValue() throws IOException {
|
|
|
+ ByteArrayOutputStream dt = new ByteArrayOutputStream();
|
|
|
+ GridmixRecord key = new GridmixRecord(100, 1);
|
|
|
+ key.write(new DataOutputStream(dt));
|
|
|
+ DataInputBuffer result = new DataInputBuffer();
|
|
|
+ byte[] b = dt.toByteArray();
|
|
|
+ result.reset(b, 0, b.length);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean next() throws IOException {
|
|
|
+ counter--;
|
|
|
+ return counter >= 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Progress getProgress() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeRawComparator implements RawComparator<GridmixKey> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compare(GridmixKey o1, GridmixKey o2) {
|
|
|
+ return o1.compareTo(o2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
|
|
+ if ((l1 - s1) != (l2 - s2)) {
|
|
|
+ return (l1 - s1) - (l2 - s2);
|
|
|
+ }
|
|
|
+ int len = l1 - s1;
|
|
|
+ for (int i = 0; i < len; i++) {
|
|
|
+ if (b1[s1 + i] != b2[s2 + i]) {
|
|
|
+ return b1[s1 + i] - b2[s2 + i];
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test SerialJobFactory
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testSerialReaderThread() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ File fin = new File("src" + File.separator + "test" + File.separator
|
|
|
+ + "resources" + File.separator + "data" + File.separator
|
|
|
+ + "wordcount2.json");
|
|
|
+ // read couple jobs from wordcount2.json
|
|
|
+ JobStoryProducer jobProducer = new ZombieJobProducer(new Path(
|
|
|
+ fin.getAbsolutePath()), null, conf);
|
|
|
+ CountDownLatch startFlag = new CountDownLatch(1);
|
|
|
+ UserResolver resolver = new SubmitterUserResolver();
|
|
|
+ FakeJobSubmitter submitter = new FakeJobSubmitter();
|
|
|
+ File ws = new File("target" + File.separator + this.getClass().getName());
|
|
|
+ if (!ws.exists()) {
|
|
|
+ Assert.assertTrue(ws.mkdirs());
|
|
|
+ }
|
|
|
+
|
|
|
+ SerialJobFactory jobFactory = new SerialJobFactory(submitter, jobProducer,
|
|
|
+ new Path(ws.getAbsolutePath()), conf, startFlag, resolver);
|
|
|
+
|
|
|
+ Path ioPath = new Path(ws.getAbsolutePath());
|
|
|
+ jobFactory.setDistCacheEmulator(new DistributedCacheEmulator(conf, ioPath));
|
|
|
+ Thread test = jobFactory.createReaderThread();
|
|
|
+ test.start();
|
|
|
+ Thread.sleep(1000);
|
|
|
+ // SerialReaderThread waits startFlag
|
|
|
+ assertEquals(0, submitter.getJobs().size());
|
|
|
+ // start!
|
|
|
+ startFlag.countDown();
|
|
|
+ while (test.isAlive()) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ jobFactory.update(null);
|
|
|
+ }
|
|
|
+ // submitter was called twice
|
|
|
+ assertEquals(2, submitter.getJobs().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FakeJobSubmitter extends JobSubmitter {
|
|
|
+ // counter for submitted jobs
|
|
|
+ private List<GridmixJob> jobs = new ArrayList<GridmixJob>();
|
|
|
+
|
|
|
+ public FakeJobSubmitter() {
|
|
|
+ super(null, 1, 1, null, null);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void add(GridmixJob job) throws InterruptedException {
|
|
|
+ jobs.add(job);
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<GridmixJob> getJobs() {
|
|
|
+ return jobs;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test SleepMapper
|
|
|
+ */
|
|
|
+ @SuppressWarnings({"unchecked", "rawtypes"})
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testSleepMapper() throws Exception {
|
|
|
+ SleepJob.SleepMapper test = new SleepJob.SleepMapper();
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JobContext.NUM_REDUCES, 2);
|
|
|
+
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
|
|
|
+ TaskAttemptID taskid = new TaskAttemptID();
|
|
|
+ FakeRecordLLReader reader = new FakeRecordLLReader();
|
|
|
+ LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
|
|
|
+ OutputCommitter committer = new CustomOutputCommitter();
|
|
|
+ StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
|
|
|
+ SleepSplit split = getSleepSplit();
|
|
|
+ MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
|
|
|
+ conf, taskid, reader, writer, committer, reporter, split);
|
|
|
+ Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
|
|
|
+ .getMapContext(mapcontext);
|
|
|
+
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ LOG.info("start:" + start);
|
|
|
+ LongWritable key = new LongWritable(start + 2000);
|
|
|
+ LongWritable value = new LongWritable(start + 2000);
|
|
|
+ // should slip 2 sec
|
|
|
+ test.map(key, value, context);
|
|
|
+ LOG.info("finish:" + System.currentTimeMillis());
|
|
|
+ assertTrue(System.currentTimeMillis() >= (start + 2000));
|
|
|
+
|
|
|
+ test.cleanup(context);
|
|
|
+ assertEquals(1, writer.getData().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private SleepSplit getSleepSplit() throws Exception {
|
|
|
+
|
|
|
+ String[] locations = {"locOne", "loctwo"};
|
|
|
+
|
|
|
+ long[] reduceDurations = {101L, 102L};
|
|
|
+
|
|
|
+ return new SleepSplit(0, 2000L, reduceDurations, 2, locations);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * test SleepReducer
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testSleepReducer() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JobContext.NUM_REDUCES, 2);
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
|
|
|
+
|
|
|
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
|
|
|
+ conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
|
|
|
+ TaskAttemptID taskid = new TaskAttemptID();
|
|
|
+
|
|
|
+ RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();
|
|
|
+
|
|
|
+ Counter counter = new GenericCounter();
|
|
|
+ Counter inputValueCounter = new GenericCounter();
|
|
|
+ RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();
|
|
|
+
|
|
|
+ OutputCommitter committer = new CustomOutputCommitter();
|
|
|
+
|
|
|
+ StatusReporter reporter = new DummyReporter();
|
|
|
+ RawComparator<GridmixKey> comparator = new FakeRawComparator();
|
|
|
+
|
|
|
+ ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
|
|
|
+ conf, taskid, input, counter, inputValueCounter, output, committer,
|
|
|
+ reporter, comparator, GridmixKey.class, NullWritable.class);
|
|
|
+ org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
|
|
|
+ .getReducerContext(reducecontext);
|
|
|
+
|
|
|
+ SleepReducer test = new SleepReducer();
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ test.setup(context);
|
|
|
+ long sleeped = context.getCurrentKey().getReduceOutputBytes();
|
|
|
+ // status has been changed
|
|
|
+ assertEquals("Sleeping... " + sleeped + " ms left", context.getStatus());
|
|
|
+ // should sleep 0.9 sec
|
|
|
+
|
|
|
+ assertTrue(System.currentTimeMillis() >= (start + sleeped));
|
|
|
+ test.cleanup(context);
|
|
|
+ // status has been changed again
|
|
|
+
|
|
|
+ assertEquals("Slept for " + sleeped, context.getStatus());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private class LoadRecordReduceWriter extends
|
|
|
+ RecordWriter<NullWritable, NullWritable> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(NullWritable key, NullWritable value) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(TaskAttemptContext context) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ protected class FakeRawKeyValueReducerIterator implements RawKeyValueIterator {
|
|
|
+
|
|
|
+ int counter = 10;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputBuffer getKey() throws IOException {
|
|
|
+ ByteArrayOutputStream dt = new ByteArrayOutputStream();
|
|
|
+ GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
|
|
|
+ Spec spec = new Spec();
|
|
|
+ spec.rec_in = counter;
|
|
|
+ spec.rec_out = counter;
|
|
|
+ spec.bytes_out = counter * 100;
|
|
|
+
|
|
|
+ key.setSpec(spec);
|
|
|
+ key.write(new DataOutputStream(dt));
|
|
|
+ DataInputBuffer result = new DataInputBuffer();
|
|
|
+ byte[] b = dt.toByteArray();
|
|
|
+ result.reset(b, 0, b.length);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputBuffer getValue() throws IOException {
|
|
|
+ ByteArrayOutputStream dt = new ByteArrayOutputStream();
|
|
|
+ NullWritable key = NullWritable.get();
|
|
|
+ key.write(new DataOutputStream(dt));
|
|
|
+ DataInputBuffer result = new DataInputBuffer();
|
|
|
+ byte[] b = dt.toByteArray();
|
|
|
+ result.reset(b, 0, b.length);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean next() throws IOException {
|
|
|
+ counter--;
|
|
|
+ return counter >= 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Progress getProgress() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|