Retire stackforge/MRaaS

This commit is contained in:
Monty Taylor 2015-10-17 15:59:22 -04:00
parent da37ddb022
commit a7839ba1c2
25 changed files with 7 additions and 5178 deletions

10
.gitignore vendored
View File

@ -1,10 +0,0 @@
*.swp
*.pyc
*.class
target/*
hadoop-swiftn-driver/target/*
.classpath
.settings
.project

View File

@ -1,4 +0,0 @@
[gerrit]
host=review.openstack.org
port=29418
project=stackforge/MRaaS.git

View File

@ -1,18 +0,0 @@
MapReduce as a Service
======================
There are three main parts:
1. Core service layer
2. Hadoop-Swift native driver
3. REST client interface
## Hadoop Swift Native Driver
hadoop-swiftn-driver includes source code base for native driver to stream data from/to Swift object store. It has been tested with Hadoop 1.0.3. To use this driver:
1. Put driver source code into Hadoop source base (tested with Hadoop 1.0.3)
2. Put hpswift.jar into lib directory
3. Build Hadoop
4. Edit conf files accordingly

7
README.rst Normal file
View File

@ -0,0 +1,7 @@
This project is no longer maintained.
The contents of this repository are still available in the Git source code
management system. To see the contents of this repository before it reached
its end of life, please check out the previous commit with
"git checkout HEAD^1".

File diff suppressed because it is too large Load Diff

View File

@ -1,519 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Do not modify this file directly. Instead, copy entries that you -->
<!-- wish to modify from this file into core-site.xml and change them -->
<!-- there. If core-site.xml does not already exist, create it. -->
<configuration>
<!--- global properties -->
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>hadoop.native.lib</name>
<value>true</value>
<description>Should native hadoop libraries, if present, be used.</description>
</property>
<property>
<name>hadoop.http.filter.initializers</name>
<value></value>
<description>A comma separated list of class names. Each class in the list
must extend org.apache.hadoop.http.FilterInitializer. The corresponding
Filter will be initialized. Then, the Filter will be applied to all user
facing jsp and servlet web pages. The ordering of the list defines the
ordering of the filters.</description>
</property>
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
<description>Class for user to group mapping (get groups for a given user)
</description>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
<description>Is service-level authorization enabled?</description>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
<description>Possible values are simple (no authentication), and kerberos
</description>
</property>
<property>
<name>hadoop.security.token.service.use_ip</name>
<value>true</value>
<description>Controls whether tokens always use IP addresses. DNS changes
will not be detected if this option is enabled. Existing client connections
that break will always reconnect to the IP of the original host. New clients
will connect to the host's new IP but fail to locate a token. Disabling
this option will allow existing and new clients to detect an IP change and
continue to locate the new host's token.
</description>
</property>
<!--
<property>
<name>hadoop.security.service.user.name.key</name>
<value></value>
<description>Name of the kerberos principal of the user that owns
a given service daemon
</description>
</property>
-->
<!--- logging properties -->
<property>
<name>hadoop.logfile.size</name>
<value>10000000</value>
<description>The max size of each log file</description>
</property>
<property>
<name>hadoop.logfile.count</name>
<value>10</value>
<description>The max number of log files</description>
</property>
<!-- i/o properties -->
<property>
<name>io.file.buffer.size</name>
<value>4096</value>
<description>The size of buffer for use in sequence files.
The size of this buffer should probably be a multiple of hardware
page size (4096 on Intel x86), and it determines how much data is
buffered during read and write operations.</description>
</property>
<property>
<name>io.bytes.per.checksum</name>
<value>512</value>
<description>The number of bytes per checksum. Must not be larger than
io.file.buffer.size.</description>
</property>
<property>
<name>io.skip.checksum.errors</name>
<value>false</value>
<description>If true, when a checksum error is encountered while
reading a sequence file, entries are skipped, instead of throwing an
exception.</description>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
<description>A list of the compression codec classes that can be used
for compression/decompression.</description>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization</value>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>
<!-- file system properties -->
<property>
<name>fs.default.name</name>
<!--value>file:///</value-->
<value>swiftn:///</value>
<description>The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.</description>
</property>
<property>
<name>fs.default.name</name>
<value>swiftn://testStore/</value>
</property>
<property>
<name>fs.swiftn.openstackAccessKeyId</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openstackSecretKey</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openStackTenantId</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openstackauthUri</name>
<value>https://region-a.geo-1.identity.hpcloudsvc.com:35357/v2.0/</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>0</value>
<description>Number of minutes between trash checkpoints.
If zero, the trash feature is disabled.
</description>
</property>
<property>
<name>fs.file.impl</name>
<value>org.apache.hadoop.fs.LocalFileSystem</value>
<description>The FileSystem for file: uris.</description>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
<description>The FileSystem for s3: uris.</description>
</property>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
<description>The FileSystem for s3n: (Native S3) uris.</description>
</property>
<property>
<name>fs.swiftn.impl</name>
<value>org.apache.hadoop.fs.swiftnative.NativeSwiftFileSystem</value>
<description>The FileSystem for swift: (Native Swift) uris.</description>
</property>
<property>
<name>fs.kfs.impl</name>
<value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
<description>The FileSystem for kfs: uris.</description>
</property>
<property>
<name>fs.hftp.impl</name>
<value>org.apache.hadoop.hdfs.HftpFileSystem</value>
</property>
<property>
<name>fs.hsftp.impl</name>
<value>org.apache.hadoop.hdfs.HsftpFileSystem</value>
</property>
<property>
<name>fs.webhdfs.impl</name>
<value>org.apache.hadoop.hdfs.web.WebHdfsFileSystem</value>
</property>
<property>
<name>fs.ftp.impl</name>
<value>org.apache.hadoop.fs.ftp.FTPFileSystem</value>
<description>The FileSystem for ftp: uris.</description>
</property>
<property>
<name>fs.ramfs.impl</name>
<value>org.apache.hadoop.fs.InMemoryFileSystem</value>
<description>The FileSystem for ramfs: uris.</description>
</property>
<property>
<name>fs.har.impl</name>
<value>org.apache.hadoop.fs.HarFileSystem</value>
<description>The filesystem for Hadoop archives. </description>
</property>
<property>
<name>fs.har.impl.disable.cache</name>
<value>true</value>
<description>Don't cache 'har' filesystem instances.</description>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
<property>
<name>fs.checkpoint.edits.dir</name>
<value>${fs.checkpoint.dir}</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary edits to merge.
If this is a comma-delimited list of directoires then teh edits is
replicated in all of the directoires for redundancy.
Default value is same as fs.checkpoint.dir
</description>
</property>
<property>
<name>fs.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
<property>
<name>fs.checkpoint.size</name>
<value>67108864</value>
<description>The size of the current edit log (in bytes) that triggers
a periodic checkpoint even if the fs.checkpoint.period hasn't expired.
</description>
</property>
<property>
<name>fs.s3.block.size</name>
<value>67108864</value>
<description>Block size to use when writing files to S3.</description>
</property>
<property>
<name>fs.s3.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3</value>
<description>Determines where on the local filesystem the S3 filesystem
should store files before sending them to S3
(or after retrieving them from S3).
</description>
</property>
<property>
<name>fs.s3.maxRetries</name>
<value>4</value>
<description>The maximum number of retries for reading or writing files to S3,
before we signal failure to the application.
</description>
</property>
<property>
<name>fs.s3.sleepTimeSeconds</name>
<value>10</value>
<description>The number of seconds to sleep between each S3 retry.
</description>
</property>
<property>
<name>local.cache.size</name>
<value>10737418240</value>
<description>The limit on the size of cache you want to keep, set by default
to 10GB. This will act as a soft limit on the cache directory for out of band data.
</description>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
<description>The minimum block size for compression in block compressed
SequenceFiles.
</description>
</property>
<property>
<name>io.seqfile.lazydecompress</name>
<value>true</value>
<description>Should values of block-compressed SequenceFiles be decompressed
only when necessary.
</description>
</property>
<property>
<name>io.seqfile.sorter.recordlimit</name>
<value>1000000</value>
<description>The limit on number of records to be kept in memory in a spill
in SequenceFiles.Sorter
</description>
</property>
<property>
<name>io.mapfile.bloom.size</name>
<value>1048576</value>
<description>The size of BloomFilter-s used in BloomMapFile. Each time this many
keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
Larger values minimize the number of filters, which slightly increases the performance,
but may waste too much space if the total number of keys is usually much smaller
than this number.
</description>
</property>
<property>
<name>io.mapfile.bloom.error.rate</name>
<value>0.005</value>
<description>The rate of false positives in BloomFilter-s used in BloomMapFile.
As this value decreases, the size of BloomFilter-s increases exponentially. This
value is the probability of encountering false positives (default is 0.5%).
</description>
</property>
<property>
<name>hadoop.util.hash.type</name>
<value>murmur</value>
<description>The default implementation of Hash. Currently this can take one of the
two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
</description>
</property>
<!-- ipc properties -->
<property>
<name>ipc.client.idlethreshold</name>
<value>4000</value>
<description>Defines the threshold number of connections after which
connections will be inspected for idleness.
</description>
</property>
<property>
<name>ipc.client.kill.max</name>
<value>10</value>
<description>Defines the maximum number of clients to disconnect in one go.
</description>
</property>
<property>
<name>ipc.client.connection.maxidletime</name>
<value>10000</value>
<description>The maximum time in msec after which a client will bring down the
connection to the server.
</description>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>10</value>
<description>Indicates the number of retries a client will make to establish
a server connection.
</description>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>128</value>
<description>Indicates the length of the listen queue for servers accepting
client connections.
</description>
</property>
<property>
<name>ipc.server.tcpnodelay</name>
<value>false</value>
<description>Turn on/off Nagle's algorithm for the TCP socket connection on
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
</description>
</property>
<property>
<name>ipc.client.tcpnodelay</name>
<value>false</value>
<description>Turn on/off Nagle's algorithm for the TCP socket connection on
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
</description>
</property>
<!-- Web Interface Configuration -->
<property>
<name>webinterface.private.actions</name>
<value>false</value>
<description> If set to true, the web interfaces of JT and NN may contain
actions, such as kill job, delete file, etc., that should
not be exposed to public. Enable this option if the interfaces
are only reachable by those who have the right authorization.
</description>
</property>
<!-- Proxy Configuration -->
<property>
<name>hadoop.rpc.socket.factory.class.default</name>
<value>org.apache.hadoop.net.StandardSocketFactory</value>
<description> Default SocketFactory to use. This parameter is expected to be
formatted as "package.FactoryClassName".
</description>
</property>
<property>
<name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
<value></value>
<description> SocketFactory to use to connect to a DFS. If null or empty, use
hadoop.rpc.socket.class.default. This socket factory is also used by
DFSClient to create sockets to DataNodes.
</description>
</property>
<property>
<name>hadoop.socks.server</name>
<value></value>
<description> Address (host:port) of the SOCKS server to be used by the
SocksSocketFactory.
</description>
</property>
<!-- Rack Configuration -->
<property>
<name>topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.ScriptBasedMapping</value>
<description> The default implementation of the DNSToSwitchMapping. It
invokes a script specified in topology.script.file.name to resolve
node names. If the value for topology.script.file.name is not set, the
default value of DEFAULT_RACK is returned for all node names.
</description>
</property>
<property>
<name>topology.script.file.name</name>
<value></value>
<description> The script name that should be invoked to resolve DNS names to
NetworkTopology names. Example: the script would take host.foo.bar as an
argument, and return /rack1 as the output.
</description>
</property>
<property>
<name>topology.script.number.args</name>
<value>100</value>
<description> The max number of args that the script configured with
topology.script.file.name should be run with. Each arg is an
IP address.
</description>
</property>
<property>
<name>hadoop.security.uid.cache.secs</name>
<value>14400</value>
<description> NativeIO maintains a cache from UID to UserName. This is
the timeout for an entry in that cache. </description>
</property>
</configuration>

View File

@ -1,41 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>
<!--value>swiftn:///</value-->
<!--value>file:///</value-->
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.swiftn.openstackAccessKeyId</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openstackSecretKey</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openStackTenantId</name>
<value></value>
</property>
<property>
<name>fs.swiftn.openstackauthUri</name>
<value>https://region-a.geo-1.identity.hpcloudsvc.com:35357/v2.0/</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/tmpdir</value>
</property>
<property>
<name>fs.swift.buffer.dir</name>
<value>/home/hadoop/tmp</value>
</property>
<property>
<name>hadoop.job.history.user.location</name>
<value>/home/hadoop</value>
</property>
</configuration>

View File

@ -1,15 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>fs.swift.buffer.dir</name>
<value>/home/hadoop/tmp</value>
</property>
</configuration>

View File

@ -1,15 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>fs.swift.buffer.dir</name>
<value>/home/hadoop/tmp</value>
</property>
</configuration>

Binary file not shown.

View File

@ -1,30 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadoop-swiftn-driver</groupId>
<artifactId>hadoop-swiftn-driver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hadoop swift native driver</name>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
</project>

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swift;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.hpswift.service.Credentials;
/**
* <p>
* Extracts OpenStack credentials from the configuration.
* </p>
*/
public class SwiftCredentials {
private String secretKey;
private String accessKey;
private String tenantId;
private String authUri;
/**
* @throws IllegalArgumentException if credentials for Swift cannot be
* determined.
*/
public Credentials initialize(URI uri, Configuration conf) {
String scheme = uri.getScheme();
String accessKeyProperty = String.format("fs.%s.openstackAccessKeyId", scheme);
String secretKeyProperty = String.format("fs.%s.openstackSecretKey", scheme);
String tenantIdProperty = String.format("fs.%s.openStackTenantId", scheme);
String authUriProperty = String.format("fs.%s.openstackauthUri", scheme);
accessKey = conf.get(accessKeyProperty);
secretKey = conf.get(secretKeyProperty);
tenantId = conf.get(tenantIdProperty);
authUri = conf.get(authUriProperty);
if (accessKey == null || secretKey == null || tenantId == null || authUri == null) {
System.out.println("**" + accessKey + secretKey + tenantId + authUri);
throw new IllegalArgumentException("OpenStack Swift must specify all of: secretKey, accessKey, tentantId, authUri");
}
Credentials hpCredentials = new Credentials(accessKey, secretKey, tenantId, authUri);
return hpCredentials;
}
public String getSecretKey(){
return secretKey;
}
public String getAccessKey(){
return accessKey;
}
public String getTenantId(){
return tenantId;
}
public String getAuthUri(){
return authUri;
}
}

