Merge "Concurrent Execution Control"

This commit is contained in:
Jenkins 2016-07-28 22:45:44 +00:00 committed by Gerrit Code Review
commit db0e6abb83
5 changed files with 304 additions and 10 deletions

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
Namespaces:
=: io.murano.metadata.engine
Name: Synchronize
Usage: Meta
Inherited: true
Cardinality: One
Applies: [Method]
Properties:
onThis:
Contract: $.bool().notNull()
Default: true
onArgs:
Contract:
- $.string().notNull()

View File

@ -78,6 +78,8 @@ Classes:
io.murano.metadata.forms.Hidden: metadata/forms/Hidden.yaml
io.murano.metadata.forms.Section: metadata/forms/Section.yaml
io.murano.metadata.engine.Serialize: metadata/engine/Serialize.yaml
io.murano.metadata.engine.Synchronize: metadata/engine/Synchronize.yaml
io.murano.test.TestFixture: test/TestFixture.yaml
io.murano.test.TestFixtureWithEnvironment: test/TestFixture.yaml

View File

@ -112,7 +112,25 @@ class MuranoDslExecutor(object):
args, kwargs = self._canonize_parameters(
method.arguments_scheme, args, kwargs, method.name, this)
with self._acquire_method_lock(method, this):
this_lock = this
arg_values_for_lock = {}
method_meta = [m for m in method.get_meta(context)
if m.type.name == ('io.murano.metadata.'
'engine.Synchronize')]
if method_meta:
method_meta = method_meta[0]
if method_meta:
if not method_meta.get_property('onThis', context):
this_lock = None
for arg_name in method_meta.get_property('onArgs', context):
arg_val = kwargs.get(arg_name)
if arg_val is not None:
arg_values_for_lock[arg_name] = arg_val
arg_values_for_lock = utils.filter_parameters_dict(arg_values_for_lock)
with self._acquire_method_lock(method, this_lock, arg_values_for_lock):
for i, arg in enumerate(args, 2):
context[str(i)] = arg
for key, value in six.iteritems(kwargs):
@ -143,31 +161,70 @@ class MuranoDslExecutor(object):
return call()
@contextlib.contextmanager
def _acquire_method_lock(self, method, this):
method_id = id(method)
if method.is_static:
this_id = id(method.declaring_type)
def _acquire_method_lock(self, method, this, arg_val_dict):
if this is None:
if not arg_val_dict:
# if neither "this" nor argument values are set then no
# locking is needed
key = None
else:
# if only the argument values are passed then find the lock
# list only by the method
key = (None, id(method))
else:
this_id = this.object_id
if method.is_static:
# find the lock list by the type and method
key = (id(method.declaring_type), id(method))
else:
# find the lock list by the object and method
key = (this.object_id, id(method))
thread_id = helpers.get_current_thread_id()
while True:
event, event_owner = self._locks.get(
(method_id, this_id), (None, None))
event, event_owner = None, None
if key is None: # no locking needed
break
lock_list = self._locks.setdefault(key, [])
# lock list contains a list of tuples:
# first item of each tuple is a dict with the values of locking
# arguments (it is used for argument values comparison),
# second item is an event to wait on,
# third one is the owner thread id
# If this lock list is empty it means no locks on this object and
# method at all.
for arg_vals, l_event, l_event_owner in lock_list:
if arg_vals == arg_val_dict:
event = l_event
event_owner = l_event_owner
break
if event:
if event_owner == thread_id:
# this means a re-entrant lock: the tuple with the same
# value of the first element exists in the list, but it was
# acquired by the same green thread. We may proceed with
# the call in this case
event = None
break
else:
event.wait()
else:
# this means either the lock list was empty or didn't contain a
# tuple with the first element equal to arg_val_dict.
# Then let's acquire a lock, i.e. create a new tuple and place
# it into the list
event = eventlet.event.Event()
self._locks[(method_id, this_id)] = (event, thread_id)
event_owner = thread_id
lock_list.append((arg_val_dict, event, event_owner))
break
try:
yield
finally:
if event is not None:
del self._locks[(method_id, this_id)]
lock_list.remove((arg_val_dict, event, event_owner))
if len(lock_list) == 0:
del self._locks[key]
event.send()
@contextlib.contextmanager

View File

