|
@@ -25,23 +25,34 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
|
|
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
|
import org.apache.hadoop.io.MultipleIOException;
|
|
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.htrace.Sampler;
|
|
|
import org.apache.htrace.Trace;
|
|
|
import org.apache.htrace.TraceScope;
|
|
@@ -59,23 +70,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
private final List<BlockingQueue<T>> queues;
|
|
|
|
|
|
MultipleBlockingQueue(int numQueue, int queueSize) {
|
|
|
- queues = new ArrayList<>(numQueue);
|
|
|
+ List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
|
|
|
for (int i = 0; i < numQueue; i++) {
|
|
|
- queues.add(new LinkedBlockingQueue<T>(queueSize));
|
|
|
+ list.add(new LinkedBlockingQueue<T>(queueSize));
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- boolean isEmpty() {
|
|
|
- for(int i = 0; i < queues.size(); i++) {
|
|
|
- if (!queues.get(i).isEmpty()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- int numQueues() {
|
|
|
- return queues.size();
|
|
|
+ queues = Collections.synchronizedList(list);
|
|
|
}
|
|
|
|
|
|
void offer(int i, T object) {
|
|
@@ -92,6 +91,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ T takeWithTimeout(int i) throws InterruptedIOException {
|
|
|
+ try {
|
|
|
+ return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
T poll(int i) {
|
|
|
return queues.get(i).poll();
|
|
|
}
|
|
@@ -99,23 +106,44 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
T peek(int i) {
|
|
|
return queues.get(i).peek();
|
|
|
}
|
|
|
+
|
|
|
+ void clear() {
|
|
|
+ for (BlockingQueue<T> q : queues) {
|
|
|
+ q.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Coordinate the communication between the streamers. */
|
|
|
- class Coordinator {
|
|
|
+ static class Coordinator {
|
|
|
+ /**
|
|
|
+ * The next internal block to write to for each streamers. The
|
|
|
+ * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
|
|
|
+ * get a new block group. The block group is split to internal blocks, which
|
|
|
+ * are then distributed into the queue for streamers to retrieve.
|
|
|
+ */
|
|
|
private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
|
|
|
+ /**
|
|
|
+ * Used to sync among all the streamers before allocating a new block. The
|
|
|
+ * DFSStripedOutputStream uses this to make sure every streamer has finished
|
|
|
+ * writing the previous block.
|
|
|
+ */
|
|
|
private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
|
|
|
|
|
|
+ /**
|
|
|
+ * The following data structures are used for syncing while handling errors
|
|
|
+ */
|
|
|
private final MultipleBlockingQueue<LocatedBlock> newBlocks;
|
|
|
- private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
|
|
|
+ private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
|
|
|
+ private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
|
|
|
|
|
|
- Coordinator(final DfsClientConf conf, final int numDataBlocks,
|
|
|
- final int numAllBlocks) {
|
|
|
+ Coordinator(final int numAllBlocks) {
|
|
|
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
|
|
|
- endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
|
|
|
-
|
|
|
+ endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
|
|
|
newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
|
|
|
- updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
|
|
|
+ updateStreamerMap = Collections.synchronizedMap(
|
|
|
+ new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
|
|
|
+ streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
|
|
|
}
|
|
|
|
|
|
MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
|
|
@@ -126,68 +154,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
return newBlocks;
|
|
|
}
|
|
|
|
|
|
- MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
|
|
|
- return updateBlocks;
|
|
|
- }
|
|
|
-
|
|
|
- StripedDataStreamer getStripedDataStreamer(int i) {
|
|
|
- return DFSStripedOutputStream.this.getStripedDataStreamer(i);
|
|
|
- }
|
|
|
-
|
|
|
void offerEndBlock(int i, ExtendedBlock block) {
|
|
|
endBlocks.offer(i, block);
|
|
|
}
|
|
|
|
|
|
- ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
|
|
|
- return endBlocks.take(i);
|
|
|
+ void offerStreamerUpdateResult(int i, boolean success) {
|
|
|
+ streamerUpdateResult.offer(i, success);
|
|
|
}
|
|
|
|
|
|
- boolean hasAllEndBlocks() {
|
|
|
- for(int i = 0; i < endBlocks.numQueues(); i++) {
|
|
|
- if (endBlocks.peek(i) == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
+ boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
|
|
|
+ return streamerUpdateResult.take(i);
|
|
|
}
|
|
|
|
|
|
- void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
|
|
|
- ExtendedBlock b = endBlocks.peek(i);
|
|
|
- if (b == null) {
|
|
|
- // streamer just has failed, put end block and continue
|
|
|
- b = block;
|
|
|
- offerEndBlock(i, b);
|
|
|
- }
|
|
|
- b.setNumBytes(newBytes);
|
|
|
+ void updateStreamer(StripedDataStreamer streamer,
|
|
|
+ boolean success) {
|
|
|
+ assert !updateStreamerMap.containsKey(streamer);
|
|
|
+ updateStreamerMap.put(streamer, success);
|
|
|
}
|
|
|
|
|
|
- /** @return a block representing the entire block group. */
|
|
|
- ExtendedBlock getBlockGroup() {
|
|
|
- final StripedDataStreamer s0 = getStripedDataStreamer(0);
|
|
|
- final ExtendedBlock b0 = s0.getBlock();
|
|
|
- if (b0 == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
|
|
|
-
|
|
|
- final ExtendedBlock block = new ExtendedBlock(b0);
|
|
|
- long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock();
|
|
|
- for (int i = 1; i < numAllBlocks; i++) {
|
|
|
- final StripedDataStreamer si = getStripedDataStreamer(i);
|
|
|
- final ExtendedBlock bi = si.getBlock();
|
|
|
- if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
|
|
|
- block.setGenerationStamp(bi.getGenerationStamp());
|
|
|
- }
|
|
|
- if (i < numDataBlocks) {
|
|
|
- numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
|
|
|
- }
|
|
|
- }
|
|
|
- block.setNumBytes(numBytes);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
|
|
|
- }
|
|
|
- return block;
|
|
|
+ void clearFailureStates() {
|
|
|
+ newBlocks.clear();
|
|
|
+ updateStreamerMap.clear();
|
|
|
+ streamerUpdateResult.clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -263,18 +251,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
private final int cellSize;
|
|
|
private final int numAllBlocks;
|
|
|
private final int numDataBlocks;
|
|
|
-
|
|
|
- @Override
|
|
|
- ExtendedBlock getBlock() {
|
|
|
- return coordinator.getBlockGroup();
|
|
|
- }
|
|
|
+ private ExtendedBlock currentBlockGroup;
|
|
|
+ private final String[] favoredNodes;
|
|
|
+ private final List<StripedDataStreamer> failedStreamers;
|
|
|
|
|
|
/** Construct a new output stream for creating a file. */
|
|
|
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
|
|
EnumSet<CreateFlag> flag, Progressable progress,
|
|
|
DataChecksum checksum, String[] favoredNodes)
|
|
|
throws IOException {
|
|
|
- super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
|
|
|
+ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
|
|
}
|
|
@@ -284,12 +270,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
cellSize = ecPolicy.getCellSize();
|
|
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
|
|
numAllBlocks = numDataBlocks + numParityBlocks;
|
|
|
+ this.favoredNodes = favoredNodes;
|
|
|
+ failedStreamers = new ArrayList<>();
|
|
|
|
|
|
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
|
|
|
numDataBlocks, numParityBlocks);
|
|
|
|
|
|
- coordinator = new Coordinator(dfsClient.getConf(),
|
|
|
- numDataBlocks, numAllBlocks);
|
|
|
+ coordinator = new Coordinator(numAllBlocks);
|
|
|
try {
|
|
|
cellBuffers = new CellBuffers(numParityBlocks);
|
|
|
} catch (InterruptedException ie) {
|
|
@@ -297,14 +284,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
"Failed to create cell buffers", ie);
|
|
|
}
|
|
|
|
|
|
- List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
|
|
|
+ streamers = new ArrayList<>(numAllBlocks);
|
|
|
for (short i = 0; i < numAllBlocks; i++) {
|
|
|
StripedDataStreamer streamer = new StripedDataStreamer(stat,
|
|
|
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
|
|
|
favoredNodes, i, coordinator);
|
|
|
- s.add(streamer);
|
|
|
+ streamers.add(streamer);
|
|
|
}
|
|
|
- streamers = Collections.unmodifiableList(s);
|
|
|
currentPackets = new DFSPacket[streamers.size()];
|
|
|
setCurrentStreamer(0);
|
|
|
}
|
|
@@ -318,17 +304,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
|
|
|
private synchronized StripedDataStreamer getCurrentStreamer() {
|
|
|
- return (StripedDataStreamer)streamer;
|
|
|
+ return (StripedDataStreamer) streamer;
|
|
|
}
|
|
|
|
|
|
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
|
|
|
// backup currentPacket for current streamer
|
|
|
- int oldIdx = streamers.indexOf(streamer);
|
|
|
- if (oldIdx >= 0) {
|
|
|
- currentPackets[oldIdx] = currentPacket;
|
|
|
+ if (streamer != null) {
|
|
|
+ int oldIdx = streamers.indexOf(getCurrentStreamer());
|
|
|
+ if (oldIdx >= 0) {
|
|
|
+ currentPackets[oldIdx] = currentPacket;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- streamer = streamers.get(newIdx);
|
|
|
+ streamer = getStripedDataStreamer(newIdx);
|
|
|
currentPacket = currentPackets[newIdx];
|
|
|
adjustChunkBoundary();
|
|
|
|
|
@@ -350,40 +338,127 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
encoder.encode(dataBuffers, parityBuffers);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private void checkStreamers(boolean setExternalError) throws IOException {
|
|
|
- int count = 0;
|
|
|
+ /**
|
|
|
+ * check all the existing StripedDataStreamer and find newly failed streamers.
|
|
|
+ * @return The newly failed streamers.
|
|
|
+ * @throws IOException if less than {@link #numDataBlocks} streamers are still
|
|
|
+ * healthy.
|
|
|
+ */
|
|
|
+ private Set<StripedDataStreamer> checkStreamers() throws IOException {
|
|
|
+ Set<StripedDataStreamer> newFailed = new HashSet<>();
|
|
|
for(StripedDataStreamer s : streamers) {
|
|
|
- if (!s.isFailed()) {
|
|
|
- if (setExternalError && s.getBlock() != null) {
|
|
|
- s.getErrorState().initExternalError();
|
|
|
- }
|
|
|
- count++;
|
|
|
+ if (!s.isHealthy() && !failedStreamers.contains(s)) {
|
|
|
+ newFailed.add(s);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ final int failCount = failedStreamers.size() + newFailed.size();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("checkStreamers: " + streamers);
|
|
|
- LOG.debug("count=" + count);
|
|
|
+ LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
|
|
|
+ LOG.debug("original failed streamers: " + failedStreamers);
|
|
|
+ LOG.debug("newly failed streamers: " + newFailed);
|
|
|
}
|
|
|
- if (count < numDataBlocks) {
|
|
|
- throw new IOException("Failed: the number of remaining blocks = "
|
|
|
- + count + " < the number of data blocks = " + numDataBlocks);
|
|
|
+ if (failCount > (numAllBlocks - numDataBlocks)) {
|
|
|
+ throw new IOException("Failed: the number of failed blocks = "
|
|
|
+ + failCount + " > the number of data blocks = "
|
|
|
+ + (numAllBlocks - numDataBlocks));
|
|
|
}
|
|
|
+ return newFailed;
|
|
|
}
|
|
|
|
|
|
private void handleStreamerFailure(String err, Exception e)
|
|
|
throws IOException {
|
|
|
- handleStreamerFailure(err, e, true);
|
|
|
- }
|
|
|
-
|
|
|
- private void handleStreamerFailure(String err, Exception e,
|
|
|
- boolean setExternalError) throws IOException {
|
|
|
LOG.warn("Failed: " + err + ", " + this, e);
|
|
|
- getCurrentStreamer().setFailed(true);
|
|
|
- checkStreamers(setExternalError);
|
|
|
+ getCurrentStreamer().getErrorState().setInternalError();
|
|
|
+ getCurrentStreamer().close(true);
|
|
|
+ checkStreamers();
|
|
|
currentPacket = null;
|
|
|
}
|
|
|
|
|
|
+ private void replaceFailedStreamers() {
|
|
|
+ assert streamers.size() == numAllBlocks;
|
|
|
+ for (short i = 0; i < numAllBlocks; i++) {
|
|
|
+ final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
|
|
|
+ if (!oldStreamer.isHealthy()) {
|
|
|
+ StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
|
|
|
+ dfsClient, src, oldStreamer.progress,
|
|
|
+ oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
|
|
|
+ favoredNodes, i, coordinator);
|
|
|
+ streamers.set(i, streamer);
|
|
|
+ currentPackets[i] = null;
|
|
|
+ if (i == 0) {
|
|
|
+ this.streamer = streamer;
|
|
|
+ }
|
|
|
+ streamer.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitEndBlocks(int i) throws IOException {
|
|
|
+ while (getStripedDataStreamer(i).isHealthy()) {
|
|
|
+ final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
|
|
|
+ if (b != null) {
|
|
|
+ StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void allocateNewBlock() throws IOException {
|
|
|
+ if (currentBlockGroup != null) {
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ // sync all the healthy streamers before writing to the new block
|
|
|
+ waitEndBlocks(i);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ failedStreamers.clear();
|
|
|
+ // replace failed streamers
|
|
|
+ replaceFailedStreamers();
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Allocating new block group. The previous block group: "
|
|
|
+ + currentBlockGroup);
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO collect excludedNodes from all the data streamers
|
|
|
+ final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
|
|
|
+ fileId, favoredNodes);
|
|
|
+ assert lb.isStriped();
|
|
|
+ if (lb.getLocations().length < numDataBlocks) {
|
|
|
+ throw new IOException("Failed to get " + numDataBlocks
|
|
|
+ + " nodes from namenode: blockGroupSize= " + numAllBlocks
|
|
|
+ + ", blocks.length= " + lb.getLocations().length);
|
|
|
+ }
|
|
|
+ // assign the new block to the current block group
|
|
|
+ currentBlockGroup = lb.getBlock();
|
|
|
+
|
|
|
+ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
+ (LocatedStripedBlock) lb, cellSize, numDataBlocks,
|
|
|
+ numAllBlocks - numDataBlocks);
|
|
|
+ for (int i = 0; i < blocks.length; i++) {
|
|
|
+ StripedDataStreamer si = getStripedDataStreamer(i);
|
|
|
+ if (si.isHealthy()) { // skipping failed data streamer
|
|
|
+ if (blocks[i] == null) {
|
|
|
+ // Set exception and close streamer as there is no block locations
|
|
|
+ // found for the parity block.
|
|
|
+ LOG.warn("Failed to get block location for parity block, index=" + i);
|
|
|
+ si.getLastException().set(
|
|
|
+ new IOException("Failed to get following block, i=" + i));
|
|
|
+ si.getErrorState().setInternalError();
|
|
|
+ si.close(true);
|
|
|
+ } else {
|
|
|
+ coordinator.getFollowingBlocks().offer(i, blocks[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean shouldEndBlockGroup() {
|
|
|
+ return currentBlockGroup != null &&
|
|
|
+ currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
|
|
|
byte[] checksum, int ckoff, int cklen) throws IOException {
|
|
@@ -392,8 +467,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
final int pos = cellBuffers.addTo(index, bytes, offset, len);
|
|
|
final boolean cellFull = pos == cellSize;
|
|
|
|
|
|
- final long oldBytes = current.getBytesCurBlock();
|
|
|
- if (!current.isFailed()) {
|
|
|
+ if (currentBlockGroup == null || shouldEndBlockGroup()) {
|
|
|
+ // the incoming data should belong to a new block. Allocate a new block.
|
|
|
+ allocateNewBlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
|
|
|
+ if (current.isHealthy()) {
|
|
|
try {
|
|
|
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
|
|
|
} catch(Exception e) {
|
|
@@ -401,12 +481,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (current.isFailed()) {
|
|
|
- final long newBytes = oldBytes + len;
|
|
|
- coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
|
|
|
- current.setBytesCurBlock(newBytes);
|
|
|
- }
|
|
|
-
|
|
|
// Two extra steps are needed when a striping cell is full:
|
|
|
// 1. Forward the current index pointer
|
|
|
// 2. Generate parity packets if a full stripe of data cells are present
|
|
@@ -419,11 +493,209 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
cellBuffers.flipDataBuffers();
|
|
|
writeParityCells();
|
|
|
next = 0;
|
|
|
+ // check failure state for all the streamers. Bump GS if necessary
|
|
|
+ checkStreamerFailures();
|
|
|
+
|
|
|
+ // if this is the end of the block group, end each internal block
|
|
|
+ if (shouldEndBlockGroup()) {
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ final StripedDataStreamer s = setCurrentStreamer(i);
|
|
|
+ if (s.isHealthy()) {
|
|
|
+ try {
|
|
|
+ endBlock();
|
|
|
+ } catch (IOException ignored) {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
setCurrentStreamer(next);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ void enqueueCurrentPacketFull() throws IOException {
|
|
|
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
|
|
|
+ + " appendChunk={}, {}", currentPacket, src, getStreamer()
|
|
|
+ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
|
|
|
+ getStreamer());
|
|
|
+ enqueueCurrentPacket();
|
|
|
+ adjustChunkBoundary();
|
|
|
+ // no need to end block here
|
|
|
+ }
|
|
|
+
|
|
|
+ private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
|
|
|
+ Set<StripedDataStreamer> healthySet = new HashSet<>();
|
|
|
+ for (StripedDataStreamer streamer : streamers) {
|
|
|
+ if (streamer.isHealthy() &&
|
|
|
+ streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
|
|
|
+ streamer.setExternalError();
|
|
|
+ healthySet.add(streamer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return healthySet;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check and handle data streamer failures. This is called only when we have
|
|
|
+ * written a full stripe (i.e., enqueue all packets for a full stripe), or
|
|
|
+ * when we're closing the outputstream.
|
|
|
+ */
|
|
|
+ private void checkStreamerFailures() throws IOException {
|
|
|
+ Set<StripedDataStreamer> newFailed = checkStreamers();
|
|
|
+ if (newFailed.size() > 0) {
|
|
|
+ // for healthy streamers, wait till all of them have fetched the new block
|
|
|
+ // and flushed out all the enqueued packets.
|
|
|
+ flushAllInternals();
|
|
|
+ }
|
|
|
+ // get all the current failed streamers after the flush
|
|
|
+ newFailed = checkStreamers();
|
|
|
+ while (newFailed.size() > 0) {
|
|
|
+ failedStreamers.addAll(newFailed);
|
|
|
+ coordinator.clearFailureStates();
|
|
|
+
|
|
|
+ // mark all the healthy streamers as external error
|
|
|
+ Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
|
|
|
+
|
|
|
+ // we have newly failed streamers, update block for pipeline
|
|
|
+ final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
|
|
|
+
|
|
|
+ // wait till all the healthy streamers to
|
|
|
+ // 1) get the updated block info
|
|
|
+ // 2) create new block outputstream
|
|
|
+ newFailed = waitCreatingNewStreams(healthySet);
|
|
|
+ if (newFailed.size() + failedStreamers.size() >
|
|
|
+ numAllBlocks - numDataBlocks) {
|
|
|
+ throw new IOException(
|
|
|
+ "Data streamers failed while creating new block streams: "
|
|
|
+ + newFailed + ". There are not enough healthy streamers.");
|
|
|
+ }
|
|
|
+ for (StripedDataStreamer failedStreamer : newFailed) {
|
|
|
+ assert !failedStreamer.isHealthy();
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO we can also succeed if all the failed streamers have not taken
|
|
|
+ // the updated block
|
|
|
+ if (newFailed.size() == 0) {
|
|
|
+ // reset external error state of all the streamers
|
|
|
+ for (StripedDataStreamer streamer : healthySet) {
|
|
|
+ assert streamer.isHealthy();
|
|
|
+ streamer.getErrorState().reset();
|
|
|
+ }
|
|
|
+ updatePipeline(newBG);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
|
|
|
+ Set<StripedDataStreamer> streamers) {
|
|
|
+ for (StripedDataStreamer streamer : streamers) {
|
|
|
+ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
|
|
|
+ if (!streamer.isHealthy() &&
|
|
|
+ coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
|
|
|
+ // this streamer had internal error before getting updated block
|
|
|
+ failed.add(streamer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return coordinator.updateStreamerMap.size() + failed.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Set<StripedDataStreamer> waitCreatingNewStreams(
|
|
|
+ Set<StripedDataStreamer> healthyStreamers) throws IOException {
|
|
|
+ Set<StripedDataStreamer> failed = new HashSet<>();
|
|
|
+ final int expectedNum = healthyStreamers.size();
|
|
|
+ final long socketTimeout = dfsClient.getConf().getSocketTimeout();
|
|
|
+ // the total wait time should be less than the socket timeout, otherwise
|
|
|
+ // a slow streamer may cause other streamers to timeout. here we wait for
|
|
|
+ // half of the socket timeout
|
|
|
+ long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
|
|
|
+ final long waitInterval = 1000;
|
|
|
+ synchronized (coordinator) {
|
|
|
+ while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
|
|
|
+ && remaingTime > 0) {
|
|
|
+ try {
|
|
|
+ long start = Time.monotonicNow();
|
|
|
+ coordinator.wait(waitInterval);
|
|
|
+ remaingTime -= Time.monotonicNow() - start;
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw DFSUtil.toInterruptedIOException("Interrupted when waiting" +
|
|
|
+ " for results of updating striped streamers", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ synchronized (coordinator) {
|
|
|
+ for (StripedDataStreamer streamer : healthyStreamers) {
|
|
|
+ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
|
|
|
+ // close the streamer if it is too slow to create new connection
|
|
|
+ streamer.setStreamerAsClosed();
|
|
|
+ failed.add(streamer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (Map.Entry<StripedDataStreamer, Boolean> entry :
|
|
|
+ coordinator.updateStreamerMap.entrySet()) {
|
|
|
+ if (!entry.getValue()) {
|
|
|
+ failed.add(entry.getKey());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (StripedDataStreamer failedStreamer : failed) {
|
|
|
+ healthyStreamers.remove(failedStreamer);
|
|
|
+ }
|
|
|
+ return failed;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
|
|
|
+ * to healthy streamers.
|
|
|
+ * @param healthyStreamers The healthy data streamers. These streamers join
|
|
|
+ * the failure handling.
|
|
|
+ */
|
|
|
+ private ExtendedBlock updateBlockForPipeline(
|
|
|
+ Set<StripedDataStreamer> healthyStreamers) throws IOException {
|
|
|
+ final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
|
|
|
+ currentBlockGroup, dfsClient.clientName);
|
|
|
+ final long newGS = updated.getBlock().getGenerationStamp();
|
|
|
+ ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
|
|
|
+ newBlock.setGenerationStamp(newGS);
|
|
|
+ final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
+ (LocatedStripedBlock) updated, cellSize, numDataBlocks,
|
|
|
+ numAllBlocks - numDataBlocks);
|
|
|
+
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ StripedDataStreamer si = getStripedDataStreamer(i);
|
|
|
+ if (healthyStreamers.contains(si)) {
|
|
|
+ final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
|
|
|
+ null, null, null, -1, updated.isCorrupt(), null);
|
|
|
+ lb.setBlockToken(updatedBlks[i].getBlockToken());
|
|
|
+ coordinator.getNewBlocks().offer(i, lb);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return newBlock;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updatePipeline(ExtendedBlock newBG) throws IOException {
|
|
|
+ final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
|
|
|
+ final String[] newStorageIDs = new String[numAllBlocks];
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ final StripedDataStreamer streamer = getStripedDataStreamer(i);
|
|
|
+ final DatanodeInfo[] nodes = streamer.getNodes();
|
|
|
+ final String[] storageIDs = streamer.getStorageIDs();
|
|
|
+ if (streamer.isHealthy() && nodes != null && storageIDs != null) {
|
|
|
+ newNodes[i] = nodes[0];
|
|
|
+ newStorageIDs[i] = storageIDs[0];
|
|
|
+ } else {
|
|
|
+ newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
|
|
|
+ newStorageIDs[i] = "";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
|
|
|
+ newBG, newNodes, newStorageIDs);
|
|
|
+ currentBlockGroup = newBG;
|
|
|
+ }
|
|
|
+
|
|
|
private int stripeDataSize() {
|
|
|
return numDataBlocks * cellSize;
|
|
|
}
|
|
@@ -500,28 +772,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Simply add bytesCurBlock together. Note that this result is not accurately
|
|
|
- * the size of the block group.
|
|
|
- */
|
|
|
- private long getCurrentSumBytes() {
|
|
|
- long sum = 0;
|
|
|
- for (int i = 0; i < numDataBlocks; i++) {
|
|
|
- sum += streamers.get(i).getBytesCurBlock();
|
|
|
- }
|
|
|
- return sum;
|
|
|
- }
|
|
|
-
|
|
|
private boolean generateParityCellsForLastStripe() {
|
|
|
- final long currentBlockGroupBytes = getCurrentSumBytes();
|
|
|
- if (currentBlockGroupBytes % stripeDataSize() == 0) {
|
|
|
+ final long currentBlockGroupBytes = currentBlockGroup == null ?
|
|
|
+ 0 : currentBlockGroup.getNumBytes();
|
|
|
+ final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
|
|
|
+ if (lastStripeSize == 0) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- final int firstCellSize =
|
|
|
- (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
|
|
|
- final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
|
|
|
- firstCellSize : cellSize;
|
|
|
+ final long parityCellSize = lastStripeSize < cellSize?
|
|
|
+ lastStripeSize : cellSize;
|
|
|
final ByteBuffer[] buffers = cellBuffers.getBuffers();
|
|
|
|
|
|
for (int i = 0; i < numAllBlocks; i++) {
|
|
@@ -550,13 +810,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
cellBuffers.clear();
|
|
|
}
|
|
|
|
|
|
- void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
|
|
|
- ) throws IOException {
|
|
|
+ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
|
|
|
+ throws IOException {
|
|
|
final StripedDataStreamer current = setCurrentStreamer(index);
|
|
|
final int len = buffer.limit();
|
|
|
|
|
|
final long oldBytes = current.getBytesCurBlock();
|
|
|
- if (!current.isFailed()) {
|
|
|
+ if (current.isHealthy()) {
|
|
|
try {
|
|
|
DataChecksum sum = getDataChecksum();
|
|
|
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
|
|
@@ -570,18 +830,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (current.isFailed()) {
|
|
|
- final long newBytes = oldBytes + len;
|
|
|
- current.setBytesCurBlock(newBytes);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
void setClosed() {
|
|
|
super.setClosed();
|
|
|
for (int i = 0; i < numAllBlocks; i++) {
|
|
|
- streamers.get(i).release();
|
|
|
+ getStripedDataStreamer(i).release();
|
|
|
}
|
|
|
cellBuffers.release();
|
|
|
}
|
|
@@ -607,37 +862,40 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
|
|
|
try {
|
|
|
// flush from all upper layers
|
|
|
- try {
|
|
|
- flushBuffer();
|
|
|
- } catch(Exception e) {
|
|
|
- handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e);
|
|
|
- }
|
|
|
+ flushBuffer();
|
|
|
// if the last stripe is incomplete, generate and write parity cells
|
|
|
if (generateParityCellsForLastStripe()) {
|
|
|
writeParityCells();
|
|
|
}
|
|
|
enqueueAllCurrentPackets();
|
|
|
|
|
|
+ // flush all the data packets
|
|
|
+ flushAllInternals();
|
|
|
+ // check failures
|
|
|
+ checkStreamerFailures();
|
|
|
+
|
|
|
for (int i = 0; i < numAllBlocks; i++) {
|
|
|
final StripedDataStreamer s = setCurrentStreamer(i);
|
|
|
- if (!s.isFailed()) {
|
|
|
+ if (s.isHealthy()) {
|
|
|
try {
|
|
|
if (s.getBytesCurBlock() > 0) {
|
|
|
setCurrentPacketToEmpty();
|
|
|
}
|
|
|
- // flush all data to Datanode
|
|
|
+ // flush the last "close" packet to Datanode
|
|
|
flushInternal();
|
|
|
} catch(Exception e) {
|
|
|
- handleStreamerFailure("flushInternal " + s, e, false);
|
|
|
+ // TODO for both close and endBlock, we currently do not handle
|
|
|
+ // failures when sending the last packet. We actually do not need to
|
|
|
+ // bump GS for this kind of failure. Thus counting the total number
|
|
|
+ // of failures may be good enough.
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
closeThreads(false);
|
|
|
- final ExtendedBlock lastBlock = coordinator.getBlockGroup();
|
|
|
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
|
|
|
try {
|
|
|
- completeFile(lastBlock);
|
|
|
+ completeFile(currentBlockGroup);
|
|
|
} finally {
|
|
|
scope.close();
|
|
|
}
|
|
@@ -652,14 +910,45 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
int idx = streamers.indexOf(getCurrentStreamer());
|
|
|
for(int i = 0; i < streamers.size(); i++) {
|
|
|
final StripedDataStreamer si = setCurrentStreamer(i);
|
|
|
- if (!si.isFailed() && currentPacket != null) {
|
|
|
+ if (si.isHealthy() && currentPacket != null) {
|
|
|
try {
|
|
|
enqueueCurrentPacket();
|
|
|
} catch (IOException e) {
|
|
|
- handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false);
|
|
|
+ handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
setCurrentStreamer(idx);
|
|
|
}
|
|
|
+
|
|
|
+ void flushAllInternals() throws IOException {
|
|
|
+ int current = getCurrentIndex();
|
|
|
+
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ final StripedDataStreamer s = setCurrentStreamer(i);
|
|
|
+ if (s.isHealthy()) {
|
|
|
+ try {
|
|
|
+ // flush all data to Datanode
|
|
|
+ flushInternal();
|
|
|
+ } catch(Exception e) {
|
|
|
+ handleStreamerFailure("flushInternal " + s, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ setCurrentStreamer(current);
|
|
|
+ }
|
|
|
+
|
|
|
+ static void sleep(long ms, String op) throws InterruptedIOException {
|
|
|
+ try {
|
|
|
+ Thread.sleep(ms);
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
+ throw DFSUtil.toInterruptedIOException(
|
|
|
+ "Sleep interrupted during " + op, ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ ExtendedBlock getBlock() {
|
|
|
+ return currentBlockGroup;
|
|
|
+ }
|
|
|
}
|