Merge "Post Jobs - Integrate API with Backend (TaskFlow)"

This commit is contained in:
Jenkins 2015-02-13 22:02:59 +00:00 committed by Gerrit Code Review
commit 48369e0c92
16 changed files with 361 additions and 111 deletions

View File

@ -18,17 +18,27 @@
"""Version 1 of the Cue API
"""
import uuid
from cue.api.controllers import base
from cue.common import exception
from cue.common.i18n import _ # noqa
from cue.common.i18n import _LI # noqa
from cue import objects
from cue.openstack.common import log as logging
from cue.taskflow import client as task_flow_client
from cue.taskflow.flow import create_cluster
from cue.taskflow.flow import delete_cluster
from oslo.utils import uuidutils
import pecan
from pecan import rest
import wsme
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
LOG = logging.getLogger(__name__)
class EndPoint(base.APIBase):
"""Representation of an End point."""
@ -151,8 +161,23 @@ class ClusterController(rest.RestController):
def delete(self):
"""Delete this Cluster."""
context = pecan.request.context
# update cluster to deleting
objects.Cluster.update_cluster_deleting(context, self.id)
# prepare and post cluster delete job to backend
job_args = {
"cluster_id": self.id,
"cluster_status": "deleting",
}
job_client = task_flow_client.get_client_instance()
job_uuid = uuidutils.generate_uuid()
job_client.post(delete_cluster, job_args, tx_uuid=job_uuid)
LOG.info(_LI('Delete Cluster Request Cluster ID %(cluster_id)s Job ID '
'%(job_id)s') % ({"cluster_id": self.id,
"job_id": job_uuid}))
class ClustersController(rest.RestController):
"""Manages operations on Clusters of nodes."""
@ -186,10 +211,31 @@ class ClustersController(rest.RestController):
# create new cluster with node related data from user
new_cluster.create(context)
# retrieve cluster data
cluster = Cluster()
cluster.cluster = get_complete_cluster(context, new_cluster.id)
# prepare and post cluster create job to backend
job_args = {
"size": cluster.cluster.size,
"flavor": cluster.cluster.flavor,
"volume_size": cluster.cluster.volume_size,
"network_id": cluster.cluster.network_id,
"cluster_status": "BUILDING",
"node_id": "node_id",
"port_name": "port_" + str(uuid.uuid4()),
}
job_client = task_flow_client.get_client_instance()
job_uuid = uuidutils.generate_uuid()
job_client.post(create_cluster, job_args, tx_uuid=job_uuid)
LOG.info(_LI('Create Cluster Request Cluster ID %(cluster_id)s Cluster'
' size %(size)s network ID %(network_id)s Job ID '
'%(job_id)s') % ({"cluster_id": cluster.cluster.id,
"size": cluster.cluster.size,
"network_id": cluster.cluster.network_id,
"job_id": job_uuid}))
return cluster
@pecan.expose()

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import uuid
from oslo.config import cfg
@ -54,6 +55,95 @@ def _make_conf(backend_uri):
}
return conf
_task_flow_client = None
def get_client_instance(client_name=None, persistence=None, jobboard=None):
"""Create and access a single instance of TaskFlow client
:param client_name: Name of the client interacting with the jobboard
:param persistence: A persistence backend instance to be used in lieu
of auto-creating a backend instance based on
configuration parameters
:param jobboard: A jobboard backend instance to be used in lieu of
auto-creating a backend instance based on
configuration parameters
:return: A :class:`.Client` instance.
"""
global _task_flow_client
if _task_flow_client is None:
if persistence is None:
persistence = Client.create_persistence()
if jobboard is None:
jobboard = Client.create_jobboard(persistence=persistence)
if client_name is None:
client_name = "cue_job_client"
_task_flow_client = Client(client_name,
persistence=persistence,
jobboard=jobboard)
return _task_flow_client
def create_persistence(conf=None, **kwargs):
"""Factory method for creating a persistence backend instance
:param conf: Configuration parameters for the persistence backend. If
no conf is provided, zookeeper configuration parameters
for the job backend will be used to configure the
persistence backend.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if conf is None:
connection = cfg.CONF.taskflow.persistence_connection
if connection is None:
connection = ("zookeeper://%s/%s"
% (
cfg.CONF.taskflow.zk_hosts,
cfg.CONF.taskflow.zk_path,
))
conf = _make_conf(connection)
be = persistence_backends.fetch(conf=conf, **kwargs)
with contextlib.closing(be.get_connection()) as conn:
conn.upgrade()
return be
def create_jobboard(board_name=None, conf=None, persistence=None, **kwargs):
"""Factory method for creating a jobboard backend instance
:param board_name: Name of the jobboard
:param conf: Configuration parameters for the jobboard backend.
:param persistence: A persistence backend instance to be used with the
jobboard.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if board_name is None:
board_name = cfg.CONF.taskflow.jobboard_name
if conf is None:
conf = {'board': 'zookeeper'}
conf.update({
"path": "%s/jobs" % (cfg.CONF.taskflow.zk_path),
"hosts": cfg.CONF.taskflow.zk_hosts,
"timeout": cfg.CONF.taskflow.zk_timeout
})
jb = job_backends.fetch(
name=board_name,
conf=conf,
persistence=persistence,
**kwargs)
jb.connect()
return jb
class Client(object):
"""An abstraction for interacting with Taskflow
@ -61,6 +151,9 @@ class Client(object):
This class provides an abstraction for Taskflow to expose a simpler
interface for posting jobs to Taskflow Jobboards than what is provided
out of the box with Taskflow.
:ivar persistence: persistence backend instance
:ivar jobboard: jobboard backend instance
"""
def __init__(self, client_name, board_name=None, persistence=None,
@ -86,19 +179,19 @@ class Client(object):
self._client_name = client_name
self._persistence = persistence or Client.persistence(**kwargs)
self.persistence = persistence or create_persistence(**kwargs)
self._jobboard = jobboard or Client.jobboard(board_name,
None,
self._persistence,
**kwargs)
self.jobboard = jobboard or create_jobboard(board_name,
None,
self.persistence,
**kwargs)
def __del__(self):
"""Destructor for Client class."""
if self._jobboard is not None:
self._jobboard.close()
if self._persistence is not None:
self._persistence.close()
if self.jobboard is not None:
self.jobboard.close()
if self.persistence is not None:
self.persistence.close()
@classmethod
def create(cls, client_name, board_name=None, persistence=None,
@ -120,62 +213,6 @@ class Client(object):
return cls(client_name, board_name=board_name, persistence=persistence,
jobboard=jobboard, **kwargs)
@staticmethod
def persistence(conf=None, **kwargs):
"""Factory method for creating a persistence backend instance
:param conf: Configuration parameters for the persistence backend. If
no conf is provided, zookeeper configuration parameters
for the job backend will be used to configure the
persistence backend.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if conf is None:
connection = cfg.CONF.taskflow.persistence_connection
if connection is None:
connection = ("zookeeper://%s/%s"
% (
cfg.CONF.taskflow.zk_hosts,
cfg.CONF.taskflow.zk_path,
))
conf = _make_conf(connection)
be = persistence_backends.fetch(conf=conf, **kwargs)
return be
@staticmethod
def jobboard(board_name, conf=None, persistence=None, **kwargs):
"""Factory method for creating a jobboard backend instance
:param board_name: Name of the jobboard
:param conf: Configuration parameters for the jobboard backend.
:param persistence: A persistence backend instance to be used with the
jobboard.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if board_name is None:
board_name = cfg.CONF.taskflow.jobboard_name
if conf is None:
conf = {'board': 'zookeeper'}
conf.update({
"path": "%s/jobs" % (cfg.CONF.taskflow.zk_path),
"hosts": cfg.CONF.taskflow.zk_hosts,
"timeout": cfg.CONF.taskflow.zk_timeout
})
jb = job_backends.fetch(
name=board_name,
conf=conf,
persistence=persistence,
**kwargs)
jb.connect()
return jb
def post(self, flow_factory, job_args=None,
flow_args=None, flow_kwargs=None, tx_uuid=None):
"""Method for posting a new job to the jobboard
@ -214,13 +251,13 @@ class Client(object):
})
job_details['flow_uuid'] = flow_detail.uuid
self._persistence.get_connection().save_logbook(book)
self.persistence.get_connection().save_logbook(book)
engines.save_factory_details(
flow_detail, flow_factory, flow_args, flow_kwargs,
self._persistence)
self.persistence)
job = self._jobboard.post(job_name, book, details=job_details)
job = self.jobboard.post(job_name, book, details=job_details)
return job
def joblist(self, only_unclaimed=False, ensure_fresh=False):
@ -231,7 +268,7 @@ class Client(object):
Behavior of this parameter is backend specific.
:return: A list of jobs in the jobboard
"""
return list(self._jobboard.iterjobs(only_unclaimed=only_unclaimed,
return list(self.jobboard.iterjobs(only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh))
def delete(self, job=None, job_id=None):
@ -254,5 +291,5 @@ class Client(object):
if j.uuid == job_id:
job = j
self._jobboard.claim(job, self._client_name)
self._jobboard.consume(job, self._client_name)
self.jobboard.claim(job, self._client_name)
self.jobboard.consume(job, self._client_name)

View File

@ -14,3 +14,4 @@
# under the License.
from create_cluster import create_cluster # noqa
from delete_cluster import delete_cluster # noqa

View File

@ -31,7 +31,7 @@ def create_cluster():
cinder_task.CreateVolume(os_client=client.cinder_client(),
provides='cinder_volume_id'),
nova_task.CreateVm(os_client=client.nova_client(),
provides='vm_id'),
provides='nova_vm_id'),
linear_flow.Flow('wait for vm to become active',
retry=retry.Times(10)).add(
nova_task.GetVmStatus(os_client=client.nova_client(),

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.patterns.linear_flow as linear_flow
import cue.taskflow.task as cue_task
def delete_cluster():
flow = linear_flow.Flow('deleting cluster').add(
cue_task.UpdateClusterStatus(cue_client="cue client")
)
return flow

View File

@ -128,10 +128,10 @@ class ConductorService(object):
version_string, self._jobboard_name)
with contextlib.closing(
tf_client.Client.persistence(conf=self._persistence_conf)
tf_client.create_persistence(conf=self._persistence_conf)
) as persistence:
with contextlib.closing(
tf_client.Client.jobboard(
tf_client.create_jobboard(
board_name=self._jobboard_name,
conf=self._jobboard_conf,
persistence=persistence,

View File

@ -18,5 +18,5 @@ import base_task as base_task
class UpdateClusterStatus(base_task.BaseTask):
def execute(self, vm_status, **kwargs):
print("Update Cluster Status to %s" % vm_status)
def execute(self, cluster_status, **kwargs):
print("Update Cluster Status to %s" % cluster_status)

View File

@ -0,0 +1,30 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Authors: Davide Agnello <davide.agnello@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright [2014] Hewlett-Packard Development Company, L.P.
# limitations under the License.
import zake.fake_client as fake_client
import cue.taskflow.client as tf_client
_zk_client = fake_client.FakeClient()
persistence = tf_client.create_persistence(client=_zk_client)
jobboard = tf_client.create_jobboard("test_board",
persistence=persistence,
client=_zk_client)
tf_client = tf_client.get_client_instance(persistence=persistence,
jobboard=jobboard)

View File

@ -86,6 +86,7 @@ class TestGetCluster(api.FunctionalTest,
class TestDeleteCluster(api.FunctionalTest,
api_utils.ClusterValidationMixin):
def setUp(self):
super(TestDeleteCluster, self).setUp()

View File

@ -71,6 +71,7 @@ class TestListClusters(api.FunctionalTest,
class TestCreateCluster(api.FunctionalTest,
api_utils.ClusterValidationMixin):
def setUp(self):
super(TestCreateCluster, self).setUp()

View File

@ -36,8 +36,8 @@ class CreatePortTests(base.TestCase):
]
task_store = {
'network_id': "0",
'port_name': "port_0",
"network_id": "0",
"port_name": "port_0",
}
def test_create_port_invalid_network(self):
@ -51,26 +51,14 @@ class CreatePortTests(base.TestCase):
# generate a new UUID for an 'invalid' network_id
CreatePortTests.task_store['network_id'] = str(uuid.uuid4())
try:
engines.run(flow, store=CreatePortTests.task_store)
except exceptions.NetworkNotFoundClient:
"""this exception is expected for this test"""
except Exception as e:
self.fail("Unexpected exception was thrown: " + e.message)
else:
self.fail("NetworkNotFoundClient exception not thrown as expected")
self.assertRaises(exceptions.NetworkNotFoundClient, engines.run, flow,
store=CreatePortTests.task_store)
def test_create_port(self):
# retrieve neutron client API class
neutron_client = client.neutron_client()
# retrieve networks
#network_list = neutron_client.list_networks()
#self.assertNotEqual(len(network_list['networks']), 0,
# "No networks found")
# set a network_id and unique name to use
#network_id = network_list['networks'][0]['id']
network = neutron_client.create_network()
network_id = network['network']['id']
CreatePortTests.task_store['network_id'] = network_id

View File

@ -12,15 +12,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo.utils import uuidutils
import taskflow.patterns.linear_flow as linear_flow
import taskflow.task
import zake.fake_client as fake_client
import cue.taskflow.client as tf_client
from cue.tests import api
import cue.tests.base as base
from cue.tests import utils as test_utils
class TimesTwo(taskflow.task.Task):
@ -37,16 +37,7 @@ def create_flow():
class TaskflowClientTest(base.TestCase):
def setUp(self):
super(TaskflowClientTest, self).setUp()
self._zk_client = fake_client.FakeClient()
self.persistence = tf_client.Client.persistence(client=self._zk_client)
with contextlib.closing(self.persistence.get_connection()) as conn:
conn.upgrade()
self.jobboard = tf_client.Client.jobboard("test_board",
persistence=self.persistence,
client=self._zk_client)
self.tf_client = tf_client.Client("test_client",
persistence=self.persistence,
jobboard=self.jobboard)
self.tf_client = tf_client.get_client_instance()
def tearDown(self):
super(TaskflowClientTest, self).tearDown()
@ -57,9 +48,9 @@ class TaskflowClientTest(base.TestCase):
}
tx_uuid = uuidutils.generate_uuid()
pre_count = self.jobboard.job_count
pre_count = self.tf_client.jobboard.job_count
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
post_count = self.jobboard.job_count
post_count = self.tf_client.jobboard.job_count
expected = pre_count + 1
self.assertEqual(expected, post_count,
@ -79,11 +70,44 @@ class TaskflowClientTest(base.TestCase):
"Job in jobboard differs from job returned by "
"Client.post method")
pre_count = self.jobboard.job_count
pre_count = self.tf_client.jobboard.job_count
self.tf_client.delete(job=job)
post_count = self.jobboard.job_count
post_count = self.tf_client.jobboard.job_count
expected = pre_count - 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a claim, "
"got %d" % (expected, post_count))
class ApiTaskFlowClientTest(api.FunctionalTest):
def setUp(self):
super(ApiTaskFlowClientTest, self).setUp()
self.tf_client = tf_client.get_client_instance()
def test_create_cluster_api(self):
"""This test verifies create cluster job is posted from REST API."""
api_cluster = test_utils.create_api_test_cluster(size=1)
pre_count = self.tf_client.jobboard.job_count
self.post_json('/clusters', params=api_cluster.as_dict(),
headers=self.auth_headers, status=202)
post_count = self.tf_client.jobboard.job_count
expected = pre_count + 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a post, "
"got %d" % (expected, post_count))
def test_delete_cluster_api(self):
"""This test verifies delete cluster job is posted from REST API."""
cluster = test_utils.create_db_test_cluster_from_objects_api(
self.context, name="test_cluster")
pre_count = self.tf_client.jobboard.job_count
self.delete('/clusters/' + cluster.id, headers=self.auth_headers)
post_count = self.tf_client.jobboard.job_count
expected = pre_count + 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a post, "
"got %d" % (expected, post_count))

View File

@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cue.tests.test_fixtures.base as base
class TaskFlowClient(base.BaseFixture):
"""A test fixture to simulate a TaskFlow client
This class is used in test cases to simulate a TaskFlow client connection
in the absence of a working TaskFlow setup with zookeeper.
"""
def __del__(self):
return
def setUp(self):
"""Set up test fixture and apply all method overrides."""
super(TaskFlowClient, self).setUp()
taskflow_client = self.mock('cue.taskflow.client.Client')
taskflow_client.create_persistence = self.persistence
taskflow_client.create_jobboard = self.jobboard
taskflow_client.post = self.post
def persistence(self, conf=None, **kwargs):
"""Mock'd version of TaskFlow Client ...persistence().
Creating a persistence backend.
:param conf: Configuration parameters for the persistence backend.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance (mocked).
"""
backend = 'mocked_backend'
return backend
def jobboard(self, board_name, conf=None, persistence=None, **kwargs):
"""Mock'd version of TaskFlow Client ...jobboard().
Factory method for creating a jobboard backend instance
:param board_name: Name of the jobboard
:param conf: Configuration parameters for the jobboard backend.
:param persistence: A persistence backend instance to be used with the
jobboard.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance (mocked).
"""
if board_name is None:
board_name = 'sample_board_name'
jobboard = 'mocked_jobboard'
return jobboard
def post(self, flow_factory, job_args=None, flow_args=None,
flow_kwargs=None, tx_uuid=None):
"""Mock'd version of TaskFlow Client ...post().
Method for posting a new job to the jobboard
:param flow_factory: Flow factory function for creating a flow instance
that will be executed as part of the job.
:param job_args: 'store' arguments to be supplied to the engine
executing the flow for the job
:param flow_args: Positional arguments to be passed to the flow factory
function
:param flow_kwargs: Keyword arguments to be passed to the flow factory
function
:param tx_uuid: Transaction UUID which will be injected as 'tx_uuid' in
job_args. A tx_uuid will be generated if one is not
provided as an argument.
:return: A taskflow.job.Job instance that represents the job that was
posted.
"""
return

View File

@ -20,6 +20,8 @@ import os_tasklib
class CreateVolume(os_tasklib.BaseTask):
default_provides = 'cinder_volume_id'
def execute(self, **kwargs):
print("Create Cinder Volume")
return "RANDOM_CINDER_VOLUME_ID"

View File

@ -20,6 +20,8 @@ import os_tasklib
class CreateVm(os_tasklib.BaseTask):
default_provides = 'nova_vm_id'
def execute(self, neutron_port_id, cinder_volume_id, **kwargs):
#print self.os_client.servers.list()
print("Create VM and attach %s port and %s volume" %

View File

@ -22,9 +22,9 @@ import os_tasklib
class GetVmStatus(os_tasklib.BaseTask):
def execute(self, vm_id, **kwargs):
def execute(self, nova_vm_id, **kwargs):
#print(self.nova_client.servers.list())
print("Get VM Status for %s" % vm_id)
print("Get VM Status for %s" % nova_vm_id)
vm_status = random.choice(['BUILDING',
'ACTIVE',
'DELETED',