Merge "Ranger-Agent: Update heat send logic"

This commit is contained in:
Zuul 2019-11-25 17:34:42 +00:00 committed by Gerrit Code Review
commit c1e791817d
21 changed files with 224 additions and 479 deletions

1
.coverage Normal file
View File

@ -0,0 +1 @@
!coverage.py: This is a private format, don't read it directly!{"arcs":{}}

4
.stestr.conf Normal file
View File

@ -0,0 +1,4 @@
[DEFAULT]
test_path=./ord/tests
top_dir=./

View File

@ -7,8 +7,7 @@ ENV LANG C.UTF-8
RUN apt -qq update && \
apt -y install git \
netcat \
apt -y install netcat \
netbase \
openssh-server \
python3-minimal \

View File

@ -13,12 +13,12 @@ resource_creation_timeout_min = 1200
resource_creation_timeout_max = 14400
#Log files location
log_dir = /var/log/ranger-agent
local_repo = ranger_repo
resource_status_check_wait = 15
api_paste_config = /etc/ranger-agent/api-paste.ini
transport_url = rabbit://stackrabbit:devstack@127.0.0.1:5672/
enable_rds_callback_check = True
enable_heat_health_check = True
retry_limits = 5
[api]
# Address to bind the API server to
@ -42,9 +42,5 @@ connection = mysql+pymysql://root:devstack@127.0.0.1:3306/ord
max_retries = -1
[orm]
#This will assume the required ssh-keys are all already populated
orm_template_repo_url = git@github.com:ranger-agent/templates.git
#This is fake service call will be replaced with rds url
rds_listener_endpoint = http://127.0.0.1:8777/v1/rds/status
repo_pull_check_wait = 2
retry_limits = 5

View File

@ -12,6 +12,7 @@ repo_connection_timeout = 120
resource_creation_timeout_max = 14400
resource_creation_timeout_min = 1200
resource_status_check_wait = 15
retry_limits = 5
transport_url = rabbit://ranger-agent:password@rabbitmq.openstack.svc.cluster.local:5672/ranger-agent
[api]
@ -36,7 +37,6 @@ username = heat
orm_template_repo_url = git@github.com:ranger-agent/templates.git
rds_listener_endpoint = http://internal.ranger.com:8777/v1/rds/status
repo_pull_check_wait = 2
retry_limits = 5
[oslo_messaging_notifications]
driver = messagingv2

View File

