|
@@ -25,15 +25,23 @@ import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.RandomAccessFile;
|
|
import java.io.RandomAccessFile;
|
|
|
|
|
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
|
+
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSClient;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSClientAdapter;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
@@ -361,4 +369,79 @@ public class TestFileAppend3 {
|
|
AppendTestUtil.checkFullFile(fs, p, fileLen,
|
|
AppendTestUtil.checkFullFile(fs, p, fileLen,
|
|
fileContents, "Failed to append to a partial chunk");
|
|
fileContents, "Failed to append to a partial chunk");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Do small appends.
|
|
|
|
+ void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
|
|
|
|
+ throws IOException {
|
|
|
|
+ for (int i = 0; i < iterations; i++) {
|
|
|
|
+ FSDataOutputStream stm;
|
|
|
|
+ try {
|
|
|
|
+ stm = fs.append(file);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ // If another thread is already appending, skip this time.
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ // Failure in write or close will be terminal.
|
|
|
|
+ AppendTestUtil.write(stm, 0, 123);
|
|
|
|
+ stm.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testSmallAppendRace() throws Exception {
|
|
|
|
+ final Path file = new Path("/testSmallAppendRace");
|
|
|
|
+ final String fName = file.toUri().getPath();
|
|
|
|
+
|
|
|
|
+ // Create the file and write a small amount of data.
|
|
|
|
+ FSDataOutputStream stm = fs.create(file);
|
|
|
|
+ AppendTestUtil.write(stm, 0, 123);
|
|
|
|
+ stm.close();
|
|
|
|
+
|
|
|
|
+ // Introduce a delay between getFileInfo and calling append() against NN.
|
|
|
|
+ final DFSClient client = DFSClientAdapter.getDFSClient(fs);
|
|
|
|
+ DFSClient spyClient = spy(client);
|
|
|
|
+ when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
|
|
|
|
+ @Override
|
|
|
|
+ public HdfsFileStatus answer(InvocationOnMock invocation){
|
|
|
|
+ try {
|
|
|
|
+ HdfsFileStatus stat = client.getFileInfo(fName);
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ return stat;
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ DFSClientAdapter.setDFSClient(fs, spyClient);
|
|
|
|
+
|
|
|
|
+ // Create two threads for doing appends to the same file.
|
|
|
|
+ Thread worker1 = new Thread() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ doSmallAppends(file, fs, 20);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ Thread worker2 = new Thread() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ doSmallAppends(file, fs, 20);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ worker1.start();
|
|
|
|
+ worker2.start();
|
|
|
|
+
|
|
|
|
+ // append will fail when the file size crosses the checksum chunk boundary,
|
|
|
|
+ // if append was called with a stale file stat.
|
|
|
|
+ doSmallAppends(file, fs, 20);
|
|
|
|
+ }
|
|
}
|
|
}
|