|
@@ -475,6 +475,7 @@ class DataStreamer extends Daemon {
|
|
|
private DataOutputStream blockStream;
|
|
|
private DataInputStream blockReplyStream;
|
|
|
private ResponseProcessor response = null;
|
|
|
+ private final Object nodesLock = new Object();
|
|
|
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
|
|
private volatile StorageType[] storageTypes = null;
|
|
|
private volatile String[] storageIDs = null;
|
|
@@ -613,7 +614,9 @@ class DataStreamer extends Daemon {
|
|
|
|
|
|
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
|
|
|
String[] storageIDs) {
|
|
|
- this.nodes = nodes;
|
|
|
+ synchronized (nodesLock) {
|
|
|
+ this.nodes = nodes;
|
|
|
+ }
|
|
|
this.storageTypes = storageTypes;
|
|
|
this.storageIDs = storageIDs;
|
|
|
}
|
|
@@ -910,7 +913,10 @@ class DataStreamer extends Daemon {
|
|
|
try (TraceScope ignored = dfsClient.getTracer().
|
|
|
newScope("waitForAckedSeqno")) {
|
|
|
LOG.debug("{} waiting for ack for: {}", this, seqno);
|
|
|
- int dnodes = nodes != null ? nodes.length : 3;
|
|
|
+ int dnodes;
|
|
|
+ synchronized (nodesLock) {
|
|
|
+ dnodes = nodes != null ? nodes.length : 3;
|
|
|
+ }
|
|
|
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
|
|
|
long begin = Time.monotonicNow();
|
|
|
try {
|