make auto scale and heal effective

This patch makes auto-scale/auto-heal effective.
Previously auto-scale/auto-heal method was called by RPC
call from the conductor itself but is called directly now.

This patch refactors common code which is related to
both scale/heal and auto-scale/auto-heal. This patch
also refactors prometheus plugin driver which calls
auto-scale/auto-heal.

Change-Id: I0c032fd082d518abc49296f737058176d30d2966
This commit is contained in:
Itsuro Oda 2024-02-21 01:35:51 +00:00
parent 8051116321
commit 7e939c0282
10 changed files with 216 additions and 306 deletions

View File

@ -612,3 +612,21 @@ def check_lcmocc_in_progress(context, inst_id):
def update_lcmocc_status(lcmocc, op_state):
lcmocc.operationState = op_state
lcmocc.stateEnteredTime = datetime.utcnow()
def new_lcmocc(inst_id, operation, req_body,
op_state=fields.LcmOperationStateType.STARTING,
auto_invocation=False):
now = datetime.utcnow()
lcmocc = objects.VnfLcmOpOccV2(
id=uuidutils.generate_uuid(),
operationState=op_state,
stateEnteredTime=now,
startTime=now,
vnfInstanceId=inst_id,
operation=operation,
isAutomaticInvocation=auto_invocation,
isCancelPending=False,
operationParams=req_body)
return lcmocc

View File

@ -112,3 +112,47 @@ def check_metadata_format(metadata):
raise sol_ex.SolValidationError(
detail="Duplicated vnfcInfo ids found in "
"metadata['VDU_VNFc_mapping'].")
def _get_current_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('scaleStatus')):
for scale_info in inst.instantiatedVnfInfo.scaleStatus:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def _get_max_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('maxScaleLevels')):
for scale_info in inst.instantiatedVnfInfo.maxScaleLevels:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def check_scale_level(inst, aspect_id, scale_type, num_steps):
orig_num_steps = num_steps
scale_level = _get_current_scale_level(inst, aspect_id)
max_scale_level = _get_max_scale_level(inst, aspect_id)
if scale_level is None or max_scale_level is None:
raise sol_ex.InvalidScaleAspectId(aspect_id=aspect_id)
if scale_type == 'SCALE_IN':
num_steps *= -1
scale_level += num_steps
if scale_level < 0 or scale_level > max_scale_level:
raise sol_ex.InvalidScaleNumberOfSteps(
num_steps=orig_num_steps)
def check_vnfc_ids(inst, vnfc_ids):
inst_vnfc_ids = []
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('vnfcInfo')):
inst_vnfc_ids = [vnfc.id for vnfc in inst.instantiatedVnfInfo.vnfcInfo]
for req_vnfc_id in vnfc_ids:
if req_vnfc_id not in inst_vnfc_ids:
raise sol_ex.SolValidationError(
detail="vnfcInstanceId(%s) does not exist." % req_vnfc_id)

View File

