diff --git a/monasca_analytics/ingestor/iptables.py b/monasca_analytics/ingestor/iptables.py index 0265ea3..355743c 100644 --- a/monasca_analytics/ingestor/iptables.py +++ b/monasca_analytics/ingestor/iptables.py @@ -58,10 +58,20 @@ class IptablesIngestor(base.BaseIngestor): return [] def map_dstream(self, dstream): + """ + Map the provided dstream into another dstream. + + :type dstream: pyspark.streaming.DStream + :param dstream: DStream to transform. + :rtype: pyspark.streaming.DStream + :return: Returns a new dstream. + """ features_list = list(self._features) return dstream.map(fn.from_json)\ + .map(lambda x: (x['ctime'], x))\ + .groupByKey()\ .map(lambda rdd_entry: IptablesIngestor._process_data( - rdd_entry, + rdd_entry[1], features_list)) @staticmethod @@ -79,10 +89,10 @@ class IptablesIngestor(base.BaseIngestor): events = [] for event in rdd_entry: events.append(event[RDD_EVENT]) - return IptablesIngestor._vectorize_events(events, feature_list) + return IptablesIngestor.vectorize_events(events, feature_list) @staticmethod - def _vectorize_events(events, feature_list): + def vectorize_events(events, feature_list): """Event vectorizing logic. For each event, we get the message, @@ -91,6 +101,8 @@ class IptablesIngestor(base.BaseIngestor): Finally, we increase the index of the vector corresponding to that feature. + :type events: list[dict] + :param events: List of collected events. :type feature_list: list[str] :param feature_list: features to extract, in order """ diff --git a/monasca_analytics/ldp/iptables_ldp.py b/monasca_analytics/ldp/iptables_ldp.py index 8aa9dd1..207459c 100644 --- a/monasca_analytics/ldp/iptables_ldp.py +++ b/monasca_analytics/ldp/iptables_ldp.py @@ -56,10 +56,12 @@ class IptablesLDP(bt.BaseLDP): """ data = self._data return dstream.map(fn.from_json)\ - .flatMap(lambda r: - self._detect_anomalies(r, data)) + .map(lambda x: (x['ctime'], x))\ + .groupByKey()\ + .flatMap(lambda r: IptablesLDP._detect_anomalies(r[1], data)) - def _detect_anomalies(self, rdd_entry, data): + @staticmethod + def _detect_anomalies(rdd_entry, data): """Classifies and marks the RDD entry as anomalous or non-anomalous :type rdd_entry: list[dict] @@ -79,7 +81,7 @@ class IptablesLDP(bt.BaseLDP): if features is None or classifier is None: return events - X = ip_ing.IptablesIngestor._vectorize_events(events, features) + X = ip_ing.IptablesIngestor.vectorize_events(events, features) Y = classifier.predict(X) for i in range(len(events)): event = events[i] diff --git a/monasca_analytics/source/markov_chain/base.py b/monasca_analytics/source/markov_chain/base.py index d37d78f..74fe09f 100644 --- a/monasca_analytics/source/markov_chain/base.py +++ b/monasca_analytics/source/markov_chain/base.py @@ -71,7 +71,7 @@ class MarkovChainSource(base.BaseSource): self._server.terminate = True self._server.shutdown() self._server.server_close() - self._server_thread.join(0.1) + self._server_thread.join() def _start_thread(self, system): self._server = SocketServer.ThreadingTCPServer(