@ -0,0 +1,124 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
Namespaces:
std: io.murano
m: io.murano.metadata.engine
Name: TestConcurrency
Methods:
isolated:
Arguments:
- call:
Contract: $.string()
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
isolatedWithDefault:
Meta:
- m:Synchronize:
Arguments:
- call:
Contract: $.string()
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
concurrentExplicit:
Meta:
- m:Synchronize:
onThis: false
Arguments:
- call:
Contract: $.string()
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
isolatedExplicit:
Meta:
- m:Synchronize:
onThis: true
Arguments:
- call:
Contract: $.string()
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
argbasedPrimitive:
Meta:
- m:Synchronize:
onArgs: flag
Arguments:
- call:
Contract: $.string()
- flag:
Contract: $.string()
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
argBasedWithObject:
Meta:
- m:Synchronize:
onArgs: flag
Arguments:
- call:
Contract: $.string()
- flag:
Contract: $.class(std:Object)
Body:
- trace(format('call-{0}-before', $call))
- yield()
- trace(format('call-{0}-after', $call))
testCallIsolated:
Body:
- list(1, 2, 3).pselect($this.isolated($))
testCallIsolatedWithDefault:
Body:
- list(1, 2, 3).pselect($this.isolatedWithDefault($))
testCallConcurrentExplicit:
Body:
- list(1, 2, 3).pselect($this.concurrentExplicit($))
testCallIsolatedExplicit:
Body:
- list(1, 2, 3).pselect($this.isolatedExplicit($))
testCallArgbasedPrimitiveIsolated:
Body:
- list(1, 2, 3).pselect($this.argbasedPrimitive($, same))
testCallArgbasedPrimitiveConcurrent:
Body:
- list(1, 2, 3).pselect($this.argbasedPrimitive($, $))
testCallArgbasedWithObjectIsolated:
Body:
- $o: new(std:Object)
- list(1, 2, 3).pselect($this.argBasedWithObject($, $o))
testCallArgbasedWithObjectConcurrent:
Body:
- list(1, 2, 3).pselect($this.argBasedWithObject($, new(std:Object)))

View File

@ -0,0 +1,84 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from murano.tests.unit.dsl.foundation import object_model as om
from murano.tests.unit.dsl.foundation import test_case
class TestConcurrency(test_case.DslTestCase):
def setUp(self):
super(TestConcurrency, self).setUp()
def yield_():
self.traces.append('yield')
eventlet.sleep(0)
self.register_function(yield_, 'yield')
self._runner = self.new_runner(om.Object('TestConcurrency'))
def check_isolated_traces(self):
for i in range(0, len(self.traces), 3):
before = self.traces[i]
switch = self.traces[i+1]
after = self.traces[i+2]
self.assertEqual('yield', switch)
self.assertEqual(before[0:6], after[0:6])
self.assertTrue(before.endswith('-before'))
self.assertTrue(after.endswith('-after'))
def check_concurrent_traces(self):
self.assertTrue(self.traces[0].endswith('-before'))
self.assertEqual('yield', self.traces[1])
self.assertTrue(self.traces[2].endswith('-before'))
self.assertEqual('yield', self.traces[3])
self.assertTrue(self.traces[4].endswith('-before'))
self.assertEqual('yield', self.traces[5])
self.assertTrue(self.traces[6].endswith('-after'))
self.assertTrue(self.traces[7].endswith('-after'))
self.assertTrue(self.traces[8].endswith('-after'))
def test_isolated(self):
self._runner.testCallIsolated()
self.check_isolated_traces()
def test_isolated_default(self):
self._runner.testCallIsolatedWithDefault()
self.check_isolated_traces()
def test_concurrent_explicit(self):
self._runner.testCallConcurrentExplicit()
self.check_concurrent_traces()
def test_isolated_explicit(self):
self._runner.testCallIsolatedExplicit()
self.check_isolated_traces()
def test_argbased_primitive_isolated(self):
self._runner.testCallArgbasedPrimitiveIsolated()
self.check_isolated_traces()
def test_argbased_primitive_concurrent(self):
self._runner.testCallArgbasedPrimitiveConcurrent()
self.check_concurrent_traces()
def test_argbased_object_isolated(self):
self._runner.testCallArgbasedWithObjectIsolated()
self.check_isolated_traces()
def test_argbased_object_concurrent(self):
self._runner.testCallArgbasedWithObjectConcurrent()
self.check_concurrent_traces()