Fix the broken iptable example.

Change-Id: I7936322021a409b250204a5bce515e14a879b279
Co-Authored-By: Joan Varvenne <joan.varvenne@hpe.com>
Co-Authored-By: David Subiros Perez <david.perez5@hpe.com>
This commit is contained in:
Joan Varvenne 2016-09-14 11:29:12 +01:00
parent 5812bd8429
commit 2cdc21c7e0
3 changed files with 22 additions and 8 deletions

View File

@ -58,10 +58,20 @@ class IptablesIngestor(base.BaseIngestor):
return []
def map_dstream(self, dstream):
"""
Map the provided dstream into another dstream.
:type dstream: pyspark.streaming.DStream
:param dstream: DStream to transform.
:rtype: pyspark.streaming.DStream
:return: Returns a new dstream.
"""
features_list = list(self._features)
return dstream.map(fn.from_json)\
.map(lambda x: (x['ctime'], x))\
.groupByKey()\
.map(lambda rdd_entry: IptablesIngestor._process_data(
rdd_entry,
rdd_entry[1],
features_list))
@staticmethod
@ -79,10 +89,10 @@ class IptablesIngestor(base.BaseIngestor):
events = []
for event in rdd_entry:
events.append(event[RDD_EVENT])
return IptablesIngestor._vectorize_events(events, feature_list)
return IptablesIngestor.vectorize_events(events, feature_list)
@staticmethod
def _vectorize_events(events, feature_list):
def vectorize_events(events, feature_list):
"""Event vectorizing logic.
For each event, we get the message,
@ -91,6 +101,8 @@ class IptablesIngestor(base.BaseIngestor):
Finally, we increase the index of the vector corresponding to
that feature.
:type events: list[dict]
:param events: List of collected events.
:type feature_list: list[str]
:param feature_list: features to extract, in order
"""

View File

@ -56,10 +56,12 @@ class IptablesLDP(bt.BaseLDP):
"""
data = self._data
return dstream.map(fn.from_json)\
.flatMap(lambda r:
self._detect_anomalies(r, data))
.map(lambda x: (x['ctime'], x))\
.groupByKey()\
.flatMap(lambda r: IptablesLDP._detect_anomalies(r[1], data))
def _detect_anomalies(self, rdd_entry, data):
@staticmethod
def _detect_anomalies(rdd_entry, data):
"""Classifies and marks the RDD entry as anomalous or non-anomalous
:type rdd_entry: list[dict]
@ -79,7 +81,7 @@ class IptablesLDP(bt.BaseLDP):
if features is None or classifier is None:
return events
X = ip_ing.IptablesIngestor._vectorize_events(events, features)
X = ip_ing.IptablesIngestor.vectorize_events(events, features)
Y = classifier.predict(X)
for i in range(len(events)):
event = events[i]

View File

@ -71,7 +71,7 @@ class MarkovChainSource(base.BaseSource):
self._server.terminate = True
self._server.shutdown()
self._server.server_close()
self._server_thread.join(0.1)
self._server_thread.join()
def _start_thread(self, system):
self._server = SocketServer.ThreadingTCPServer(