Enable NaiveBayes to support a text format

Enable NaiveBayes to support a text format using tf-idf.

This feature allows user to create following prediction models.
- model detects whether it is a spam mail or not
- model predicts whether it is a review of goodwill or not
- model detects what language a document is written in

implements blueprint : support-text-format
Change-Id: Ib016c587ab821faa7518f6face76d57883dab5ab
This commit is contained in:
Hiroyuki Eguchi 2017-03-15 10:55:32 +09:00
parent f78676d89e
commit 892f96b98d
1 changed files with 104 additions and 32 deletions

View File

@ -47,10 +47,10 @@ from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.classification import NaiveBayesModel
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.clustering import KMeansModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel
from pyspark.mllib.fpm import FPGrowth
@ -89,10 +89,22 @@ class ModelController(object):
"""Is called to create mode."""
raise NotImplementedError()
def create_model_text(self, data, params):
"""Is called to create mode."""
raise NotImplementedError()
def evaluate_model(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def evaluate_model_libsvm(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def evaluate_model_text(self, context, model, data):
"""Is called to evaluate mode."""
raise NotImplementedError()
def load_model(self, context, path):
"""Is called to load mode."""
raise NotImplementedError()
@ -105,12 +117,50 @@ class ModelController(object):
"""Is called to predict value."""
raise NotImplementedError()
def predict_text(self, context, params):
"""Is called to predict value."""
raise NotImplementedError()
def parsePoint(self, line):
values = [float(s) for s in line.split(',')]
if values[0] == -1:
values[0] = 0
return LabeledPoint(values[0], values[1:])
def textToIndex(self, text):
return HashingTF().transform(text.split(" "))
def parseTextRDDToIndex(self, data):
labels = data.map(lambda line: float(line.split(" ", 1)[0]))
documents = data.map(lambda line: line.split(" ", 1)[1].split(" "))
tf = HashingTF().transform(documents)
tf.cache()
idfIgnore = IDF(minDocFreq=2).fit(tf)
index = idfIgnore.transform(tf)
return labels.zip(index).map(lambda line: LabeledPoint(line[0], line[1]))
def evaluateBinaryClassification(self, predictionAndLabels):
metrics = MulticlassMetrics(predictionAndLabels).confusionMatrix()
return "{}: {}".format("True Positive", metrics[0, 0]) + os.linesep\
+ "{}: {}".format("False Positive", metrics[0, 1]) + os.linesep\
+ "{}: {}".format("False Negative", metrics[1, 0]) + os.linesep\
+ "{}: {}".format("True Negative", metrics[1, 1])
def evaluateRegression(self, scoreAndLabels):
metrics = RegressionMetrics(scoreAndLabels)
return "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
+ "{}: {}".format("R-squared", metrics.r2)
class KMeansModelController(ModelController):
@ -171,14 +221,7 @@ class RecommendationController(ModelController):
ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])
metrics = RegressionMetrics(scoreAndLabels)
result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
+ "{}: {}".format("R-squared", metrics.r2)
return result
return self.evaluateRegression(scoreAndLabels)
def load_model(self, context, path):
return MatrixFactorizationModel.load(context, path)
@ -229,14 +272,10 @@ class RegressionModelController(ModelController):
points = data.map(self.parsePoint)
scoreAndLabels = points.map(lambda p: (float(model.predict(p.features)), p.label))
metrics = RegressionMetrics(scoreAndLabels)
return self.evaluateRegression(scoreAndLabels)
result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\
+ "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\
+ "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\
+ "{}: {}".format("R-squared", metrics.r2)
return result
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return getattr(self.model_class, 'load')(context, path)
@ -289,12 +328,10 @@ class LogisticRegressionModelController(ModelController):
predictionAndLabels = data.map(self.parsePoint)\
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = BinaryClassificationMetrics(predictionAndLabels)
return self.evaluateBinaryClassification(predictionAndLabels)
result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\
+ "{}: {}".format("Area under ROC", metrics.areaUnderROC)
return result
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return LogisticRegressionModel.load(context, path)
@ -318,17 +355,30 @@ class NaiveBayesModelController(ModelController):
points = data.map(self.parsePoint)
return NaiveBayes.train(points, lambda_)
def create_model_text(self, data, params):
lambda_ = float(params.get('lambda', 1.0))
points = self.parseTextRDDToIndex(data)
return NaiveBayes.train(points, lambda_)
def evaluate_model(self, context, model, data):
predictionAndLabels = data.map(self.parsePoint)\
.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = BinaryClassificationMetrics(predictionAndLabels)
return self.evaluateBinaryClassification(predictionAndLabels)
result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\
+ "{}: {}".format("Area under ROC", metrics.areaUnderROC)
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
return result
def evaluate_model_text(self, context, model, data):
points = self.parseTextRDDToIndex(data)
predictionAndLabels = points.map(lambda lp: (float(model.predict(lp.features)), lp.label))
return self.evaluateBinaryClassification(predictionAndLabels)
def load_model(self, context, path):
return NaiveBayesModel.load(context, path)
@ -336,6 +386,10 @@ class NaiveBayesModelController(ModelController):
def predict(self, model, params):
return model.predict(params.split(','))
def predict_text(self, model, params):
index = self.textToIndex(params)
return model.predict(index)
class TreeModelController(ModelController):
@ -448,6 +502,9 @@ class TreeModelController(ModelController):
return result
def evaluate_model_libsvm(self, context, model, data):
return self.evaluate_model(context, model, data)
def load_model(self, context, path):
return getattr(self.model_class, 'load')(context, path)
@ -484,7 +541,7 @@ class Word2VecModelController(ModelController):
def __init__(self):
super(Word2VecModelController, self).__init__()
def create_model(self, data, params):
def create_model_text(self, data, params):
learningRate = float(params.get('learningRate', 0.025))
numIterations = int(params.get('numIterations', 10))
@ -501,7 +558,7 @@ class Word2VecModelController(ModelController):
def load_model(self, context, path):
return Word2VecModel.load(context, path)
def predict(self, model, params):
def predict_text(self, model, params):
dic_params = literal_eval(params)
@ -523,7 +580,7 @@ class FPGrowthModelController(ModelController):
def __init__(self):
super(FPGrowthModelController, self).__init__()
def create_model(self, data, params):
def create_model_text(self, data, params):
minSupport = float(params.get('minSupport', 0.2))
numPartitions = int(params.get('numPartitions', 10))
@ -640,6 +697,9 @@ class MeteosSparkController(object):
if dataset_format == 'libsvm':
self.model = self.controller.create_model_libsvm(self.data,
list_params)
elif dataset_format == 'text':
self.model = self.controller.create_model_text(self.data,
list_params)
else:
self.model = self.controller.create_model(self.data, list_params)
@ -651,10 +711,20 @@ class MeteosSparkController(object):
self.load_data()
self.model = self.controller.load_model(self.context,
self.modelpath)
dataset_format = self.job_args.get('dataset_format')
output = self.controller.evaluate_model(self.context,
self.model,
self.data)
if dataset_format == 'libsvm':
output = self.controller.evaluate_model_libsvm(self.context,
self.model,
self.data)
elif dataset_format == 'text':
output = self.controller.evaluate_model_text(self.context,
self.model,
self.data)
else:
output = self.controller.evaluate_model(self.context,
self.model,
self.data)
if output is not None:
print(output)
@ -707,6 +777,8 @@ class MeteosSparkController(object):
if dataset_format == 'libsvm':
return self.controller.predict_libsvm(self.model, params)
elif dataset_format == 'text':
return self.controller.predict_text(self.model, params)
else:
return self.controller.predict(self.model, params)