Handling concurrent transactions in metadata_set method

There are follow changes in this patch:
 - Using exception ConcurrentTransactions for processing
   concurrent transactions during writing metadata.
 - wrapper @retry_on_conflict was used for metadata_set method to
   allow retrying in the event of a race. The same wrapper was added for
   _push_metadata_software_deployments method.
 - added new parameter for metadata_set method - merge_metadata.
   When RetryRequest exception is raised, oslo_db_api.wrap_db_retry
   re-call metadata_set method and in this case we need to refresh
   old metadata. It's mostly need for signals without data and id.
   For example:
     A and B signals come in the same moment and both get number 1,
   because metadata was empty. Then during write in db RetryRequest
   exception was raised for signal B. Metadata of this signal stores old
   number - 1. So we should re-calculate this value using new length
   of metadata and set number - 2.

Change-Id: I1ddbad7cde3036cfa9310c670609fcde607ffcac
Co-Authored-By: Zane Bitter <zbitter@redhat.com>
Partially-Bug: #1497274
(cherry picked from commit 82b744042e)
This commit is contained in:
Sergey Kraynev 2016-03-14 03:57:15 -04:00 committed by rabi
parent 12caf71624
commit c6d18486a5
3 changed files with 44 additions and 9 deletions

View File

@ -343,11 +343,28 @@ class Resource(object):
self._rsrc_metadata = rs.rsrc_metadata
return rs.rsrc_metadata
def metadata_set(self, metadata):
@resource_objects.retry_on_conflict
def metadata_set(self, metadata, merge_metadata=None):
"""Write new metadata to the database.
The caller may optionally provide a merge_metadata() function, which
takes two arguments - the metadata passed to metadata_set() and the
current metadata of the resource - and returns the merged metadata to
write. If merge_metadata is not provided, the metadata passed to
metadata_set() is written verbatim, overwriting any existing metadata.
If a race condition is detected, the write will be retried with the new
result of merge_metadata() (if it is supplied) or the verbatim data (if
it is not).
"""
if self.id is None or self.action == self.INIT:
raise exception.ResourceNotAvailable(resource_name=self.name)
rs = resource_objects.Resource.get_obj(self.stack.context, self.id)
rs.update_and_save({'rsrc_metadata': metadata})
LOG.debug('Setting metadata for %s', six.text_type(self))
db_res = resource_objects.Resource.get_obj(self.stack.context, self.id)
if merge_metadata is not None:
db_res = db_res.refresh(attrs=['rsrc_metadata'])
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
db_res.update_metadata(metadata)
self._rsrc_metadata = metadata
@classmethod

View File

@ -11,8 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_service import service
@ -26,6 +24,7 @@ from heat.common.i18n import _
from heat.common.i18n import _LI
from heat.db import api as db_api
from heat.engine import api
from heat.objects import resource as resource_objects
from heat.objects import software_config as software_config_object
from heat.objects import software_deployment as software_deployment_object
from heat.rpc import api as rpc_api
@ -84,7 +83,7 @@ class SoftwareConfigService(service.Service):
result = [api.format_software_config(sd.config) for sd in all_sd_s]
return result
@oslo_db_api.wrap_db_retry(max_retries=10, retry_on_request=True)
@resource_objects.retry_on_conflict
def _push_metadata_software_deployments(self, cnxt, server_id, sd):
rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id)
if not rs:
@ -95,9 +94,8 @@ class SoftwareConfigService(service.Service):
rows_updated = db_api.resource_update(
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
if not rows_updated:
action = "deployments of server %s" % server_id
raise db_exc.RetryRequest(
exception.ConcurrentTransaction(action=action))
action = _('deployments of server %s') % server_id
raise exception.ConcurrentTransaction(action=action)
metadata_put_url = None
metadata_queue_id = None

View File

@ -22,9 +22,12 @@ from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
import retrying
import six
from heat.common import crypt
from heat.common import exception
from heat.common.i18n import _
from heat.db import api as db_api
from heat.objects import fields as heat_fields
from heat.objects import resource_data
@ -33,6 +36,15 @@ from heat.objects import stack
cfg.CONF.import_opt('encrypt_parameters_and_properties', 'heat.common.config')
def retry_on_conflict(func):
def is_conflict(ex):
return isinstance(ex, exception.ConcurrentTransaction)
wrapper = retrying.retry(stop_max_attempt_number=11,
wait_random_min=0.0, wait_random_max=2.0,
retry_on_exception=is_conflict)
return wrapper(func)
class Resource(
base.VersionedObject,
base.VersionedObjectDictCompat,
@ -186,3 +198,11 @@ class Resource(
result[prop_name] = encrypted_value
return (True, result)
return (False, data)
def update_metadata(self, metadata):
if self.rsrc_metadata != metadata:
rows_updated = self.select_and_update(
{'rsrc_metadata': metadata}, self.engine_id, self.atomic_key)
if not rows_updated:
action = _('metadata setting for resource %s') % self.name
raise exception.ConcurrentTransaction(action=action)