Post Jobs - Integrate API with Backend (TaskFlow)
* Connected create cluster and delete cluster functionality * Updated existing API tests to mock-out taskflow client * Added tests for job posting from API to jobboard (TaskFlow) Change-Id: I59a61b250db84d2782e21393ea3a856c426541fc
This commit is contained in:
parent
1f2ccd8882
commit
bf069a9485
|
@ -18,17 +18,27 @@
|
|||
|
||||
"""Version 1 of the Cue API
|
||||
"""
|
||||
import uuid
|
||||
|
||||
from cue.api.controllers import base
|
||||
from cue.common import exception
|
||||
from cue.common.i18n import _ # noqa
|
||||
from cue.common.i18n import _LI # noqa
|
||||
from cue import objects
|
||||
from cue.openstack.common import log as logging
|
||||
from cue.taskflow import client as task_flow_client
|
||||
from cue.taskflow.flow import create_cluster
|
||||
from cue.taskflow.flow import delete_cluster
|
||||
|
||||
from oslo.utils import uuidutils
|
||||
import pecan
|
||||
from pecan import rest
|
||||
import wsme
|
||||
from wsme import types as wtypes
|
||||
import wsmeext.pecan as wsme_pecan
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EndPoint(base.APIBase):
|
||||
"""Representation of an End point."""
|
||||
|
@ -151,8 +161,23 @@ class ClusterController(rest.RestController):
|
|||
def delete(self):
|
||||
"""Delete this Cluster."""
|
||||
context = pecan.request.context
|
||||
|
||||
# update cluster to deleting
|
||||
objects.Cluster.update_cluster_deleting(context, self.id)
|
||||
|
||||
# prepare and post cluster delete job to backend
|
||||
job_args = {
|
||||
"cluster_id": self.id,
|
||||
"cluster_status": "deleting",
|
||||
}
|
||||
job_client = task_flow_client.get_client_instance()
|
||||
job_uuid = uuidutils.generate_uuid()
|
||||
job_client.post(delete_cluster, job_args, tx_uuid=job_uuid)
|
||||
|
||||
LOG.info(_LI('Delete Cluster Request Cluster ID %(cluster_id)s Job ID '
|
||||
'%(job_id)s') % ({"cluster_id": self.id,
|
||||
"job_id": job_uuid}))
|
||||
|
||||
|
||||
class ClustersController(rest.RestController):
|
||||
"""Manages operations on Clusters of nodes."""
|
||||
|
@ -186,10 +211,31 @@ class ClustersController(rest.RestController):
|
|||
# create new cluster with node related data from user
|
||||
new_cluster.create(context)
|
||||
|
||||
# retrieve cluster data
|
||||
cluster = Cluster()
|
||||
|
||||
cluster.cluster = get_complete_cluster(context, new_cluster.id)
|
||||
|
||||
# prepare and post cluster create job to backend
|
||||
job_args = {
|
||||
"size": cluster.cluster.size,
|
||||
"flavor": cluster.cluster.flavor,
|
||||
"volume_size": cluster.cluster.volume_size,
|
||||
"network_id": cluster.cluster.network_id,
|
||||
"cluster_status": "BUILDING",
|
||||
"node_id": "node_id",
|
||||
"port_name": "port_" + str(uuid.uuid4()),
|
||||
}
|
||||
job_client = task_flow_client.get_client_instance()
|
||||
job_uuid = uuidutils.generate_uuid()
|
||||
job_client.post(create_cluster, job_args, tx_uuid=job_uuid)
|
||||
|
||||
LOG.info(_LI('Create Cluster Request Cluster ID %(cluster_id)s Cluster'
|
||||
' size %(size)s network ID %(network_id)s Job ID '
|
||||
'%(job_id)s') % ({"cluster_id": cluster.cluster.id,
|
||||
"size": cluster.cluster.size,
|
||||
"network_id": cluster.cluster.network_id,
|
||||
"job_id": job_uuid}))
|
||||
|
||||
return cluster
|
||||
|
||||
@pecan.expose()
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import contextlib
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
@ -54,6 +55,95 @@ def _make_conf(backend_uri):
|
|||
}
|
||||
return conf
|
||||
|
||||
_task_flow_client = None
|
||||
|
||||
|
||||
def get_client_instance(client_name=None, persistence=None, jobboard=None):
|
||||
"""Create and access a single instance of TaskFlow client
|
||||
|
||||
:param client_name: Name of the client interacting with the jobboard
|
||||
:param persistence: A persistence backend instance to be used in lieu
|
||||
of auto-creating a backend instance based on
|
||||
configuration parameters
|
||||
:param jobboard: A jobboard backend instance to be used in lieu of
|
||||
auto-creating a backend instance based on
|
||||
configuration parameters
|
||||
:return: A :class:`.Client` instance.
|
||||
"""
|
||||
global _task_flow_client
|
||||
|
||||
if _task_flow_client is None:
|
||||
if persistence is None:
|
||||
persistence = Client.create_persistence()
|
||||
if jobboard is None:
|
||||
jobboard = Client.create_jobboard(persistence=persistence)
|
||||
if client_name is None:
|
||||
client_name = "cue_job_client"
|
||||
|
||||
_task_flow_client = Client(client_name,
|
||||
persistence=persistence,
|
||||
jobboard=jobboard)
|
||||
|
||||
return _task_flow_client
|
||||
|
||||
|
||||
def create_persistence(conf=None, **kwargs):
|
||||
"""Factory method for creating a persistence backend instance
|
||||
|
||||
:param conf: Configuration parameters for the persistence backend. If
|
||||
no conf is provided, zookeeper configuration parameters
|
||||
for the job backend will be used to configure the
|
||||
persistence backend.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance.
|
||||
"""
|
||||
if conf is None:
|
||||
connection = cfg.CONF.taskflow.persistence_connection
|
||||
if connection is None:
|
||||
connection = ("zookeeper://%s/%s"
|
||||
% (
|
||||
cfg.CONF.taskflow.zk_hosts,
|
||||
cfg.CONF.taskflow.zk_path,
|
||||
))
|
||||
conf = _make_conf(connection)
|
||||
be = persistence_backends.fetch(conf=conf, **kwargs)
|
||||
with contextlib.closing(be.get_connection()) as conn:
|
||||
conn.upgrade()
|
||||
return be
|
||||
|
||||
|
||||
def create_jobboard(board_name=None, conf=None, persistence=None, **kwargs):
|
||||
"""Factory method for creating a jobboard backend instance
|
||||
|
||||
:param board_name: Name of the jobboard
|
||||
:param conf: Configuration parameters for the jobboard backend.
|
||||
:param persistence: A persistence backend instance to be used with the
|
||||
jobboard.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance.
|
||||
"""
|
||||
if board_name is None:
|
||||
board_name = cfg.CONF.taskflow.jobboard_name
|
||||
|
||||
if conf is None:
|
||||
conf = {'board': 'zookeeper'}
|
||||
|
||||
conf.update({
|
||||
"path": "%s/jobs" % (cfg.CONF.taskflow.zk_path),
|
||||
"hosts": cfg.CONF.taskflow.zk_hosts,
|
||||
"timeout": cfg.CONF.taskflow.zk_timeout
|
||||
})
|
||||
|
||||
jb = job_backends.fetch(
|
||||
name=board_name,
|
||||
conf=conf,
|
||||
persistence=persistence,
|
||||
**kwargs)
|
||||
jb.connect()
|
||||
return jb
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""An abstraction for interacting with Taskflow
|
||||
|
@ -61,6 +151,9 @@ class Client(object):
|
|||
This class provides an abstraction for Taskflow to expose a simpler
|
||||
interface for posting jobs to Taskflow Jobboards than what is provided
|
||||
out of the box with Taskflow.
|
||||
|
||||
:ivar persistence: persistence backend instance
|
||||
:ivar jobboard: jobboard backend instance
|
||||
"""
|
||||
|
||||
def __init__(self, client_name, board_name=None, persistence=None,
|
||||
|
@ -86,19 +179,19 @@ class Client(object):
|
|||
|
||||
self._client_name = client_name
|
||||
|
||||
self._persistence = persistence or Client.persistence(**kwargs)
|
||||
self.persistence = persistence or create_persistence(**kwargs)
|
||||
|
||||
self._jobboard = jobboard or Client.jobboard(board_name,
|
||||
None,
|
||||
self._persistence,
|
||||
**kwargs)
|
||||
self.jobboard = jobboard or create_jobboard(board_name,
|
||||
None,
|
||||
self.persistence,
|
||||
**kwargs)
|
||||
|
||||
def __del__(self):
|
||||
"""Destructor for Client class."""
|
||||
if self._jobboard is not None:
|
||||
self._jobboard.close()
|
||||
if self._persistence is not None:
|
||||
self._persistence.close()
|
||||
if self.jobboard is not None:
|
||||
self.jobboard.close()
|
||||
if self.persistence is not None:
|
||||
self.persistence.close()
|
||||
|
||||
@classmethod
|
||||
def create(cls, client_name, board_name=None, persistence=None,
|
||||
|
@ -120,62 +213,6 @@ class Client(object):
|
|||
return cls(client_name, board_name=board_name, persistence=persistence,
|
||||
jobboard=jobboard, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def persistence(conf=None, **kwargs):
|
||||
"""Factory method for creating a persistence backend instance
|
||||
|
||||
:param conf: Configuration parameters for the persistence backend. If
|
||||
no conf is provided, zookeeper configuration parameters
|
||||
for the job backend will be used to configure the
|
||||
persistence backend.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance.
|
||||
"""
|
||||
if conf is None:
|
||||
connection = cfg.CONF.taskflow.persistence_connection
|
||||
if connection is None:
|
||||
connection = ("zookeeper://%s/%s"
|
||||
% (
|
||||
cfg.CONF.taskflow.zk_hosts,
|
||||
cfg.CONF.taskflow.zk_path,
|
||||
))
|
||||
conf = _make_conf(connection)
|
||||
be = persistence_backends.fetch(conf=conf, **kwargs)
|
||||
return be
|
||||
|
||||
@staticmethod
|
||||
def jobboard(board_name, conf=None, persistence=None, **kwargs):
|
||||
"""Factory method for creating a jobboard backend instance
|
||||
|
||||
:param board_name: Name of the jobboard
|
||||
:param conf: Configuration parameters for the jobboard backend.
|
||||
:param persistence: A persistence backend instance to be used with the
|
||||
jobboard.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance.
|
||||
"""
|
||||
if board_name is None:
|
||||
board_name = cfg.CONF.taskflow.jobboard_name
|
||||
|
||||
if conf is None:
|
||||
conf = {'board': 'zookeeper'}
|
||||
|
||||
conf.update({
|
||||
"path": "%s/jobs" % (cfg.CONF.taskflow.zk_path),
|
||||
"hosts": cfg.CONF.taskflow.zk_hosts,
|
||||
"timeout": cfg.CONF.taskflow.zk_timeout
|
||||
})
|
||||
|
||||
jb = job_backends.fetch(
|
||||
name=board_name,
|
||||
conf=conf,
|
||||
persistence=persistence,
|
||||
**kwargs)
|
||||
jb.connect()
|
||||
return jb
|
||||
|
||||
def post(self, flow_factory, job_args=None,
|
||||
flow_args=None, flow_kwargs=None, tx_uuid=None):
|
||||
"""Method for posting a new job to the jobboard
|
||||
|
@ -214,13 +251,13 @@ class Client(object):
|
|||
})
|
||||
job_details['flow_uuid'] = flow_detail.uuid
|
||||
|
||||
self._persistence.get_connection().save_logbook(book)
|
||||
self.persistence.get_connection().save_logbook(book)
|
||||
|
||||
engines.save_factory_details(
|
||||
flow_detail, flow_factory, flow_args, flow_kwargs,
|
||||
self._persistence)
|
||||
self.persistence)
|
||||
|
||||
job = self._jobboard.post(job_name, book, details=job_details)
|
||||
job = self.jobboard.post(job_name, book, details=job_details)
|
||||
return job
|
||||
|
||||
def joblist(self, only_unclaimed=False, ensure_fresh=False):
|
||||
|
@ -231,7 +268,7 @@ class Client(object):
|
|||
Behavior of this parameter is backend specific.
|
||||
:return: A list of jobs in the jobboard
|
||||
"""
|
||||
return list(self._jobboard.iterjobs(only_unclaimed=only_unclaimed,
|
||||
return list(self.jobboard.iterjobs(only_unclaimed=only_unclaimed,
|
||||
ensure_fresh=ensure_fresh))
|
||||
|
||||
def delete(self, job=None, job_id=None):
|
||||
|
@ -254,5 +291,5 @@ class Client(object):
|
|||
if j.uuid == job_id:
|
||||
job = j
|
||||
|
||||
self._jobboard.claim(job, self._client_name)
|
||||
self._jobboard.consume(job, self._client_name)
|
||||
self.jobboard.claim(job, self._client_name)
|
||||
self.jobboard.consume(job, self._client_name)
|
|
@ -14,3 +14,4 @@
|
|||
# under the License.
|
||||
|
||||
from create_cluster import create_cluster # noqa
|
||||
from delete_cluster import delete_cluster # noqa
|
||||
|
|
|
@ -31,7 +31,7 @@ def create_cluster():
|
|||
cinder_task.CreateVolume(os_client=client.cinder_client(),
|
||||
provides='cinder_volume_id'),
|
||||
nova_task.CreateVm(os_client=client.nova_client(),
|
||||
provides='vm_id'),
|
||||
provides='nova_vm_id'),
|
||||
linear_flow.Flow('wait for vm to become active',
|
||||
retry=retry.Times(10)).add(
|
||||
nova_task.GetVmStatus(os_client=client.nova_client(),
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import taskflow.patterns.linear_flow as linear_flow
|
||||
|
||||
import cue.taskflow.task as cue_task
|
||||
|
||||
|
||||
def delete_cluster():
|
||||
flow = linear_flow.Flow('deleting cluster').add(
|
||||
cue_task.UpdateClusterStatus(cue_client="cue client")
|
||||
)
|
||||
return flow
|
|
@ -128,10 +128,10 @@ class ConductorService(object):
|
|||
version_string, self._jobboard_name)
|
||||
|
||||
with contextlib.closing(
|
||||
tf_client.Client.persistence(conf=self._persistence_conf)
|
||||
tf_client.create_persistence(conf=self._persistence_conf)
|
||||
) as persistence:
|
||||
with contextlib.closing(
|
||||
tf_client.Client.jobboard(
|
||||
tf_client.create_jobboard(
|
||||
board_name=self._jobboard_name,
|
||||
conf=self._jobboard_conf,
|
||||
persistence=persistence,
|
||||
|
|
|
@ -18,5 +18,5 @@ import base_task as base_task
|
|||
|
||||
|
||||
class UpdateClusterStatus(base_task.BaseTask):
|
||||
def execute(self, vm_status, **kwargs):
|
||||
print("Update Cluster Status to %s" % vm_status)
|
||||
def execute(self, cluster_status, **kwargs):
|
||||
print("Update Cluster Status to %s" % cluster_status)
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Authors: Davide Agnello <davide.agnello@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# Copyright [2014] Hewlett-Packard Development Company, L.P.
|
||||
# limitations under the License.
|
||||
|
||||
import zake.fake_client as fake_client
|
||||
|
||||
import cue.taskflow.client as tf_client
|
||||
|
||||
_zk_client = fake_client.FakeClient()
|
||||
persistence = tf_client.create_persistence(client=_zk_client)
|
||||
jobboard = tf_client.create_jobboard("test_board",
|
||||
persistence=persistence,
|
||||
client=_zk_client)
|
||||
|
||||
tf_client = tf_client.get_client_instance(persistence=persistence,
|
||||
jobboard=jobboard)
|
|
@ -86,6 +86,7 @@ class TestGetCluster(api.FunctionalTest,
|
|||
|
||||
class TestDeleteCluster(api.FunctionalTest,
|
||||
api_utils.ClusterValidationMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteCluster, self).setUp()
|
||||
|
||||
|
|
|
@ -71,6 +71,7 @@ class TestListClusters(api.FunctionalTest,
|
|||
|
||||
class TestCreateCluster(api.FunctionalTest,
|
||||
api_utils.ClusterValidationMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateCluster, self).setUp()
|
||||
|
||||
|
|
|
@ -36,8 +36,8 @@ class CreatePortTests(base.TestCase):
|
|||
]
|
||||
|
||||
task_store = {
|
||||
'network_id': "0",
|
||||
'port_name': "port_0",
|
||||
"network_id": "0",
|
||||
"port_name": "port_0",
|
||||
}
|
||||
|
||||
def test_create_port_invalid_network(self):
|
||||
|
@ -51,26 +51,14 @@ class CreatePortTests(base.TestCase):
|
|||
# generate a new UUID for an 'invalid' network_id
|
||||
CreatePortTests.task_store['network_id'] = str(uuid.uuid4())
|
||||
|
||||
try:
|
||||
engines.run(flow, store=CreatePortTests.task_store)
|
||||
except exceptions.NetworkNotFoundClient:
|
||||
"""this exception is expected for this test"""
|
||||
except Exception as e:
|
||||
self.fail("Unexpected exception was thrown: " + e.message)
|
||||
else:
|
||||
self.fail("NetworkNotFoundClient exception not thrown as expected")
|
||||
self.assertRaises(exceptions.NetworkNotFoundClient, engines.run, flow,
|
||||
store=CreatePortTests.task_store)
|
||||
|
||||
def test_create_port(self):
|
||||
# retrieve neutron client API class
|
||||
neutron_client = client.neutron_client()
|
||||
|
||||
# retrieve networks
|
||||
#network_list = neutron_client.list_networks()
|
||||
#self.assertNotEqual(len(network_list['networks']), 0,
|
||||
# "No networks found")
|
||||
|
||||
# set a network_id and unique name to use
|
||||
#network_id = network_list['networks'][0]['id']
|
||||
network = neutron_client.create_network()
|
||||
network_id = network['network']['id']
|
||||
CreatePortTests.task_store['network_id'] = network_id
|
||||
|
|
|
@ -12,15 +12,15 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import contextlib
|
||||
|
||||
from oslo.utils import uuidutils
|
||||
import taskflow.patterns.linear_flow as linear_flow
|
||||
import taskflow.task
|
||||
import zake.fake_client as fake_client
|
||||
|
||||
import cue.taskflow.client as tf_client
|
||||
from cue.tests import api
|
||||
import cue.tests.base as base
|
||||
from cue.tests import utils as test_utils
|
||||
|
||||
|
||||
class TimesTwo(taskflow.task.Task):
|
||||
|
@ -37,16 +37,7 @@ def create_flow():
|
|||
class TaskflowClientTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TaskflowClientTest, self).setUp()
|
||||
self._zk_client = fake_client.FakeClient()
|
||||
self.persistence = tf_client.Client.persistence(client=self._zk_client)
|
||||
with contextlib.closing(self.persistence.get_connection()) as conn:
|
||||
conn.upgrade()
|
||||
self.jobboard = tf_client.Client.jobboard("test_board",
|
||||
persistence=self.persistence,
|
||||
client=self._zk_client)
|
||||
self.tf_client = tf_client.Client("test_client",
|
||||
persistence=self.persistence,
|
||||
jobboard=self.jobboard)
|
||||
self.tf_client = tf_client.get_client_instance()
|
||||
|
||||
def tearDown(self):
|
||||
super(TaskflowClientTest, self).tearDown()
|
||||
|
@ -57,9 +48,9 @@ class TaskflowClientTest(base.TestCase):
|
|||
}
|
||||
tx_uuid = uuidutils.generate_uuid()
|
||||
|
||||
pre_count = self.jobboard.job_count
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
|
||||
post_count = self.jobboard.job_count
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = pre_count + 1
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
|
@ -79,11 +70,44 @@ class TaskflowClientTest(base.TestCase):
|
|||
"Job in jobboard differs from job returned by "
|
||||
"Client.post method")
|
||||
|
||||
pre_count = self.jobboard.job_count
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
self.tf_client.delete(job=job)
|
||||
post_count = self.jobboard.job_count
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = pre_count - 1
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
"expected %d jobs in the jobboard after a claim, "
|
||||
"got %d" % (expected, post_count))
|
||||
|
||||
|
||||
class ApiTaskFlowClientTest(api.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(ApiTaskFlowClientTest, self).setUp()
|
||||
self.tf_client = tf_client.get_client_instance()
|
||||
|
||||
def test_create_cluster_api(self):
|
||||
"""This test verifies create cluster job is posted from REST API."""
|
||||
api_cluster = test_utils.create_api_test_cluster(size=1)
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
self.post_json('/clusters', params=api_cluster.as_dict(),
|
||||
headers=self.auth_headers, status=202)
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = pre_count + 1
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
"expected %d jobs in the jobboard after a post, "
|
||||
"got %d" % (expected, post_count))
|
||||
|
||||
def test_delete_cluster_api(self):
|
||||
"""This test verifies delete cluster job is posted from REST API."""
|
||||
cluster = test_utils.create_db_test_cluster_from_objects_api(
|
||||
self.context, name="test_cluster")
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
self.delete('/clusters/' + cluster.id, headers=self.auth_headers)
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = pre_count + 1
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
"expected %d jobs in the jobboard after a post, "
|
||||
"got %d" % (expected, post_count))
|
|
@ -0,0 +1,93 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import cue.tests.test_fixtures.base as base
|
||||
|
||||
|
||||
class TaskFlowClient(base.BaseFixture):
|
||||
"""A test fixture to simulate a TaskFlow client
|
||||
|
||||
This class is used in test cases to simulate a TaskFlow client connection
|
||||
in the absence of a working TaskFlow setup with zookeeper.
|
||||
"""
|
||||
|
||||
def __del__(self):
|
||||
return
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixture and apply all method overrides."""
|
||||
super(TaskFlowClient, self).setUp()
|
||||
|
||||
taskflow_client = self.mock('cue.taskflow.client.Client')
|
||||
taskflow_client.create_persistence = self.persistence
|
||||
taskflow_client.create_jobboard = self.jobboard
|
||||
taskflow_client.post = self.post
|
||||
|
||||
def persistence(self, conf=None, **kwargs):
|
||||
"""Mock'd version of TaskFlow Client ...persistence().
|
||||
|
||||
Creating a persistence backend.
|
||||
|
||||
:param conf: Configuration parameters for the persistence backend.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance (mocked).
|
||||
"""
|
||||
|
||||
backend = 'mocked_backend'
|
||||
|
||||
return backend
|
||||
|
||||
def jobboard(self, board_name, conf=None, persistence=None, **kwargs):
|
||||
"""Mock'd version of TaskFlow Client ...jobboard().
|
||||
|
||||
Factory method for creating a jobboard backend instance
|
||||
|
||||
:param board_name: Name of the jobboard
|
||||
:param conf: Configuration parameters for the jobboard backend.
|
||||
:param persistence: A persistence backend instance to be used with the
|
||||
jobboard.
|
||||
:param kwargs: Keyword arguments to be passed forward to the
|
||||
persistence backend constructor
|
||||
:return: A persistence backend instance (mocked).
|
||||
"""
|
||||
if board_name is None:
|
||||
board_name = 'sample_board_name'
|
||||
|
||||
jobboard = 'mocked_jobboard'
|
||||
|
||||
return jobboard
|
||||
|
||||
def post(self, flow_factory, job_args=None, flow_args=None,
|
||||
flow_kwargs=None, tx_uuid=None):
|
||||
"""Mock'd version of TaskFlow Client ...post().
|
||||
|
||||
Method for posting a new job to the jobboard
|
||||
|
||||
:param flow_factory: Flow factory function for creating a flow instance
|
||||
that will be executed as part of the job.
|
||||
:param job_args: 'store' arguments to be supplied to the engine
|
||||
executing the flow for the job
|
||||
:param flow_args: Positional arguments to be passed to the flow factory
|
||||
function
|
||||
:param flow_kwargs: Keyword arguments to be passed to the flow factory
|
||||
function
|
||||
:param tx_uuid: Transaction UUID which will be injected as 'tx_uuid' in
|
||||
job_args. A tx_uuid will be generated if one is not
|
||||
provided as an argument.
|
||||
:return: A taskflow.job.Job instance that represents the job that was
|
||||
posted.
|
||||
"""
|
||||
return
|
|
@ -20,6 +20,8 @@ import os_tasklib
|
|||
|
||||
|
||||
class CreateVolume(os_tasklib.BaseTask):
|
||||
default_provides = 'cinder_volume_id'
|
||||
|
||||
def execute(self, **kwargs):
|
||||
print("Create Cinder Volume")
|
||||
return "RANDOM_CINDER_VOLUME_ID"
|
||||
|
|
|
@ -20,6 +20,8 @@ import os_tasklib
|
|||
|
||||
|
||||
class CreateVm(os_tasklib.BaseTask):
|
||||
default_provides = 'nova_vm_id'
|
||||
|
||||
def execute(self, neutron_port_id, cinder_volume_id, **kwargs):
|
||||
#print self.os_client.servers.list()
|
||||
print("Create VM and attach %s port and %s volume" %
|
||||
|
|
|
@ -22,9 +22,9 @@ import os_tasklib
|
|||
|
||||
|
||||
class GetVmStatus(os_tasklib.BaseTask):
|
||||
def execute(self, vm_id, **kwargs):
|
||||
def execute(self, nova_vm_id, **kwargs):
|
||||
#print(self.nova_client.servers.list())
|
||||
print("Get VM Status for %s" % vm_id)
|
||||
print("Get VM Status for %s" % nova_vm_id)
|
||||
vm_status = random.choice(['BUILDING',
|
||||
'ACTIVE',
|
||||
'DELETED',
|
||||
|
|
Loading…
Reference in New Issue