|
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.*;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -77,6 +78,7 @@ public class TestSpaceReservation {
|
|
|
private DFSClient client = null;
|
|
|
FsVolumeReference singletonVolumeRef = null;
|
|
|
FsVolumeImpl singletonVolume = null;
|
|
|
+ private DataNodeFaultInjector old = null;
|
|
|
|
|
|
private static Random rand = new Random();
|
|
|
|
|
@@ -146,6 +148,9 @@ public class TestSpaceReservation {
|
|
|
cluster.shutdown();
|
|
|
cluster = null;
|
|
|
}
|
|
|
+ if (old != null) {
|
|
|
+ DataNodeFaultInjector.set(old);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void createFileAndTestSpaceReservation(
|
|
@@ -613,6 +618,49 @@ public class TestSpaceReservation {
|
|
|
checkReservedSpace(expectedFile2Reserved);
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testReservedSpaceForPipelineRecovery() throws Exception {
|
|
|
+ final short replication = 3;
|
|
|
+ startCluster(BLOCK_SIZE, replication, -1);
|
|
|
+
|
|
|
+ final String methodName = GenericTestUtils.getMethodName();
|
|
|
+ final Path file = new Path("/" + methodName + ".01.dat");
|
|
|
+
|
|
|
+ old = DataNodeFaultInjector.get();
|
|
|
+ // Fault injector to fail connection to mirror first time.
|
|
|
+ DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
|
|
+ private int tries = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void failMirrorConnection() throws IOException {
|
|
|
+ if (tries++ == 0) {
|
|
|
+ throw new IOException("Failing Mirror for space reservation");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // Write 1 byte to the file and kill the writer.
|
|
|
+ FSDataOutputStream os = fs.create(file, replication);
|
|
|
+ os.write(new byte[1]);
|
|
|
+ os.close();
|
|
|
+ // Ensure all space reserved for the replica was released on each
|
|
|
+ // DataNode.
|
|
|
+ cluster.triggerBlockReports();
|
|
|
+ for (final DataNode dn : cluster.getDataNodes()) {
|
|
|
+ try (FsDatasetSpi.FsVolumeReferences volumes =
|
|
|
+ dn.getFSDataset().getFsVolumeReferences()) {
|
|
|
+ final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ LOG.info("dn " + dn.getDisplayName() + " space : "
|
|
|
+ + volume.getReservedForReplicas());
|
|
|
+ return (volume.getReservedForReplicas() == 0);
|
|
|
+ }
|
|
|
+ }, 100, Integer.MAX_VALUE); // Wait until the test times out.
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
|
|
|
InterruptedException, IOException {
|
|
|
for (final DataNode dn : cluster.getDataNodes()) {
|