Implemented data locality for Swift
Change-Id: I13b15b9fabc6007d4218dd52cc123fa10c46dd45
This commit is contained in:
parent
f29a070505
commit
ad6bd75475
|
@ -96,7 +96,7 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-core</artifactId>
|
||||
<version>1.1.2</version>
|
||||
<version>1.2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -145,6 +145,8 @@ public final class RestClientBindings {
|
|||
props.setProperty(SWIFT_SERVICE_PROPERTY, service);
|
||||
props.setProperty(SWIFT_CONTAINER_PROPERTY, container);
|
||||
copy(conf, prefix + DOT_AUTH_URL, props, SWIFT_AUTH_PROPERTY, true);
|
||||
copy(conf, prefix + DOT_AUTH_ENDPOINT_PREFIX, props,
|
||||
SWIFT_AUTH_ENDPOINT_PREFIX, true);
|
||||
copy(conf, prefix + DOT_USERNAME, props, SWIFT_USERNAME_PROPERTY, true);
|
||||
copy(conf, prefix + DOT_APIKEY, props, SWIFT_APIKEY_PROPERTY, false);
|
||||
copy(conf, prefix + DOT_PASSWORD, props, SWIFT_PASSWORD_PROPERTY,
|
||||
|
|
|
@ -78,12 +78,6 @@ public class SwiftProtocolConstants {
|
|||
*/
|
||||
public static final String SERVICE_CATALOG_OBJECT_STORE = "object-store";
|
||||
|
||||
/**
|
||||
* entry in the swift catalog defining the prefix used to talk to objects
|
||||
* {@value}
|
||||
*/
|
||||
public static final String SWIFT_OBJECT_AUTH_ENDPOINT =
|
||||
"/object_endpoint/";
|
||||
/**
|
||||
* Swift-specific header: object manifest used in the final upload
|
||||
* of a multipart operation: {@value}
|
||||
|
@ -211,6 +205,7 @@ public class SwiftProtocolConstants {
|
|||
public static final String DOT_CONTAINER = ".CONTAINER-NAME";
|
||||
|
||||
public static final String DOT_AUTH_URL = ".auth.url";
|
||||
public static final String DOT_AUTH_ENDPOINT_PREFIX = ".auth.endpoint.prefix";
|
||||
public static final String DOT_TENANT = ".tenant";
|
||||
public static final String DOT_USERNAME = ".username";
|
||||
public static final String DOT_PASSWORD = ".password";
|
||||
|
@ -232,6 +227,8 @@ public class SwiftProtocolConstants {
|
|||
public static final String SWIFT_CONTAINER_PROPERTY = FS_SWIFT + DOT_CONTAINER;
|
||||
|
||||
public static final String SWIFT_AUTH_PROPERTY = FS_SWIFT + DOT_AUTH_URL;
|
||||
public static final String SWIFT_AUTH_ENDPOINT_PREFIX =
|
||||
FS_SWIFT + DOT_AUTH_ENDPOINT_PREFIX;
|
||||
public static final String SWIFT_TENANT_PROPERTY = FS_SWIFT + DOT_TENANT;
|
||||
public static final String SWIFT_USERNAME_PROPERTY = FS_SWIFT + DOT_USERNAME;
|
||||
public static final String SWIFT_PASSWORD_PROPERTY = FS_SWIFT + DOT_PASSWORD;
|
||||
|
|
|
@ -172,6 +172,12 @@ public final class SwiftRestClient {
|
|||
*/
|
||||
private URI objectLocationURI;
|
||||
|
||||
/**
|
||||
* Entry in the swift catalog defining the prefix used to talk to objects
|
||||
*/
|
||||
private String authEndpointPrefix;
|
||||
|
||||
|
||||
private final URI filesystemURI;
|
||||
|
||||
/**
|
||||
|
@ -275,6 +281,14 @@ public final class SwiftRestClient {
|
|||
}
|
||||
}
|
||||
|
||||
public String getAuthEndpointPrefix() {
|
||||
return authEndpointPrefix;
|
||||
}
|
||||
|
||||
public void setAuthEndpointPrefix(String authEndpointPrefix) {
|
||||
this.authEndpointPrefix = authEndpointPrefix;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Base class for all Swift REST operations
|
||||
|
@ -464,6 +478,7 @@ public final class SwiftRestClient {
|
|||
container = props.getProperty(SWIFT_CONTAINER_PROPERTY);
|
||||
String isPubProp = props.getProperty(SWIFT_PUBLIC_PROPERTY, "false");
|
||||
usePublicURL = "true".equals(isPubProp);
|
||||
authEndpointPrefix = getOption(props, SWIFT_AUTH_ENDPOINT_PREFIX);
|
||||
|
||||
if (apiKey == null && password == null) {
|
||||
throw new SwiftConfigurationException(
|
||||
|
@ -591,7 +606,7 @@ public final class SwiftRestClient {
|
|||
/**
|
||||
* Make an HTTP GET request to Swift to get a range of data in the object.
|
||||
*
|
||||
* @param path path to object
|
||||
* @param url url to object
|
||||
* @param offset offset from file beginning
|
||||
* @param length file length
|
||||
* @return The input stream -which must be closed afterwards.
|
||||
|
@ -599,18 +614,18 @@ public final class SwiftRestClient {
|
|||
* @throws SwiftException swift specific error
|
||||
* @throws FileNotFoundException path is not there
|
||||
*/
|
||||
public HttpBodyContent getData(SwiftObjectPath path,
|
||||
public HttpBodyContent getData(URI url,
|
||||
long offset,
|
||||
long length) throws IOException {
|
||||
if (offset < 0) {
|
||||
throw new SwiftException("Invalid offset: " + offset
|
||||
+ " in getDataAsInputStream( path=" + path
|
||||
+ " in getDataAsInputStream( url=" + url
|
||||
+ ", offset=" + offset
|
||||
+ ", length =" + length + ")");
|
||||
}
|
||||
if (length <= 0) {
|
||||
throw new SwiftException("Invalid length: " + length
|
||||
+ " in getDataAsInputStream( path="+ path
|
||||
+ " in getDataAsInputStream( url="+ url
|
||||
+ ", offset=" + offset
|
||||
+ ", length ="+ length + ")");
|
||||
}
|
||||
|
@ -622,11 +637,29 @@ public final class SwiftRestClient {
|
|||
LOG.debug("getData:" + range);
|
||||
}
|
||||
|
||||
return getData(path,
|
||||
return getData(url,
|
||||
new Header(HEADER_RANGE, range),
|
||||
SwiftRestClient.NEWEST);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make an HTTP GET request to Swift to get a range of data in the object.
|
||||
*
|
||||
* @param path path to object
|
||||
* @param offset offset from file beginning
|
||||
* @param length file length
|
||||
* @return The input stream -which must be closed afterwards.
|
||||
* @throws IOException Problems
|
||||
* @throws SwiftException swift specific error
|
||||
* @throws FileNotFoundException path is not there
|
||||
*/
|
||||
public HttpBodyContent getData(SwiftObjectPath path,
|
||||
long offset,
|
||||
long length) throws IOException {
|
||||
return getData(pathToURI(path), offset, length);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns object length
|
||||
*
|
||||
|
@ -675,11 +708,27 @@ public final class SwiftRestClient {
|
|||
public HttpBodyContent getData(SwiftObjectPath path,
|
||||
final Header... requestHeaders)
|
||||
throws IOException {
|
||||
preRemoteCommand("getData");
|
||||
return doGet(pathToURI(path),
|
||||
requestHeaders);
|
||||
return getData(pathToURI(path), requestHeaders);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the path contents as an input stream.
|
||||
* <b>Warning:</b> this input stream must be closed to avoid
|
||||
* keeping Http connections open.
|
||||
*
|
||||
* @param url path to file
|
||||
* @param requestHeaders http headers
|
||||
* @return byte[] file data or null if the object was not found
|
||||
* @throws IOException on IO Faults
|
||||
* @throws FileNotFoundException if there is nothing at the path
|
||||
*/
|
||||
public HttpBodyContent getData(URI url,
|
||||
final Header... requestHeaders)
|
||||
throws IOException {
|
||||
preRemoteCommand("getData");
|
||||
return doGet(url, requestHeaders);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns object location as byte[]
|
||||
*
|
||||
|
@ -1193,8 +1242,7 @@ public final class SwiftRestClient {
|
|||
|
||||
|
||||
accessToken = access.getToken();
|
||||
String path = SWIFT_OBJECT_AUTH_ENDPOINT
|
||||
+ swiftEndpoint.getTenantId();
|
||||
String path = getAuthEndpointPrefix() + accessToken.getTenant().getId();
|
||||
String host = endpointURI.getHost();
|
||||
try {
|
||||
objectLocation = new URI(endpointURI.getScheme(),
|
||||
|
|
|
@ -35,22 +35,19 @@ import org.apache.hadoop.fs.swift.util.DurationStats;
|
|||
import org.apache.hadoop.fs.swift.util.JSONUtil;
|
||||
import org.apache.hadoop.fs.swift.util.SwiftObjectPath;
|
||||
import org.apache.hadoop.fs.swift.util.SwiftUtils;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.apache.hadoop.net.ScriptBasedMapping;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.codehaus.jackson.map.type.CollectionType;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.*;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -65,6 +62,7 @@ public class SwiftNativeFileSystemStore {
|
|||
LogFactory.getLog(SwiftNativeFileSystemStore.class);
|
||||
private URI uri;
|
||||
private SwiftRestClient swiftRestClient;
|
||||
private DNSToSwitchMapping dnsToSwitchMapping;
|
||||
|
||||
/**
|
||||
* Initalize the filesystem store -this creates the REST client binding.
|
||||
|
@ -76,6 +74,10 @@ public class SwiftNativeFileSystemStore {
|
|||
*/
|
||||
public void initialize(URI fsURI, Configuration configuration) throws IOException {
|
||||
this.uri = fsURI;
|
||||
dnsToSwitchMapping = ReflectionUtils.newInstance(
|
||||
configuration.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
|
||||
DNSToSwitchMapping.class), configuration);
|
||||
|
||||
this.swiftRestClient = SwiftRestClient.getInstance(fsURI, configuration);
|
||||
}
|
||||
|
||||
|
@ -271,8 +273,85 @@ public class SwiftNativeFileSystemStore {
|
|||
* @throws FileNotFoundException path doesn't resolve to an object
|
||||
*/
|
||||
public HttpBodyContent getObject(Path path) throws IOException {
|
||||
List<String> locations = getDataLocalEndpoints(path);
|
||||
|
||||
for (String url : locations) {
|
||||
try {
|
||||
return swiftRestClient.getData(new URI(url),
|
||||
SwiftRestClient.NEWEST);
|
||||
} catch (Exception e) {
|
||||
// Ignore
|
||||
// It is possible that endpoint doesn't contains needed data.
|
||||
}
|
||||
}
|
||||
|
||||
return swiftRestClient.getData(toObjectPath(path),
|
||||
SwiftRestClient.NEWEST);
|
||||
SwiftRestClient.NEWEST);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of endpoints for given swift path that are local for the
|
||||
* host. List is returned in order of preference.
|
||||
*/
|
||||
private List<String> getDataLocalEndpoints(Path path) throws IOException {
|
||||
final String hostRack = getHostRack();
|
||||
|
||||
List<URI> uriLocations = getObjectLocation(path);
|
||||
List<String> strLocations = new ArrayList<String>();
|
||||
final Map<String, Integer> similarityMap = new HashMap<String, Integer>();
|
||||
for (URI uri : uriLocations) {
|
||||
String url = uri.toString();
|
||||
int similarity = getSimilarity(getRack(uri.getHost()), hostRack);
|
||||
if (similarity > 0) {
|
||||
strLocations.add(url);
|
||||
similarityMap.put(url, similarity);
|
||||
}
|
||||
}
|
||||
|
||||
Collections.sort(strLocations, new Comparator<String>() {
|
||||
@Override
|
||||
public int compare(String o1, String o2) {
|
||||
Integer dst1 = similarityMap.get(o1);
|
||||
Integer dst2 = similarityMap.get(o2);
|
||||
return -dst1.compareTo(dst2);
|
||||
}
|
||||
});
|
||||
|
||||
return strLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns similarity index for two racks.
|
||||
* Bigger numbers correspond to closer location.
|
||||
* Zero corresponds to different racks.
|
||||
*
|
||||
* @param rack1 path to rack1
|
||||
* @param rack2 path to rack2
|
||||
* @return the similarity index
|
||||
*/
|
||||
private int getSimilarity(String rack1, String rack2) {
|
||||
String[] r1 = rack1.split("/");
|
||||
String[] r2 = rack2.split("/");
|
||||
int i = 1; //skip leading empty string
|
||||
while (i < r1.length && i < r2.length && r1[i].equals(r2[i])) {
|
||||
i++;
|
||||
}
|
||||
|
||||
return i - 1;
|
||||
}
|
||||
|
||||
private String getHostRack() throws SwiftException {
|
||||
String hostAddress;
|
||||
try {
|
||||
hostAddress = InetAddress.getLocalHost().getHostAddress();
|
||||
} catch (UnknownHostException e) {
|
||||
throw new SwiftException("Failed to get localhost address", e);
|
||||
}
|
||||
return getRack(hostAddress);
|
||||
}
|
||||
|
||||
private String getRack(String url) {
|
||||
return dnsToSwitchMapping.resolve(Arrays.asList(url)).get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -286,6 +365,16 @@ public class SwiftNativeFileSystemStore {
|
|||
*/
|
||||
public HttpBodyContent getObject(Path path, long byteRangeStart, long length)
|
||||
throws IOException {
|
||||
List<String> locations = getDataLocalEndpoints(path);
|
||||
|
||||
for (String url : locations) {
|
||||
try {
|
||||
return swiftRestClient.getData(new URI(url), byteRangeStart, length);
|
||||
} catch (Exception e) {
|
||||
//Ignore
|
||||
}
|
||||
}
|
||||
|
||||
return swiftRestClient.getData(
|
||||
toObjectPath(path), byteRangeStart, length);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue