meteos/meteos/cluster/binary/meteos-script-1.6.0.py

877 lines
29 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright 2016 NEC Corpocation All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import os
import socket
import sys
import uuid
from ast import literal_eval
from math import sqrt
from numpy import array
from pyspark import SparkContext
from pyspark.mllib.classification import LogisticRegressionModel
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.classification import NaiveBayesModel
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.clustering import KMeansModel
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel
from pyspark.mllib.fpm import FPGrowth
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionModel
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.regression import RidgeRegressionModel
from pyspark.mllib.regression import RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTreeModel
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.tree import RandomForestModel
from pyspark.mllib.util import MLUtils
EXIT_CODE = '80577372-9349-463a-bbc3-1ca54f187cc9'
class ModelController(object):
"""Class defines interface of Model Controller."""
def __init__(self):
super(ModelController, self).__init__()
def create_model(self, data, params):
"""Is called to create mode."""
raise NotImplementedError()
def create_model_libsvm(self, data, params):
"""Is called to create mode."""
raise NotImplementedError()
def create_model_text(self, data, params):
"""Is called to create mode."""
raise NotImplementedError()
def evaluate_model(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def evaluate_model_libsvm(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def evaluate_model_text(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def load_model(self, context, path):
"""Is called to load mode."""
raise NotImplementedError()
def predict(self, context, params):
"""Is called to predict value."""
raise NotImplementedError()
def predict_libsvm(self, context, params):
"""Is called to predict value."""
raise NotImplementedError()
def predict_text(self, context, params):
"""Is called to predict value."""
raise NotImplementedError()
def parsePoint(self, line):
values = [float(s) for s in line.split(',')]
if values[0] == -1:
values[0] = 0
return LabeledPoint(values[0], values[1:])
def textToIndex(self, text):
return HashingTF().transform(text.split(" "))
def parseTextRDDToIndex(self, data, label=True):
if label:
labels = data.map(lambda line: float(line.split(" ", 1)[0]))
documents = data.map(lambda line: line.split(" ", 1)[1].split(" "))
else:
documents = data.map(lambda line: line.split(" "))
tf = HashingTF().transform(documents)
tf.cache()
idfIgnore = IDF(minDocFreq=2).fit(tf)
index = idfIgnore.transform(tf)
if label:
return labels.zip(index).map(lambda line: LabeledPoint(line[0], line[1]))
else:
return index
def evaluateBinaryClassification(self, predictionAndLabels):
metrics = MulticlassMetrics(predictionAndLabels).confusionMatrix()
return "{}: {}".format("True Positive", metrics[0, 0]) + os.linesep\
+ "{}: {}".format("False Positive", metrics[0, 1]) + os.linesep\
+ "{}: {}".format("False Negative", metrics[1, 0]) + os.linesep\
+ "{}: {}".format("True Negative", metrics[1, 1])
def evaluateRegression(self, scoreAndLabels):
metrics = RegressionMetrics(scoreAndLabels)
return "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
+ "{}: {}".format("R-squared", metrics.r2)
class KMeansModelController(ModelController):
def __init__(self):
super(KMeansModelController, self).__init__()
self.model_params = {}
def _parse_model_params(self, params):
p = {}
p['numClasses'] = int(params.get('numClasses', 2))
p['maxIterations'] = int(params.get('numIterations', 10))
p['runs'] = int(params.get('runs', 10))
p['initializationMode'] = params.get('mode', 'random')
self.model_params = p
def create_model(self, data, params):
self._parse_model_params(params)
numClasses = self.model_params.pop('numClasses')
parsedData = data.map(
lambda line: array([float(x) for x in line.split(',')]))
return KMeans.train(parsedData,
numClasses,
**self.model_params)
def create_model_text(self, data, params):
self._parse_model_params(params)
numClasses = self.model_params.pop('numClasses')
parsedData = self.parseTextRDDToIndex(data, label=False)
return KMeans.train(parsedData,
numClasses,
**self.model_params)
def load_model(self, context, path):
return KMeansModel.load(context, path)
def predict(self, model, params):
return model.predict(params.split(','))
def predict_text(self, model, params):
index = self.textToIndex(params)
return model.predict(index)
class RecommendationController(ModelController):
def __init__(self):
super(RecommendationController, self).__init__()
def _create_ratings(self, data):
return data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
def create_model(self, data, params):
# Build the recommendation model using Alternating Least Squares
rank = int(params.get('rank', 10))
numIterations = int(params.get('numIterations', 10))
ratings = self._create_ratings(data)
return ALS.train(ratings, rank, numIterations)
def evaluate_model(self, context, model, data):
ratings = self._create_ratings(data)
testData = ratings.map(lambda p: (p.user, p.product))
predictions = model.predictAll(testData)\
.map(lambda r: ((r.user, r.product), r.rating))
ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])
return self.evaluateRegression(scoreAndLabels)
def load_model(self, context, path):
return MatrixFactorizationModel.load(context, path)
def predict(self, model, params):
parsedData = params.split(',')
return model.predict(parsedData[0], parsedData[1])
class RegressionModelController(ModelController):
def __init__(self, train_name, model_name):
super(RegressionModelController, self).__init__()
self.train_class = eval(train_name)
self.model_class = eval(model_name)
self.model_params = {}
def _parse_model_params(self, params):
p = {}
p['iterations'] = int(params.get('numIterations', 100))
p['step'] = float(params.get('step', 0.00000001))
p['miniBatchFraction'] = float(params.get('miniBatchFraction', 1.0))
p['convergenceTol'] = float(params.get('convergenceTol', 0.001))
if self.__class__.__name__ == 'LinearRegressionModelController':
p['regParam'] = float(params.get('regParam', 0.0))
elif self.__class__.__name__ == 'RidgeRegressionModelController':
p['regParam'] = float(params.get('regParam', 0.01))
self.model_params = p
def create_model(self, data, params):
self._parse_model_params(params)
points = data.map(self.parsePoint)
return getattr(self.train_class, 'train')(points, **self.model_params)
def create_model_libsvm(self, data, params):
self._parse_model_params(params)
return getattr(self.train_class, 'train')(data, **self.model_params)
def evaluate_model(self, context, model, data):
points = data.map(self.parsePoint)
scoreAndLabels = points.map(lambda p: (float(model.predict(p.features)), p.label))
return self.evaluateRegression(scoreAndLabels)
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return getattr(self.model_class, 'load')(context, path)
def predict(self, model, params):
return model.predict(params.split(','))
def predict_libsvm(self, model, params):
return self.predict(model, params)
class LinearRegressionModelController(RegressionModelController):
def __init__(self):
train_name = 'LinearRegressionWithSGD'
model_name = 'LinearRegressionModel'
super(LinearRegressionModelController, self).__init__(train_name,
model_name)
class RidgeRegressionModelController(RegressionModelController):
def __init__(self):
train_name = 'RidgeRegressionWithSGD'
model_name = 'RidgeRegressionModel'
super(RidgeRegressionModelController, self).__init__(train_name,
model_name)
class LogisticRegressionModelController(ModelController):
def __init__(self):
super(LogisticRegressionModelController, self).__init__()
def create_model(self, data, params):
numIterations = int(params.get('numIterations', 10))
points = data.map(self.parsePoint)
return LogisticRegressionWithSGD.train(points, numIterations)
def create_model_libsvm(self, data, params):
numIterations = int(params.get('numIterations', 10))
return LogisticRegressionWithSGD.train(data, numIterations)
def evaluate_model(self, context, model, data):
predictionAndLabels = data.map(self.parsePoint)\
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
return self.evaluateBinaryClassification(predictionAndLabels)
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return LogisticRegressionModel.load(context, path)
def predict(self, model, params):
return model.predict(params.split(','))
def predict_libsvm(self, model, params):
return self.predict(model, params)
class NaiveBayesModelController(ModelController):
def __init__(self):
super(NaiveBayesModelController, self).__init__()
def create_model(self, data, params):
lambda_ = float(params.get('lambda', 1.0))
points = data.map(self.parsePoint)
return NaiveBayes.train(points, lambda_)
def create_model_text(self, data, params):
lambda_ = float(params.get('lambda', 1.0))
points = self.parseTextRDDToIndex(data)
return NaiveBayes.train(points, lambda_)
def evaluate_model(self, context, model, data):
predictionAndLabels = data.map(self.parsePoint)\
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
return self.evaluateBinaryClassification(predictionAndLabels)
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def evaluate_model_text(self, context, model, data):
points = self.parseTextRDDToIndex(data)
predictionAndLabels = points.map(lambda lp: (float(model.predict(lp.features)), lp.label))
return self.evaluateBinaryClassification(predictionAndLabels)
def load_model(self, context, path):
return NaiveBayesModel.load(context, path)
def predict(self, model, params):
return model.predict(params.split(','))
def predict_text(self, model, params):
index = self.textToIndex(params)
return model.predict(index)
class TreeModelController(ModelController):
def __init__(self, train_name, model_name, algorithm):
super(TreeModelController, self).__init__()
self.train_class = eval(train_name)
self.model_class = eval(model_name)
self.algorithm = algorithm
self.model_params = {}
def _parse_to_libsvm(self, param):
index_l = []
value_l = []
param_l = param.split(' ')
param_len = str(len(param_l) * 2)
for p in param_l:
index_l.append(str(int(p.split(':')[0]) - 1))
value_l.append(p.split(':')[1])
index = ','.join(index_l)
value = ','.join(value_l)
parsed_str = '(' + param_len + ', [' + index + '],[' + value + '])'
return SparseVector.parse(parsed_str)
def _parse_model_params(self, params):
p = {}
p['maxDepth'] = int(params.get('maxDepth', 5))
p['maxBins'] = int(params.get('maxBins', 32))
if self.algorithm == 'Classification':
p['numClasses'] = int(params.get('numClasses', 2))
if self.__class__.__name__ == 'RandomForestModelController':
p['numTrees'] = int(params.get('numTrees', 3))
self.model_params = p
def _create_model(self, data, params, format='csv'):
self._parse_model_params(params)
if format == 'csv':
points = data.map(self.parsePoint)
else:
points = data
if (self.__class__.__name__ == 'DecisionTreeModelController' and
self.algorithm == 'Regression'):
return getattr(self.train_class,
'trainRegressor')(points,
{},
**self.model_params)
elif (self.__class__.__name__ == 'DecisionTreeModelController' and
self.algorithm == 'Classification'):
numClasses = self.model_params.pop('numClasses')
return getattr(self.train_class,
'trainClassifier')(points,
numClasses,
{},
**self.model_params)
if (self.__class__.__name__ == 'RandomForestModelController' and
self.algorithm == 'Regression'):
numTrees = self.model_params.pop('numTrees')
return getattr(self.train_class,
'trainRegressor')(points,
{},
numTrees,
**self.model_params)
elif (self.__class__.__name__ == 'RandomForestModelController' and
self.algorithm == 'Classification'):
numClasses = self.model_params.pop('numClasses')
numTrees = self.model_params.pop('numTrees')
return getattr(self.train_class,
'trainClassifier')(points,
numClasses,
{},
numTrees,
**self.model_params)
def create_model(self, data, params):
return self._create_model(data, params)
def create_model_libsvm(self, data, params):
return self._create_model(data, params, format='libsvm')
def evaluate_model(self, context, model, data):
predictions = model.predict(data.map(lambda x: x.features))
predictionAndLabels = data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(predictionAndLabels)
result = "{}: {}".format("Precision", metrics.precision()) + os.linesep\
+ "{}: {}".format("Recall", metrics.recall()) + os.linesep\
+ "{}: {}".format("F1 Score", metrics.fMeasure())
return result
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return getattr(self.model_class, 'load')(context, path)
def predict(self, model, params):
return model.predict(params.split(','))
def predict_libsvm(self, model, params):
parsed_params = self._parse_to_libsvm(params)
return model.predict(parsed_params)
class DecisionTreeModelController(TreeModelController):
def __init__(self, algorithm):
train_name = 'DecisionTree'
model_name = 'DecisionTreeModel'
super(DecisionTreeModelController, self).__init__(train_name,
model_name,
algorithm)
class RandomForestModelController(TreeModelController):
def __init__(self, algorithm):
train_name = 'RandomForest'
model_name = 'RandomForestModel'
super(RandomForestModelController, self).__init__(train_name,
model_name,
algorithm)
class Word2VecModelController(ModelController):
def __init__(self):
super(Word2VecModelController, self).__init__()
def create_model_text(self, data, params):
learningRate = float(params.get('learningRate', 0.025))
numIterations = int(params.get('numIterations', 10))
minCount = int(params.get('minCount', 5))
word2vec = Word2Vec()
word2vec.setLearningRate(learningRate)
word2vec.setNumIterations(numIterations)
word2vec.setMinCount(minCount)
inp = data.map(lambda row: row.split(" "))
return word2vec.fit(inp)
def load_model(self, context, path):
return Word2VecModel.load(context, path)
def predict_text(self, model, params):
dic_params = literal_eval(params)
keyword = dic_params.get('word')
num = dic_params.get('num', 2)
synonyms = model.findSynonyms(keyword, num)
result = ""
for word, cosine_distance in synonyms:
result += "{}: {}".format(word, cosine_distance) + os.linesep
return result
class FPGrowthModelController(ModelController):
def __init__(self):
super(FPGrowthModelController, self).__init__()
def create_model_text(self, data, params):
minSupport = float(params.get('minSupport', 0.2))
numPartitions = int(params.get('numPartitions', 10))
limits = int(params.get('limits', 10))
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions,
minSupport=minSupport,
numPartitions=numPartitions)
result = model.freqItemsets().collect()
for index, fi in enumerate(result):
if index == limits:
break
print(str(fi.items) + ':' + str(fi.freq))
class MeteosSparkController(object):
def init_context(self):
self.base_hostname = socket.gethostname().split(".")[0]
master_node = 'spark://' + self.base_hostname + ':7077'
self.context = SparkContext(master_node, 'INFO')
def parse_args(self, args):
self.id = args[3]
decoded_args = base64.b64decode(args[4])
self.job_args = literal_eval(decoded_args)
self.datapath = 'data-' + self.id
self.modelpath = 'model-' + self.id
def init_model_controller(self):
model_type = self.job_args['model']['type']
if model_type == 'KMeans':
self.controller = KMeansModelController()
elif model_type == 'Recommendation':
self.controller = RecommendationController()
elif model_type == 'LogisticRegression':
self.controller = LogisticRegressionModelController()
elif model_type == 'LinearRegression':
self.controller = LinearRegressionModelController()
elif model_type == 'RidgeRegression':
self.controller = RidgeRegressionModelController()
elif model_type == 'DecisionTreeRegression':
self.controller = DecisionTreeModelController('Regression')
elif model_type == 'DecisionTreeClassification':
self.controller = DecisionTreeModelController('Classification')
elif model_type == 'RandomForestRegression':
self.controller = RandomForestModelController('Regression')
elif model_type == 'RandomForestClassification':
self.controller = RandomForestModelController('Classification')
elif model_type == 'Word2Vec':
self.controller = Word2VecModelController()
elif model_type == 'FPGrowth':
self.controller = FPGrowthModelController()
elif model_type == 'NaiveBayes':
self.controller = NaiveBayesModelController()
def save_data(self, collect=True):
if collect:
self.data.collect()
self.data.saveAsTextFile(self.datapath)
print(self.data.take(10))
def load_data(self):
source_dataset_url = self.job_args['source_dataset_url']
if source_dataset_url.count('swift'):
swift = self.job_args['swift']
tenant = swift['tenant']
username = swift['username']
password = swift['password']
container_name = source_dataset_url.split('/')[2]
object_name = source_dataset_url.split('/')[3]
prefix = 'fs.swift.service.sahara'
hconf = self.context._jsc.hadoopConfiguration()
hconf.set(prefix + '.tenant', tenant)
hconf.set(prefix + '.username', username)
hconf.set(prefix + '.password', password)
hconf.setInt(prefix + ".http.port", 8080)
self.data = self._load_data('swift://' + container_name + '.sahara/' + object_name)
else:
dataset_path = 'data-' + source_dataset_url.split('/')[2]
self.data = self._load_data(dataset_path)
def _load_data(self, path):
dataset_format = self.job_args.get('dataset_format')
if dataset_format == 'libsvm':
return MLUtils.loadLibSVMFile(self.context, path)
else:
return self.context.textFile(path).cache()
def create_and_save_model(self):
model_params = self.job_args['model']['params']
params = base64.b64decode(model_params)
list_params = literal_eval(params)
dataset_format = self.job_args.get('dataset_format')
if dataset_format == 'libsvm':
self.model = self.controller.create_model_libsvm(self.data,
list_params)
elif dataset_format == 'text':
self.model = self.controller.create_model_text(self.data,
list_params)
else:
self.model = self.controller.create_model(self.data, list_params)
if self.model:
self.model.save(self.context, self.modelpath)
def evaluate_model(self):
self.load_data()
self.model = self.controller.load_model(self.context,
self.modelpath)
dataset_format = self.job_args.get('dataset_format')
if dataset_format == 'libsvm':
output = self.controller.evaluate_model_libsvm(self.context,
self.model,
self.data)
elif dataset_format == 'text':
output = self.controller.evaluate_model_text(self.context,
self.model,
self.data)
else:
output = self.controller.evaluate_model(self.context,
self.model,
self.data)
if output is not None:
print(output)
def download_dataset(self):
self.load_data()
self.save_data()
def parse_dataset(self):
self.load_data()
dataset_param = self.job_args['dataset']['params']
params = base64.b64decode(dataset_param)
list_params = literal_eval(params)
cmd = ''
for param in list_params:
cmd = cmd + '.' + param['method'] + '(' + param['args'] + ')'
exec('self.data = self.data' + cmd)
self.save_data()
def split_dataset(self):
self.load_data()
percent_train = self.job_args['dataset']['percent_train']
percent_test = self.job_args['dataset']['percent_test']
# Split the data into training and test sets
(trainData, testData) = self.data.randomSplit([float(percent_train),
float(percent_test)])
self.data = trainData
self.save_data()
# save testData
testData.collect()
datapath = 'data-' + self.job_args['dataset']['test_dataset']['id']
testData.saveAsTextFile(datapath)
def create_model(self):
self.load_data()
self.create_and_save_model()
def _predict(self, dataset_format, params):
if dataset_format == 'libsvm':
return self.controller.predict_libsvm(self.model, params)
elif dataset_format == 'text':
return self.controller.predict_text(self.model, params)
else:
return self.controller.predict(self.model, params)
def online_predict(self):
host = 'localhost'
port = int(self.job_args['model']['port'])
buf = 8192
dataset_format = self.job_args.get('dataset_format')
self.model = self.controller.load_model(self.context, self.modelpath)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_addr = (host, port)
s.bind(s_addr)
s.listen(1)
while(True):
conn, c_addr = s.accept()
EXIT = False
try:
while(True):
input_value = conn.recv(buf)
params = base64.b64decode(input_value)
if params == EXIT_CODE:
EXIT = True
elif params:
output = self._predict(dataset_format, params)
conn.sendall(str(output))
else:
break
except Exception:
pass
finally:
conn.close()
if EXIT:
break
def predict(self):
predict_params = self.job_args['learning']['params']
params = base64.b64decode(predict_params)
self.model = self.controller.load_model(self.context, self.modelpath)
dataset_format = self.job_args.get('dataset_format')
output = self._predict(dataset_format, params)
if output is not None:
print(output)
if __name__ == '__main__':
meteos = MeteosSparkController()
meteos.parse_args(sys.argv)
meteos.init_model_controller()
meteos.init_context()
getattr(meteos, meteos.job_args['method'])()