Fix the Cloud example as it was broken.
Change-Id: If5222d7737e3efb77a519e877d471949985ca5fa
This commit is contained in:
parent
bd1352a68b
commit
981a5805e2
|
@ -14,7 +14,6 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -47,30 +46,25 @@ class CloudIngestor(base.BaseIngestor):
|
||||||
def map_dstream(self, dstream):
|
def map_dstream(self, dstream):
|
||||||
features_list = list(self._features)
|
features_list = list(self._features)
|
||||||
return dstream.map(fn.from_json)\
|
return dstream.map(fn.from_json)\
|
||||||
.map(lambda rdd_entry: CloudIngestor._process_data(
|
.map(lambda x: (x['ctime'], x['event']))\
|
||||||
rdd_entry,
|
.groupByKey()\
|
||||||
|
.map(lambda rdd_entry: CloudIngestor._parse_and_vectorize(
|
||||||
|
rdd_entry[1],
|
||||||
features_list))
|
features_list))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_default_config():
|
def get_default_config():
|
||||||
return {"module": CloudIngestor.__name__}
|
return {"module": CloudIngestor.__name__}
|
||||||
|
|
||||||
# TODO(David): With the new model, this can now be method, and the lambda
|
|
||||||
# can be removed.
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _process_data(rdd_entry, feature_list):
|
def _parse_and_vectorize(iterable, feature_list):
|
||||||
json_value = json.loads(rdd_entry)
|
|
||||||
return CloudIngestor._parse_and_vectorize(json_value, feature_list)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_and_vectorize(json_value, feature_list):
|
|
||||||
values = {
|
values = {
|
||||||
"support_1": 0
|
"support_1": 0.0
|
||||||
}
|
}
|
||||||
for feature in feature_list:
|
for feature in feature_list:
|
||||||
values[feature] = 0
|
values[feature] = 0.0
|
||||||
for e in json_value["events"]:
|
for e in iterable:
|
||||||
if e["id"] in values:
|
if e["id"] in values:
|
||||||
values[e["id"]] += 1
|
values[e["id"]] += 1.0
|
||||||
res = [values[f] for f in feature_list]
|
res = [values[f] for f in feature_list]
|
||||||
return np.array(res)
|
return np.array(res)
|
||||||
|
|
|
@ -14,13 +14,13 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import voluptuous
|
import voluptuous
|
||||||
|
|
||||||
import monasca_analytics.ldp.base as bt
|
import monasca_analytics.ldp.base as bt
|
||||||
from monasca_analytics.util import validation_utils as vu
|
import monasca_analytics.util.spark_func as fn
|
||||||
|
import monasca_analytics.util.validation_utils as vu
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -51,33 +51,32 @@ class CloudCausalityLDP(bt.BaseLDP):
|
||||||
:param dstream: DStream created by the source.
|
:param dstream: DStream created by the source.
|
||||||
"""
|
"""
|
||||||
data = self._data
|
data = self._data
|
||||||
return dstream.flatMap(lambda r: self._aggregate(r, data))
|
return dstream.map(fn.from_json)\
|
||||||
|
.map(lambda x: (x['ctime'], x))\
|
||||||
|
.groupByKey()\
|
||||||
|
.flatMap(lambda r: CloudCausalityLDP._aggregate(r[1], data))
|
||||||
|
|
||||||
def _aggregate(self, rdd_entry, data):
|
@staticmethod
|
||||||
rdd_entry = json.loads(rdd_entry)
|
def _aggregate(rdd_entry, data):
|
||||||
new_entries = []
|
new_entries = []
|
||||||
events = rdd_entry["events"]
|
|
||||||
features = data["features"]
|
features = data["features"]
|
||||||
matrix = data["matrix"]
|
matrix = data["matrix"]
|
||||||
|
|
||||||
for event in events:
|
|
||||||
event["ctime"] = rdd_entry["ctime"]
|
|
||||||
|
|
||||||
if features is None or matrix is None:
|
if features is None or matrix is None:
|
||||||
return events
|
return rdd_entry
|
||||||
|
|
||||||
for event in events:
|
for event in rdd_entry:
|
||||||
causes = []
|
causes = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cause = features.index(event["id"])
|
cause = features.index(event["event"]["id"])
|
||||||
for other_event in events:
|
for other_event in rdd_entry:
|
||||||
if other_event["id"] != event["id"]:
|
if other_event["event"]["id"] != event["event"]["id"]:
|
||||||
try:
|
try:
|
||||||
caused = features.index(other_event["id"])
|
caused = features.index(other_event["event"]["id"])
|
||||||
|
|
||||||
if matrix[caused][cause]:
|
if matrix[caused][cause]:
|
||||||
causes.append(other_event["id"])
|
causes.append(other_event["event"]["id"])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|
Loading…
Reference in New Issue