|
@@ -1,3 +1,20 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
package org.apache.hadoop.hdfs;
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
@@ -31,7 +48,8 @@ import java.util.List;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
public class TestLeaseRecoveryStriped {
|
|
public class TestLeaseRecoveryStriped {
|
|
- public static final Log LOG = LogFactory.getLog(TestLeaseRecoveryStriped.class);
|
|
|
|
|
|
+ public static final Log LOG = LogFactory
|
|
|
|
+ .getLog(TestLeaseRecoveryStriped.class);
|
|
|
|
|
|
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
|
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
|
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
|
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
|
@@ -83,39 +101,34 @@ public class TestLeaseRecoveryStriped {
|
|
}
|
|
}
|
|
|
|
|
|
public static final int[][][] BLOCK_LENGTHS_SUITE = {
|
|
public static final int[][][] BLOCK_LENGTHS_SUITE = {
|
|
- {{ 11 * CELL_SIZE,10 * CELL_SIZE, 9 * CELL_SIZE,
|
|
|
|
- 8 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
|
|
|
|
- 5 * CELL_SIZE, 4 * CELL_SIZE, 3 * CELL_SIZE},
|
|
|
|
- {36 * CELL_SIZE}},
|
|
|
|
|
|
+ { { 11 * CELL_SIZE, 10 * CELL_SIZE, 9 * CELL_SIZE, 8 * CELL_SIZE,
|
|
|
|
+ 7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE, 4 * CELL_SIZE,
|
|
|
|
+ 3 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
|
|
|
|
|
- {{ 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE,
|
|
|
|
- 6 * CELL_SIZE, 7 * CELL_SIZE, 8 * CELL_SIZE,
|
|
|
|
- 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
|
|
|
|
- {36 * CELL_SIZE}},
|
|
|
|
|
|
+ { { 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE, 6 * CELL_SIZE,
|
|
|
|
+ 7 * CELL_SIZE, 8 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
|
|
+ 11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
|
|
|
|
|
- {{ 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
|
|
|
|
- 5 * CELL_SIZE, 4 * CELL_SIZE, 2 * CELL_SIZE,
|
|
|
|
- 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
|
|
|
|
- {36 * CELL_SIZE}},
|
|
|
|
|
|
+ { { 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE,
|
|
|
|
+ 4 * CELL_SIZE, 2 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
|
|
+ 11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
|
|
|
|
|
- {{ 8 * CELL_SIZE + bytesPerChecksum,
|
|
|
|
|
|
+ { { 8 * CELL_SIZE + bytesPerChecksum,
|
|
7 * CELL_SIZE + bytesPerChecksum * 2,
|
|
7 * CELL_SIZE + bytesPerChecksum * 2,
|
|
6 * CELL_SIZE + bytesPerChecksum * 2,
|
|
6 * CELL_SIZE + bytesPerChecksum * 2,
|
|
5 * CELL_SIZE - bytesPerChecksum * 3,
|
|
5 * CELL_SIZE - bytesPerChecksum * 3,
|
|
4 * CELL_SIZE - bytesPerChecksum * 4,
|
|
4 * CELL_SIZE - bytesPerChecksum * 4,
|
|
- 3 * CELL_SIZE - bytesPerChecksum * 4,
|
|
|
|
- 9 * CELL_SIZE, 10 * CELL_SIZE, 11 * CELL_SIZE},
|
|
|
|
- {36 * CELL_SIZE}},
|
|
|
|
- };
|
|
|
|
|
|
+ 3 * CELL_SIZE - bytesPerChecksum * 4, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
|
|
+ 11 * CELL_SIZE }, { 36 * CELL_SIZE } }, };
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testLeaseRecovery() throws Exception {
|
|
public void testLeaseRecovery() throws Exception {
|
|
- for(int i=0; i < BLOCK_LENGTHS_SUITE.length; i++){
|
|
|
|
|
|
+ for (int i = 0; i < BLOCK_LENGTHS_SUITE.length; i++) {
|
|
int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
|
|
int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
|
|
int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
|
|
int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
|
|
try {
|
|
try {
|
|
runTest(blockLengths, safeLength);
|
|
runTest(blockLengths, safeLength);
|
|
- } catch (Throwable e){
|
|
|
|
|
|
+ } catch (Throwable e) {
|
|
String msg = "failed testCase at i=" + i + ", blockLengths="
|
|
String msg = "failed testCase at i=" + i + ", blockLengths="
|
|
+ Arrays.toString(blockLengths) + "\n"
|
|
+ Arrays.toString(blockLengths) + "\n"
|
|
+ StringUtils.stringifyException(e);
|
|
+ StringUtils.stringifyException(e);
|
|
@@ -141,8 +154,8 @@ public class TestLeaseRecoveryStriped {
|
|
|
|
|
|
private void writePartialBlocks(int[] blockLengths) throws Exception {
|
|
private void writePartialBlocks(int[] blockLengths) throws Exception {
|
|
final FSDataOutputStream out = dfs.create(p);
|
|
final FSDataOutputStream out = dfs.create(p);
|
|
- final DFSStripedOutputStream stripedOut
|
|
|
|
- = (DFSStripedOutputStream) out.getWrappedStream();
|
|
|
|
|
|
+ final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
|
|
|
|
+ .getWrappedStream();
|
|
int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE;
|
|
int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE;
|
|
int[] posToKill = getPosToKill(blockLengths);
|
|
int[] posToKill = getPosToKill(blockLengths);
|
|
int checkingPos = nextCheckingPos(posToKill, 0);
|
|
int checkingPos = nextCheckingPos(posToKill, 0);
|
|
@@ -180,13 +193,14 @@ public class TestLeaseRecoveryStriped {
|
|
int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
|
|
int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
|
|
for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
|
|
for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
|
|
int numStripe = (blockLengths[i] - 1) / CELL_SIZE;
|
|
int numStripe = (blockLengths[i] - 1) / CELL_SIZE;
|
|
- posToKill[i] = numStripe * STRIPE_SIZE
|
|
|
|
- + i * CELL_SIZE + blockLengths[i] % CELL_SIZE;
|
|
|
|
|
|
+ posToKill[i] = numStripe * STRIPE_SIZE + i * CELL_SIZE
|
|
|
|
+ + blockLengths[i] % CELL_SIZE;
|
|
if (blockLengths[i] % CELL_SIZE == 0) {
|
|
if (blockLengths[i] % CELL_SIZE == 0) {
|
|
posToKill[i] += CELL_SIZE;
|
|
posToKill[i] += CELL_SIZE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS+NUM_PARITY_BLOCKS; i++) {
|
|
|
|
|
|
+ for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS
|
|
|
|
+ + NUM_PARITY_BLOCKS; i++) {
|
|
Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0);
|
|
Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0);
|
|
int numStripe = (blockLengths[i]) / CELL_SIZE;
|
|
int numStripe = (blockLengths[i]) / CELL_SIZE;
|
|
posToKill[i] = numStripe * STRIPE_SIZE;
|
|
posToKill[i] = numStripe * STRIPE_SIZE;
|
|
@@ -194,10 +208,10 @@ public class TestLeaseRecoveryStriped {
|
|
return posToKill;
|
|
return posToKill;
|
|
}
|
|
}
|
|
|
|
|
|
- private List<Integer> getIndexToStop(int[] posToKill, int pos){
|
|
|
|
- List<Integer> indices=new LinkedList<>();
|
|
|
|
- for(int i=0;i<posToKill.length;i++){
|
|
|
|
- if(pos==posToKill[i]){
|
|
|
|
|
|
+ private List<Integer> getIndexToStop(int[] posToKill, int pos) {
|
|
|
|
+ List<Integer> indices = new LinkedList<>();
|
|
|
|
+ for (int i = 0; i < posToKill.length; i++) {
|
|
|
|
+ if (pos == posToKill[i]) {
|
|
indices.add(i);
|
|
indices.add(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -214,8 +228,8 @@ public class TestLeaseRecoveryStriped {
|
|
}
|
|
}
|
|
}, 100, 3000);
|
|
}, 100, 3000);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- throw new IOException("Timeout waiting for streamer " + s +". Sent="
|
|
|
|
- + s.bytesSent + ", expected="+byteSent);
|
|
|
|
|
|
+ throw new IOException("Timeout waiting for streamer " + s + ". Sent="
|
|
|
|
+ + s.bytesSent + ", expected=" + byteSent);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -226,7 +240,8 @@ public class TestLeaseRecoveryStriped {
|
|
}
|
|
}
|
|
|
|
|
|
private void recoverLease() throws Exception {
|
|
private void recoverLease() throws Exception {
|
|
- final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(conf);
|
|
|
|
|
|
+ final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(
|
|
|
|
+ conf);
|
|
try {
|
|
try {
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
@Override
|
|
@Override
|
|
@@ -246,8 +261,9 @@ public class TestLeaseRecoveryStriped {
|
|
private FileSystem getFSAsAnotherUser(final Configuration c)
|
|
private FileSystem getFSAsAnotherUser(final Configuration c)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
return FileSystem.get(FileSystem.getDefaultUri(c), c,
|
|
return FileSystem.get(FileSystem.getDefaultUri(c), c,
|
|
- UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
|
- new String[]{fakeGroup}).getUserName());
|
|
|
|
|
|
+ UserGroupInformation
|
|
|
|
+ .createUserForTesting(fakeUsername, new String[] { fakeGroup })
|
|
|
|
+ .getUserName());
|
|
}
|
|
}
|
|
|
|
|
|
public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
|
|
public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
|