View File

@ -1,29 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swift;
/**
* Thrown if there is a problem communicating with Amazon S3.
*/
public class SwiftException extends RuntimeException {
public SwiftException(Throwable t) {
super(t);
}
}

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
/**
* <p>
* Holds basic metadata for a file stored in a {@link NativeFileSystemStore}.
* </p>
*/
class FileMetadata {
private final String key;
private final long length;
private final long lastModified;
public FileMetadata(String key, long length, long lastModified) {
this.key = key;
this.length = length;
this.lastModified = lastModified;
}
public String getKey() {
return key;
}
public long getLength() {
return length;
}
public long getLastModified() {
return lastModified;
}
@Override
public String toString() {
return "FileMetadata[" + key + ", " + length + ", " + lastModified + "]";
}
}

View File

@ -1,63 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
/**
* <p>
* An abstraction for a key-based {@link File} store.
* </p>
*/
interface NativeFileSystemStore {
public void initialize(URI uri, Configuration conf) throws IOException;
public void storeFile(String key, File file, byte[] md5Hash) throws IOException;
public void storeEmptyFile(String key) throws IOException;
public FileMetadata retrieveMetadata(String key) throws IOException;
public InputStream retrieve(String key) throws IOException;
public InputStream retrieve(String key, long byteRangeStart) throws IOException;
public PartialListing list(String prefix, int maxListingLength) throws IOException;
public PartialListing list(String prefix, int maxListingLength, String priorLastKey) throws IOException;
public PartialListing listAll(String prefix, int maxListingLength, String priorLastKey) throws IOException;
public void delete(String key) throws IOException;
public void rename(String srcKey, String dstKey) throws IOException;
/**
* Delete all keys with the given prefix. Used for testing.
* @throws IOException
*/
public void purge(String prefix) throws IOException;
/**
* Diagnostic method to dump state to the console.
* @throws IOException
*/
public void dump() throws IOException;
}

