Improve speed in data generation while preserving semantics.
This make the fake data generation much faster and configurable. For the cloud example, the phase 2 is now triggered in less than 5 seconds instead of 6 minutes. Change-Id: I9967c94d28a380fa19afe9275e769f1465f4f922
This commit is contained in:
parent
981a5805e2
commit
a2a315fb5e
|
@ -3,7 +3,7 @@
|
|||
# (cloud-like data model)
|
||||
#
|
||||
|
||||
src = CloudMarkovChainSource(sleep=0.01)
|
||||
src = CloudMarkovChainSource(sleep=0.01, min_event_per_burst=500)
|
||||
|
||||
src.transitions.web_service = {
|
||||
"run=>slow": {
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
"src1": {
|
||||
"module": "CloudMarkovChainSource",
|
||||
"sleep": 0.01,
|
||||
"min_event_per_burst": 500,
|
||||
"transitions": {
|
||||
"web_service": {
|
||||
"run=>slow": {
|
||||
|
|
|
@ -38,6 +38,7 @@ class CloudMarkovChainSource(base.MarkovChainSource):
|
|||
source_schema = voluptuous.Schema({
|
||||
"module": voluptuous.And(
|
||||
basestring, vu.NoSpaceCharacter()),
|
||||
"min_event_per_burst": voluptuous.Or(float, int),
|
||||
"sleep": voluptuous.And(
|
||||
float, voluptuous.Range(
|
||||
min=0, max=1, min_included=False, max_included=False)),
|
||||
|
@ -90,6 +91,7 @@ class CloudMarkovChainSource(base.MarkovChainSource):
|
|||
return {
|
||||
"module": CloudMarkovChainSource.__name__,
|
||||
"sleep": 0.01,
|
||||
"min_event_per_burst": 500,
|
||||
"transitions": {
|
||||
"web_service": {
|
||||
"run=>slow": {
|
||||
|
@ -144,6 +146,8 @@ class CloudMarkovChainSource(base.MarkovChainSource):
|
|||
def get_params():
|
||||
return [
|
||||
params.ParamDescriptor('sleep', type_util.Number(), 0.01),
|
||||
params.ParamDescriptor('min_event_per_burst', type_util.Number(),
|
||||
500),
|
||||
params.ParamDescriptor('transitions', type_util.Object({
|
||||
'web_service': type_util.Object({
|
||||
'run=>slow': type_util.Any(),
|
||||
|
|
|
@ -84,6 +84,11 @@ class MarkovChainSource(base.BaseSource):
|
|||
self._server.terminate = False
|
||||
self._server.system = system
|
||||
self._server.sleep_in_seconds = self._config["sleep"]
|
||||
if "min_event_per_burst" in self._config:
|
||||
self._server.min_event_per_burst = \
|
||||
int(self._config["min_event_per_burst"])
|
||||
else:
|
||||
self._server.min_event_per_burst = 500
|
||||
|
||||
self._server_thread = threading.Thread(target=self._serve_forever)
|
||||
self._server_thread.start()
|
||||
|
@ -233,9 +238,17 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
|
|||
fake_date = datetime.datetime.today()
|
||||
hour_of_day = fake_date.hour
|
||||
while not self.server.terminate:
|
||||
self.server.system.next_state(hour_of_day)
|
||||
request = RequestBuilder(self.request)
|
||||
self.server.system.collect_events(hour_of_day, fake_date, request)
|
||||
|
||||
# Collect some events
|
||||
while request.nb_events() < self.server.min_event_per_burst:
|
||||
self.server.system.next_state(hour_of_day)
|
||||
self.server.system.collect_events(hour_of_day, fake_date,
|
||||
request)
|
||||
hour_of_day += 1
|
||||
fake_date += datetime.timedelta(hours=1)
|
||||
if hour_of_day > 24:
|
||||
hour_of_day = 0
|
||||
|
||||
try:
|
||||
request.finalize()
|
||||
|
@ -245,11 +258,6 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
|
|||
|
||||
time.sleep(self.server.sleep_in_seconds)
|
||||
|
||||
hour_of_day += 1
|
||||
fake_date += datetime.timedelta(hours=1)
|
||||
if hour_of_day > 24:
|
||||
hour_of_day = 0
|
||||
|
||||
|
||||
class RequestBuilder(object):
|
||||
|
||||
|
@ -264,6 +272,9 @@ class RequestBuilder(object):
|
|||
"""
|
||||
self._collected_data.append(data)
|
||||
|
||||
def nb_events(self):
|
||||
return len(self._collected_data)
|
||||
|
||||
def finalize(self):
|
||||
for data in self._collected_data:
|
||||
self._request.send("{0}\n".format(json.dumps(data,
|
||||
|
|
|
@ -6,4 +6,4 @@ b = a.transitions.web_service
|
|||
a = CloudMarkovChainSource()
|
||||
b = a."transitions"."web_service"
|
||||
|
||||
# TYPE_TABLE_EQ {Ident< a >: CloudMarkovChainSource(sleep=TypeNumber,transitions=TypeStruct < {host: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, switch: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, web_service: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >} >,triggers=TypeStruct < {support: TypeStruct < {get_called: TypeAny} >} >,graph=TypeAny), Ident< b >: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >}
|
||||
# TYPE_TABLE_EQ {Ident< a >: CloudMarkovChainSource(sleep=TypeNumber,min_event_per_burst=TypeNumber,transitions=TypeStruct < {host: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, switch: TypeStruct < {off=>on: TypeNumber, on=>off: TypeNumber} >, web_service: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >} >,triggers=TypeStruct < {support: TypeStruct < {get_called: TypeAny} >} >,graph=TypeAny), Ident< b >: TypeStruct < {run=>slow: TypeAny, slow=>run: TypeAny, stop=>run: TypeAny} >}
|
|
@ -70,6 +70,7 @@ class MarkovChainSourceTest(MonanasTestCase):
|
|||
}
|
||||
},
|
||||
"sleep": 0.1,
|
||||
"min_event_per_burst": 500,
|
||||
"graph": {
|
||||
"h1:host": ["s1"],
|
||||
"h2:host": ["s1"],
|
||||
|
@ -87,6 +88,7 @@ class MarkovChainSourceTest(MonanasTestCase):
|
|||
"module": 123,
|
||||
"transitions": dict(self.valid_config["transitions"]),
|
||||
"sleep": 0.1,
|
||||
"min_event_per_burst": 500,
|
||||
"graph": {}
|
||||
}
|
||||
self.mcs = cloud.CloudMarkovChainSource("fake_id", self.valid_config)
|
||||
|
|
Loading…
Reference in New Issue