Improve speed in data generation while preserving semantics.

This make the fake data generation much faster and configurable.
For the cloud example, the phase 2 is now triggered in less than 5
seconds instead of 6 minutes.

Change-Id: I9967c94d28a380fa19afe9275e769f1465f4f922
This commit is contained in:
Joan Varvenne 2016-09-19 18:10:32 +01:00
parent 981a5805e2
commit a2a315fb5e
6 changed files with 27 additions and 9 deletions

View File

@ -3,7 +3,7 @@
# (cloud-like data model)
#
src = CloudMarkovChainSource(sleep=0.01)
src = CloudMarkovChainSource(sleep=0.01, min_event_per_burst=500)
src.transitions.web_service = {
"run=>slow": {

View File

@ -13,6 +13,7 @@
"src1": {
"module": "CloudMarkovChainSource",
"sleep": 0.01,
"min_event_per_burst": 500,
"transitions": {
"web_service": {
"run=>slow": {

View File

@ -38,6 +38,7 @@ class CloudMarkovChainSource(base.MarkovChainSource):
source_schema = voluptuous.Schema({
"module": voluptuous.And(
basestring, vu.NoSpaceCharacter()),
"min_event_per_burst": voluptuous.Or(float, int),
"sleep": voluptuous.And(
float, voluptuous.Range(
min=0, max=1, min_included=False, max_included=False)),
@ -90,6 +91,7 @@ class CloudMarkovChainSource(base.MarkovChainSource):
return {
"module": CloudMarkovChainSource.__name__,
"sleep": 0.01,
"min_event_per_burst": 500,
"transitions": {
"web_service": {
"run=>slow": {
@ -144,6 +146,8 @@ class CloudMarkovChainSource(base.MarkovChainSource):
def get_params():
return [
params.ParamDescriptor('sleep', type_util.Number(), 0.01),
params.ParamDescriptor('min_event_per_burst', type_util.Number(),
500),
params.ParamDescriptor('transitions', type_util.Object({
'web_service': type_util.Object({
'run=>slow': type_util.Any(),

View File

@ -84,6 +84,11 @@ class MarkovChainSource(base.BaseSource):
self._server.terminate = False
self._server.system = system
self._server.sleep_in_seconds = self._config["sleep"]
if "min_event_per_burst" in self._config:
self._server.min_event_per_burst = \
int(self._config["min_event_per_burst"])
else:
self._server.min_event_per_burst = 500
self._server_thread = threading.Thread(target=self._serve_forever)
self._server_thread.start()
@ -233,9 +238,17 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
fake_date = datetime.datetime.today()
hour_of_day = fake_date.hour
while not self.server.terminate:
self.server.system.next_state(hour_of_day)
request = RequestBuilder(self.request)
self.server.system.collect_events(hour_of_day, fake_date, request)
# Collect some events
while request.nb_events() < self.server.min_event_per_burst:
self.server.system.next_state(hour_of_day)
self.server.system.collect_events(hour_of_day, fake_date,
request)
hour_of_day += 1
fake_date += datetime.timedelta(hours=1)
if hour_of_day > 24:
hour_of_day = 0
try:
request.finalize()
@ -245,11 +258,6 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
time.sleep(self.server.sleep_in_seconds)
hour_of_day += 1
fake_date += datetime.timedelta(hours=1)
if hour_of_day > 24:
hour_of_day = 0
class RequestBuilder(object):
@ -264,6 +272,9 @@ class RequestBuilder(object):
"""
self._collected_data.append(data)
def nb_events(self):
return len(self._collected_data)
def finalize(self):
for data in self._collected_data:
self._request.send("{0}\n".format(json.dumps(data,

View File

@ -6,4 +6,4 @@ b = a.transitions.web_service
a = CloudMarkovChainSource()
b = a."transitions"."web_service"
# TYPE_TABLE_EQ {Ident< a >: CloudMarkovChainSource(sleep=TypeNumber,transitions=TypeStruct < {host: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, switch: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, web_service: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >} >,triggers=TypeStruct < {support: TypeStruct < {get_called: TypeAny} >} >,graph=TypeAny), Ident< b >: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >}
# TYPE_TABLE_EQ {Ident< a >: CloudMarkovChainSource(sleep=TypeNumber,min_event_per_burst=TypeNumber,transitions=TypeStruct < {host: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, switch: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, web_service: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >} >,triggers=TypeStruct < {support: TypeStruct < {get_called: TypeAny} >} >,graph=TypeAny), Ident< b >: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >}

View File

@ -70,6 +70,7 @@ class MarkovChainSourceTest(MonanasTestCase):
}
},
"sleep": 0.1,
"min_event_per_burst": 500,
"graph": {
"h1:host": ["s1"],
"h2:host": ["s1"],
@ -87,6 +88,7 @@ class MarkovChainSourceTest(MonanasTestCase):
"module": 123,
"transitions": dict(self.valid_config["transitions"]),
"sleep": 0.1,
"min_event_per_burst": 500,
"graph": {}
}
self.mcs = cloud.CloudMarkovChainSource("fake_id", self.valid_config)