@ -1,147 +0,0 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from oslo_utils import uuidutils
from tacker.sol_refactored.common import coordinate
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2
from tacker.sol_refactored import objects
from tacker.sol_refactored.objects.v2 import fields as v2fields
# TODO(fengyi): The code of this function is all copied from the
# controller, which is not friendly to future development and
# maintenance, and may be refactored in the future. After
# refactoring, only the validation of the req body is left,
# and the creation of lcmocc and the call to start_lcm_op are
# all executed by the controller, notification driver, etc.
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def heal(context, vnf_instance_id, body, inst=None, auto_invocation=False):
if not inst:
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
# check parameter for later use
is_all = body.get('additionalParams', {}).get('all', False)
if not isinstance(is_all, bool):
raise sol_ex.SolValidationError(
detail="additionalParams['all'] must be bool.")
if 'vnfcInstanceId' in body:
inst_info = inst.instantiatedVnfInfo
vnfc_id = []
if inst_info.obj_attr_is_set('vnfcInfo'):
vnfc_id = [vnfc.id for vnfc in inst_info.vnfcInfo]
for req_vnfc_id in body['vnfcInstanceId']:
if req_vnfc_id not in vnfc_id:
raise sol_ex.SolValidationError(
detail="vnfcInstanceId(%s) does not exist."
% req_vnfc_id)
lcmocc = new_lcmocc(vnf_instance_id, v2fields.LcmOperationType.HEAL, body,
auto_invocation=auto_invocation)
lcmocc.create(context)
rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
rpc.start_lcm_op(context, lcmocc.id)
return lcmocc
# TODO(fengyi): The code of this function is all copied from the
# controller, which is not friendly to future development and
# maintenance, and may be refactored in the future. After
# refactoring, only the validation of the req body is left,
# and the creation of lcmocc and the call to start_lcm_op are
# all executed by the controller, notification driver, etc.
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def scale(context, vnf_instance_id, body, inst=None, auto_invocation=False):
if not inst:
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
# check parameters
aspect_id = body['aspectId']
if 'numberOfSteps' not in body:
# set default value (1) defined by SOL specification for
# the convenience of the following methods.
body['numberOfSteps'] = 1
scale_level = _get_current_scale_level(inst, aspect_id)
max_scale_level = _get_max_scale_level(inst, aspect_id)
if scale_level is None or max_scale_level is None:
raise sol_ex.InvalidScaleAspectId(aspect_id=aspect_id)
num_steps = body['numberOfSteps']
if body['type'] == 'SCALE_IN':
num_steps *= -1
scale_level += num_steps
if scale_level < 0 or scale_level > max_scale_level:
raise sol_ex.InvalidScaleNumberOfSteps(
num_steps=body['numberOfSteps'])
lcmocc = new_lcmocc(vnf_instance_id, v2fields.LcmOperationType.SCALE, body,
auto_invocation=auto_invocation)
lcmocc.create(context)
rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
rpc.start_lcm_op(context, lcmocc.id)
return lcmocc
def _get_current_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('scaleStatus')):
for scale_info in inst.instantiatedVnfInfo.scaleStatus:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def _get_max_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('maxScaleLevels')):
for scale_info in inst.instantiatedVnfInfo.maxScaleLevels:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def new_lcmocc(inst_id, operation, req_body,
op_state=v2fields.LcmOperationStateType.STARTING,
auto_invocation=False):
now = datetime.utcnow()
lcmocc = objects.VnfLcmOpOccV2(
id=uuidutils.generate_uuid(),
operationState=op_state,
stateEnteredTime=now,
startTime=now,
vnfInstanceId=inst_id,
operation=operation,
isAutomaticInvocation=auto_invocation,
isCancelPending=False,
operationParams=req_body)
return lcmocc

View File

@ -62,8 +62,8 @@ class ConductorV2(object):
self.vnfpm_driver = vnfpm_driver_v2.VnfPmDriverV2()
self.endpoint = CONF.v2_vnfm.endpoint
self.nfvo_client = nfvo_client.NfvoClient()
self.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.sn_driver = sdrv.ServerNotificationDriver.instance()
self.prom_driver = pp_drv.PrometheusPluginDriver(self)
self.sn_driver = sdrv.ServerNotificationDriver(self)
self._change_lcm_op_state()
self._periodic_call()
@ -119,10 +119,10 @@ class ConductorV2(object):
def start_lcm_op(self, context, lcmocc_id):
lcmocc = lcmocc_utils.get_lcmocc(context, lcmocc_id)
self._start_lcm_op(context, lcmocc)
self.start_lcm_op_internal(context, lcmocc)
@coordinate.lock_vnf_instance('{lcmocc.vnfInstanceId}', delay=True)
def _start_lcm_op(self, context, lcmocc):
def start_lcm_op_internal(self, context, lcmocc):
# just consistency check
if lcmocc.operationState != fields.LcmOperationStateType.STARTING:
LOG.error("VnfLcmOpOcc unexpected operationState.")

View File

