|
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
+import java.util.EnumSet;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
@@ -48,16 +49,22 @@ import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.CreateFlag;
|
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSOutputStream;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
@@ -70,8 +77,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
|
|
+import org.apache.hadoop.io.EnumSetWritable;
|
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
@@ -397,6 +407,54 @@ public class TestBlockManager {
|
|
assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
|
|
assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test(timeout = 60000)
|
|
|
|
+ public void testNeededReplicationWhileAppending() throws IOException {
|
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
|
+ String src = "/test-file";
|
|
|
|
+ Path file = new Path(src);
|
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ try {
|
|
|
|
+ BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
|
+ NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
|
|
|
+ DFSOutputStream out = null;
|
|
|
|
+ try {
|
|
|
|
+ out = (DFSOutputStream) (fs.create(file).
|
|
|
|
+ getWrappedStream());
|
|
|
|
+ out.write(1);
|
|
|
|
+ out.hflush();
|
|
|
|
+ out.close();
|
|
|
|
+ FSDataInputStream in = null;
|
|
|
|
+ ExtendedBlock oldBlock = null;
|
|
|
|
+ try {
|
|
|
|
+ in = fs.open(file);
|
|
|
|
+ oldBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
|
|
|
|
+ } finally {
|
|
|
|
+ IOUtils.closeStream(in);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String clientName =
|
|
|
|
+ ((DistributedFileSystem) fs).getClient().getClientName();
|
|
|
|
+ namenode.append(src, clientName, new EnumSetWritable<>(
|
|
|
|
+ EnumSet.of(CreateFlag.APPEND)));
|
|
|
|
+ LocatedBlock newLocatedBlock =
|
|
|
|
+ namenode.updateBlockForPipeline(oldBlock, clientName);
|
|
|
|
+ ExtendedBlock newBlock =
|
|
|
|
+ new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(),
|
|
|
|
+ oldBlock.getNumBytes(),
|
|
|
|
+ newLocatedBlock.getBlock().getGenerationStamp());
|
|
|
|
+ namenode.updatePipeline(clientName, oldBlock, newBlock,
|
|
|
|
+ newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs());
|
|
|
|
+ BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
|
|
|
|
+ assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi)));
|
|
|
|
+ } finally {
|
|
|
|
+ IOUtils.closeStream(out);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Tell the block manager that replication is completed for the given
|
|
* Tell the block manager that replication is completed for the given
|