View File

@ -1,583 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.swift.SwiftException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.Progressable;
/**
* <p>
* A {@link FileSystem} for reading and writing files stored on
* OpenStack Swift</a>.
* Unlike {@link org.apache.hadoop.fs.SwiftFileSystem.SwiftFileSystem} this implementation
* stores files on Swift in their
* native form so they can be read by other Swift tools.
* </p>
* @see org.apache.hadoop.fs.SwiftFileSystem.SwiftFileSystem
*/
public class NativeSwiftFileSystem extends FileSystem {
public static final Log LOG = LogFactory.getLog(NativeSwiftFileSystem.class);
private static final String FOLDER_SUFFIX = "_$folder$";
private static final long MAX_SWIFT_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
static final String PATH_DELIMITER = Path.SEPARATOR;
private static final int SWIFT_MAX_LISTING_LENGTH = 1000;
private class NativeSwiftFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
private long pos = 0;
public NativeSwiftFsInputStream(InputStream in, String key) {
this.in = in;
this.key = key;
}
public synchronized int read() throws IOException {
int result = in.read();
if (result != -1) {
pos++;
}
return result;
}
public synchronized int read(byte[] b, int off, int len)
throws IOException {
int result = in.read(b, off, len);
if (result > 0) {
pos += result;
}
return result;
}
public void close() throws IOException {
in.close();
}
public synchronized void seek(long pos) throws IOException {
in.close();
in = store.retrieve(key, pos);
this.pos = pos;
}
public synchronized long getPos() throws IOException {
return pos;
}
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}
private class NativeSwiftFsOutputStream extends OutputStream {
private Configuration conf;
private String key;
private File backupFile;
private OutputStream backupStream;
private MessageDigest digest;
private boolean closed;
public NativeSwiftFsOutputStream(Configuration conf,
NativeFileSystemStore store,
String key,
Progressable progress,
int bufferSize) throws IOException {
this.conf = conf;
this.key = key;
this.backupFile = newBackupFile();
try {
this.digest = MessageDigest.getInstance("MD5");
this.backupStream = new BufferedOutputStream(new DigestOutputStream(
new FileOutputStream(backupFile), this.digest));
} catch (NoSuchAlgorithmException e) {
LOG.warn("Cannot load MD5 digest algorithm, skipping message integrity check.", e);
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
}
}
private File newBackupFile() throws IOException {
File dir = new File(conf.get("fs.swift.buffer.dir"));
if (!dir.mkdirs() && !dir.exists()) {
throw new IOException("Cannot create Swift buffer directory: " + dir);
}
File result = File.createTempFile("output-", ".tmp", dir);
result.deleteOnExit();
return result;
}
@Override
public void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
backupStream.close();
try {
byte[] md5Hash = digest == null ? null : digest.digest();
store.storeFile(key, backupFile, md5Hash);
} finally {
if (!backupFile.delete()) {
LOG.warn("Could not delete temporary swiftn file: " + backupFile);
}
super.close();
closed = true;
}
}
@Override
public void write(int b) throws IOException {
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
backupStream.write(b, off, len);
}
}
private URI uri;
private NativeFileSystemStore store;
private Path workingDir;
public NativeSwiftFileSystem() {
// set store in initialize()
}
public NativeSwiftFileSystem(NativeFileSystemStore store) {
this.store = store;
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
if (store == null) {
store = createDefaultStore(conf);
}
store.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this);
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new SwiftNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.swift.maxRetries", 4),
conf.getLong("fs.swift.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(SwiftException.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
return (NativeFileSystemStore) RetryProxy.create(NativeFileSystemStore.class,
store,
methodNameToPolicyMap);
}
private static String pathToKey(Path path) {
if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
// allow uris without trailing slash after bucket to refer to root,
// like swiftn://mybucket
return "";
}
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
return path.toUri().getPath().substring(1); // remove initial slash
}
private static Path keyToPath(String key) {
return new Path("/" + key);
}
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
/** This optional operation is not yet supported. */
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
if (exists(f) && !overwrite) {
throw new IOException("File already exists:"+f);
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataOutputStream(new NativeSwiftFsOutputStream(getConf(), store,
key, progress, bufferSize), statistics);
}
@Override
@Deprecated
public boolean delete(Path path) throws IOException {
return delete(path, true);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException e) {
return false;
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (status.isDir()) {
FileStatus[] contents = listStatus(f);
if (!recursive && contents.length > 0) {
throw new IOException("Directory " + f.toString() + " is not empty.");
}
for (FileStatus p : contents) {
if (!delete(p.getPath(), recursive)) {
return false;
}
}
store.delete(key + FOLDER_SUFFIX);
} else {
store.delete(key);
}
return true;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() == 0) { // root always exists
return newDirectory(absolutePath);
}
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
return newFile(meta, absolutePath);
}
if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
return newDirectory(absolutePath);
}
PartialListing listing = store.list(key, 1);
if (listing.getFiles().length > 0 || listing.getCommonPrefixes().length > 0) {
return newDirectory(absolutePath);
}
throw new FileNotFoundException(absolutePath + ": No such file or directory.");
}
@Override
public URI getUri() {
return uri;
}
/**
* <p>
* If <code>f</code> is a file, this method will make a single call to Swift.
* If <code>f</code> is a directory, this method will make a maximum of
* (<i>n</i> / 1000) + 2 calls to Swift, where <i>n</i> is the total number of
* files and directories contained directly in <code>f</code>.
* </p>
*/
@Override
public FileStatus[] listStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() > 0) {
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
return new FileStatus[] { newFile(meta, absolutePath) };
}
}
URI pathUri = absolutePath.toUri();
Set<FileStatus> status = new TreeSet<FileStatus>();
String priorLastKey = null;
do {
PartialListing listing = store.list(key, SWIFT_MAX_LISTING_LENGTH, priorLastKey);
for (FileMetadata fileMetadata : listing.getFiles()) {
Path subpath = keyToPath(fileMetadata.getKey());
String relativePath = pathUri.relativize(subpath.toUri()).getPath();
if (relativePath.endsWith(FOLDER_SUFFIX)) {
status.add(newDirectory(new Path(absolutePath,
relativePath.substring(0,relativePath.indexOf(FOLDER_SUFFIX)))));
} else {
status.add(newFile(fileMetadata, subpath));
}
}
for (String commonPrefix : listing.getCommonPrefixes()) {
Path subpath = keyToPath(commonPrefix);
String relativePath = pathUri.relativize(subpath.toUri()).getPath();
status.add(newDirectory(new Path(absolutePath, relativePath)));
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
if (status.isEmpty() &&
store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
return null;
}
return status.toArray(new FileStatus[0]);
}
private FileStatus newFile(FileMetadata meta, Path path) {
return new FileStatus(meta.getLength(), false, 1, MAX_SWIFT_FILE_SIZE,
meta.getLastModified(), path.makeQualified(this));
}
private FileStatus newDirectory(Path path) {
return new FileStatus(0, true, 1, MAX_SWIFT_FILE_SIZE, 0,
path.makeQualified(this));
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
Path absolutePath = makeAbsolute(f);
List<Path> paths = new ArrayList<Path>();
do {
paths.add(0, absolutePath);
absolutePath = absolutePath.getParent();
} while (absolutePath != null);
boolean result = true;
for (Path path : paths) {
result &= mkdir(path);
}
return result;
}
private boolean mkdir(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
if (!fileStatus.isDir()) {
throw new IOException(String.format(
"Can't make directory for path %s since it is a file.", f));
}
} catch (FileNotFoundException e) {
String key = pathToKey(f) + FOLDER_SUFFIX;
store.storeEmptyFile(key);
}
return true;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (!exists(f)) {
throw new FileNotFoundException(f.toString());
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataInputStream(new BufferedFSInputStream(
new NativeSwiftFsInputStream(store.retrieve(key), key), bufferSize));
}
// rename() and delete() use this method to ensure that the parent directory
// of the source does not vanish.
private void createParent(Path path) throws IOException {
Path parent = path.getParent();
if (parent != null) {
String key = pathToKey(makeAbsolute(parent));
if (key.length() > 0) {
store.storeEmptyFile(key + FOLDER_SUFFIX);
}
}
}
private boolean existsAndIsFile(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() == 0) {
return false;
}
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
// Swift object with given key exists, so this is a file
return true;
}
if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
// Signifies empty directory
return false;
}
PartialListing listing = store.list(key, 1, null);
if (listing.getFiles().length > 0 ||
listing.getCommonPrefixes().length > 0) {
// Non-empty directory
return false;
}
throw new FileNotFoundException(absolutePath +
": No such file or directory");
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
String srcKey = pathToKey(makeAbsolute(src));
if (srcKey.length() == 0) {
// Cannot rename root of file system
return false;
}
// Figure out the final destination
String dstKey;
try {
boolean dstIsFile = existsAndIsFile(dst);
if (dstIsFile) {
// Attempting to overwrite a file using rename()
return false;
} else {
// Move to within the existent directory
dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
}
} catch (FileNotFoundException e) {
// dst doesn't exist, so we can proceed
dstKey = pathToKey(makeAbsolute(dst));
try {
if (!getFileStatus(dst.getParent()).isDir()) {
return false; // parent dst is a file
}
} catch (FileNotFoundException ex) {
return false; // parent dst does not exist
}
}
try {
boolean srcIsFile = existsAndIsFile(src);
if (srcIsFile) {
store.rename(srcKey, dstKey);
} else {
// Move the folder object
store.delete(srcKey + FOLDER_SUFFIX);
store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
// Move everything inside the folder
String priorLastKey = null;
do {
PartialListing listing = store.listAll(srcKey, SWIFT_MAX_LISTING_LENGTH,
priorLastKey);
for (FileMetadata file : listing.getFiles()) {
store.rename(file.getKey(), dstKey
+ file.getKey().substring(srcKey.length()));
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
}
createParent(src);
return true;
} catch (FileNotFoundException e) {
// Source file does not exist;
return false;
}
}
/**
* Set the working directory to the given directory.
*/
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
/**
* <p>
* Holds information on a directory listing for a
* {@link NativeFileSystemStore}.
* This includes the {@link FileMetadata files} and directories
* (their names) contained in a directory.
* </p>
* <p>
* This listing may be returned in chunks, so a <code>priorLastKey</code>
* is provided so that the next chunk may be requested.
* </p>
* @see NativeFileSystemStore#list(String, int, String)
*/
class PartialListing {
private final String priorLastKey;
private final FileMetadata[] files;
private final String[] commonPrefixes;
public PartialListing(String priorLastKey, FileMetadata[] files, String[] commonPrefixes) {
this.priorLastKey = priorLastKey;
this.files = files;
this.commonPrefixes = commonPrefixes;
}
public FileMetadata[] getFiles() {
return files;
}
public String[] getCommonPrefixes() {
return commonPrefixes;
}
public String getPriorLastKey() {
return priorLastKey;
}
}

View File

@ -1,250 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
import static org.apache.hadoop.fs.swiftnative.NativeSwiftFileSystem.PATH_DELIMITER;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.swift.SwiftCredentials;
import org.apache.hadoop.fs.swift.SwiftException;
import org.codehaus.jackson.JsonNode;
import org.hpswift.service.Credentials;
import org.hpswift.service.Swift;
class SwiftNativeFileSystemStore implements NativeFileSystemStore {
private SwiftCredentials swiftCredentials;
private Swift swift;
private String container;
public void initialize(URI uri, Configuration conf) throws IOException {
try {
swiftCredentials = new SwiftCredentials();
Credentials hpswiftCredentials = swiftCredentials.initialize(uri, conf);
swift = new Swift(hpswiftCredentials);
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
container = uri.getHost();
}
public void storeFile(String key, File file, byte[] md5Hash)
throws IOException {
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
try {
swift.putStream(container, key, in, md5Hash);
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
// ignore
}
}
}
}
public void storeEmptyFile(String key) throws IOException {
try {
swift.put(container, key, new byte[0]);
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
}
public FileMetadata retrieveMetadata(String key) throws IOException {
try {
org.hpswift.service.SwiftMetadata metadata = swift.getMetaData(container, key);
if (metadata == null) return null;
return new FileMetadata(key,metadata.getLength(),metadata.getLastModified());
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
}
public InputStream retrieve(String key) throws IOException {
try {
// returns null if not found
return swift.getS(container,key);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
}
throw new SwiftException(e);
}
}
public InputStream retrieve(String key, long byteRangeStart)
throws IOException {
try {
// returns null if not found
return swift.getS(container,key,byteRangeStart);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
}
throw new SwiftException(e);
}
}
public PartialListing list(String prefix, int maxListingLength)
throws IOException {
return list(prefix, maxListingLength, null);
}
public PartialListing list(String prefix, int maxListingLength,
String priorLastKey) throws IOException {
return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
}
public PartialListing listAll(String prefix, int maxListingLength,
String priorLastKey) throws IOException {
return list(prefix, null, maxListingLength, priorLastKey);
}
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
try {
String lastKey = null;
if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
JsonNode list = swift.list(container, prefix, delimiter, maxListingLength, priorLastKey);
int size = list.size();
FileMetadata[] metadata = new FileMetadata[size];
String[] keys = new String[size];
for (int i =0; i < size; i++) {
JsonNode item = list.get(i);
String key = item.get("name").getTextValue();
long len = item.get("bytes").getLongValue();
String lm = item.get("last_modified").getTextValue();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
format.setTimeZone(TimeZone.getTimeZone("GMT"));
Date date = format.parse(lm);
long lastMod = date.getTime();
keys[i] = key;
metadata[i] = new FileMetadata(key,len,lastMod);
lastKey = key;
}
return new PartialListing(lastKey,metadata,keys);
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
e.printStackTrace();
throw new SwiftException(e);
}
}
public void delete(String key) throws IOException {
try {
swift.delete(container,key);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
}
throw new SwiftException(e);
}
}
public void rename(String srcKey, String dstKey) throws IOException {
try {
swift.copy(container,srcKey,dstKey);
swift.delete(container,srcKey);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
}
throw new SwiftException(e);
}
}
public void purge(String prefix) throws IOException {
try {
JsonNode list = swift.list(container, prefix, null, 0, null);
int size = list.size();
for (int i = 0; i < size; i++) {
JsonNode item = list.get(i);
String key = item.get("name").getTextValue();
swift.delete(container, key);
}
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
}
public void dump() throws IOException {
StringBuilder sb = new StringBuilder("Swift Native Filesystem, ");
sb.append(container).append("\n");
try {
JsonNode list = swift.list(container, null, null, 0, null);
int size = list.size();
for (int i = 0; i < size; i++) {
JsonNode item = list.get(i);
String key = item.get("name").getTextValue();
sb.append(key).append("\n");
}
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new SwiftException(e);
}
}
}

