Set a concurrency limit on ResourceChain

This adds a limit to the number of resources that ResourceChain creates
in parallel. It reduces memory usage drastically when ResourceChain
contains lots of resources. We use the number of active services, which
seems to give a reasonable balance between performance and memory
consumption.

Story: #2003975
Task: #26917
Change-Id: Id20fb89813b32927298b3a7c0abe23da710b04d1
This commit is contained in:
Thomas Herve 2018-10-04 17:01:07 +02:00
parent 82e30159b5
commit bad27d3886
3 changed files with 46 additions and 1 deletions

View File

@ -25,6 +25,7 @@ from heat.engine import properties
from heat.engine.resources import stack_resource
from heat.engine import rsrc_defn
from heat.engine import support
from heat.objects import service as service_objects
from heat.scaling import template as scl_template
LOG = logging.getLogger(__name__)
@ -118,12 +119,19 @@ class ResourceChain(stack_resource.StackResource):
resource_types = self.properties[self.RESOURCES]
resource_names = self._resource_names(resource_types)
name_def_tuples = []
# Impose a concurrency limit if concurrent is set. This minimizes the
# memory usage when the chain contains lots of resources, but it keeps
# performance to a reasonable level.
concurrency_limit = service_objects.Service.active_service_count(
self.context) or 1
for index, rt in enumerate(resource_types):
name = resource_names[index]
depends_on = None
if index > 0 and not self.properties[self.CONCURRENT]:
depends_on = [resource_names[index - 1]]
elif index >= concurrency_limit:
depends_on = [resource_names[index - concurrency_limit]]
t = (name, self._build_resource_definition(name, rt,
depends_on=depends_on))

View File

@ -17,6 +17,7 @@
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
from heat.common import service_utils
from heat.db.sqlalchemy import api as db_api
from heat.objects import base as heat_base
@ -89,3 +90,10 @@ class Service(
host,
binary,
hostname))
@classmethod
def active_service_count(cls, context):
"""Return the number of services reportedly active."""
return len([
svc for svc in cls.get_all(context)
if service_utils.format_service(svc)['status'] == 'up'])

View File

@ -19,6 +19,7 @@ from heat.common import exception
from heat.engine import node_data
from heat.engine.resources.openstack.heat import resource_chain
from heat.engine import rsrc_defn
from heat.objects import service as service_objects
from heat.tests import common
from heat.tests import utils
@ -69,12 +70,14 @@ class ResourceChainTest(common.HeatTestCase):
self.assertEqual(RESOURCE_PROPERTIES, resource['properties'])
self.assertEqual(['0'], resource['depends_on'])
def test_child_template_with_concurrent(self):
@mock.patch.object(service_objects.Service, 'active_service_count')
def test_child_template_with_concurrent(self, mock_count):
# Setup
tmpl_def = copy.deepcopy(TEMPLATE)
tmpl_def['resources']['test-chain']['properties']['concurrent'] = True
chain = self._create_chain(tmpl_def)
mock_count.return_value = 5
# Test
child_template = chain.child_template()
@ -89,6 +92,32 @@ class ResourceChainTest(common.HeatTestCase):
resource = tmpl['resources']['1']
self.assertNotIn('depends_on', resource)
@mock.patch.object(service_objects.Service, 'active_service_count')
def test_child_template_with_concurrent_limit(self, mock_count):
tmpl_def = copy.deepcopy(TEMPLATE)
tmpl_def['resources']['test-chain']['properties']['concurrent'] = True
tmpl_def['resources']['test-chain']['properties']['resources'] = [
'OS::Heat::SoftwareConfig', 'OS::Heat::StructuredConfig',
'OS::Heat::SoftwareConfig', 'OS::Heat::StructuredConfig']
chain = self._create_chain(tmpl_def)
mock_count.return_value = 2
child_template = chain.child_template()
tmpl = child_template.t
resource = tmpl['resources']['0']
self.assertNotIn('depends_on', resource)
resource = tmpl['resources']['1']
self.assertNotIn('depends_on', resource)
resource = tmpl['resources']['2']
self.assertEqual(['0'], resource['depends_on'])
resource = tmpl['resources']['3']
self.assertEqual(['1'], resource['depends_on'])
def test_child_template_default_concurrent(self):
# Setup
tmpl_def = copy.deepcopy(TEMPLATE)