@ -14,7 +14,9 @@
# under the License.
from ord.api.healthcheck import HealthCheck
from ord.client.client import Clients
from ord.client import rpcapi
from ord.common import exceptions as exc
from ord.common import utils
from ord.common.utils import ErrorCode
@ -23,8 +25,10 @@ from ord.i18n import _
from ord.openstack.common import log
from oslo_config import cfg
from pecan import expose
from pecan import request as pecan_req
from urllib.error import HTTPError
import base64
import datetime
import json
import oslo_messaging as messaging
@ -46,13 +50,17 @@ orm_opts = [
opts = [
cfg.StrOpt('region',
help='Region')
help='Region'),
cfg.StrOpt('auth_enabled', default='True',
help='check if authentication turned on')
]
CONF.register_opts(opts)
opt_group = cfg.OptGroup(name='orm',
title='Options for the orm service')
CONF.register_group(opt_group)
CONF.register_opts(orm_opts, opt_group)
@ -122,9 +130,19 @@ class ListenerQueueHandler(object):
class NotifierController(object):
kc = None
def __init__(self):
super(NotifierController, self).__init__()
self._rpcapi = rpcapi.RpcAPI()
self._set_keystone_client()
def _set_keystone_client(cls):
try:
if NotifierController.kc is None:
NotifierController.kc = Clients().keystone()
except exc.KeystoneInitializationException as e:
raise webob.exc.HTTPUnauthorized(explanation=str(e))
def _prepare_response_message(self, kwargs, target_data,
status, error_msg=None, error_code=None):
@ -189,6 +207,11 @@ class NotifierController(object):
msg = _('%s contains white spaces') % key
raise webob.exc.HTTPBadRequest(explanation=msg)
def _validate_token(self):
self._set_keystone_client()
token = pecan_req.headers['X-Auth-Token']
self.kc.tokens.validate(token)
@expose(generic=True)
def ord_notifier(self, **args):
raise webob.exc.HTTPNotFound
@ -217,42 +240,46 @@ class NotifierController(object):
@ord_notifier.when(method='POST', template='json')
def ord_notifier_POST(self, **vals):
vals = vals['ord-notifier']
request_id = vals.get('request-id')
if CONF.auth_enabled:
self._validate_token()
file_info = vals['file']
vals_dict = json.loads(vals['json'])
values = vals_dict['ord-notifier']
request_id = values.get('request-id')
if request_id is None:
msg = _("A valid request_id parameter is required")
raise webob.exc.HTTPBadRequest(explanation=msg)
# FIXME we don't process this field. So why for it here?
resource_type = vals.get('resource-type')
resource_type = values.get('resource-type')
if resource_type is None:
msg = _("A valid resource_type parameter is required")
raise webob.exc.HTTPBadRequest(explanation=msg)
# FIXME we support specific set of operation. We must check
# that received operation is in support list.
resource_operation = vals.get('operation')
resource_operation = values.get('operation')
if resource_operation is None:
msg = _("A valid resource_operation parameter is required")
raise webob.exc.HTTPBadRequest(explanation=msg)
resource_name = vals.get('resource-template-name')
resource_name = values.get('resource-template-name')
if resource_name is None:
msg = _("A valid resource-template-name parameter is required")
raise webob.exc.HTTPBadRequest(explanation=msg)
# FIXME: why is this needed?
template_version = vals.get('resource-template-version')
template_version = values.get('resource-template-version')
# FIXME: we can handle only 'hot' or 'ansible' values here
# Everything else must be rejected here.
template_type = vals.get('resource-template-type')
template_type = values.get('resource-template-type')
if template_type is None:
template_type = utils.TEMPLATE_TYPE_HEAT
status_id = str(uuid.uuid4())
region = vals.get('region')
region = values.get('region')
if region is None:
msg = _("A valid region is required")
raise webob.exc.HTTPBadRequest(explanation=msg)
@ -261,8 +288,8 @@ class NotifierController(object):
raise webob.exc.HTTPBadRequest(explanation=msg)
resource_id = ''
if 'resource-id' in vals:
resource_id = vals.get('resource-id')
if 'resource-id' in values:
resource_id = values.get('resource-id')
kwargs = {
'request_id': str(request_id),
@ -281,11 +308,11 @@ class NotifierController(object):
LOG.debug('Payload to DB call %r ' % kwargs)
db_response = self._persist_notification_record(kwargs=kwargs)
response = {}
vals['status'] = db_response['status']
values['status'] = db_response['status']
if 'error_code' in db_response:
vals['error-code'] = db_response['error_code']
vals['error-msg'] = db_response['error_msg']
response['ord-notifier-response'] = vals
values['error-code'] = db_response['error_code']
values['error-msg'] = db_response['error_msg']
response['ord-notifier-response'] = values
if 'error_code' not in db_response:
LOG.debug("----- message to Engine -----")
LOG.debug(" message: %s \nstatus_id: %s" %
@ -294,7 +321,9 @@ class NotifierController(object):
payload = str(kwargs)
try:
ctxt = {'request_id': kwargs.get('request_id')}
self._rpcapi.invoke_notifier_rpc(ctxt, payload)
heat_template = base64.b64decode(file_info.file.read())
self._rpcapi.invoke_notifier_rpc(ctxt, payload, heat_template)
except messaging.MessageDeliveryFailure:
LOG.error("Fail to deliver message")
else:

View File

@ -1,216 +0,0 @@
# Copyright 2016 ATT
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ord.common.exceptions as excp
import os
import shlex
import subprocess
from ord.openstack.common import log as logging
from oslo_config import cfg
from oslo_utils import fileutils
CONF = cfg.CONF
ORM_OPTS = [
cfg.StrOpt('orm_template_repo_url',
default='',
help='Remote repo location'),
cfg.Opt('repo_pull_check_wait',
default='1',
help='Wait Time'),
cfg.IntOpt('resource_status_check_wait', default=15,
help='delay in seconds between two retry call'),
cfg.IntOpt('retry_limits',
default=5,
help='number of retry'),
]
cfg.CONF.register_opts(ORM_OPTS, group='orm')
LOG = logging.getLogger(__name__)
subprocess._has_poll = False
class TemplateRepoClient(object):
"""Implementation to download template from repo.
Requires minimal installation (git) and minimal upkeep.
"""
def __init__(self, local_repo):
"""Clone git repo."""
self.git_repo_status = False
self.git_init_repo(local_repo)
def git_init_repo(self, local_repo):
# Check if local git repo already exists
repopath = os.path.join(os.environ['HOME'], local_repo)
repo = cfg.CONF.orm.orm_template_repo_url
LOG.info(
"%s Setting up repo initiated ...", os.path.basename(repo))
# create the git repo directory if not exists
fileutils.ensure_tree(repopath)
try:
# initialize repo directory as a git repo
cmd = 'git init {0}'.format(repopath)
self.run_git('GitRepoInit', cmd, workdir=repopath)
try:
# set remote origin
cmd = 'git -C {0} remote add origin {1}'.format(
repopath, repo)
self.run_git('GitRepoInit', cmd, workdir=repopath)
except Exception as repoexp:
LOG.warning(repoexp)
# fetch origin
rem_lock_file = '{0}/.git/refs/remotes/origin/master.lock'\
.format(repopath)
if os.path.exists(rem_lock_file):
os.remove(rem_lock_file)
cmd = 'git -C {0} fetch origin'.format(
repopath)
self.run_git('GitRepoInit', cmd, workdir=repopath)
except Exception as repoexp:
self.git_repo_status = False
LOG.critical("Failed to initialize Repo %s " % repoexp)
LOG.info(
"%s Setting up repo status (completed = %s)",
os.path.basename(repo), self.git_repo_status)
def pull_template(self, local_repo, pathtotemplate):
"""Get template from repo.
:param local_repo: local repo name
:param pathtotemplate: path to template
"""
if not self.git_repo_status:
self.git_init_repo(local_repo)
LOG.debug("Template pull initiated ...")
workdir = os.path.join(os.environ['HOME'], local_repo)
# normalize the path before checking if file exists
templatepath = os.path.normpath(
os.path.join(workdir, pathtotemplate))
# delete previous version
if os.path.isfile(templatepath):
os.remove(templatepath)
cmd = 'git -C {0} fetch origin'.format(workdir)
self.run_git('PullTemplate', cmd, workdir=workdir)
cmd = 'git -C {0} checkout FETCH_HEAD -- {1}'.format(
workdir, pathtotemplate)
self.run_git('PullTemplate', cmd, workdir=workdir, is_timeout=True)
LOG.debug("Template pull completed ...")
return templatepath
def run_git(self, label, cmd, workdir=None, is_timeout=False):
LOG.info("Running cmd: '%s'", cmd)
timed_out = False
retry_left = CONF.orm.retry_limits
if is_timeout:
timeout_sec = cfg.CONF.resource_status_check_wait
cmd = 'timeout -k {0}s {1}s {2}'.format(timeout_sec + 5,
timeout_sec, cmd)
LOG.info('Setting cmd timeout to: %s seconds', timeout_sec)
while(retry_left > 0):
try:
process = subprocess.Popen(
shlex.split(cmd), stdout=subprocess.PIPE,
shell=False, stderr=subprocess.PIPE)
[stdout, stderr] = process.communicate()
# 124 is the return code in the shell if timeout occurred
if process.returncode == 124:
timed_out = True
LOG.critical(
"Run command '%s' exceeded the alloted"
"time of %s seconds, process was killed.",
cmd, timeout_sec)
except Exception as exception:
LOG.critical("Unexpected error running '%s'"
"exception: %s",
cmd, exception.args)
[stdout, stderr] = process.communicate()
finally:
proc_result = {}
proc_result["returncode"] = process.returncode
proc_result["stdout"] = stdout.decode("UTF-8")
proc_result["stderr"] = stderr.decode("UTF-8")
proc_result["timed_out"] = timed_out
if proc_result["returncode"] == 0:
retry_left = 0
process.returncode = 0
self.git_repo_status = True
else:
if 'remote origin already exists' in proc_result["stderr"]:
retry_left = 0
else:
retry_left -= 1
LOG.warning("stderr: %s", proc_result)
LOG.warning("Retrying cmd '%s'. Retries left: %s",
cmd, retry_left)
if workdir is not None:
try:
rem_lock_file = '{0}/.git/refs/remotes/origin/master.lock'\
.format(workdir)
if os.path.exists(rem_lock_file):
os.remove(rem_lock_file)
fetch = 'git -C {0} fetch origin'.format(workdir)
fetch_process = subprocess.Popen(
shlex.split(fetch), stdout=subprocess.PIPE,
shell=False, stderr=subprocess.PIPE)
[stdout, stderr] = fetch_process.communicate()
LOG.info("Run command '%s' to syncup"
" repo after error", fetch)
except Exception as exp:
LOG.warning(exp)
if process.returncode != 0:
self.check_git_errors(label, proc_result)
def check_git_errors(self, label, result):
stderr = result['stderr'].lower()
if result['timed_out']:
raise excp.RepoTimeoutException(label=label)
elif 'service not known' in stderr:
raise excp.RepoIncorrectURL(label=label)
elif 'does not exist' in stderr:
raise excp.RepoNotExist(label=label)
elif ('permission denied' in stderr) or ('No such remote' in stderr):
raise excp.RepoNoPermission(label=label)
elif 'did not match any file(s) known to git' in stderr:
raise excp.FileNotInRepo(label=label)
elif 'remote origin already exists' in stderr:
pass
else:
# general unknown exception in case none of the above
# are the cause of the problem
raise excp.RepoUnknownException(label=label, unknown=stderr)

View File

@ -63,8 +63,6 @@ class HeatClient(object):
return stack
def create_stack(self, name, template):
template = utils.load_file(template)
client, self._kc = Clients().heat(self._kc)
try:
response = client.stacks.create(
@ -74,7 +72,6 @@ class HeatClient(object):
return response
def update_stack(self, stack_id, template):
template = utils.load_file(template)
client, self._kc = Clients().heat(self._kc)
try:

View File

@ -30,12 +30,13 @@ class RpcAPI(object):
self.transport = messaging.get_rpc_transport(cfg.CONF)
self._client = messaging.RPCClient(self.transport, self.target)
def invoke_notifier_rpc(self, ctxt, payload):
def invoke_notifier_rpc(self, ctxt, payload, heat_template):
try:
cctxt = self._client.prepare(version='1.0')
cctxt.cast(ctxt=ctxt,
method='invoke_notifier_rpc',
payload=payload)
payload=payload,
heat_template=heat_template)
except messaging.MessageDeliveryFailure:
LOG.error("Fail to deliver message")

View File

@ -61,7 +61,8 @@ def _create_facade(conf_group):
autocommit=True,
expire_on_commit=False,
mysql_sql_mode=conf_group.mysql_sql_mode,
idle_timeout=conf_group.idle_timeout,
# replace deprecated "idle_timeout" with "connection_recycle_time"
connection_recycle_time=conf_group.connection_recycle_time,
connection_debug=conf_group.connection_debug,
connection_trace=conf_group.connection_trace,
max_retries=conf_group.max_retries)

View File

@ -14,7 +14,6 @@
import ast
from multiprocessing import Process
import os
from ord.client import rpcengine
from ord.engine.healthcheck import HealthCheck
@ -32,7 +31,7 @@ class QueueHandler(object):
self._rpcengine = rpcengine.RpcEngine()
self.factory = WorkerFactory()
def invoke_notifier_rpc(self, ctxt, payload):
def invoke_notifier_rpc(self, ctxt, payload, heat_template):
LOG.debug("\n----- message from API -----")
LOG.debug("\n Payload: %s \nctxt: %s "
@ -45,12 +44,11 @@ class QueueHandler(object):
operation = d["resource_operation"]
template_status_id = d["template_status_id"]
region = d["region"]
stack_name = resource_name[:resource_name.index(".")]
path_to_tempate = os.path.join(region, template_type,
resource_type, resource_name)
worker = self.factory.getWorker(operation, path_to_tempate,
stack_name = resource_name
worker = self.factory.getWorker(operation,
stack_name, template_status_id,
resource_type, template_type)
resource_type, template_type,
heat_template)
self.factory.execute(worker, operation)
def invoke_health_probe_rpc(self, ctxt):

View File

@ -57,7 +57,7 @@ class HealthCheck(object):
if CONF.enable_heat_health_check:
try:
LOG.debug("Health Heat test starting")
heat.HeatClient().delete_stack('none_existence_id')
heat.HeatClient().delete_stack('non_existant_id')
except exc.HEATStackDeleteError as heatex:
if 'MessagingTimeout' in str(heatex):
LOG.error('Health Heat Test Exp in %s: %r',

View File

@ -13,7 +13,6 @@
# under the License.
import json
import os
from oslo_config import cfg
from random import SystemRandom
import six
@ -21,7 +20,6 @@ import sys
import threading
import time
from ord.client import getrepo
from ord.client import heat
from ord.client import rpcengine
from ord.common import exceptions as exc
@ -33,9 +31,6 @@ from ord.openstack.common import log as logging
CONF = cfg.CONF
CONF.register_opts([
cfg.StrOpt('local_repo', default='aic-orm-resources-labs',
help='local repo from where the'
'template yaml can be accessed from'),
cfg.IntOpt('heat_poll_interval', default=5,
help='delay in seconds between two consecutive call to '
'heat.stacks.status'),
@ -70,7 +65,6 @@ class Singleton(type):
@six.add_metaclass(Singleton)
class WorkerFactory(object):
_instance = None
_temp_repo_client = None
_heat_client = None
_glance_client = None
_db_client = None
@ -85,10 +79,6 @@ class WorkerFactory(object):
str(WorkerFactory._client_initialize))
WorkerThread._init_error = None
try:
WorkerThread._temp_repo_client = \
getrepo.TemplateRepoClient(CONF.local_repo)
WorkerThread._heat_client = heat.HeatClient()
try:
@ -122,19 +112,19 @@ class WorkerFactory(object):
WorkerFactory._client_init()
WorkerThread._client_initialize = True
def getWorker(self, operation, path_to_tempate, stack_name,
def getWorker(self, operation, stack_name,
template_status_id, resource_type,
template_type):
template_type, heat_template):
template_type = template_type.lower()
# FIXME: this code have a none zero to fail in very unexpected
# FIXME: this code has a none zero to fail in very unexpected
# way
randCrypt = SystemRandom()
threadID = randCrypt.randint(1, 99999999)
if template_type == "hot":
miniWorker = WorkerThread(threadID, operation,
path_to_tempate, stack_name,
miniWorker = WorkerThread(threadID, operation, stack_name,
template_status_id, resource_type,
heat_template,
WorkerThread._init_error)
WorkerThread._threadPool.update({threadID: miniWorker})
elif template_type == "ansible":
@ -154,16 +144,17 @@ class WorkerFactory(object):
class WorkerThread(threading.Thread):
def __init__(self, threadID, operation, path_to_tempate, stack_name,
template_status_id, resource_type, client_error=None):
LOG.info("initializing Thread._init_")
def __init__(self, threadID, operation, stack_name,
template_status_id, resource_type, heat_template,
client_error=None):
LOG.info("initializing Thread")
threading.Thread.__init__(self)
self.threadID = threadID
self.operation = operation
self.template_path = path_to_tempate
self.stack_name = stack_name
self.template_status_id = template_status_id
self.resource_type = resource_type
self.heat_template = heat_template
self.client_error = client_error
def extract_resource_extra_metadata(self, rds_payload, rds_status):
@ -194,15 +185,13 @@ class WorkerThread(threading.Thread):
def run(self):
LOG.debug("Thread Starting :: %s", self.threadID)
LOG.debug("operation=%s, stack_name=%s, path_to_tempate=%s",
self.operation, self.stack_name, self.template_path)
template_absolute_path = self.template_path
LOG.debug("operation %s for stack_name %s ",
self.operation, self.stack_name)
try:
if self._is_engine_initialized():
LOG.debug('Client initialization complete')
try:
template_absolute_path = self._fetch_template()
self._execute_operation(template_absolute_path)
self._execute_operation(self.heat_template)
except exc.ORDException as e:
LOG.error('%s', e.message)
self._update_permanent_storage(e)
@ -224,7 +213,6 @@ class WorkerThread(threading.Thread):
LOG.critical('Unhandled exception into %s', type(self).__name__,
exc_info=True)
finally:
self._cleanup_template(template_absolute_path)
LOG.info("Thread Exiting :: %s", self.threadID)
WorkerFactory.removeWorker(self.threadID)
@ -271,15 +259,6 @@ class WorkerThread(threading.Thread):
raise
def _cleanup_template(self, template_absolute_path):
LOG.info("Removing template File :: %s", template_absolute_path)
try:
if os.path.isfile(template_absolute_path):
os.remove(template_absolute_path)
LOG.info("Template File Removed")
except Exception as ex:
LOG.error("Error on cleanup of template File :: %s", ex)
def _update_permanent_storage(self, error=None):
args = {}
if isinstance(error, exc.StackOperationError):
@ -328,7 +307,7 @@ class WorkerThread(threading.Thread):
LOG.error("Unexpected error collecting extra \
Image Parameter %s", exception)
max_range = int(CONF.orm.retry_limits)
max_range = int(CONF.retry_limits)
self._rpcengine. \
invoke_listener_rpc(res_ctxt, json.dumps(rds_payload))
@ -358,24 +337,9 @@ class WorkerThread(threading.Thread):
else:
break
def _fetch_template(self):
"""Fetch template from document storage
Template fetching will be skipped if current operation does not require
template.
"""
if self.operation not in (
utils.OPERATION_CREATE,
utils.OPERATION_MODIFY):
return
LOG.debug("template path: %r", self.template_path)
return self._temp_repo_client.pull_template(
CONF.local_repo, self.template_path)
def _create_stack(self, template):
LOG.debug("Creating stack name %s by template %s",
self.stack_name, self.template_path)
LOG.debug("Creating template for stack %s ", self.stack_name)
# This call return raw response(dict), but all other calls to heat
# client return "models" build from raw responses. Look like this a
# BUG into heatclient. This behavior is not fixed until now (1.2.0).
@ -385,7 +349,7 @@ class WorkerThread(threading.Thread):
def _update_stack(self, template):
LOG.debug("Updating stack id %s by template %s",
self.stack_name, self.template_path)
self.stack_name, )
stack = self._heat_client.get_stack_by_name(self.stack_name)
self._heat_client.update_stack(stack.id, template)

View File

@ -16,6 +16,9 @@
"""
Unit Tests for ord.api.test_api
"""
import base64
from cgi import FieldStorage
import mock
from mox3.mox import stubout
from ord.api.controllers.v1 import api
@ -26,13 +29,12 @@ import requests
from urllib import request
import webob
CONF = cfg.CONF
class OrdApiTestCase(base.BaseTestCase):
PATH_PREFIX = ''
def setUp(self):
super(OrdApiTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
@ -40,24 +42,30 @@ class OrdApiTestCase(base.BaseTestCase):
self.addCleanup(self.stubs.SmartUnsetAll)
def test_api_notifier(self):
ord_notifier = api.NotifierController()
kwargs = {
'request_id': '1',
'resource_id': 'qwe1234',
'resource-type': 'image'
}
payload = str(kwargs)
mock_file = FieldStorage('heat_template', headers={})
params = {
"ord-notifier": {
"request-id": "2",
"resource-id": "1",
"resource-type": "image",
"resource-template-version": "1",
"resource-template-name": "image1",
"resource-template-type": "hot",
"operation": "create",
"region": "local"}
"file": mock_file,
"json":
'{"ord-notifier": {\
"request-id": "2",\
"resource-id": "1",\
"resource-type": "image",\
"resource-template-version": "1",\
"resource-template-name": "image1",\
"resource-template-type": "hot",\
"operation": "create",\
"region": "local"\
}\
}'
}
db_response = {'template_type': 'hot',
@ -72,17 +80,34 @@ class OrdApiTestCase(base.BaseTestCase):
CONF.set_default('region', 'local')
def fake_keystone_client(*args, **kwds):
return
self.stubs.Set(api.NotifierController, '_set_keystone_client',
fake_keystone_client)
ord_notifier = api.NotifierController()
def fake_validate_token(*args):
return
def fake_persist_notification_record(*args, **kwds):
return db_response
def fake_b64decode(*args, **kwds):
return "heat_template"
def fake_invoke_notifier_rpc(*args, **kwds):
return payload
self.stubs.Set(ord_notifier, "_validate_token", fake_validate_token)
self.stubs.Set(ord_notifier, "_persist_notification_record",
fake_persist_notification_record)
self.stubs.Set(base64, "b64decode", fake_b64decode)
self.stubs.Set(ord_notifier._rpcapi, "invoke_notifier_rpc",
fake_invoke_notifier_rpc)
response = ord_notifier.ord_notifier_POST(**params)
expect_response = response['ord-notifier-response']['status']
self.assertEqual(expect_response, 'Submitted')
@ -182,62 +207,108 @@ class OrdApiTestCase(base.BaseTestCase):
self.assertEqual(output_status, db_template_target['status'])
def test_api_notifier_for_blank_region(self):
ord_notifier = api.NotifierController()
mock_file = FieldStorage('heat_template', headers={})
params = {
"ord-notifier": {
"request-id": "2",
"resource-id": "1",
"resource-type": "image",
"resource-template-version": "1",
"resource-template-name": "image1",
"resource-template-type": "hot",
"operation": "create"}
"file": mock_file,
"json":
'{"ord-notifier": {\
"request-id": "2",\
"resource-id": "1",\
"resource-type": "image",\
"resource-template-version": "1",\
"resource-template-name": "image1",\
"resource-template-type": "hot",\
"operation": "create"\
}\
}'
}
def fake_keystone_client(*args, **kwds):
return
self.stubs.Set(api.NotifierController, '_set_keystone_client',
fake_keystone_client)
ord_notifier = api.NotifierController()
def fake_validate_token(*args):
return
self.stubs.Set(ord_notifier, "_validate_token", fake_validate_token)
self.assertRaises(webob.exc.HTTPBadRequest,
ord_notifier.ord_notifier_POST,
**params)
def test_api_notifier_for_invalid_region(self):
ord_notifier = api.NotifierController()
mock_file = FieldStorage('heat_template', headers={})
params = {
"ord-notifier": {
"request-id": "2",
"resource-id": "1",
"resource-type": "image",
"resource-template-version": "1",
"resource-template-name": "image1",
"resource-template-type": "hot",
"operation": "create",
"region": "dev"}
"file": mock_file,
"json":
'{"ord-notifier": {\
"request-id": "2",\
"resource-id": "1",\
"resource-type": "image",\
"resource-template-version": "1",\
"resource-template-name": "image1",\
"resource-template-type": "hot",\
"operation": "create",\
"region": "dev"\
}\
}'
}
CONF.set_default('region', 'local')
def fake_keystone_client(*args, **kwds):
return
self.stubs.Set(api.NotifierController, '_set_keystone_client',
fake_keystone_client)
ord_notifier = api.NotifierController()
def fake_validate_token(*args):
return
self.stubs.Set(ord_notifier, "_validate_token", fake_validate_token)
self.assertRaises(webob.exc.HTTPBadRequest,
ord_notifier.ord_notifier_POST,
**params)
def test_api_notifier_for_invalid_payload(self):
ord_notifier = api.NotifierController()
mock_file = FieldStorage('heat_template', headers={})
params = {
"ord-notifier": {
"request-id": "2",
"resource-id": "1",
"resource-type": "imag e",
"resource-template-version": "1",
"resource-template-name": "ima ge1",
"resource-template-type": "hot",
"operation": "create",
"region": "local"}
"file": mock_file,
"json":
'{"ord-notifier": {\
"request-id": "2",\
"resource-id": "1",\
"resource-type": "image",\
"resource-template-version": "1",\
"resource-template-name": "ima ge1",\
"resource-template-type": "hot",\
"operation": "create",\
"region": "local"\
}\
}'
}
CONF.set_default('region', 'local')
def fake_keystone_client(*args, **kwds):
return
self.stubs.Set(api.NotifierController, '_set_keystone_client',
fake_keystone_client)
ord_notifier = api.NotifierController()
def fake_validate_token(*args):
return
self.stubs.Set(ord_notifier, "_validate_token", fake_validate_token)
self.assertRaises(webob.exc.HTTPBadRequest,
ord_notifier.ord_notifier_POST,
**params)
def test_api_ord_notifier_status(self):
ord_notifier = api.NotifierController()
request_id = {"Id": "2"}
db_template = {'resource_operation': 'create',
'resource_id': '1',
@ -274,6 +345,13 @@ class OrdApiTestCase(base.BaseTestCase):
'error-msg': 'stack fail'}
}
def fake_keystone_client(*args, **kwds):
return
self.stubs.Set(api.NotifierController, '_set_keystone_client',
fake_keystone_client)
ord_notifier = api.NotifierController()
def fake_retrieve_template(*args, **kwds):
return db_template

View File

@ -1,59 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ord.client import getrepo
from ord.common.exceptions import ORDException
from ord.tests import base
import os
from oslo_config import cfg
from unittest import mock
from unittest.mock import patch
CONF = cfg.CONF
class GetRepoTestCase(base.BaseTestCase):
def setUp(self):
super(GetRepoTestCase, self).setUp()
self.git_inst = None
self.local_repo = 'ord_test'
with patch.object(getrepo.TemplateRepoClient, 'git_init_repo'):
self.git_inst = getrepo.TemplateRepoClient(self.local_repo)
def test_pullrepo_template(self):
path = os.path.abspath('')
testfile = 'ord/dummy.py'
expected = path + "/" + testfile
with patch.object(self.git_inst, 'run_git'):
result = self.git_inst.pull_template(path, testfile)
self.assertEqual(expected, result)
def test_fail_pull_template(self):
path = os.path.abspath('test')
testfile = 'tests/files/stack0.yaml'
self.assertRaises(ORDException, self.git_inst.pull_template,
path, testfile)
def test_git_init_repo(self):
self.subprocess = mock.Mock()
with patch.object(self.git_inst, 'run_git') as mock_method:
self.git_inst.git_init_repo(self.local_repo)
mock_method.assert_called()

View File

@ -83,7 +83,7 @@ class TestHeatClient(base.BaseTestCase):
self.heat.create_stack(stack_name, template.name)
self.heat_client.stacks.create.assert_called_once_with(
stack_name=stack_name, template=self.test_template)
stack_name=stack_name, template=template.name)
def test_update_stack(self):
stack_idnr = "1"
@ -92,7 +92,7 @@ class TestHeatClient(base.BaseTestCase):
self.heat.update_stack(stack_idnr, template.name)
self.heat_client.stacks.update.assert_called_once_with(
stack_idnr, template=self.test_template)
stack_idnr, template=template.name)
def test_delete_stack(self):
stack_idnr = "1"

View File

@ -83,8 +83,9 @@ class RpcAPITestCase(base.BaseTestCase):
'resource-type': 'image'
}
payload = str(kwargs)
heat_template = 'test'
self._test_api('invoke_notifier_rpc',
rpc_method='cast',
payload=payload,
version='1.0')
version='1.0',
heat_template=heat_template)

View File

@ -30,13 +30,12 @@ class TestWorkerFactory(base.BaseTestCase):
self.resource_type = 'image'
self.template_type = 'hot'
self.threadId = 123
self.heat_template = mock.Mock()
super(TestWorkerFactory, self).setUp()
self.clients = mock.Mock()
self.patch('ord.client.getrepo.TemplateRepoClient')\
.return_value = self.clients
self.patch('ord.client.heat.HeatClient').return_value = self.clients
self.patch('ord.client.rpcengine.RpcEngine')\
.return_value = self.clients
@ -45,11 +44,11 @@ class TestWorkerFactory(base.BaseTestCase):
def test_getWorker(self):
threadId = self.worker.getWorker(self.operation,
self.path_to_tempate,
self.stack_name,
self.template_status_id,
self.resource_type,
self.template_type)
self.template_type,
self.heat_template)
assert (threadId > 0)
def test_negetive_removeWorker(self):
@ -58,11 +57,11 @@ class TestWorkerFactory(base.BaseTestCase):
def test_removeWorker(self):
localThreadId = self.worker.getWorker(self.operation,
self.path_to_tempate,
self.stack_name,
self.template_status_id,
self.resource_type,
self.template_type)
self.template_type,
self.heat_template)
try:
self.worker.removeWorker(localThreadId)
except Exception:

