|
@@ -276,11 +276,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
return getCurrentStreamer().getIndex();
|
|
|
}
|
|
|
|
|
|
- StripedDataStreamer getCurrentStreamer() {
|
|
|
+ private synchronized StripedDataStreamer getCurrentStreamer() {
|
|
|
return (StripedDataStreamer)streamer;
|
|
|
}
|
|
|
|
|
|
- private StripedDataStreamer setCurrentStreamer(int i) {
|
|
|
+ private synchronized StripedDataStreamer setCurrentStreamer(int i) {
|
|
|
streamer = streamers.get(i);
|
|
|
return getCurrentStreamer();
|
|
|
}
|
|
@@ -344,8 +344,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
int ckOff = 0;
|
|
|
while (byteBuffer.remaining() > 0) {
|
|
|
DFSPacket p = createPacket(packetSize, chunksPerPacket,
|
|
|
- streamer.getBytesCurBlock(),
|
|
|
- streamer.getAndIncCurrentSeqno(), false);
|
|
|
+ getCurrentStreamer().getBytesCurBlock(),
|
|
|
+ getCurrentStreamer().getAndIncCurrentSeqno(), false);
|
|
|
int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
|
|
|
int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
|
|
|
maxBytesToPacket: byteBuffer.remaining();
|
|
@@ -353,7 +353,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
p.writeChecksum(checksumBuf, ckOff, ckLen);
|
|
|
ckOff += ckLen;
|
|
|
p.writeData(byteBuffer, toWrite);
|
|
|
- streamer.incBytesCurBlock(toWrite);
|
|
|
+ getCurrentStreamer().incBytesCurBlock(toWrite);
|
|
|
packets.add(p);
|
|
|
}
|
|
|
return packets;
|
|
@@ -529,7 +529,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
if (!current.isFailed()) {
|
|
|
try {
|
|
|
for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
|
|
|
- streamer.waitAndQueuePacket(p);
|
|
|
+ getCurrentStreamer().waitAndQueuePacket(p);
|
|
|
}
|
|
|
endBlock();
|
|
|
} catch(Exception e) {
|