View File

@ -1,140 +0,0 @@
package org.hpswift.service;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
public class Credentials {
private String accessKey;
private String secretKey;
private String tenantId;
private String authUri;
public HttpClient client = null;
private static JsonNodeFactory fact = JsonNodeFactory.instance;
private static ObjectMapper m = new ObjectMapper();
public Credentials(String aKey, String sKey, String tId, String aUri){
accessKey = aKey;
secretKey = sKey;
tenantId = tId;
authUri = aUri;
ThreadSafeClientConnManager cm = new ThreadSafeClientConnManager();
cm.setMaxTotal(500);
cm.setDefaultMaxPerRoute(500);
client = new DefaultHttpClient(cm);
HttpHost proxy = new HttpHost("proxy-ccy.houston.hp.com", 8080);
client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
}
public void checkCode(HttpResponse response,int code,String msg) throws Exception {
if (code >= 200 && code < 300) {
return;
}
int len = 0;
HttpEntity ent = response.getEntity();
if (ent != null) {
len = (int) ent.getContentLength();
}
if (len != 0) {
StringBuilder sb = new StringBuilder();
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
String line = null;
while ((line = rd.readLine()) != null) {
sb.append(line + "\n");
}
rd.close();
String s = sb.toString();
// System.out.println(s);
}
throw new Exception(code + " - " + msg);
}
public JsonNode getAuth(String type) throws Exception {
// TODO return same token if not expired
ObjectNode info = fact.objectNode();
ObjectNode auth = fact.objectNode();
ObjectNode cred = fact.objectNode();
info.put("auth", auth);
auth.put("apiAccessKeyCredentials", cred);
cred.put("accessKey", accessKey);
cred.put("secretKey", secretKey);
auth.put("tenantId", tenantId);
StringWriter w = new StringWriter();
m.writeValue(w, info);
String infos = w.toString();
HttpPost post = new HttpPost(authUri + "tokens");
HttpEntity e = new StringEntity(infos);
post.setEntity(e);
post.setHeader("Accept", "application/json");
post.setHeader("Content-Type", "application/json");
HttpResponse response = client.execute(post);
int code = response.getStatusLine().getStatusCode();
checkCode(response, code, "AUTH");
InputStream r = response.getEntity().getContent();
ObjectNode n = m.readValue(r, ObjectNode.class);
String authToken = n.get("access").get("token").get("id").getTextValue();
String storageUrl = "";
JsonNode cat = n.get("access").get("serviceCatalog");
for (int i = 0; i < cat.size(); i++) {
JsonNode service = cat.get(i);
if (service.get("type").getTextValue().equals(type)) {
JsonNode ep = service.get("endpoints");
JsonNode ep0 = ep.get(0);
JsonNode puburl = ep0.get("publicURL");
storageUrl = puburl.getTextValue();
break;
}
}
r.close();
post.abort();
ObjectNode result = fact.objectNode();
result.put("token",authToken);
result.put("url",storageUrl);
return result;
}
public String getAccessKey(){
return accessKey;
}
public String getSecretKey(){
return secretKey;
}
public String getTenantId(){
return tenantId;
}
String getAuthUri(){
return authUri;
}
}

