Fix the Cloud example as it was broken.

Change-Id: If5222d7737e3efb77a519e877d471949985ca5fa
This commit is contained in:
Joan Varvenne 2016-09-19 17:16:09 +01:00
parent bd1352a68b
commit 981a5805e2
2 changed files with 24 additions and 31 deletions

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import numpy as np
@ -47,30 +46,25 @@ class CloudIngestor(base.BaseIngestor):
def map_dstream(self, dstream):
features_list = list(self._features)
return dstream.map(fn.from_json)\
.map(lambda rdd_entry: CloudIngestor._process_data(
rdd_entry,
.map(lambda x: (x['ctime'], x['event']))\
.groupByKey()\
.map(lambda rdd_entry: CloudIngestor._parse_and_vectorize(
rdd_entry[1],
features_list))
@staticmethod
def get_default_config():
return {"module": CloudIngestor.__name__}
# TODO(David): With the new model, this can now be method, and the lambda
# can be removed.
@staticmethod
def _process_data(rdd_entry, feature_list):
json_value = json.loads(rdd_entry)
return CloudIngestor._parse_and_vectorize(json_value, feature_list)
@staticmethod
def _parse_and_vectorize(json_value, feature_list):
def _parse_and_vectorize(iterable, feature_list):
values = {
"support_1": 0
"support_1": 0.0
}
for feature in feature_list:
values[feature] = 0
for e in json_value["events"]:
values[feature] = 0.0
for e in iterable:
if e["id"] in values:
values[e["id"]] += 1
values[e["id"]] += 1.0
res = [values[f] for f in feature_list]
return np.array(res)

View File

@ -14,13 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import voluptuous
import monasca_analytics.ldp.base as bt
from monasca_analytics.util import validation_utils as vu
import monasca_analytics.util.spark_func as fn
import monasca_analytics.util.validation_utils as vu
logger = logging.getLogger(__name__)
@ -51,33 +51,32 @@ class CloudCausalityLDP(bt.BaseLDP):
:param dstream: DStream created by the source.
"""
data = self._data
return dstream.flatMap(lambda r: self._aggregate(r, data))
return dstream.map(fn.from_json)\
.map(lambda x: (x['ctime'], x))\
.groupByKey()\
.flatMap(lambda r: CloudCausalityLDP._aggregate(r[1], data))
def _aggregate(self, rdd_entry, data):
rdd_entry = json.loads(rdd_entry)
@staticmethod
def _aggregate(rdd_entry, data):
new_entries = []
events = rdd_entry["events"]
features = data["features"]
matrix = data["matrix"]
for event in events:
event["ctime"] = rdd_entry["ctime"]
if features is None or matrix is None:
return events
return rdd_entry
for event in events:
for event in rdd_entry:
causes = []
try:
cause = features.index(event["id"])
for other_event in events:
if other_event["id"] != event["id"]:
cause = features.index(event["event"]["id"])
for other_event in rdd_entry:
if other_event["event"]["id"] != event["event"]["id"]:
try:
caused = features.index(other_event["id"])
caused = features.index(other_event["event"]["id"])
if matrix[caused][cause]:
causes.append(other_event["id"])
causes.append(other_event["event"]["id"])
except ValueError:
pass
except ValueError: