adding taskflow support with initial workflow

- Adding taskflow support
- Added initial workflow for create cluster

Change-Id: I21ca51f50f8d983ab787c9435cfd0d576c3ed7a2
This commit is contained in:
Min Pae 2015-01-14 14:23:32 -08:00
parent 0887839f44
commit 912d1e0772
26 changed files with 677 additions and 1 deletions

7
.gitignore vendored
View File

@ -60,3 +60,10 @@ target/
# Vagrant
.vagrant
# Rope
.ropeproject
#Virtualenv
.virtualenv
.venv

45
cue/client.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cinderclient.client as CinderClient
import neutronclient.neutron.client as NeutronClient
import novaclient.client as NovaClient
def nova_client():
return NovaClient.Client(2,
'admin',
'secrete',
'demo',
'http://192.168.41.183:5000/v2.0'
)
def cinder_client():
return CinderClient.Client('1',
'admin',
'secrete',
'demo',
'http://192.168.41.183:5000/v2.0'
)
def neutron_client():
return NeutronClient.Client('2.0',
username='admin',
password='secrete',
tenant_name='demo',
auth_url='http://192.168.41.183:5000/v2.0'
)

70
cue/cmd/monitor.py Normal file
View File

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
import oslo.config.cfg as cfg
from oslo_log import log
import taskflow.jobs.backends as job_backends
import taskflow.persistence.backends as persistence_backends
import cue.common.service as cue_service
PERSISTENCE_BACKEND_CONF = {
"connection": "zookeeper",
}
JOB_BACKEND_CONF = {
"board": "zookeeper",
}
CONF = cfg.CONF
def main():
cue_service.prepare_service(sys.argv)
LOG = log.getLogger(__name__)
with persistence_backends.backend(
PERSISTENCE_BACKEND_CONF.copy()
) as persistence:
with job_backends.backend(
'tutorial_simple',
{"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_simple"
},
persistence=persistence
) as board_simple:
with job_backends.backend(
'tutorial_conduct',
{"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_conduct"
},
persistence=persistence
) as board_conduct:
while True:
job_count = board_simple.job_count
job_count += board_conduct.job_count
LOG.info("%d outstanding jobs" % (job_count))
time.sleep(1)
if __name__ == "__main__":
sys.exit(main())

52
cue/cmd/worker.py Normal file
View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from taskflow.conductors import single_threaded
from taskflow.jobs import backends as job_backends
from taskflow.persistence import backends as persistence_backends
PERSISTENCE_BACKEND_CONF = {
#"connection": "mysql+pymysql://taskflow:taskflow@localhost/taskflow",
"connection": "zookeeper",
}
JOB_BACKEND_CONF = {
"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_conduct",
}
def main():
with persistence_backends.backend(
PERSISTENCE_BACKEND_CONF.copy()
) as persistence:
with job_backends.backend(
'tutorial_conduct',
JOB_BACKEND_CONF.copy(),
persistence=persistence
) as board:
conductor = single_threaded.SingleThreadedConductor(
"conductor name", board, persistence, engine='serial')
conductor.run()
if __name__ == "__main__":
sys.exit(main())

0
cue/taskflow/__init__.py Normal file
View File

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from create_cluster import create_cluster # noqa

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
import cue.client as client
import cue.taskflow.task as cue_task
import os_tasklib.cinder as cinder_task
import os_tasklib.common as common_task
import os_tasklib.neutron as neutron_task
import os_tasklib.nova as nova_task
def create_cluster():
flow = linear_flow.Flow('creating vm').add(
neutron_task.CreatePort(os_client=client.neutron_client(),
provides='neutron_port_id'),
cinder_task.CreateVolume(os_client=client.cinder_client(),
provides='cinder_volume_id'),
nova_task.CreateVm(os_client=client.nova_client(),
provides='vm_id'),
linear_flow.Flow('wait for vm to become active',
retry=retry.Times(10)).add(
nova_task.GetVmStatus(os_client=client.nova_client(),
provides='vm_status'),
common_task.CheckFor(rebind={'check_var': 'vm_status'},
check_value='ACTIVE',
timeout_seconds=1),
),
cue_task.UpdateClusterStatus(cue_client="cue client"),
)
return flow

View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from base_task import BaseTask # noqa
from update_cluster_status import UpdateClusterStatus # noqa

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.task
class BaseTask(taskflow.task.Task):
def __init__(self, cue_client, name=None, **kwargs):
self.cue_client = cue_client
super(BaseTask, self).__init__(name=name, **kwargs)

