From acb2c56fe0035193fdcf53cdb08fee0d1bb3cc73 Mon Sep 17 00:00:00 2001 From: Steve Baker Date: Tue, 28 Jul 2015 11:19:50 +1200 Subject: [PATCH] Use atomic_key for deployment metadata updates This change switches _push_metadata_software_deployments over to using db_api.resource_update for deployment resource metadata updates. Using the atomic_key filter of resource_update allows this code to detect if another transaction has updated the metadata, and the wrap_db_retry decorator is used to perform retry attempts. No actual resource locking is required since we just need to detect if another transaction performed an update, so the engine_id is not specified. This change uses db_api directly rather than the Resource object since this change will need to be backported to Kilo (db_api.resource_update was created for convergence, and is in Kilo, but Resource object using it is not). This change can switch back to using the Resource object once the backport is complete. This rows_updated/RetryRequest/wrap_db_retry technique is used by other OpenStack projects for concurrent transaction detection (such as nova, for floating IP allocation). Change-Id: Ibf9dd58a66a77d9ae9d4b519b0f11567977f416c Closes-Bug: #1477329 --- heat/common/exception.py | 4 ++ heat/engine/service_software_config.py | 15 +++-- heat/tests/engine/test_software_config.py | 72 ++++++++++++++++++----- 3 files changed, 71 insertions(+), 20 deletions(-) diff --git a/heat/common/exception.py b/heat/common/exception.py index 9b65cd9b61..849028eb33 100644 --- a/heat/common/exception.py +++ b/heat/common/exception.py @@ -464,6 +464,10 @@ class ReadOnlyFieldError(HeatException): msg_fmt = _('Cannot modify readonly field %(field)s') +class DeploymentConcurrentTransaction(HeatException): + msg_fmt = _('Concurrent transaction for deployments of server %(server)s') + + class ObjectFieldInvalid(HeatException): msg_fmt = _('Field %(field)s of %(objname)s is not an instance of Field') diff --git a/heat/engine/service_software_config.py b/heat/engine/service_software_config.py index c31a196ab2..3c526db666 100644 --- a/heat/engine/service_software_config.py +++ b/heat/engine/service_software_config.py @@ -11,6 +11,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_db import api as oslo_db_api +from oslo_db import exception as db_exc from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_service import service @@ -19,10 +21,11 @@ import requests import six from six.moves.urllib import parse as urlparse +from heat.common import exception from heat.common.i18n import _ from heat.common.i18n import _LI +from heat.db import api as db_api from heat.engine import api -from heat.objects import resource as resource_object from heat.objects import software_config as software_config_object from heat.objects import software_deployment as software_deployment_object from heat.rpc import api as rpc_api @@ -81,15 +84,19 @@ class SoftwareConfigService(service.Service): result = [api.format_software_config(sd.config) for sd in all_sd_s] return result + @oslo_db_api.wrap_db_retry(max_retries=10, retry_on_request=True) def _push_metadata_software_deployments(self, cnxt, server_id, sd): - rs = (resource_object.Resource. - get_by_physical_resource_id(cnxt, server_id)) + rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id) if not rs: return deployments = self.metadata_software_deployments(cnxt, server_id) md = rs.rsrc_metadata or {} md['deployments'] = deployments - rs.update_and_save({'rsrc_metadata': md}) + rows_updated = db_api.resource_update( + cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key) + if not rows_updated: + raise db_exc.RetryRequest( + exception.DeploymentConcurrentTransaction(server=server_id)) metadata_put_url = None metadata_queue_id = None diff --git a/heat/tests/engine/test_software_config.py b/heat/tests/engine/test_software_config.py index bfbded57c5..807f317665 100644 --- a/heat/tests/engine/test_software_config.py +++ b/heat/tests/engine/test_software_config.py @@ -22,6 +22,7 @@ import six from heat.common import exception from heat.common import template_format +from heat.db import api as db_api from heat.engine.clients.os import swift from heat.engine.clients.os import zaqar from heat.engine import service @@ -559,14 +560,18 @@ class SoftwareConfigServiceTest(common.HeatTestCase): @mock.patch.object(service_software_config.SoftwareConfigService, 'metadata_software_deployments') - @mock.patch.object(service_software_config.resource_object.Resource, - 'get_by_physical_resource_id') + @mock.patch.object(db_api, 'resource_update') + @mock.patch.object(db_api, 'resource_get_by_physical_resource_id') @mock.patch.object(service_software_config.requests, 'put') - def test_push_metadata_software_deployments(self, put, res_get, md_sd): + def test_push_metadata_software_deployments( + self, put, res_get, res_upd, md_sd): rs = mock.Mock() rs.rsrc_metadata = {'original': 'metadata'} + rs.id = '1234' + rs.atomic_key = 1 rs.data = [] res_get.return_value = rs + res_upd.return_value = 1 deployments = {'deploy': 'this'} md_sd.return_value = deployments @@ -578,24 +583,56 @@ class SoftwareConfigServiceTest(common.HeatTestCase): self.engine.software_config._push_metadata_software_deployments( self.ctx, '1234', None) - rs.update_and_save.assert_called_once_with( - {'rsrc_metadata': result_metadata}) + res_upd.assert_called_once_with( + self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1) put.side_effect = Exception('Unexpected requests.put') @mock.patch.object(service_software_config.SoftwareConfigService, 'metadata_software_deployments') - @mock.patch.object(service_software_config.resource_object.Resource, - 'get_by_physical_resource_id') + @mock.patch.object(db_api, 'resource_update') + @mock.patch.object(db_api, 'resource_get_by_physical_resource_id') @mock.patch.object(service_software_config.requests, 'put') - def test_push_metadata_software_deployments_temp_url( - self, put, res_get, md_sd): + def test_push_metadata_software_deployments_retry( + self, put, res_get, res_upd, md_sd): rs = mock.Mock() rs.rsrc_metadata = {'original': 'metadata'} + rs.id = '1234' + rs.atomic_key = 1 + rs.data = [] + res_get.return_value = rs + # zero update means another transaction updated + res_upd.return_value = 0 + + deployments = {'deploy': 'this'} + md_sd.return_value = deployments + + self.assertRaises( + exception.DeploymentConcurrentTransaction, + self.engine.software_config._push_metadata_software_deployments, + self.ctx, + '1234', + None) + # retry ten times then the final failure + self.assertEqual(11, res_upd.call_count) + put.assert_not_called() + + @mock.patch.object(service_software_config.SoftwareConfigService, + 'metadata_software_deployments') + @mock.patch.object(db_api, 'resource_update') + @mock.patch.object(db_api, 'resource_get_by_physical_resource_id') + @mock.patch.object(service_software_config.requests, 'put') + def test_push_metadata_software_deployments_temp_url( + self, put, res_get, res_upd, md_sd): + rs = mock.Mock() + rs.rsrc_metadata = {'original': 'metadata'} + rs.id = '1234' + rs.atomic_key = 1 rd = mock.Mock() rd.key = 'metadata_put_url' rd.value = 'http://192.168.2.2/foo/bar' rs.data = [rd] res_get.return_value = rs + res_upd.return_value = 1 deployments = {'deploy': 'this'} md_sd.return_value = deployments @@ -607,26 +644,29 @@ class SoftwareConfigServiceTest(common.HeatTestCase): self.engine.software_config._push_metadata_software_deployments( self.ctx, '1234', None) - rs.update_and_save.assert_called_once_with( - {'rsrc_metadata': result_metadata}) + res_upd.assert_called_once_with( + self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1) put.assert_called_once_with( 'http://192.168.2.2/foo/bar', json.dumps(result_metadata)) @mock.patch.object(service_software_config.SoftwareConfigService, 'metadata_software_deployments') - @mock.patch.object(service_software_config.resource_object.Resource, - 'get_by_physical_resource_id') + @mock.patch.object(db_api, 'resource_update') + @mock.patch.object(db_api, 'resource_get_by_physical_resource_id') @mock.patch.object(zaqar.ZaqarClientPlugin, 'create_for_tenant') def test_push_metadata_software_deployments_queue( - self, plugin, res_get, md_sd): + self, plugin, res_get, res_upd, md_sd): rs = mock.Mock() rs.rsrc_metadata = {'original': 'metadata'} + rs.id = '1234' + rs.atomic_key = 1 rd = mock.Mock() rd.key = 'metadata_queue_id' rd.value = '6789' rs.data = [rd] res_get.return_value = rs + res_upd.return_value = 1 sd = mock.Mock() sd.stack_user_project_id = 'project1' queue = mock.Mock() @@ -644,8 +684,8 @@ class SoftwareConfigServiceTest(common.HeatTestCase): self.engine.software_config._push_metadata_software_deployments( self.ctx, '1234', sd) - rs.update_and_save.assert_called_once_with( - {'rsrc_metadata': result_metadata}) + res_upd.assert_called_once_with( + self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1) plugin.assert_called_once_with('project1') zaqar_client.queue.assert_called_once_with('6789')