402 lines
12 KiB
Java
402 lines
12 KiB
Java
/**
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package org.apache.hadoop.fs.swift.snative;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.swift.exceptions.SwiftConnectionClosedException;
|
|
import org.apache.hadoop.fs.swift.exceptions.SwiftException;
|
|
import org.apache.hadoop.fs.swift.http.HttpBodyContent;
|
|
import org.apache.hadoop.fs.swift.http.HttpInputStreamWithRelease;
|
|
import org.apache.hadoop.fs.swift.util.SwiftUtils;
|
|
|
|
import java.io.EOFException;
|
|
import java.io.IOException;
|
|
|
|
/**
|
|
* The input stream from remote Swift blobs.
|
|
* The class attempts to be buffer aware, and react to a forward seek operation
|
|
* by trying to scan ahead through the current block of data to find it.
|
|
* This accelerates some operations that do a lot of seek()/read() actions,
|
|
* including work (such as in the MR engine) that do a seek() immediately after
|
|
* an open().
|
|
*/
|
|
class SwiftNativeInputStream extends FSInputStream {
|
|
|
|
private static final Log LOG = LogFactory.getLog(SwiftNativeInputStream.class);
|
|
|
|
/**
|
|
* range requested off the server: {@value}
|
|
*/
|
|
private final long bufferSize;
|
|
|
|
/**
|
|
* File nativeStore instance
|
|
*/
|
|
private final SwiftNativeFileSystemStore nativeStore;
|
|
|
|
/**
|
|
* Hadoop statistics. Used to get info about number of reads, writes, etc.
|
|
*/
|
|
private final FileSystem.Statistics statistics;
|
|
|
|
/**
|
|
* Data input stream
|
|
*/
|
|
private HttpInputStreamWithRelease httpStream;
|
|
|
|
/**
|
|
* File path
|
|
*/
|
|
private final Path path;
|
|
|
|
/**
|
|
* Current position
|
|
*/
|
|
private long pos = 0;
|
|
|
|
/**
|
|
* Length of the file picked up at start time
|
|
*/
|
|
private long contentLength = -1;
|
|
|
|
/**
|
|
* Why the stream is closed
|
|
*/
|
|
private String reasonClosed = "unopened";
|
|
|
|
/**
|
|
* Offset in the range requested last
|
|
*/
|
|
private long rangeOffset = 0;
|
|
|
|
private long nextReadPosition = 0;
|
|
|
|
public SwiftNativeInputStream(SwiftNativeFileSystemStore storeNative,
|
|
FileSystem.Statistics statistics, Path path, long bufferSize)
|
|
throws IOException {
|
|
this.nativeStore = storeNative;
|
|
this.statistics = statistics;
|
|
this.path = path;
|
|
if (bufferSize <= 0) {
|
|
throw new IllegalArgumentException("Invalid buffer size");
|
|
}
|
|
this.bufferSize = bufferSize;
|
|
//initial buffer fill
|
|
this.httpStream = storeNative.getObject(path).getInputStream();
|
|
//fillBuffer(0);
|
|
}
|
|
|
|
/**
|
|
* Move to a new position within the file relative to where the pointer is now.
|
|
* Always call from a synchronized clause
|
|
* @param offset offset
|
|
*/
|
|
private synchronized void incPos(int offset) {
|
|
pos += offset;
|
|
rangeOffset += offset;
|
|
SwiftUtils.trace(LOG, "Inc: pos=%d bufferOffset=%d", pos, rangeOffset);
|
|
}
|
|
|
|
/**
|
|
* Update the start of the buffer; always call from a sync'd clause
|
|
* @param seekPos position sought.
|
|
* @param contentLength content length provided by response (may be -1)
|
|
*/
|
|
private synchronized void updateStartOfBufferPosition(long seekPos,
|
|
long contentLength) {
|
|
//reset the seek pointer
|
|
pos = seekPos;
|
|
//and put the buffer offset to 0
|
|
rangeOffset = 0;
|
|
this.contentLength = contentLength;
|
|
SwiftUtils.trace(LOG, "Move: pos=%d; bufferOffset=%d; contentLength=%d",
|
|
pos,
|
|
rangeOffset,
|
|
contentLength);
|
|
}
|
|
|
|
@Override
|
|
public synchronized int read() throws IOException {
|
|
verifyOpen();
|
|
int result = -1;
|
|
try {
|
|
seekStream();
|
|
result = httpStream.read();
|
|
} catch (IOException e) {
|
|
String msg = "IOException while reading " + path
|
|
+ ": ' +e, attempting to reopen.";
|
|
LOG.debug(msg, e);
|
|
if (reopenBuffer()) {
|
|
result = httpStream.read();
|
|
}
|
|
}
|
|
if (result != -1) {
|
|
incPos(1);
|
|
}
|
|
if (statistics != null && result != -1) {
|
|
statistics.incrementBytesRead(1);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
@Override
|
|
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
|
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
|
|
SwiftUtils.validateReadArgs(b, off, len);
|
|
int result = -1;
|
|
try {
|
|
verifyOpen();
|
|
result = httpStream.read(b, off, len);
|
|
} catch (IOException e) {
|
|
//other IO problems are viewed as transient and re-attempted
|
|
LOG.info("Received IOException while reading '" + path +
|
|
"', attempting to reopen: " + e);
|
|
LOG.debug("IOE on read()" + e, e);
|
|
if (reopenBuffer()) {
|
|
result = httpStream.read(b, off, len);
|
|
}
|
|
}
|
|
if (result > 0) {
|
|
incPos(result);
|
|
if (statistics != null) {
|
|
statistics.incrementBytesRead(result);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Re-open the buffer
|
|
* @return true iff more data could be added to the buffer
|
|
* @throws IOException if not
|
|
*/
|
|
private boolean reopenBuffer() throws IOException {
|
|
innerClose("reopening buffer to trigger refresh");
|
|
boolean success = false;
|
|
try {
|
|
fillBuffer(pos);
|
|
success = true;
|
|
} catch (EOFException eof) {
|
|
//the EOF has been reached
|
|
this.reasonClosed = "End of file";
|
|
}
|
|
return success;
|
|
}
|
|
|
|
/**
|
|
* close the stream. After this the stream is not usable -unless and until
|
|
* it is re-opened (which can happen on some of the buffer ops)
|
|
* This method is thread-safe and idempotent.
|
|
*
|
|
* @throws IOException on IO problems.
|
|
*/
|
|
@Override
|
|
public synchronized void close() throws IOException {
|
|
innerClose("closed");
|
|
}
|
|
|
|
private void innerClose(String reason) throws IOException {
|
|
try {
|
|
if (httpStream != null) {
|
|
reasonClosed = reason;
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Closing HTTP input stream : " + reason);
|
|
}
|
|
httpStream.close();
|
|
}
|
|
} finally {
|
|
httpStream = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Assume that the connection is not closed: throws an exception if it is
|
|
* @throws SwiftConnectionClosedException
|
|
*/
|
|
private void verifyOpen() throws SwiftConnectionClosedException {
|
|
if (httpStream == null) {
|
|
throw new SwiftConnectionClosedException(reasonClosed);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public synchronized String toString() {
|
|
return "SwiftNativeInputStream" +
|
|
" position=" + pos
|
|
+ " buffer size = " + bufferSize
|
|
+ " "
|
|
+ (httpStream != null ? httpStream.toString()
|
|
: (" no input stream: " + reasonClosed));
|
|
}
|
|
|
|
/**
|
|
* Treats any finalize() call without the input stream being closed
|
|
* as a serious problem, logging at error level
|
|
* @throws Throwable n/a
|
|
*/
|
|
@Override
|
|
protected void finalize() throws Throwable {
|
|
if (httpStream != null) {
|
|
LOG.error(
|
|
"Input stream is leaking handles by not being closed() properly: "
|
|
+ httpStream.toString());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read through the specified number of bytes.
|
|
* The implementation iterates a byte a time, which may seem inefficient
|
|
* compared to the read(bytes[]) method offered by input streams.
|
|
* However, if you look at the code that implements that method, it comes
|
|
* down to read() one char at a time -only here the return value is discarded.
|
|
*
|
|
*<p/>
|
|
* This is a no-op if the stream is closed
|
|
* @param bytes number of bytes to read.
|
|
* @throws IOException IO problems
|
|
* @throws SwiftException if a read returned -1.
|
|
*/
|
|
private int chompBytes(long bytes) throws IOException {
|
|
int count = 0;
|
|
if (httpStream != null) {
|
|
int result;
|
|
for (long i = 0; i < bytes; i++) {
|
|
result = httpStream.read();
|
|
if (result < 0) {
|
|
throw new SwiftException("Received error code while chomping input");
|
|
}
|
|
count ++;
|
|
incPos(1);
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
/**
|
|
* Seek to an offset. If the data is already in the buffer, move to it
|
|
* @param targetPos target position
|
|
* @throws IOException on any problem
|
|
*/
|
|
@Override
|
|
public synchronized void seek(long targetPos) throws IOException {
|
|
if (targetPos < 0) {
|
|
throw new IOException("Negative Seek offset not supported");
|
|
}
|
|
nextReadPosition = targetPos;
|
|
}
|
|
|
|
public synchronized void realSeek(long targetPos) throws IOException {
|
|
if (targetPos < 0) {
|
|
throw new IOException("Negative Seek offset not supported");
|
|
}
|
|
//there's some special handling of near-local data
|
|
//as the seek can be omitted if it is in/adjacent
|
|
long offset = targetPos - pos;
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
|
|
+ "; offset="+offset);
|
|
}
|
|
if (offset == 0) {
|
|
LOG.debug("seek is no-op");
|
|
return;
|
|
}
|
|
|
|
if (offset < 0) {
|
|
LOG.debug("seek is backwards");
|
|
} else if ((rangeOffset + offset < bufferSize)) {
|
|
//if the seek is in range of that requested, scan forwards
|
|
//instead of closing and re-opening a new HTTP connection
|
|
SwiftUtils.debug(LOG,
|
|
"seek is within current stream"
|
|
+ "; pos= %d ; targetPos=%d; "
|
|
+ "offset= %d ; bufferOffset=%d",
|
|
pos, targetPos, offset, rangeOffset);
|
|
try {
|
|
LOG.debug("chomping ");
|
|
chompBytes(offset);
|
|
} catch (IOException e) {
|
|
//this is assumed to be recoverable with a seek -or more likely to fail
|
|
LOG.debug("while chomping ",e);
|
|
}
|
|
if (targetPos - pos == 0) {
|
|
LOG.trace("chomping successful");
|
|
return;
|
|
}
|
|
LOG.trace("chomping failed");
|
|
} else {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Seek is beyond buffer size of " + bufferSize);
|
|
}
|
|
}
|
|
|
|
innerClose("seeking to " + targetPos);
|
|
fillBuffer(targetPos);
|
|
}
|
|
|
|
/**
|
|
* Lazy seek.
|
|
* @throws IOException
|
|
*/
|
|
private void seekStream() throws IOException {
|
|
if (httpStream != null && nextReadPosition == pos) {
|
|
// already at specified position
|
|
return;
|
|
}
|
|
realSeek(nextReadPosition);
|
|
}
|
|
|
|
/**
|
|
* Fill the buffer from the target position
|
|
* If the target position == current position, the
|
|
* read still goes ahead; this is a way of handling partial read failures
|
|
* @param targetPos target position
|
|
* @throws IOException IO problems on the read
|
|
*/
|
|
private void fillBuffer(long targetPos) throws IOException {
|
|
long length = targetPos + bufferSize;
|
|
SwiftUtils.debug(LOG, "Fetching %d bytes starting at %d", length, targetPos);
|
|
HttpBodyContent blob = nativeStore.getObject(path, targetPos, length);
|
|
httpStream = blob.getInputStream();
|
|
updateStartOfBufferPosition(targetPos, blob.getContentLength());
|
|
}
|
|
|
|
@Override
|
|
public synchronized long getPos() throws IOException {
|
|
return pos;
|
|
}
|
|
|
|
/**
|
|
* This FS doesn't explicitly support multiple data sources, so
|
|
* return false here.
|
|
* @param targetPos the desired target position
|
|
* @return true if a new source of the data has been set up
|
|
* as the source of future reads
|
|
* @throws IOException IO problems
|
|
*/
|
|
@Override
|
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
return false;
|
|
}
|
|
}
|