Fix the broken iptable example.
Change-Id: I7936322021a409b250204a5bce515e14a879b279 Co-Authored-By: Joan Varvenne <joan.varvenne@hpe.com> Co-Authored-By: David Subiros Perez <david.perez5@hpe.com>
This commit is contained in:
parent
5812bd8429
commit
2cdc21c7e0
|
@ -58,10 +58,20 @@ class IptablesIngestor(base.BaseIngestor):
|
|||
return []
|
||||
|
||||
def map_dstream(self, dstream):
|
||||
"""
|
||||
Map the provided dstream into another dstream.
|
||||
|
||||
:type dstream: pyspark.streaming.DStream
|
||||
:param dstream: DStream to transform.
|
||||
:rtype: pyspark.streaming.DStream
|
||||
:return: Returns a new dstream.
|
||||
"""
|
||||
features_list = list(self._features)
|
||||
return dstream.map(fn.from_json)\
|
||||
.map(lambda x: (x['ctime'], x))\
|
||||
.groupByKey()\
|
||||
.map(lambda rdd_entry: IptablesIngestor._process_data(
|
||||
rdd_entry,
|
||||
rdd_entry[1],
|
||||
features_list))
|
||||
|
||||
@staticmethod
|
||||
|
@ -79,10 +89,10 @@ class IptablesIngestor(base.BaseIngestor):
|
|||
events = []
|
||||
for event in rdd_entry:
|
||||
events.append(event[RDD_EVENT])
|
||||
return IptablesIngestor._vectorize_events(events, feature_list)
|
||||
return IptablesIngestor.vectorize_events(events, feature_list)
|
||||
|
||||
@staticmethod
|
||||
def _vectorize_events(events, feature_list):
|
||||
def vectorize_events(events, feature_list):
|
||||
"""Event vectorizing logic.
|
||||
|
||||
For each event, we get the message,
|
||||
|
@ -91,6 +101,8 @@ class IptablesIngestor(base.BaseIngestor):
|
|||
Finally, we increase the index of the vector corresponding to
|
||||
that feature.
|
||||
|
||||
:type events: list[dict]
|
||||
:param events: List of collected events.
|
||||
:type feature_list: list[str]
|
||||
:param feature_list: features to extract, in order
|
||||
"""
|
||||
|
|
|
@ -56,10 +56,12 @@ class IptablesLDP(bt.BaseLDP):
|
|||
"""
|
||||
data = self._data
|
||||
return dstream.map(fn.from_json)\
|
||||
.flatMap(lambda r:
|
||||
self._detect_anomalies(r, data))
|
||||
.map(lambda x: (x['ctime'], x))\
|
||||
.groupByKey()\
|
||||
.flatMap(lambda r: IptablesLDP._detect_anomalies(r[1], data))
|
||||
|
||||
def _detect_anomalies(self, rdd_entry, data):
|
||||
@staticmethod
|
||||
def _detect_anomalies(rdd_entry, data):
|
||||
"""Classifies and marks the RDD entry as anomalous or non-anomalous
|
||||
|
||||
:type rdd_entry: list[dict]
|
||||
|
@ -79,7 +81,7 @@ class IptablesLDP(bt.BaseLDP):
|
|||
if features is None or classifier is None:
|
||||
return events
|
||||
|
||||
X = ip_ing.IptablesIngestor._vectorize_events(events, features)
|
||||
X = ip_ing.IptablesIngestor.vectorize_events(events, features)
|
||||
Y = classifier.predict(X)
|
||||
for i in range(len(events)):
|
||||
event = events[i]
|
||||
|
|
|
@ -71,7 +71,7 @@ class MarkovChainSource(base.BaseSource):
|
|||
self._server.terminate = True
|
||||
self._server.shutdown()
|
||||
self._server.server_close()
|
||||
self._server_thread.join(0.1)
|
||||
self._server_thread.join()
|
||||
|
||||
def _start_thread(self, system):
|
||||
self._server = SocketServer.ThreadingTCPServer(
|
||||
|
|
Loading…
Reference in New Issue