|
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
@@ -61,52 +60,72 @@ import com.google.common.base.Preconditions;
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
|
public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
- /** Coordinate the communication between the streamers. */
|
|
|
- static class Coordinator {
|
|
|
- private final DfsClientConf conf;
|
|
|
- private final List<BlockingQueue<ExtendedBlock>> endBlocks;
|
|
|
- private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
|
|
|
- private volatile boolean shouldLocateFollowingBlock = false;
|
|
|
-
|
|
|
- Coordinator(final DfsClientConf conf, final int numDataBlocks,
|
|
|
- final int numAllBlocks) {
|
|
|
- this.conf = conf;
|
|
|
- endBlocks = new ArrayList<>(numDataBlocks);
|
|
|
- for (int i = 0; i < numDataBlocks; i++) {
|
|
|
- endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
|
|
|
+ static class MultipleBlockingQueue<T> {
|
|
|
+ private final int pullTimeout;
|
|
|
+ private final List<BlockingQueue<T>> queues;
|
|
|
+
|
|
|
+ MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) {
|
|
|
+ queues = new ArrayList<>(numQueue);
|
|
|
+ for (int i = 0; i < numQueue; i++) {
|
|
|
+ queues.add(new LinkedBlockingQueue<T>(queueSize));
|
|
|
}
|
|
|
|
|
|
- stripedBlocks = new ArrayList<>(numAllBlocks);
|
|
|
- for (int i = 0; i < numAllBlocks; i++) {
|
|
|
- stripedBlocks.add(new LinkedBlockingQueue<LocatedBlock>(1));
|
|
|
+ this.pullTimeout = pullTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ void offer(int i, T object) {
|
|
|
+ final boolean b = queues.get(i).offer(object);
|
|
|
+ Preconditions.checkState(b, "Failed to offer " + object
|
|
|
+ + " to queue, i=" + i);
|
|
|
+ }
|
|
|
+
|
|
|
+ T poll(int i) throws InterruptedIOException {
|
|
|
+ try {
|
|
|
+ return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- boolean shouldLocateFollowingBlock() {
|
|
|
- return shouldLocateFollowingBlock;
|
|
|
+ T peek(int i) {
|
|
|
+ return queues.get(i).peek();
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- void putEndBlock(int i, ExtendedBlock block) {
|
|
|
- shouldLocateFollowingBlock = true;
|
|
|
+ /** Coordinate the communication between the streamers. */
|
|
|
+ static class Coordinator {
|
|
|
+ private final MultipleBlockingQueue<LocatedBlock> stripedBlocks;
|
|
|
+ private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
|
|
|
+ private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
|
|
|
|
|
|
- final boolean b = endBlocks.get(i).offer(block);
|
|
|
- Preconditions.checkState(b, "Failed to add " + block
|
|
|
- + " to endBlocks queue, i=" + i);
|
|
|
+ Coordinator(final DfsClientConf conf, final int numDataBlocks,
|
|
|
+ final int numAllBlocks) {
|
|
|
+ stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
|
|
|
+ conf.getStripedWriteMaxSecondsGetStripedBlock());
|
|
|
+ endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1,
|
|
|
+ conf.getStripedWriteMaxSecondsGetEndedBlock());
|
|
|
+ updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
|
|
|
+ conf.getStripedWriteMaxSecondsGetStripedBlock());
|
|
|
+ }
|
|
|
+
|
|
|
+ void putEndBlock(int i, ExtendedBlock block) {
|
|
|
+ endBlocks.offer(i, block);
|
|
|
}
|
|
|
|
|
|
ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
|
|
|
- try {
|
|
|
- return endBlocks.get(i).poll(
|
|
|
- conf.getStripedWriteMaxSecondsGetEndedBlock(),
|
|
|
- TimeUnit.SECONDS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw DFSUtil.toInterruptedIOException(
|
|
|
- "getEndBlock interrupted, i=" + i, e);
|
|
|
- }
|
|
|
+ return endBlocks.poll(i);
|
|
|
+ }
|
|
|
+
|
|
|
+ void putUpdateBlock(int i, ExtendedBlock block) {
|
|
|
+ updateBlocks.offer(i, block);
|
|
|
+ }
|
|
|
+
|
|
|
+ ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException {
|
|
|
+ return updateBlocks.poll(i);
|
|
|
}
|
|
|
|
|
|
void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
|
|
|
- ExtendedBlock b = endBlocks.get(i).peek();
|
|
|
+ ExtendedBlock b = endBlocks.peek(i);
|
|
|
if (b == null) {
|
|
|
// streamer just has failed, put end block and continue
|
|
|
b = block;
|
|
@@ -119,22 +138,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("putStripedBlock " + block + ", i=" + i);
|
|
|
}
|
|
|
- final boolean b = stripedBlocks.get(i).offer(block);
|
|
|
- if (!b) {
|
|
|
- throw new IOException("Failed: " + block + ", i=" + i);
|
|
|
- }
|
|
|
+ stripedBlocks.offer(i, block);
|
|
|
}
|
|
|
|
|
|
LocatedBlock getStripedBlock(int i) throws IOException {
|
|
|
- final LocatedBlock lb;
|
|
|
- try {
|
|
|
- lb = stripedBlocks.get(i).poll(
|
|
|
- conf.getStripedWriteMaxSecondsGetStripedBlock(),
|
|
|
- TimeUnit.SECONDS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
|
|
|
- }
|
|
|
-
|
|
|
+ final LocatedBlock lb = stripedBlocks.poll(i);
|
|
|
if (lb == null) {
|
|
|
throw new IOException("Failed: i=" + i);
|
|
|
}
|
|
@@ -218,6 +226,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
return streamers.get(0);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ ExtendedBlock getBlock() {
|
|
|
+ return getLeadingStreamer().getBlock();
|
|
|
+ }
|
|
|
+
|
|
|
/** Construct a new output stream for creating a file. */
|
|
|
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
|
|
EnumSet<CreateFlag> flag, Progressable progress,
|
|
@@ -292,6 +305,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
int count = 0;
|
|
|
for(StripedDataStreamer s : streamers) {
|
|
|
if (!s.isFailed()) {
|
|
|
+ s.getErrorState().initExtenalError();
|
|
|
count++;
|
|
|
}
|
|
|
}
|