FSDataInputStream extends DataInputStream
The core behavior of FSDataInputStream
is defined by java.io.DataInputStream
,
with extensions that add key assumptions to the system.
Seekable.seek()
to offsets within the array of bytes, with future
reads starting at this offset.Files are opened via FileSystem.open(p)
, which, if successful, returns:
result = FSDataInputStream(0, FS.Files[p])
The stream can be modeled as:
FSDIS = (pos, data[], isOpen)
with access functions:
pos(FSDIS)
data(FSDIS)
isOpen(FSDIS)
Implicit invariant: the size of the data stream equals the size of the
file as returned by FileSystem.getFileStatus(Path p)
forall p in dom(FS.Files[p]) :
len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
The semantics of java.io.Closeable
are defined in the interface definition
within the JRE.
The operation MUST be idempotent; the following sequence is not an error:
FSDIS.close();
FSDIS.close();
Implementations SHOULD be robust against failure. If an inner stream
is closed, it should be checked for being null
first.
Implementations SHOULD NOT raise IOException
exceptions (or any other exception)
during this operation. Client applications often ignore these, or may fail
unexpectedly.
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
Return the current position. The outcome when a stream is closed is undefined.
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
Return the data at the current position.
read()
may take to complete.isOpen(FSDIS)
if ( pos < len(data) ):
FSDIS' = (pos + 1, data, True)
result = data[pos]
else
result = -1
InputStream.read(buffer[], offset, length)
Read length
bytes of data into the destination buffer, starting at offset
offset
isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
Exceptions that may be raised on precondition failure are
InvalidArgumentException
ArrayIndexOutOfBoundsException
RuntimeException
if length == 0 :
result = 0
elseif pos > len(data):
result -1
else
let l = min(length, len(data)-length) :
buffer' = buffer where forall i in [0..l-1]:
buffer'[o+i] = data[pos+i]
FSDIS' = (pos+l, data, true)
result = l
Seekable.seek(s)
Not all subclasses implement the Seek operation:
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
If the operation is supported, the file SHOULD be open:
isOpen(FSDIS)
Some filesystems do not perform this check, relying on the read()
contract
to reject reads on a closed stream (e.g. RawLocalFileSystem
).
A seek(0)
MUST always succeed, as the seek position must be
positive and less than the length of the Stream's:
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
Some FileSystems do not raise an exception if this condition is not met. They
instead return -1 on any read()
operation where, at the time of the read,
len(data(FSDIS)) < pos(FSDIS)
.
FSDIS' = (s, data, True)
There is an implicit invariant: a seek to the current position is a no-op
seek(getPos())
Implementations may recognise this operation and bypass all other precondition checks, leaving the input stream unchanged.
Seekable.seekToNewSource(offset)
This operation instructs the source to retrieve data[]
from a different
source from the current source. This is only relevant if the filesystem supports
multiple replicas of a file and there is more than 1 replica of the
data at offset offset
.
Not all subclasses implement the operation operation, and instead
either raise an exception or return False
.
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
Examples: CompressionInputStream
, HttpFSFileSystem
If supported, the file must be open:
isOpen(FSDIS)
The majority of subclasses that do not implement this operation simply fail.
if not supported(FSDIS, Seekable.seekToNewSource(s)):
result = False
Examples: RawLocalFileSystem
, HttpFSFileSystem
If the operation is supported and there is a new location for the data:
FSDIS' = (pos, data', true)
result = True
The new data is the original data (or an updated version of it, as covered
in the Consistency section below), but the block containing the data at offset
sourced from a different replica.
If there is no other copy, FSDIS
is not updated; the response indicates this:
result = False
Outside of test methods, the primary use of this method is in the {{FSInputChecker}} class, which can react to a checksum error in a read by attempting to source the data elsewhere. It a new source can be found it attempts to reread and recheck that portion of the file.
PositionedReadable
The PositionedReadable
operations provide the ability to
read data into a buffer from a specific position in
the data stream.
Although the interface declares that it must be thread safe, some of the implementations do not follow this guarantee.
Not all FSDataInputStream
implementations support these operations. Those that do
not implement Seekable.seek()
do not implement the PositionedReadable
interface.
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
This could be considered obvious: if a stream is not Seekable, a client
cannot seek to a location. It is also a side effect of the
base class implementation, which uses Seekable.seek()
.
Implicit invariant: for all PositionedReadable
operations, the value
of pos
is unchanged at the end of the operation
pos(FSDIS') == pos(FSDIS)
There are no guarantees that this holds during the operation.
For any operations that fail, the contents of the destination
buffer
are undefined. Implementations may overwrite part
or all of the buffer before reporting a failure.
int PositionedReadable.read(position, buffer, offset, length)
position > 0 else raise [IllegalArgumentException, RuntimeException]
len(buffer) + offset < len(data) else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0
The amount of data read is the less of the length or the amount of data available from the specified position:
let available = min(length, len(data)-position)
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
void PositionedReadable.readFully(position, buffer, offset, length)
position > 0 else raise [IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
(position + length) <= len(data) else raise [EOFException, IOException]
len(buffer) + offset < len(data)
The amount of data read is the less of the length or the amount of data available from the specified position:
let available = min(length, len(data)-position)
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
The semantics of this are exactly equivalent to
readFully(position, buffer, 0, len(buffer))
FileSystem.open(p)
are expected to receive access to the data of FS.Files[p]
at the time of opening.At time t0
FSDIS0 = FS'read(p) = (0, data0[])
At time t1
FS' = FS' where FS'.Files[p] = data1
From time t >= t1
, the value of FSDIS0
is undefined.
It may be unchanged
FSDIS0.data == data0
forall l in len(FSDIS0.data):
FSDIS0.read() == data0[l]
It may pick up the new data
FSDIS0.data == data1
forall l in len(FSDIS0.data):
FSDIS0.read() == data1[l]
It may be inconsistent, such that a read of an offset returns data from either of the datasets
forall l in len(FSDIS0.data):
(FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
That is, every value read may be from the original or updated file.
It may also be inconsistent on repeated reads of same offset, that is
at time t2 > t1
:
r2 = FSDIS0.read(l)
While at time t3 > t2
:
r3 = FSDIS0.read(l)
It may be that r3 != r2
. (That is, some of the data my be cached or replicated,
and on a subsequent read, a different version of the file's contents are returned).
Similarly, if the data at the path p
, is deleted, this change MAY or MAY
not be visible during read operations performed on FSDIS0
.