View File

@ -1,281 +0,0 @@
package org.hpswift.service;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Formatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;
import java.util.TimeZone;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
public class Swift {
private Credentials credentials = null;
private HttpClient client = null;
private String authToken = "";
private String storageUrl = "";
private static JsonNodeFactory fact = JsonNodeFactory.instance;
private static ObjectMapper m = new ObjectMapper();
private void getAuth() throws Exception {
JsonNode auth = credentials.getAuth("object-store");
authToken = auth.get("token").getTextValue();
storageUrl = auth.get("url").getTextValue();
}
/*
public byte[] get(String container, String key) throws Exception {
getAuth();
HttpGet cmd = new HttpGet(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
credentials.checkCode(response,code,"GET:" + container +":" + key);
int len = (int) response.getEntity().getContentLength();
byte buff[] = new byte[len];
InputStream r = response.getEntity().getContent();
int off = 0;
while (true) {
int len1 = r.read(buff, off, len);
if (len1 <= 0) break;
off += len1;
len -= len1;
}
r.close();
//cmd.abort();
return buff;
}
*/
public InputStream getS(String container, String key) throws Exception {
HttpGet cmd = new HttpGet(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
if (code == 404) {
return null;
}
credentials.checkCode(response,code,"GET:" + container +":" + key);
InputStream r = response.getEntity().getContent();
return r;
}
public InputStream getS(String container, String key,long start) throws Exception {
HttpGet cmd = new HttpGet(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
cmd.setHeader("Range","bytes="+start+"-");
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
if (code == 404) {
return null;
}
credentials.checkCode(response,code,"GET:" + container +":" + key);
InputStream r = response.getEntity().getContent();
return r;
}
public SwiftMetadata getMetaData(String container, String key) throws Exception {
getAuth();
HttpHead cmd = new HttpHead(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
if (code == 404) {
cmd.abort();
return null; //not found
}
credentials.checkCode(response,code,"HEAD:" + container +":" + key);
String slen = response.getHeaders("Content-Length")[0].getValue();
String smod = response.getHeaders("Last-Modified")[0].getValue();
// TODO decode len and mod
long len = slen.length();
SimpleDateFormat format = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
format.setTimeZone(TimeZone.getTimeZone("GMT"));
Date date = format.parse(smod);
long mod = date.getTime();
cmd.abort();
return new SwiftMetadata(len, mod);
}
public void put(String container, String key, byte val[]) throws Exception {
getAuth();
HttpPut cmd = new HttpPut(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
HttpEntity e = new ByteArrayEntity(val);
cmd.setEntity(e);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
credentials.checkCode(response,code,"PUT:" + container +":" + key);
cmd.abort();
}
public void copy(String container, String srcKey, String dstKey) throws Exception {
getAuth();
HttpPut cmd = new HttpPut(storageUrl + "/" + container +"/" + dstKey);
cmd.setHeader("X-Auth-Token", authToken);
cmd.setHeader("X-Copy-From", "/" + container +"/" + srcKey);
HttpEntity e = new StringEntity("");
cmd.setEntity(e);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
credentials.checkCode(response,code,"COPY:" + container +":" + srcKey +":" + dstKey);
cmd.abort();
}
private static String bytesToHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 2);
Formatter formatter = new Formatter(sb);
for (byte b : bytes) {
formatter.format("%02x", b);
}
return sb.toString();
}
public void putStream(String container, String key, InputStream in,byte[] md5) throws Exception {
// Note: value size must be less than 5GB
getAuth();
HttpPut cmd = new HttpPut(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
cmd.setHeader("ETag", bytesToHexString(md5));
HttpEntity e = new InputStreamEntity(in,-1);
cmd.setEntity(e);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
credentials.checkCode(response,code,"PUT:" + container +":" + key);
cmd.abort();
}
public void delete(String container, String key) throws Exception {
getAuth();
HttpDelete cmd = new HttpDelete(storageUrl + "/" + container +"/" + key);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int code = response.getStatusLine().getStatusCode();
credentials.checkCode(response,code,"DELETE:" + container +":" + key);
cmd.abort();
}
public JsonNode list(String container,String prefix, String delimiter, int limit, String marker) throws Exception {
String q = "?format=json";
if (limit > 0) {
q = q + ("&limit=" + limit);
}
if (prefix != null) {
q = q + ("&prefix=" + URLEncoder.encode(prefix,"UTF-8"));
}
if (delimiter != null) {
q = q + ("&delimiter=" + URLEncoder.encode(delimiter,"UTF-8"));
}
if (marker != null) {
q = q + ("&marker=" + marker);
}
getAuth();
HttpGet cmd = new HttpGet(storageUrl + "/" + container + q);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int len = 0;
HttpEntity ent = response.getEntity();
if (ent != null) {
len = (int) ent.getContentLength();
}
if (len == 0) {
return fact.arrayNode();
}
StringBuilder sb = new StringBuilder();
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
String line = null;
while ((line = rd.readLine()) != null) {
sb.append(line + "\n");
}
rd.close();
String rs = sb.toString();
ArrayNode result = (ArrayNode) m.readValue(rs,JsonNode.class);
return result;
}
public List<String> allContainers() throws Exception {
getAuth();
HttpGet cmd = new HttpGet(storageUrl);
cmd.setHeader("X-Auth-Token", authToken);
HttpResponse response = client.execute(cmd);
int len = 0;
HttpEntity ent = response.getEntity();
if (ent != null) {
len = (int) ent.getContentLength();
}
if (len == 0) {
return new ArrayList<String>();
}
StringBuilder sb = new StringBuilder();
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
String line = null;
while ((line = rd.readLine()) != null) {
sb.append(line + "\n");
}
rd.close();
cmd.abort();
String result = sb.toString();
ArrayList<String> al = new ArrayList<String>(Arrays.asList(result.split("\n")));
return al;
}
public Swift(Credentials credentials) throws Exception {
this.credentials = credentials;
client = credentials.client;
}
}

View File

@ -1,25 +0,0 @@
package org.hpswift.service;
public class SwiftMetadata {
private final long length;
private final long lastModified;
public SwiftMetadata(long length, long lastModified) {
this.length = length;
this.lastModified = lastModified;
}
public long getLength() {
return length;
}
public long getLastModified() {
return lastModified;
}
@Override
public String toString() {
return "SwiftMetadata[" + length + ", " + lastModified + "]";
}
}

View File

@ -1,37 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swift;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
public class TestSwiftNativeCredentials extends TestCase {
public void testInvalidHostnameWithUnderscores() throws Exception {
SwiftCredentials swiftCredentials = new SwiftCredentials();
try {
swiftCredentials.initialize(new URI("swiftn://a:b@c_d"), new Configuration());
fail("Should throw IllegalArgumentException");
} catch (IllegalArgumentException e) {
//assertEquals("Invalid hostname in URI swiftn://a:b@c_d", e.getMessage());
}
}
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
public abstract class NativeSwiftFileSystemContractBaseTest extends FileSystemContractBaseTest {
private NativeFileSystemStore store;
abstract NativeFileSystemStore getNativeFileSystemStore() throws IOException;
@Override
protected void setUp() throws Exception {
Configuration conf = new Configuration();
store = getNativeFileSystemStore();
fs = new NativeSwiftFileSystem(store);
fs.initialize(URI.create(conf.get("test.fs.swiftn.name")), conf);
}
@Override
protected void tearDown() throws Exception {
store.purge("test");
super.tearDown();
}
public void testListStatusForRoot() throws Exception {
Path testDir = path("/test");
assertTrue(fs.mkdirs(testDir));
FileStatus[] paths = fs.listStatus(path("/"));
assertEquals(1, paths.length);
assertEquals(path("/test"), paths[0].getPath());
}
}

View File

@ -1,29 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swiftnative;
import java.io.IOException;
public class SwiftNativeFileSystemContractTest extends NativeSwiftFileSystemContractBaseTest {
@Override
NativeFileSystemStore getNativeFileSystemStore() throws IOException {
return new SwiftNativeFileSystemStore();
}
}

View File

@ -1,16 +0,0 @@
#remove word out if it exists
bin/hadoop fs -rmr swiftn://output/wordout
# input and output from HPCloud Object store
bin/hadoop jar build/hadoop-examples-*.jar wordcount swiftn://input swiftn://output/wordout
bin/hadoop fs -cat swiftn://output/wordout/*
bin/hadoop fs -lsr swiftn://input
#input from swift and output to local file system
bin/hadoop jar build/hadoop-examples-*.jar wordcount swiftn://input file:///home/output
bin/hadoop fs -rmr /cloudout
#input from swift and output to hdfs
bin/hadoop jar build/hadoop-examples-*.jar wordcount swiftn://input hdfs://localhost:9000/cloudout