@ -17,7 +17,7 @@ import threading
from oslo_log import log as logging
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import vnflcm_auto
from tacker.sol_refactored import objects
LOG = logging.getLogger(__name__)
@ -65,39 +65,11 @@ class VnfmAutoHealTimer():
self.cancel()
class PrometheusPluginDriverStub():
def trigger_scale(self, context, vnf_instance_id, scale_req):
pass
def enqueue_heal(self, context, vnf_instance_id, vnfc_info_id):
pass
def dequeue_heal(self, vnf_instance_id):
pass
class PrometheusPluginDriver():
_instance = None
@staticmethod
def instance():
if PrometheusPluginDriver._instance is None:
if (CONF.prometheus_plugin.auto_scaling or
CONF.prometheus_plugin.auto_healing or
CONF.prometheus_plugin.fault_management or
CONF.prometheus_plugin.performance_management):
PrometheusPluginDriver()
else:
stub = PrometheusPluginDriverStub()
PrometheusPluginDriver._instance = stub
return PrometheusPluginDriver._instance
def __init__(self):
if PrometheusPluginDriver._instance:
raise SystemError("Not constructor but instance() should be used.")
PrometheusPluginDriver._instance = self
def __init__(self, conductor):
self.timer_map = {}
self.expiration_time = CONF.prometheus_plugin.timer_interval
self.conductor = conductor
def enqueue_heal(self, context, vnf_instance_id, vnfc_info_id):
if vnf_instance_id not in self.timer_map:
@ -112,11 +84,11 @@ class PrometheusPluginDriver():
del self.timer_map[vnf_instance_id]
def _trigger_heal(self, context, vnf_instance_id, vnfc_info_ids):
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_info_ids)
body = heal_req.to_dict()
LOG.info(f"VNFM AutoHealing is triggered. vnf: {vnf_instance_id}, "
f"vnfcInstanceId: {vnfc_info_ids}")
vnflcm_utils.heal(context, vnf_instance_id, body, auto_invocation=True)
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_info_ids)
vnflcm_auto.auto_heal(context, vnf_instance_id, heal_req.to_dict(),
self.conductor)
def _timer_expired(self, context, vnf_instance_id, vnfc_info_ids):
self.dequeue_heal(vnf_instance_id)
@ -126,5 +98,5 @@ class PrometheusPluginDriver():
LOG.info(f"VNFM AutoScaling is triggered. vnf: {vnf_instance_id}, "
f"type: {scale_req['type']}, aspectId: "
f"{scale_req['aspectId']}")
vnflcm_utils.scale(context, vnf_instance_id, scale_req,
auto_invocation=True)
vnflcm_auto.auto_scale(context, vnf_instance_id, scale_req,
self.conductor)

View File

