|
@@ -27,6 +27,7 @@ import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
|
|
|
|
|
|
+import org.apache.commons.lang.exception.ExceptionUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
|
+import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
@@ -338,4 +340,49 @@ public class TestSimulatedFSDataset {
|
|
fsdataset.addBlockPool(bpid, conf);
|
|
fsdataset.addBlockPool(bpid, conf);
|
|
return fsdataset;
|
|
return fsdataset;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testConcurrentAddBlockPool() throws InterruptedException,
|
|
|
|
+ IOException {
|
|
|
|
+ final String[] bpids = {"BP-TEST1-", "BP-TEST2-"};
|
|
|
|
+ final SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
|
|
|
|
+ class AddBlockPoolThread extends Thread {
|
|
|
|
+ private int id;
|
|
|
|
+ private IOException ioe;
|
|
|
|
+ public AddBlockPoolThread(int id) {
|
|
|
|
+ super();
|
|
|
|
+ this.id = id;
|
|
|
|
+ }
|
|
|
|
+ public void test() throws InterruptedException, IOException {
|
|
|
|
+ this.join();
|
|
|
|
+ if (ioe != null) {
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ public void run() {
|
|
|
|
+ for (int i=0; i < 10000; i++) {
|
|
|
|
+ // add different block pools concurrently
|
|
|
|
+ String newbpid = bpids[id] + i;
|
|
|
|
+ fsdataset.addBlockPool(newbpid, conf);
|
|
|
|
+ // and then add a block into the pool
|
|
|
|
+ ExtendedBlock block = new ExtendedBlock(newbpid,1);
|
|
|
|
+ try {
|
|
|
|
+ // it will throw an exception if the block pool is not found
|
|
|
|
+ fsdataset.createTemporary(StorageType.DEFAULT, block);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // JUnit does not capture exception in non-main thread,
|
|
|
|
+ // so cache it and then let main thread throw later.
|
|
|
|
+ this.ioe = ioe;
|
|
|
|
+ }
|
|
|
|
+ assert(fsdataset.getReplicaString(newbpid,1) != "null");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ AddBlockPoolThread t1 = new AddBlockPoolThread(0);
|
|
|
|
+ AddBlockPoolThread t2 = new AddBlockPoolThread(1);
|
|
|
|
+ t1.start();
|
|
|
|
+ t2.start();
|
|
|
|
+ t1.test();
|
|
|
|
+ t2.test();
|
|
|
|
+ }
|
|
}
|
|
}
|