diff --git a/meteos/cluster/binary/meteos-script-1.6.0.py b/meteos/cluster/binary/meteos-script-1.6.0.py index 4b865ff..fcbace8 100644 --- a/meteos/cluster/binary/meteos-script-1.6.0.py +++ b/meteos/cluster/binary/meteos-script-1.6.0.py @@ -47,10 +47,10 @@ from pyspark.mllib.classification import NaiveBayes from pyspark.mllib.classification import NaiveBayesModel from pyspark.mllib.clustering import KMeans from pyspark.mllib.clustering import KMeansModel -from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.mllib.evaluation import MulticlassMetrics -from pyspark.mllib.evaluation import RankingMetrics from pyspark.mllib.evaluation import RegressionMetrics +from pyspark.mllib.feature import HashingTF +from pyspark.mllib.feature import IDF from pyspark.mllib.feature import Word2Vec from pyspark.mllib.feature import Word2VecModel from pyspark.mllib.fpm import FPGrowth @@ -89,10 +89,22 @@ class ModelController(object): """Is called to create mode.""" raise NotImplementedError() + def create_model_text(self, data, params): + """Is called to create mode.""" + raise NotImplementedError() + def evaluate_model(self, context, model, data): """Is called to evaluate mode.""" raise NotImplementedError() + def evaluate_model_libsvm(self, context, model, data): + """Is called to evaluate mode.""" + raise NotImplementedError() + + def evaluate_model_text(self, context, model, data): + """Is called to evaluate mode.""" + raise NotImplementedError() + def load_model(self, context, path): """Is called to load mode.""" raise NotImplementedError() @@ -105,12 +117,50 @@ class ModelController(object): """Is called to predict value.""" raise NotImplementedError() + def predict_text(self, context, params): + """Is called to predict value.""" + raise NotImplementedError() + def parsePoint(self, line): values = [float(s) for s in line.split(',')] if values[0] == -1: values[0] = 0 return LabeledPoint(values[0], values[1:]) + def textToIndex(self, text): + return HashingTF().transform(text.split(" ")) + + def parseTextRDDToIndex(self, data): + + labels = data.map(lambda line: float(line.split(" ", 1)[0])) + documents = data.map(lambda line: line.split(" ", 1)[1].split(" ")) + + tf = HashingTF().transform(documents) + tf.cache() + + idfIgnore = IDF(minDocFreq=2).fit(tf) + index = idfIgnore.transform(tf) + + return labels.zip(index).map(lambda line: LabeledPoint(line[0], line[1])) + + def evaluateBinaryClassification(self, predictionAndLabels): + + metrics = MulticlassMetrics(predictionAndLabels).confusionMatrix() + + return "{}: {}".format("True Positive", metrics[0, 0]) + os.linesep\ + + "{}: {}".format("False Positive", metrics[0, 1]) + os.linesep\ + + "{}: {}".format("False Negative", metrics[1, 0]) + os.linesep\ + + "{}: {}".format("True Negative", metrics[1, 1]) + + def evaluateRegression(self, scoreAndLabels): + + metrics = RegressionMetrics(scoreAndLabels) + + return "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\ + + "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\ + + "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\ + + "{}: {}".format("R-squared", metrics.r2) + class KMeansModelController(ModelController): @@ -171,14 +221,7 @@ class RecommendationController(ModelController): ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating)) scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1]) - metrics = RegressionMetrics(scoreAndLabels) - - result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\ - + "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\ - + "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\ - + "{}: {}".format("R-squared", metrics.r2) - - return result + return self.evaluateRegression(scoreAndLabels) def load_model(self, context, path): return MatrixFactorizationModel.load(context, path) @@ -229,14 +272,10 @@ class RegressionModelController(ModelController): points = data.map(self.parsePoint) scoreAndLabels = points.map(lambda p: (float(model.predict(p.features)), p.label)) - metrics = RegressionMetrics(scoreAndLabels) + return self.evaluateRegression(scoreAndLabels) - result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\ - + "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\ - + "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\ - + "{}: {}".format("R-squared", metrics.r2) - - return result + def evaluate_model_libsvm(self, context, model, data): + return self.evaluate_model(context, model, data) def load_model(self, context, path): return getattr(self.model_class, 'load')(context, path) @@ -289,12 +328,10 @@ class LogisticRegressionModelController(ModelController): predictionAndLabels = data.map(self.parsePoint)\ .map(lambda lp: (float(model.predict(lp.features)), lp.label)) - metrics = BinaryClassificationMetrics(predictionAndLabels) + return self.evaluateBinaryClassification(predictionAndLabels) - result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\ - + "{}: {}".format("Area under ROC", metrics.areaUnderROC) - - return result + def evaluate_model_libsvm(self, context, model, data): + return self.evaluate_model(context, model, data) def load_model(self, context, path): return LogisticRegressionModel.load(context, path) @@ -318,17 +355,30 @@ class NaiveBayesModelController(ModelController): points = data.map(self.parsePoint) return NaiveBayes.train(points, lambda_) + def create_model_text(self, data, params): + + lambda_ = float(params.get('lambda', 1.0)) + + points = self.parseTextRDDToIndex(data) + + return NaiveBayes.train(points, lambda_) + def evaluate_model(self, context, model, data): predictionAndLabels = data.map(self.parsePoint)\ .map(lambda lp: (float(model.predict(lp.features)), lp.label)) - metrics = BinaryClassificationMetrics(predictionAndLabels) + return self.evaluateBinaryClassification(predictionAndLabels) - result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\ - + "{}: {}".format("Area under ROC", metrics.areaUnderROC) + def evaluate_model_libsvm(self, context, model, data): + return self.evaluate_model(context, model, data) - return result + def evaluate_model_text(self, context, model, data): + + points = self.parseTextRDDToIndex(data) + predictionAndLabels = points.map(lambda lp: (float(model.predict(lp.features)), lp.label)) + + return self.evaluateBinaryClassification(predictionAndLabels) def load_model(self, context, path): return NaiveBayesModel.load(context, path) @@ -336,6 +386,10 @@ class NaiveBayesModelController(ModelController): def predict(self, model, params): return model.predict(params.split(',')) + def predict_text(self, model, params): + index = self.textToIndex(params) + return model.predict(index) + class TreeModelController(ModelController): @@ -448,6 +502,9 @@ class TreeModelController(ModelController): return result + def evaluate_model_libsvm(self, context, model, data): + return self.evaluate_model(context, model, data) + def load_model(self, context, path): return getattr(self.model_class, 'load')(context, path) @@ -484,7 +541,7 @@ class Word2VecModelController(ModelController): def __init__(self): super(Word2VecModelController, self).__init__() - def create_model(self, data, params): + def create_model_text(self, data, params): learningRate = float(params.get('learningRate', 0.025)) numIterations = int(params.get('numIterations', 10)) @@ -501,7 +558,7 @@ class Word2VecModelController(ModelController): def load_model(self, context, path): return Word2VecModel.load(context, path) - def predict(self, model, params): + def predict_text(self, model, params): dic_params = literal_eval(params) @@ -523,7 +580,7 @@ class FPGrowthModelController(ModelController): def __init__(self): super(FPGrowthModelController, self).__init__() - def create_model(self, data, params): + def create_model_text(self, data, params): minSupport = float(params.get('minSupport', 0.2)) numPartitions = int(params.get('numPartitions', 10)) @@ -640,6 +697,9 @@ class MeteosSparkController(object): if dataset_format == 'libsvm': self.model = self.controller.create_model_libsvm(self.data, list_params) + elif dataset_format == 'text': + self.model = self.controller.create_model_text(self.data, + list_params) else: self.model = self.controller.create_model(self.data, list_params) @@ -651,10 +711,20 @@ class MeteosSparkController(object): self.load_data() self.model = self.controller.load_model(self.context, self.modelpath) + dataset_format = self.job_args.get('dataset_format') - output = self.controller.evaluate_model(self.context, - self.model, - self.data) + if dataset_format == 'libsvm': + output = self.controller.evaluate_model_libsvm(self.context, + self.model, + self.data) + elif dataset_format == 'text': + output = self.controller.evaluate_model_text(self.context, + self.model, + self.data) + else: + output = self.controller.evaluate_model(self.context, + self.model, + self.data) if output is not None: print(output) @@ -707,6 +777,8 @@ class MeteosSparkController(object): if dataset_format == 'libsvm': return self.controller.predict_libsvm(self.model, params) + elif dataset_format == 'text': + return self.controller.predict_text(self.model, params) else: return self.controller.predict(self.model, params)