From c6d18486a5f307eec3293aeb99b487684f1fda79 Mon Sep 17 00:00:00 2001 From: Sergey Kraynev Date: Mon, 14 Mar 2016 03:57:15 -0400 Subject: [PATCH] Handling concurrent transactions in metadata_set method There are follow changes in this patch: - Using exception ConcurrentTransactions for processing concurrent transactions during writing metadata. - wrapper @retry_on_conflict was used for metadata_set method to allow retrying in the event of a race. The same wrapper was added for _push_metadata_software_deployments method. - added new parameter for metadata_set method - merge_metadata. When RetryRequest exception is raised, oslo_db_api.wrap_db_retry re-call metadata_set method and in this case we need to refresh old metadata. It's mostly need for signals without data and id. For example: A and B signals come in the same moment and both get number 1, because metadata was empty. Then during write in db RetryRequest exception was raised for signal B. Metadata of this signal stores old number - 1. So we should re-calculate this value using new length of metadata and set number - 2. Change-Id: I1ddbad7cde3036cfa9310c670609fcde607ffcac Co-Authored-By: Zane Bitter Partially-Bug: #1497274 (cherry picked from commit 82b744042e39529eaca6660704ad4923c2442be9) --- heat/engine/resource.py | 23 ++++++++++++++++++++--- heat/engine/service_software_config.py | 10 ++++------ heat/objects/resource.py | 20 ++++++++++++++++++++ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index ea032a72fa..7ece19e614 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -343,11 +343,28 @@ class Resource(object): self._rsrc_metadata = rs.rsrc_metadata return rs.rsrc_metadata - def metadata_set(self, metadata): + @resource_objects.retry_on_conflict + def metadata_set(self, metadata, merge_metadata=None): + """Write new metadata to the database. + + The caller may optionally provide a merge_metadata() function, which + takes two arguments - the metadata passed to metadata_set() and the + current metadata of the resource - and returns the merged metadata to + write. If merge_metadata is not provided, the metadata passed to + metadata_set() is written verbatim, overwriting any existing metadata. + + If a race condition is detected, the write will be retried with the new + result of merge_metadata() (if it is supplied) or the verbatim data (if + it is not). + """ if self.id is None or self.action == self.INIT: raise exception.ResourceNotAvailable(resource_name=self.name) - rs = resource_objects.Resource.get_obj(self.stack.context, self.id) - rs.update_and_save({'rsrc_metadata': metadata}) + LOG.debug('Setting metadata for %s', six.text_type(self)) + db_res = resource_objects.Resource.get_obj(self.stack.context, self.id) + if merge_metadata is not None: + db_res = db_res.refresh(attrs=['rsrc_metadata']) + metadata = merge_metadata(metadata, db_res.rsrc_metadata) + db_res.update_metadata(metadata) self._rsrc_metadata = metadata @classmethod diff --git a/heat/engine/service_software_config.py b/heat/engine/service_software_config.py index 36d612ddf0..235eda2069 100644 --- a/heat/engine/service_software_config.py +++ b/heat/engine/service_software_config.py @@ -11,8 +11,6 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_db import api as oslo_db_api -from oslo_db import exception as db_exc from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_service import service @@ -26,6 +24,7 @@ from heat.common.i18n import _ from heat.common.i18n import _LI from heat.db import api as db_api from heat.engine import api +from heat.objects import resource as resource_objects from heat.objects import software_config as software_config_object from heat.objects import software_deployment as software_deployment_object from heat.rpc import api as rpc_api @@ -84,7 +83,7 @@ class SoftwareConfigService(service.Service): result = [api.format_software_config(sd.config) for sd in all_sd_s] return result - @oslo_db_api.wrap_db_retry(max_retries=10, retry_on_request=True) + @resource_objects.retry_on_conflict def _push_metadata_software_deployments(self, cnxt, server_id, sd): rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id) if not rs: @@ -95,9 +94,8 @@ class SoftwareConfigService(service.Service): rows_updated = db_api.resource_update( cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key) if not rows_updated: - action = "deployments of server %s" % server_id - raise db_exc.RetryRequest( - exception.ConcurrentTransaction(action=action)) + action = _('deployments of server %s') % server_id + raise exception.ConcurrentTransaction(action=action) metadata_put_url = None metadata_queue_id = None diff --git a/heat/objects/resource.py b/heat/objects/resource.py index a59c98cceb..dafc00c766 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -22,9 +22,12 @@ from oslo_config import cfg from oslo_serialization import jsonutils from oslo_versionedobjects import base from oslo_versionedobjects import fields +import retrying import six from heat.common import crypt +from heat.common import exception +from heat.common.i18n import _ from heat.db import api as db_api from heat.objects import fields as heat_fields from heat.objects import resource_data @@ -33,6 +36,15 @@ from heat.objects import stack cfg.CONF.import_opt('encrypt_parameters_and_properties', 'heat.common.config') +def retry_on_conflict(func): + def is_conflict(ex): + return isinstance(ex, exception.ConcurrentTransaction) + wrapper = retrying.retry(stop_max_attempt_number=11, + wait_random_min=0.0, wait_random_max=2.0, + retry_on_exception=is_conflict) + return wrapper(func) + + class Resource( base.VersionedObject, base.VersionedObjectDictCompat, @@ -186,3 +198,11 @@ class Resource( result[prop_name] = encrypted_value return (True, result) return (False, data) + + def update_metadata(self, metadata): + if self.rsrc_metadata != metadata: + rows_updated = self.select_and_update( + {'rsrc_metadata': metadata}, self.engine_id, self.atomic_key) + if not rows_updated: + action = _('metadata setting for resource %s') % self.name + raise exception.ConcurrentTransaction(action=action)