|
@@ -1,7 +1,31 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import java.nio.ByteBuffer;
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.Socket;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -17,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
-
|
|
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -30,13 +53,8 @@ import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.net.Socket;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-
|
|
|
public class TestDFSStripedOutputStream {
|
|
|
+ public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class);
|
|
|
private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
|
|
|
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
|
|
|
|
|
@@ -46,7 +64,6 @@ public class TestDFSStripedOutputStream {
|
|
|
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
|
|
private final int stripesPerBlock = 4;
|
|
|
private final int blockSize = cellSize * stripesPerBlock;
|
|
|
- private final RawErasureEncoder encoder = new RSRawEncoder();
|
|
|
|
|
|
@Before
|
|
|
public void setup() throws IOException {
|
|
@@ -56,7 +73,6 @@ public class TestDFSStripedOutputStream {
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
|
|
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
|
|
|
fs = cluster.getFileSystem();
|
|
|
- encoder.initialize(dataBlocks, parityBlocks, cellSize);
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -67,78 +83,74 @@ public class TestDFSStripedOutputStream {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileEmpty() throws IOException {
|
|
|
+ public void testFileEmpty() throws IOException {
|
|
|
testOneFile("/EmptyFile", 0);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileSmallerThanOneCell1() throws IOException {
|
|
|
+ public void testFileSmallerThanOneCell1() throws IOException {
|
|
|
testOneFile("/SmallerThanOneCell", 1);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileSmallerThanOneCell2() throws IOException {
|
|
|
+ public void testFileSmallerThanOneCell2() throws IOException {
|
|
|
testOneFile("/SmallerThanOneCell", cellSize - 1);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileEqualsWithOneCell() throws IOException {
|
|
|
+ public void testFileEqualsWithOneCell() throws IOException {
|
|
|
testOneFile("/EqualsWithOneCell", cellSize);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileSmallerThanOneStripe1() throws IOException {
|
|
|
+ public void testFileSmallerThanOneStripe1() throws IOException {
|
|
|
testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileSmallerThanOneStripe2() throws IOException {
|
|
|
+ public void testFileSmallerThanOneStripe2() throws IOException {
|
|
|
testOneFile("/SmallerThanOneStripe", cellSize + 123);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileEqualsWithOneStripe() throws IOException {
|
|
|
+ public void testFileEqualsWithOneStripe() throws IOException {
|
|
|
testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileMoreThanOneStripe1() throws IOException {
|
|
|
+ public void testFileMoreThanOneStripe1() throws IOException {
|
|
|
testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileMoreThanOneStripe2() throws IOException {
|
|
|
+ public void testFileMoreThanOneStripe2() throws IOException {
|
|
|
testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks
|
|
|
+ cellSize * dataBlocks + 123);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileFullBlockGroup() throws IOException {
|
|
|
+ public void testFileFullBlockGroup() throws IOException {
|
|
|
testOneFile("/FullBlockGroup", blockSize * dataBlocks);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileMoreThanABlockGroup1() throws IOException {
|
|
|
+ public void testFileMoreThanABlockGroup1() throws IOException {
|
|
|
testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileMoreThanABlockGroup2() throws IOException {
|
|
|
+ public void testFileMoreThanABlockGroup2() throws IOException {
|
|
|
testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
|
|
|
}
|
|
|
|
|
|
|
|
|
@Test
|
|
|
- public void TestFileMoreThanABlockGroup3() throws IOException {
|
|
|
+ public void testFileMoreThanABlockGroup3() throws IOException {
|
|
|
testOneFile("/MoreThanABlockGroup3",
|
|
|
blockSize * dataBlocks * 3 + cellSize * dataBlocks
|
|
|
+ cellSize + 123);
|
|
|
}
|
|
|
|
|
|
- private int stripeDataSize() {
|
|
|
- return cellSize * dataBlocks;
|
|
|
- }
|
|
|
-
|
|
|
private byte[] generateBytes(int cnt) {
|
|
|
byte[] bytes = new byte[cnt];
|
|
|
for (int i = 0; i < cnt; i++) {
|
|
@@ -152,8 +164,7 @@ public class TestDFSStripedOutputStream {
|
|
|
return (byte) (pos % mod + 1);
|
|
|
}
|
|
|
|
|
|
- private void testOneFile(String src, int writeBytes)
|
|
|
- throws IOException {
|
|
|
+ private void testOneFile(String src, int writeBytes) throws IOException {
|
|
|
Path testPath = new Path(src);
|
|
|
|
|
|
byte[] bytes = generateBytes(writeBytes);
|
|
@@ -161,8 +172,7 @@ public class TestDFSStripedOutputStream {
|
|
|
|
|
|
// check file length
|
|
|
FileStatus status = fs.getFileStatus(testPath);
|
|
|
- long fileLength = status.getLen();
|
|
|
- Assert.assertEquals(writeBytes, fileLength);
|
|
|
+ Assert.assertEquals(writeBytes, status.getLen());
|
|
|
|
|
|
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
|
|
|
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
|
|
@@ -251,16 +261,12 @@ public class TestDFSStripedOutputStream {
|
|
|
continue;
|
|
|
}
|
|
|
for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
|
|
|
- byte expected;
|
|
|
// calculate the position of this byte in the file
|
|
|
long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
|
|
|
dataBlocks, posInBlk, blkIdxInGroup) +
|
|
|
group * blockSize * dataBlocks;
|
|
|
- if (posInFile >= writeBytes) {
|
|
|
- expected = 0;
|
|
|
- } else {
|
|
|
- expected = getByte(posInFile);
|
|
|
- }
|
|
|
+ Assert.assertTrue(posInFile < writeBytes);
|
|
|
+ final byte expected = getByte(posInFile);
|
|
|
|
|
|
String s = "Unexpected byte " + actualBlkBytes[posInBlk]
|
|
|
+ ", expect " + expected
|
|
@@ -272,84 +278,34 @@ public class TestDFSStripedOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // verify the parity blocks
|
|
|
- final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
|
|
|
- final long groupSize = lbs.getLocatedBlocks().get(group).getBlockSize();
|
|
|
- int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(groupSize,
|
|
|
- cellSize, dataBlocks, dataBlocks);
|
|
|
- for (int i = 0; i < parityBlocks; i++) {
|
|
|
- parityBuffers[i] = ByteBuffer.allocate(parityBlkSize);
|
|
|
- }
|
|
|
- final int numStripes = (int) (groupSize - 1) / stripeDataSize() + 1;
|
|
|
- for (int i = 0; i < numStripes; i++) {
|
|
|
- final int parityCellSize = i < numStripes - 1 || parityBlkSize % cellSize == 0
|
|
|
- ? cellSize : parityBlkSize % cellSize;
|
|
|
- ByteBuffer[] stripeBuf = new ByteBuffer[dataBlocks];
|
|
|
- for (int k = 0; k < stripeBuf.length; k++) {
|
|
|
- stripeBuf[k] = ByteBuffer.allocate(cellSize);
|
|
|
- }
|
|
|
- for (int j = 0; j < dataBlocks; j++) {
|
|
|
- if (dataBlockBytes[j] != null) {
|
|
|
- int length = Math.min(cellSize,
|
|
|
- dataBlockBytes[j].length - cellSize * i);
|
|
|
- if (length > 0) {
|
|
|
- stripeBuf[j].put(dataBlockBytes[j], cellSize * i, length);
|
|
|
- }
|
|
|
- }
|
|
|
- final long pos = stripeBuf[j].position();
|
|
|
- for (int k = 0; k < parityCellSize - pos; k++) {
|
|
|
- stripeBuf[j].put((byte) 0);
|
|
|
- }
|
|
|
- stripeBuf[j].flip();
|
|
|
- }
|
|
|
- ByteBuffer[] parityBuf = new ByteBuffer[parityBlocks];
|
|
|
- for (int j = 0; j < parityBlocks; j++) {
|
|
|
- parityBuf[j] = ByteBuffer.allocate(cellSize);
|
|
|
- for (int k = 0; k < parityCellSize; k++) {
|
|
|
- parityBuf[j].put((byte) 0);
|
|
|
- }
|
|
|
- parityBuf[j].flip();
|
|
|
- }
|
|
|
-
|
|
|
- encoder.encode(stripeBuf, parityBuf);
|
|
|
- for (int j = 0; j < parityBlocks; j++) {
|
|
|
- parityBuffers[j].put(parityBuf[j]);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for (int i = 0; i < parityBlocks; i++) {
|
|
|
- Assert.assertArrayEquals(parityBuffers[i].array(), parityBlockBytes[i]);
|
|
|
- }
|
|
|
+ verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
|
|
|
+ cellSize, dataBlockBytes, parityBlockBytes);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void testReadWriteOneFile(String src, int writeBytes)
|
|
|
- throws IOException {
|
|
|
- Path TestPath = new Path(src);
|
|
|
- byte[] bytes = generateBytes(writeBytes);
|
|
|
- DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
|
|
|
-
|
|
|
- //check file length
|
|
|
- FileStatus status = fs.getFileStatus(TestPath);
|
|
|
- long fileLength = status.getLen();
|
|
|
- if (fileLength != writeBytes) {
|
|
|
- Assert.fail("File Length error: expect=" + writeBytes
|
|
|
- + ", actual=" + fileLength);
|
|
|
+
|
|
|
+ static void verifyParity(final long size, final int cellSize,
|
|
|
+ byte[][] dataBytes, byte[][] parityBytes) {
|
|
|
+ // verify the parity blocks
|
|
|
+ int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
|
|
|
+ size, cellSize, dataBytes.length, dataBytes.length);
|
|
|
+ final byte[][] expectedParityBytes = new byte[parityBytes.length][];
|
|
|
+ for (int i = 0; i < parityBytes.length; i++) {
|
|
|
+ expectedParityBytes[i] = new byte[parityBlkSize];
|
|
|
}
|
|
|
-
|
|
|
- DFSStripedInputStream dis = new DFSStripedInputStream(
|
|
|
- fs.getClient(), src, true);
|
|
|
- byte[] buf = new byte[writeBytes + 100];
|
|
|
- int readLen = dis.read(0, buf, 0, buf.length);
|
|
|
- readLen = readLen >= 0 ? readLen : 0;
|
|
|
- if (readLen != writeBytes) {
|
|
|
- Assert.fail("The length of file is not correct.");
|
|
|
- }
|
|
|
-
|
|
|
- for (int i = 0; i < writeBytes; i++) {
|
|
|
- if (getByte(i) != buf[i]) {
|
|
|
- Assert.fail("Byte at i = " + i + " is wrongly written.");
|
|
|
+ for (int i = 0; i < dataBytes.length; i++) {
|
|
|
+ if (dataBytes[i] == null) {
|
|
|
+ dataBytes[i] = new byte[dataBytes[0].length];
|
|
|
+ } else if (dataBytes[i].length < dataBytes[0].length) {
|
|
|
+ final byte[] tmp = dataBytes[i];
|
|
|
+ dataBytes[i] = new byte[dataBytes[0].length];
|
|
|
+ System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
|
|
|
}
|
|
|
}
|
|
|
+ final RawErasureEncoder encoder = new RSRawEncoder();
|
|
|
+ encoder.initialize(dataBytes.length, parityBytes.length, cellSize);
|
|
|
+ encoder.encode(dataBytes, expectedParityBytes);
|
|
|
+ for (int i = 0; i < parityBytes.length; i++) {
|
|
|
+ Assert.assertArrayEquals(expectedParityBytes[i], parityBytes[i]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|