Enable NaiveBayes to support a text format
Enable NaiveBayes to support a text format using tf-idf. This feature allows user to create following prediction models. - model detects whether it is a spam mail or not - model predicts whether it is a review of goodwill or not - model detects what language a document is written in implements blueprint : support-text-format Change-Id: Ib016c587ab821faa7518f6face76d57883dab5ab
This commit is contained in:
parent
f78676d89e
commit
892f96b98d
|
@ -47,10 +47,10 @@ from pyspark.mllib.classification import NaiveBayes
|
|||
from pyspark.mllib.classification import NaiveBayesModel
|
||||
from pyspark.mllib.clustering import KMeans
|
||||
from pyspark.mllib.clustering import KMeansModel
|
||||
from pyspark.mllib.evaluation import BinaryClassificationMetrics
|
||||
from pyspark.mllib.evaluation import MulticlassMetrics
|
||||
from pyspark.mllib.evaluation import RankingMetrics
|
||||
from pyspark.mllib.evaluation import RegressionMetrics
|
||||
from pyspark.mllib.feature import HashingTF
|
||||
from pyspark.mllib.feature import IDF
|
||||
from pyspark.mllib.feature import Word2Vec
|
||||
from pyspark.mllib.feature import Word2VecModel
|
||||
from pyspark.mllib.fpm import FPGrowth
|
||||
|
@ -89,10 +89,22 @@ class ModelController(object):
|
|||
"""Is called to create mode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_model_text(self, data, params):
|
||||
"""Is called to create mode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def evaluate_model(self, context, model, data):
|
||||
"""Is called to evaluate mode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def evaluate_model_libsvm(self, context, model, data):
|
||||
"""Is called to evaluate mode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def evaluate_model_text(self, context, model, data):
|
||||
"""Is called to evaluate mode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def load_model(self, context, path):
|
||||
"""Is called to load mode."""
|
||||
raise NotImplementedError()
|
||||
|
@ -105,12 +117,50 @@ class ModelController(object):
|
|||
"""Is called to predict value."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def predict_text(self, context, params):
|
||||
"""Is called to predict value."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def parsePoint(self, line):
|
||||
values = [float(s) for s in line.split(',')]
|
||||
if values[0] == -1:
|
||||
values[0] = 0
|
||||
return LabeledPoint(values[0], values[1:])
|
||||
|
||||
def textToIndex(self, text):
|
||||
return HashingTF().transform(text.split(" "))
|
||||
|
||||
def parseTextRDDToIndex(self, data):
|
||||
|
||||
labels = data.map(lambda line: float(line.split(" ", 1)[0]))
|
||||
documents = data.map(lambda line: line.split(" ", 1)[1].split(" "))
|
||||
|
||||
tf = HashingTF().transform(documents)
|
||||
tf.cache()
|
||||
|
||||
idfIgnore = IDF(minDocFreq=2).fit(tf)
|
||||
index = idfIgnore.transform(tf)
|
||||
|
||||
return labels.zip(index).map(lambda line: LabeledPoint(line[0], line[1]))
|
||||
|
||||
def evaluateBinaryClassification(self, predictionAndLabels):
|
||||
|
||||
metrics = MulticlassMetrics(predictionAndLabels).confusionMatrix()
|
||||
|
||||
return "{}: {}".format("True Positive", metrics[0, 0]) + os.linesep\
|
||||
+ "{}: {}".format("False Positive", metrics[0, 1]) + os.linesep\
|
||||
+ "{}: {}".format("False Negative", metrics[1, 0]) + os.linesep\
|
||||
+ "{}: {}".format("True Negative", metrics[1, 1])
|
||||
|
||||
def evaluateRegression(self, scoreAndLabels):
|
||||
|
||||
metrics = RegressionMetrics(scoreAndLabels)
|
||||
|
||||
return "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
|
||||
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("R-squared", metrics.r2)
|
||||
|
||||
|
||||
class KMeansModelController(ModelController):
|
||||
|
||||
|
@ -171,14 +221,7 @@ class RecommendationController(ModelController):
|
|||
ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
|
||||
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])
|
||||
|
||||
metrics = RegressionMetrics(scoreAndLabels)
|
||||
|
||||
result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
|
||||
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("R-squared", metrics.r2)
|
||||
|
||||
return result
|
||||
return self.evaluateRegression(scoreAndLabels)
|
||||
|
||||
def load_model(self, context, path):
|
||||
return MatrixFactorizationModel.load(context, path)
|
||||
|
@ -229,14 +272,10 @@ class RegressionModelController(ModelController):
|
|||
points = data.map(self.parsePoint)
|
||||
scoreAndLabels = points.map(lambda p: (float(model.predict(p.features)), p.label))
|
||||
|
||||
metrics = RegressionMetrics(scoreAndLabels)
|
||||
return self.evaluateRegression(scoreAndLabels)
|
||||
|
||||
result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
|
||||
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
|
||||
+ "{}: {}".format("R-squared", metrics.r2)
|
||||
|
||||
return result
|
||||
def evaluate_model_libsvm(self, context, model, data):
|
||||
return self.evaluate_model(context, model, data)
|
||||
|
||||
def load_model(self, context, path):
|
||||
return getattr(self.model_class, 'load')(context, path)
|
||||
|
@ -289,12 +328,10 @@ class LogisticRegressionModelController(ModelController):
|
|||
predictionAndLabels = data.map(self.parsePoint)\
|
||||
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
|
||||
|
||||
metrics = BinaryClassificationMetrics(predictionAndLabels)
|
||||
return self.evaluateBinaryClassification(predictionAndLabels)
|
||||
|
||||
result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\
|
||||
+ "{}: {}".format("Area under ROC", metrics.areaUnderROC)
|
||||
|
||||
return result
|
||||
def evaluate_model_libsvm(self, context, model, data):
|
||||
return self.evaluate_model(context, model, data)
|
||||
|
||||
def load_model(self, context, path):
|
||||
return LogisticRegressionModel.load(context, path)
|
||||
|
@ -318,17 +355,30 @@ class NaiveBayesModelController(ModelController):
|
|||
points = data.map(self.parsePoint)
|
||||
return NaiveBayes.train(points, lambda_)
|
||||
|
||||
def create_model_text(self, data, params):
|
||||
|
||||
lambda_ = float(params.get('lambda', 1.0))
|
||||
|
||||
points = self.parseTextRDDToIndex(data)
|
||||
|
||||
return NaiveBayes.train(points, lambda_)
|
||||
|
||||
def evaluate_model(self, context, model, data):
|
||||
|
||||
predictionAndLabels = data.map(self.parsePoint)\
|
||||
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
|
||||
|
||||
metrics = BinaryClassificationMetrics(predictionAndLabels)
|
||||
return self.evaluateBinaryClassification(predictionAndLabels)
|
||||
|
||||
result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\
|
||||
+ "{}: {}".format("Area under ROC", metrics.areaUnderROC)
|
||||
def evaluate_model_libsvm(self, context, model, data):
|
||||
return self.evaluate_model(context, model, data)
|
||||
|
||||
return result
|
||||
def evaluate_model_text(self, context, model, data):
|
||||
|
||||
points = self.parseTextRDDToIndex(data)
|
||||
predictionAndLabels = points.map(lambda lp: (float(model.predict(lp.features)), lp.label))
|
||||
|
||||
return self.evaluateBinaryClassification(predictionAndLabels)
|
||||
|
||||
def load_model(self, context, path):
|
||||
return NaiveBayesModel.load(context, path)
|
||||
|
@ -336,6 +386,10 @@ class NaiveBayesModelController(ModelController):
|
|||
def predict(self, model, params):
|
||||
return model.predict(params.split(','))
|
||||
|
||||
def predict_text(self, model, params):
|
||||
index = self.textToIndex(params)
|
||||
return model.predict(index)
|
||||
|
||||
|
||||
class TreeModelController(ModelController):
|
||||
|
||||
|
@ -448,6 +502,9 @@ class TreeModelController(ModelController):
|
|||
|
||||
return result
|
||||
|
||||
def evaluate_model_libsvm(self, context, model, data):
|
||||
return self.evaluate_model(context, model, data)
|
||||
|
||||
def load_model(self, context, path):
|
||||
return getattr(self.model_class, 'load')(context, path)
|
||||
|
||||
|
@ -484,7 +541,7 @@ class Word2VecModelController(ModelController):
|
|||
def __init__(self):
|
||||
super(Word2VecModelController, self).__init__()
|
||||
|
||||
def create_model(self, data, params):
|
||||
def create_model_text(self, data, params):
|
||||
|
||||
learningRate = float(params.get('learningRate', 0.025))
|
||||
numIterations = int(params.get('numIterations', 10))
|
||||
|
@ -501,7 +558,7 @@ class Word2VecModelController(ModelController):
|
|||
def load_model(self, context, path):
|
||||
return Word2VecModel.load(context, path)
|
||||
|
||||
def predict(self, model, params):
|
||||
def predict_text(self, model, params):
|
||||
|
||||
dic_params = literal_eval(params)
|
||||
|
||||
|
@ -523,7 +580,7 @@ class FPGrowthModelController(ModelController):
|
|||
def __init__(self):
|
||||
super(FPGrowthModelController, self).__init__()
|
||||
|
||||
def create_model(self, data, params):
|
||||
def create_model_text(self, data, params):
|
||||
|
||||
minSupport = float(params.get('minSupport', 0.2))
|
||||
numPartitions = int(params.get('numPartitions', 10))
|
||||
|
@ -640,6 +697,9 @@ class MeteosSparkController(object):
|
|||
if dataset_format == 'libsvm':
|
||||
self.model = self.controller.create_model_libsvm(self.data,
|
||||
list_params)
|
||||
elif dataset_format == 'text':
|
||||
self.model = self.controller.create_model_text(self.data,
|
||||
list_params)
|
||||
else:
|
||||
self.model = self.controller.create_model(self.data, list_params)
|
||||
|
||||
|
@ -651,10 +711,20 @@ class MeteosSparkController(object):
|
|||
self.load_data()
|
||||
self.model = self.controller.load_model(self.context,
|
||||
self.modelpath)
|
||||
dataset_format = self.job_args.get('dataset_format')
|
||||
|
||||
output = self.controller.evaluate_model(self.context,
|
||||
self.model,
|
||||
self.data)
|
||||
if dataset_format == 'libsvm':
|
||||
output = self.controller.evaluate_model_libsvm(self.context,
|
||||
self.model,
|
||||
self.data)
|
||||
elif dataset_format == 'text':
|
||||
output = self.controller.evaluate_model_text(self.context,
|
||||
self.model,
|
||||
self.data)
|
||||
else:
|
||||
output = self.controller.evaluate_model(self.context,
|
||||
self.model,
|
||||
self.data)
|
||||
|
||||
if output is not None:
|
||||
print(output)
|
||||
|
@ -707,6 +777,8 @@ class MeteosSparkController(object):
|
|||
|
||||
if dataset_format == 'libsvm':
|
||||
return self.controller.predict_libsvm(self.model, params)
|
||||
elif dataset_format == 'text':
|
||||
return self.controller.predict_text(self.model, params)
|
||||
else:
|
||||
return self.controller.predict(self.model, params)
|
||||
|
||||
|
|
Loading…
Reference in New Issue