@ -20,8 +20,7 @@ from oslo_utils import encodeutils
from tacker import context as tacker_context
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2 as rpc
from tacker.sol_refactored.conductor import vnflcm_auto
from tacker.sol_refactored import objects
LOG = logging.getLogger(__name__)
@ -67,29 +66,10 @@ class ServerNotificationTimer():
class ServerNotificationDriver():
_instance = None
@staticmethod
def instance():
if not ServerNotificationDriver._instance:
ServerNotificationDriver._instance = (
ServerNotificationDriverMain()
if CONF.server_notification.server_notification
else ServerNotificationDriver())
return ServerNotificationDriver._instance
def notify(self, vnf_instance_id, vnfc_instance_ids):
pass
def remove_timer(self, vnf_instance_id):
pass
class ServerNotificationDriverMain(ServerNotificationDriver):
def __init__(self):
def __init__(self, conductor):
self.timer_map = {}
self.expiration_time = CONF.server_notification.timer_interval
self.rpc = rpc.VnfLcmRpcApiV2()
self.conductor = conductor
def notify(self, vnf_instance_id, vnfc_instance_ids):
if vnf_instance_id not in self.timer_map:
@ -103,14 +83,14 @@ class ServerNotificationDriverMain(ServerNotificationDriver):
del self.timer_map[vnf_instance_id]
def request_heal(self, vnf_instance_id, vnfc_instance_ids):
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_instance_ids)
body = heal_req.to_dict()
LOG.info("server_notification auto healing is processed: %s.",
vnf_instance_id)
context = tacker_context.get_admin_context()
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_instance_ids)
body = heal_req.to_dict()
try:
vnflcm_utils.heal(context, vnf_instance_id, body,
auto_invocation=True)
vnflcm_auto.auto_heal(context, vnf_instance_id, body,
self.conductor)
except Exception as exp:
LOG.error(str(body))
LOG.error("server_notification auto healing is failed: %s.",

View File

@ -0,0 +1,76 @@
# Copyright (C) 2024 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.common import coordinate
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.objects.v2 import fields as v2fields
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def _auto_heal_pre(context, vnf_instance_id, heal_req):
# NOTE: validation check of heal_req is not necessary since it is
# made by conductor.
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
if 'vnfcInstanceId' in heal_req:
inst_utils.check_vnfc_ids(inst, heal_req['vnfcInstanceId'])
lcmocc = lcmocc_utils.new_lcmocc(
vnf_instance_id, v2fields.LcmOperationType.HEAL, heal_req,
auto_invocation=True)
lcmocc.create(context)
return lcmocc
def auto_heal(context, vnf_instance_id, heal_req, conductor):
lcmocc = _auto_heal_pre(context, vnf_instance_id, heal_req)
conductor.start_lcm_op_internal(context, lcmocc)
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def _auto_scale_pre(context, vnf_instance_id, scale_req):
# NOTE: validation check of scale_req is not necessary since it is
# made by conductor.
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
if 'numberOfSteps' not in scale_req:
scale_req['numberOfSteps'] = 1
inst_utils.check_scale_level(inst, scale_req['aspectId'],
scale_req['type'], scale_req['numberOfSteps'])
lcmocc = lcmocc_utils.new_lcmocc(
vnf_instance_id, v2fields.LcmOperationType.SCALE, scale_req,
auto_invocation=True)
lcmocc.create(context)
return lcmocc
def auto_scale(context, vnf_instance_id, scale_req, conductor):
lcmocc = _auto_scale_pre(context, vnf_instance_id, scale_req)
conductor.start_lcm_op_internal(context, lcmocc)

View File

@ -29,7 +29,6 @@ from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import subscription_utils as subsc_utils
from tacker.sol_refactored.common import vim_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2
from tacker.sol_refactored.controller import vnflcm_view
from tacker.sol_refactored.nfvo import nfvo_client
@ -205,7 +204,7 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
detail="vnfcInstanceId(%s) does not exist."
% vnfc_mod['id'])
lcmocc = vnflcm_utils.new_lcmocc(
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.MODIFY_INFO,
body, v2fields.LcmOperationStateType.PROCESSING)
lcmocc.create(context)
@ -249,21 +248,24 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
return target
def _check_policy(self, context, inst, action):
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format(action),
target=self._get_policy_target(inst))
@validator.schema(schema.InstantiateVnfRequest_V200, '2.0.0')
@coordinate.lock_vnf_instance('{id}')
def instantiate(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('instantiate'),
target=self._get_policy_target(inst))
self._check_policy(context, inst, 'instantiate')
if inst.instantiationState != 'NOT_INSTANTIATED':
raise sol_ex.VnfInstanceIsInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = vnflcm_utils.new_lcmocc(
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.INSTANTIATE, body)
req_param = lcmocc.operationParams
@ -291,16 +293,14 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def terminate(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('terminate'),
target=self._get_policy_target(inst))
self._check_policy(context, inst, 'terminate')
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = vnflcm_utils.new_lcmocc(
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.TERMINATE, body)
lcmocc.create(context)
@ -314,11 +314,26 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def scale(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('scale'),
target=self._get_policy_target(inst))
self._check_policy(context, inst, 'scale')
lcmocc = vnflcm_utils.scale(context, id, body, inst=inst)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
# check parameters
if 'numberOfSteps' not in body:
# set default value (1) defined by SOL specification for
# the convenience of the following methods.
body['numberOfSteps'] = 1
inst_utils.check_scale_level(inst, body['aspectId'],
body['type'], body['numberOfSteps'])
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.SCALE, body)
lcmocc.create(context)
self.conductor_rpc.start_lcm_op(context, lcmocc.id)
location = lcmocc_utils.lcmocc_href(lcmocc.id, self.endpoint)
@ -328,11 +343,27 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def heal(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('heal'),
target=self._get_policy_target(inst))
self._check_policy(context, inst, 'heal')
lcmocc = vnflcm_utils.heal(context, id, body, inst=inst)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
# check parameter for later use
is_all = body.get('additionalParams', {}).get('all', False)
if not isinstance(is_all, bool):
raise sol_ex.SolValidationError(
detail="additionalParams['all'] must be bool.")
if 'vnfcInstanceId' in body:
inst_utils.check_vnfc_ids(inst, body['vnfcInstanceId'])
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.HEAL, body)
lcmocc.create(context)
self.conductor_rpc.start_lcm_op(context, lcmocc.id)
location = lcmocc_utils.lcmocc_href(lcmocc.id, self.endpoint)
@ -343,16 +374,14 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def change_ext_conn(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('change_ext_conn'),
target=self._get_policy_target(inst))
self._check_policy(context, inst, 'change_ext_conn')
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = vnflcm_utils.new_lcmocc(
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.CHANGE_EXT_CONN, body)
lcmocc.create(context)
@ -404,16 +433,14 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def change_vnfpkg(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('change_vnfpkg'),
target=self._get_policy_target(inst))
vnfd_id = body['vnfdId']
self._check_policy(context, inst, 'change_vnfpkg')
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
vnfd_id = body['vnfdId']
pkg_info = self.nfvo_client.get_vnf_package_info_vnfd(
context, vnfd_id)
if pkg_info.operationalState != "ENABLED":
@ -440,7 +467,7 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
raise sol_ex.SolValidationError(
detail="'lcm-kubernetes-def-files' must be specified")
lcmocc = vnflcm_utils.new_lcmocc(
lcmocc = lcmocc_utils.new_lcmocc(
id, v2fields.LcmOperationType.CHANGE_VNFPKG, body)
lcmocc.create(context)

View File

@ -16,9 +16,9 @@
import time
from tacker import context
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_v2
from tacker.sol_refactored.conductor import prometheus_plugin_driver as pp_drv
from tacker.sol_refactored.conductor import vnflcm_auto
from tacker.sol_refactored import objects
from tacker.tests.unit.db import base as db_base
@ -123,14 +123,11 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.conductor = conductor_v2.ConductorV2()
pp_drv.PrometheusPluginDriver._instance = None
def tearDown(self):
super(TestPrometheusPlugin, self).tearDown()
# delete singleton object
pp_drv.PrometheusPluginDriver._instance = None
@mock.patch.object(vnflcm_utils, 'scale')
@mock.patch.object(vnflcm_auto, 'auto_scale')
def test_trigger_scale(self, mock_do_scale):
scale_req = {
'type': 'SCALE_OUT',
@ -139,33 +136,7 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
self.conductor.trigger_scale(
self.context, 'vnf_instance_id', scale_req)
def test_constructor(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
def test_driver_stub(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=False)
pp_drv.PrometheusPluginDriver._instance = None
drv = pp_drv.PrometheusPluginDriver.instance()
drv.trigger_scale(None, None, None)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
drv = pp_drv.PrometheusPluginDriver.instance()
def test_driver_constructor(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
pp_drv.PrometheusPluginDriver.instance()
self.assertRaises(
SystemError,
pp_drv.PrometheusPluginDriver)
def test_conductor_vnfm_auto_heal_queue(self):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
pp_drv.PrometheusPluginDriver._instance = None
self.conductor.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.config_fixture.config(
group='prometheus_plugin', timer_interval=1)
# queueing test
@ -187,12 +158,8 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
self.conductor.dequeue_auto_heal_instance(
self.context, 'invalid_id')
@mock.patch.object(vnflcm_utils, 'heal')
@mock.patch.object(vnflcm_auto, 'auto_heal')
def test_conductor_timer_expired(self, mock_do_heal):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
pp_drv.PrometheusPluginDriver._instance = None
self.conductor.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.conductor.prom_driver._timer_expired(
self.context, 'test_id', ['id'])

View File

@ -20,9 +20,9 @@ from oslo_log import log as logging
from tacker import context
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_v2
from tacker.sol_refactored.conductor import server_notification_driver as snd
from tacker.sol_refactored.conductor import vnflcm_auto
from tacker.sol_refactored import objects
from tacker.tests.unit.db import base as db_base
from unittest import mock
@ -39,18 +39,12 @@ class TestServerNotification(db_base.SqlTestCase):
self.config_fixture.config(
group='server_notification', server_notification=True)
self.conductor = conductor_v2.ConductorV2()
snd.ServerNotificationDriver._instance = None
def tearDown(self):
super(TestServerNotification, self).tearDown()
snd.ServerNotificationDriver._instance = None
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_conductor_notify_server_notification(self, mock_do_request):
self.config_fixture.config(
group='server_notification', server_notification=True)
snd.ServerNotificationDriver._instance = None
self.conductor.sn_driver = snd.ServerNotificationDriver.instance()
self.config_fixture.config(
group='server_notification', timer_interval=1)
resp = webob.Response()
@ -72,20 +66,11 @@ class TestServerNotification(db_base.SqlTestCase):
self.conductor.server_notification_remove_timer(
self.context, 'invalid_id')
@mock.patch.object(vnflcm_utils, 'heal')
@mock.patch.object(vnflcm_auto, 'auto_heal')
def test_conductor_timer_expired(self, mock_heal):
self.config_fixture.config(
group='server_notification', server_notification=True)
snd.ServerNotificationDriver._instance = None
self.conductor.sn_driver = snd.ServerNotificationDriver.instance()
self.conductor.sn_driver.timer_expired('test_id', ['id'])
def test_conductor_timer_expired_error(self):
self.config_fixture.config(
group='server_notification', server_notification=True)
snd.ServerNotificationDriver._instance = None
self.conductor.sn_driver = snd.ServerNotificationDriver.instance()
log_name = "tacker.sol_refactored.conductor.server_notification_driver"
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
self.conductor.sn_driver.timer_expired('test_id', ['id'])
@ -125,15 +110,3 @@ class TestServerNotification(db_base.SqlTestCase):
timer.append(['4'])
timer.cancel()
timer.__del__()
def test_driver_stub(self):
self.config_fixture.config(
group='server_notification', server_notification=False)
drv = snd.ServerNotificationDriver.instance()
drv = snd.ServerNotificationDriver.instance()
drv.notify('id', ['id'])
drv.remove_timer('id')
self.config_fixture.config(
group='server_notification', server_notification=True)
snd.ServerNotificationDriver._instance = None
drv = snd.ServerNotificationDriver.instance()