View File

@ -13,7 +13,6 @@
# under the License.
import itertools
import os
import mock
from oslo_config import cfg
@ -26,26 +25,18 @@ from ord.tests import base
CONF = cfg.CONF
# FIXME: pep8 compatible - camelcase attributes
class TestWorkerThread(base.BaseTestCase):
def setUp(self):
super(TestWorkerThread, self).setUp()
self.operation = utils.OPERATION_CREATE
self.path_to_tempate = 'test_path'
self.stack_name = 'test_stack'
self.template_status_id = '1'
self.resource_type = 'image'
self.template_type = 'hot'
self.threadId = 123
self.local_repo = 'aic-orm-resources-labs'
self._temp_repo_client = mock.Mock()
self._temp_repo_client.pull_template.return_value = self.pull_client\
= mock.Mock()
self.patch('ord.engine.workerfactory.getrepo').return_value\
= self._temp_repo_client
self.heat_template = mock.Mock()
self.db_api = mock.Mock()
self.db_api.update_target_data.return_value = self.db_client\
@ -54,17 +45,16 @@ class TestWorkerThread(base.BaseTestCase):
= self.db_api
self.WorkerFactory = mock.Mock()
self.WorkerFactory.removeWorker.return_value = self.remove_clinet\
self.WorkerFactory.removeWorker.return_value = self.remove_client\
= mock.Mock()
self.patch('ord.engine.workerfactory.WorkerFactory').return_value\
= self.WorkerFactory
self.workerThread = workerfactory.WorkerThread(
self.threadId, self.operation, self.path_to_tempate,
self.threadId, self.operation,
self.stack_name, self.template_status_id,
self.resource_type)
self.heat_template, self.resource_type)
self.workerThread._heat_client = self.heat_client = mock.Mock()
self.workerThread._temp_repo_client = self._temp_repo_client
self.workerThread.db_api = self.db_api
def test_extract_resource_extra_metadata(self):
@ -88,6 +78,7 @@ class TestWorkerThread(base.BaseTestCase):
'size': '10',
'virtual_size': '12'}}}
self.workerThread.resource_type = 'image'
self.heat_client.get_stack_by_name.return_value = stack
self.heat_client.get_image_data_by_stackid.return_value = image_data
self.workerThread.extract_resource_extra_metadata(
@ -99,34 +90,25 @@ class TestWorkerThread(base.BaseTestCase):
get_image_data_by_stackid.assert_called_once_with(stack.id)
self.assertEqual(output_payload, input_payload)
def test_fetch_template(self):
self.workerThread._fetch_template()
self._temp_repo_client.pull_template\
.assert_called_with(self.local_repo, self.path_to_tempate)
def test_create_stack(self):
self.heat_client.create_stack.return_value = {'stack': {'id': 1}}
template = os.path.join(
os.path.expanduser('~'), self.local_repo, self.path_to_tempate)
self.workerThread._create_stack(template)
self.workerThread._create_stack(self.heat_template)
self.heat_client.create_stack.assert_called_once_with(
self.stack_name, template)
self.stack_name, self.heat_template)
def test_update_stack(self):
stack = base.Dummy(id='1', stack_name=self.stack_name)
template = os.path.join(
os.path.expanduser('~'), self.local_repo, self.path_to_tempate)
self.heat_client.get_stack_by_name.return_value = stack
self.workerThread._update_stack(template)
self.workerThread._update_stack(self.heat_template)
self.heat_client.get_stack_by_name.assert_called_once_with(
self.stack_name)
self.heat_client.update_stack.\
assert_called_with(stack.id, template)
assert_called_with(stack.id, self.heat_template)
def test_delete_stack(self):
stack = base.Dummy(id='1', stack_name=self.stack_name)
@ -208,10 +190,6 @@ class TestWorkerThread(base.BaseTestCase):
self.assertEqual('UPDATE_COMPLETE', status_transition.transitions[-1])
def test_run(self):
self.workerThread._fetch_template = fetch_template = mock.Mock()
template_absolute_path = os.path.join(
os.path.expanduser('~'), self.local_repo, self.path_to_tempate)
fetch_template.return_value = template_absolute_path
self.workerThread._execute_operation = execute = mock.Mock()
execute.return_value = 'OPERATION_STATUS'
self.workerThread._update_permanent_storage = \
@ -220,16 +198,10 @@ class TestWorkerThread(base.BaseTestCase):
self.workerThread._send_operation_results = send_results = mock.Mock()
self.workerThread.run()
fetch_template.assert_called_once_with()
execute.assert_called_with(template_absolute_path)
save_results.assert_called_once_with()
send_results.assert_called_once_with()
def test_run_fail(self):
self.workerThread._fetch_template = fetch_template = mock.Mock()
template_absolute_path = os.path.join(
os.path.expanduser('~'), self.local_repo, self.path_to_tempate)
fetch_template.return_value = template_absolute_path
error = exc.StackOperationError(operation='unittest', stack='dummy')
self.workerThread._execute_operation = execute = mock.Mock(
@ -240,29 +212,9 @@ class TestWorkerThread(base.BaseTestCase):
self.workerThread.run()
fetch_template.assert_called_once_with()
execute.assert_called_with(template_absolute_path)
save_status.assert_called_once_with(error)
send_results.assert_called_once_with()
def test_run_fail_uncontrolled(self):
error = ZeroDivisionError()
self.workerThread._fetch_template = fetch_template = mock.Mock()
template_absolute_path = os.path.join(
os.path.expanduser('~'), self.local_repo, self.path_to_tempate)
fetch_template.return_value = template_absolute_path
self.workerThread._execute_operation = execute = mock.Mock(
side_effect=error)
self.workerThread._update_permanent_storage = save_status = mock.Mock()
self.workerThread._send_operation_results = send_results = mock.Mock()
self.workerThread._cleanup_template = mock.Mock()
self.workerThread.run()
fetch_template.assert_called_once_with()
execute.assert_called_with(template_absolute_path)
def test_update_permanent_storage(self):
db_api = self.patch('ord.engine.workerfactory.db_api')

View File

@ -2,6 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
amqp>=2.5.2
hacking>=0.12.0,!=0.13.0,<0.14 # Apache-2.0
bandit>=1.5.1
coverage>=4.0

View File

@ -7,12 +7,11 @@ skipsdist = True
[testenv]
basepython = python3.6
usedevelop = True
install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
deps = -r {toxinidir}/requirements.txt
-r {toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
[testenv:bandit]