Merge "Handling concurrent transactions in metadata_set method" into stable/liberty
This commit is contained in:
commit
75d5895bf5
|
@ -343,11 +343,28 @@ class Resource(object):
|
|||
self._rsrc_metadata = rs.rsrc_metadata
|
||||
return rs.rsrc_metadata
|
||||
|
||||
def metadata_set(self, metadata):
|
||||
@resource_objects.retry_on_conflict
|
||||
def metadata_set(self, metadata, merge_metadata=None):
|
||||
"""Write new metadata to the database.
|
||||
|
||||
The caller may optionally provide a merge_metadata() function, which
|
||||
takes two arguments - the metadata passed to metadata_set() and the
|
||||
current metadata of the resource - and returns the merged metadata to
|
||||
write. If merge_metadata is not provided, the metadata passed to
|
||||
metadata_set() is written verbatim, overwriting any existing metadata.
|
||||
|
||||
If a race condition is detected, the write will be retried with the new
|
||||
result of merge_metadata() (if it is supplied) or the verbatim data (if
|
||||
it is not).
|
||||
"""
|
||||
if self.id is None or self.action == self.INIT:
|
||||
raise exception.ResourceNotAvailable(resource_name=self.name)
|
||||
rs = resource_objects.Resource.get_obj(self.stack.context, self.id)
|
||||
rs.update_and_save({'rsrc_metadata': metadata})
|
||||
LOG.debug('Setting metadata for %s', six.text_type(self))
|
||||
db_res = resource_objects.Resource.get_obj(self.stack.context, self.id)
|
||||
if merge_metadata is not None:
|
||||
db_res = db_res.refresh(attrs=['rsrc_metadata'])
|
||||
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
|
||||
db_res.update_metadata(metadata)
|
||||
self._rsrc_metadata = metadata
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -11,8 +11,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import service
|
||||
|
@ -26,6 +24,7 @@ from heat.common.i18n import _
|
|||
from heat.common.i18n import _LI
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import api
|
||||
from heat.objects import resource as resource_objects
|
||||
from heat.objects import software_config as software_config_object
|
||||
from heat.objects import software_deployment as software_deployment_object
|
||||
from heat.rpc import api as rpc_api
|
||||
|
@ -84,7 +83,7 @@ class SoftwareConfigService(service.Service):
|
|||
result = [api.format_software_config(sd.config) for sd in all_sd_s]
|
||||
return result
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=10, retry_on_request=True)
|
||||
@resource_objects.retry_on_conflict
|
||||
def _push_metadata_software_deployments(self, cnxt, server_id, sd):
|
||||
rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id)
|
||||
if not rs:
|
||||
|
@ -95,9 +94,8 @@ class SoftwareConfigService(service.Service):
|
|||
rows_updated = db_api.resource_update(
|
||||
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
|
||||
if not rows_updated:
|
||||
action = "deployments of server %s" % server_id
|
||||
raise db_exc.RetryRequest(
|
||||
exception.ConcurrentTransaction(action=action))
|
||||
action = _('deployments of server %s') % server_id
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
|
||||
metadata_put_url = None
|
||||
metadata_queue_id = None
|
||||
|
|
|
@ -22,9 +22,12 @@ from oslo_config import cfg
|
|||
from oslo_serialization import jsonutils
|
||||
from oslo_versionedobjects import base
|
||||
from oslo_versionedobjects import fields
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from heat.common import crypt
|
||||
from heat.common import exception
|
||||
from heat.common.i18n import _
|
||||
from heat.db import api as db_api
|
||||
from heat.objects import fields as heat_fields
|
||||
from heat.objects import resource_data
|
||||
|
@ -33,6 +36,15 @@ from heat.objects import stack
|
|||
cfg.CONF.import_opt('encrypt_parameters_and_properties', 'heat.common.config')
|
||||
|
||||
|
||||
def retry_on_conflict(func):
|
||||
def is_conflict(ex):
|
||||
return isinstance(ex, exception.ConcurrentTransaction)
|
||||
wrapper = retrying.retry(stop_max_attempt_number=11,
|
||||
wait_random_min=0.0, wait_random_max=2.0,
|
||||
retry_on_exception=is_conflict)
|
||||
return wrapper(func)
|
||||
|
||||
|
||||
class Resource(
|
||||
base.VersionedObject,
|
||||
base.VersionedObjectDictCompat,
|
||||
|
@ -186,3 +198,11 @@ class Resource(
|
|||
result[prop_name] = encrypted_value
|
||||
return (True, result)
|
||||
return (False, data)
|
||||
|
||||
def update_metadata(self, metadata):
|
||||
if self.rsrc_metadata != metadata:
|
||||
rows_updated = self.select_and_update(
|
||||
{'rsrc_metadata': metadata}, self.engine_id, self.atomic_key)
|
||||
if not rows_updated:
|
||||
action = _('metadata setting for resource %s') % self.name
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
|
|
Loading…
Reference in New Issue