|
|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.apache.ambari.infra.job.archive;
|
|
|
|
|
|
+import org.apache.ambari.infra.job.JobContextRepository;
|
|
|
import org.easymock.EasyMockRunner;
|
|
|
import org.easymock.EasyMockSupport;
|
|
|
import org.easymock.Mock;
|
|
|
@@ -26,12 +27,14 @@ import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.runner.RunWith;
|
|
|
+import org.springframework.batch.core.BatchStatus;
|
|
|
import org.springframework.batch.core.JobExecution;
|
|
|
import org.springframework.batch.core.StepExecution;
|
|
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
|
|
import org.springframework.batch.core.scope.context.StepContext;
|
|
|
import org.springframework.batch.item.ExecutionContext;
|
|
|
import org.springframework.batch.item.ItemStreamReader;
|
|
|
+import org.springframework.batch.repeat.RepeatStatus;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.UncheckedIOException;
|
|
|
@@ -39,10 +42,20 @@ import java.util.HashMap;
|
|
|
|
|
|
import static org.easymock.EasyMock.expect;
|
|
|
import static org.easymock.EasyMock.expectLastCall;
|
|
|
+import static org.hamcrest.MatcherAssert.assertThat;
|
|
|
+import static org.hamcrest.core.Is.is;
|
|
|
|
|
|
@RunWith(EasyMockRunner.class)
|
|
|
public class DocumentExporterTest extends EasyMockSupport {
|
|
|
|
|
|
+ private static final long JOB_EXECUTION_ID = 1L;
|
|
|
+ private static final long STEP_EXECUTION_ID = 1L;
|
|
|
+ private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{
|
|
|
+ put("id", "2");
|
|
|
+ }});
|
|
|
+ private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{
|
|
|
+ put("id", "3");
|
|
|
+ }});
|
|
|
private DocumentExporter documentExporter;
|
|
|
@Mock
|
|
|
private ItemStreamReader<Document> reader;
|
|
|
@@ -52,17 +65,30 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
private DocumentItemWriter documentItemWriter;
|
|
|
@Mock
|
|
|
private DocumentItemWriter documentItemWriter2;
|
|
|
+ @Mock
|
|
|
+ private DocumentItemWriter documentItemWriter3;
|
|
|
+ @Mock
|
|
|
+ private JobContextRepository jobContextRepository;
|
|
|
|
|
|
- private ExecutionContext executionContext;
|
|
|
+// private ExecutionContext executionContext;
|
|
|
private ChunkContext chunkContext;
|
|
|
private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }});
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
- StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L));
|
|
|
- chunkContext = new ChunkContext(new StepContext(stepExecution));
|
|
|
- executionContext = stepExecution.getExecutionContext();
|
|
|
- documentExporter = new DocumentExporter(reader, documentDestination, 2);
|
|
|
+ chunkContext = chunkContext(BatchStatus.STARTED);
|
|
|
+ documentExporter = documentExporter(2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private DocumentExporter documentExporter(int writeBlockSize) {
|
|
|
+ return new DocumentExporter(reader, documentDestination, writeBlockSize, jobContextRepository);
|
|
|
+ }
|
|
|
+
|
|
|
+ private ChunkContext chunkContext(BatchStatus batchStatus) {
|
|
|
+ StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(JOB_EXECUTION_ID));
|
|
|
+ stepExecution.setId(STEP_EXECUTION_ID);
|
|
|
+ stepExecution.getJobExecution().setStatus(batchStatus);
|
|
|
+ return new ChunkContext(new StepContext(stepExecution));
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
@@ -72,7 +98,7 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
|
|
|
@Test
|
|
|
public void testNothingToRead() throws Exception {
|
|
|
- reader.open(executionContext); expectLastCall();
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
expect(reader.read()).andReturn(null);
|
|
|
reader.close(); expectLastCall();
|
|
|
replayAll();
|
|
|
@@ -80,9 +106,13 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
documentExporter.execute(null, chunkContext);
|
|
|
}
|
|
|
|
|
|
+ private ExecutionContext executionContext(ChunkContext chunkContext) {
|
|
|
+ return chunkContext.getStepContext().getStepExecution().getExecutionContext();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
|
|
|
- reader.open(executionContext); expectLastCall();
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
expect(reader.read()).andReturn(DOCUMENT);
|
|
|
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
|
|
|
documentItemWriter.write(DOCUMENT); expectLastCall();
|
|
|
@@ -91,36 +121,35 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
documentItemWriter.close(); expectLastCall();
|
|
|
replayAll();
|
|
|
|
|
|
- documentExporter.execute(null, chunkContext);
|
|
|
+ assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
|
|
|
- Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }});
|
|
|
- Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }});
|
|
|
-
|
|
|
- reader.open(executionContext); expectLastCall();
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
expect(reader.read()).andReturn(DOCUMENT);
|
|
|
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
|
|
|
documentItemWriter.write(DOCUMENT); expectLastCall();
|
|
|
- expect(reader.read()).andReturn(document2);
|
|
|
- documentItemWriter.write(document2); expectLastCall();
|
|
|
- expect(reader.read()).andReturn(document3);
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT_2);
|
|
|
+ documentItemWriter.write(DOCUMENT_2); expectLastCall();
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT_3);
|
|
|
documentItemWriter.close(); expectLastCall();
|
|
|
- expect(documentDestination.open(document3)).andReturn(documentItemWriter2);
|
|
|
- documentItemWriter2.write(document3); expectLastCall();
|
|
|
+ jobContextRepository.updateExecutionContext(chunkContext.getStepContext().getStepExecution());
|
|
|
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
|
|
|
+ expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter2);
|
|
|
+ documentItemWriter2.write(DOCUMENT_3); expectLastCall();
|
|
|
expect(reader.read()).andReturn(null);
|
|
|
- reader.update(executionContext);
|
|
|
+ reader.update(executionContext(chunkContext));
|
|
|
reader.close(); expectLastCall();
|
|
|
documentItemWriter2.close(); expectLastCall();
|
|
|
replayAll();
|
|
|
|
|
|
- documentExporter.execute(null, chunkContext);
|
|
|
+ assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
|
|
|
}
|
|
|
|
|
|
@Test(expected = IOException.class)
|
|
|
public void testReadError() throws Exception {
|
|
|
- reader.open(executionContext); expectLastCall();
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
expect(reader.read()).andReturn(DOCUMENT);
|
|
|
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
|
|
|
documentItemWriter.write(DOCUMENT); expectLastCall();
|
|
|
@@ -134,7 +163,7 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
|
|
|
@Test(expected = UncheckedIOException.class)
|
|
|
public void testWriteError() throws Exception {
|
|
|
- reader.open(executionContext); expectLastCall();
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
expect(reader.read()).andReturn(DOCUMENT);
|
|
|
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
|
|
|
documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
|
|
|
@@ -144,4 +173,43 @@ public class DocumentExporterTest extends EasyMockSupport {
|
|
|
|
|
|
documentExporter.execute(null, chunkContext);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testStopAndRestartExportsAllDocuments() throws Exception {
|
|
|
+ ChunkContext stoppingChunkContext = chunkContext(BatchStatus.STOPPING);
|
|
|
+ DocumentExporter documentExporter = documentExporter(1);
|
|
|
+
|
|
|
+ reader.open(executionContext(chunkContext)); expectLastCall();
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT);
|
|
|
+
|
|
|
+ expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
|
|
|
+ documentItemWriter.write(DOCUMENT); expectLastCall();
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT_2);
|
|
|
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
|
|
|
+ documentItemWriter.close(); expectLastCall();
|
|
|
+ reader.update(executionContext(this.chunkContext));
|
|
|
+ jobContextRepository.updateExecutionContext(this.chunkContext.getStepContext().getStepExecution());
|
|
|
+
|
|
|
+ expect(documentDestination.open(DOCUMENT_2)).andReturn(documentItemWriter2);
|
|
|
+ documentItemWriter2.write(DOCUMENT_2); expectLastCall();
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT_3);
|
|
|
+ expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(stoppingChunkContext.getStepContext().getStepExecution());
|
|
|
+ documentItemWriter2.revert(); expectLastCall();
|
|
|
+ reader.close(); expectLastCall();
|
|
|
+
|
|
|
+ reader.open(executionContext(chunkContext));
|
|
|
+ expect(reader.read()).andReturn(DOCUMENT_3);
|
|
|
+ expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter3);
|
|
|
+ documentItemWriter3.write(DOCUMENT_3); expectLastCall();
|
|
|
+ documentItemWriter3.close(); expectLastCall();
|
|
|
+
|
|
|
+ expect(reader.read()).andReturn(null);
|
|
|
+ reader.close(); expectLastCall();
|
|
|
+ replayAll();
|
|
|
+
|
|
|
+ RepeatStatus repeatStatus = documentExporter.execute(null, this.chunkContext);
|
|
|
+ assertThat(repeatStatus, is(RepeatStatus.CONTINUABLE));
|
|
|
+ repeatStatus = documentExporter.execute(null, this.chunkContext);
|
|
|
+ assertThat(repeatStatus, is(RepeatStatus.FINISHED));
|
|
|
+ }
|
|
|
}
|