View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base_task as base_task
class UpdateClusterStatus(base_task.BaseTask):
def execute(self, vm_status, **kwargs):
print("Update Cluster Status to %s" % vm_status)

23
os_tasklib/__init__.py Normal file
View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.task as task
class BaseTask(task.Task):
def __init__(self, os_client, name=None, **kwargs):
self.os_client = os_client
super(BaseTask, self).__init__(name=name, **kwargs)

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from create_volume import CreateVolume # noqa

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'sputnik13'
import os_tasklib
class CreateVolume(os_tasklib.BaseTask):
def execute(self, **kwargs):
print("Create Cinder Volume")
return "RANDOM_CINDER_VOLUME_ID"
def revert(self, **kwargs):
print("Delete Cinder Volume %s" % kwargs['result'])

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from assert_task import Assert # noqa
from check_for import CheckFor # noqa

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import taskflow.task as task
class Assert(task.Task):
def __init__(self,
predicate,
arglist=None,
timeout_seconds=None,
timeout_ms=None,
name=None,
**kwargs):
self.predicate = predicate
self.sleep_time = 0
if timeout_seconds:
self.sleep_time = timeout_seconds
if timeout_ms:
self.sleep_time += timeout_ms / 1000.0
super(Assert, self).__init__(name=name, **kwargs)
def execute(self, *args, **kwargs):
print("execute")
assert(self.predicate(*args, **kwargs))
return (args, kwargs)
def revert(self, *args, **kwargs):
print("revert")
print("%s %s" % (args, kwargs))
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'sputnik13'
import time
import taskflow.task as task
class CheckFor(task.Task):
def __init__(self,
check_value,
timeout_seconds=None,
timeout_ms=None,
name=None,
**kwargs):
self.check_value = check_value
self.sleep_time = 0
if timeout_seconds:
self.sleep_time = timeout_seconds
if timeout_ms:
self.sleep_time += timeout_ms / 1000.0
super(CheckFor, self).__init__(name=name, **kwargs)
def execute(self, check_var, **kwargs):
if check_var == self.check_value:
return self.check_value
else:
raise AssertionError("expected %s, got %s" %
(self.check_value, check_var))
def revert(self, check_var, *args, **kwargs):
print("Check failed, expected %s, got %s" %
(self.check_value, check_var))
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
pass

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
pass

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from create_port import CreatePort # noqa

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'sputnik13'
import os_tasklib
class CreatePort(os_tasklib.BaseTask):
default_provides = 'neutron_port_id'
def execute(self, **kwargs):
print("Create Neutron Port")
return "RANDOM_NEUTRON_PORT_ID"
def revert(self, **kwargs):
print("Delete Neutron Port %s" % kwargs['result'])

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from create_vm import CreateVm # noqa
from get_vm_status import GetVmStatus # noqa

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'sputnik13'
import os_tasklib
class CreateVm(os_tasklib.BaseTask):
def execute(self, neutron_port_id, cinder_volume_id, **kwargs):
#print self.os_client.servers.list()
print("Create VM and attach %s port and %s volume" %
(neutron_port_id, cinder_volume_id))
return "RANDOM_VM_ID"
def revert(self, **kwargs):
print("Delete VM %s" % kwargs['result'])

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'sputnik13'
import random
import os_tasklib
class GetVmStatus(os_tasklib.BaseTask):
def execute(self, vm_id, **kwargs):
#print(self.nova_client.servers.list())
print("Get VM Status for %s" % vm_id)
vm_status = random.choice(['BUILDING',
'ACTIVE',
'DELETED',
'SUSPENDED',
'PAUSED',
'ERROR',
'STOPPED'])
return vm_status
def revert(self, **kwargs):
print(kwargs)

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
pass

View File

@ -24,4 +24,11 @@ WSME>=0.6
keystonemiddleware>=1.0.0
paramiko>=1.13.0
posix_ipc
taskflow>=0.4
taskflow>=0.6.1
-e git+https://github.com/python-zk/kazoo.git#egg=kazoo
#PyMySQL
python-keystoneclient
python-novaclient
python-cinderclient
python-neutronclient

View File

@ -21,12 +21,15 @@ classifier =
[files]
packages =
cue
os_tasklib
[entry_points]
console_scripts =
cue-api = cue.cmd.api:main
cue-dbsync = cue.cmd.dbsync:main
cue-manage = cue.cmd.manage:main
cue-monitor = cue.cmd.monitor:main
cue-worker = cue.cmd.worker:main
cue.database.migration_backend =
sqlalchemy = cue.db.sqlalchemy.migration