Merge "Move updating provision progress to conductor"

This commit is contained in:
Jenkins 2015-03-19 11:15:21 +00:00 committed by Gerrit Code Review
commit 9f709c912a
15 changed files with 172 additions and 197 deletions

View File

@ -442,31 +442,24 @@ class LocalApi(object):
# Events ops
def cluster_provision_step_add(self, context, cluster_id, values):
"""Create a cluster assigned ProvisionStep
from the values dictionary
"""
"""Create a provisioning step assigned to cluster from values dict."""
return self._manager.cluster_provision_step_add(
context, cluster_id, values)
def cluster_provision_step_update(self, context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
self._manager.cluster_provision_step_update(
context, provision_step, values)
def cluster_provision_step_get_events(self, context, provision_step):
"""Return all events from the specified ProvisionStep."""
return self._manager.cluster_provision_step_get_events(
def cluster_provision_step_update(self, context, provision_step):
"""Update the cluster provisioning step."""
return self._manager.cluster_provision_step_update(
context, provision_step)
def cluster_provision_step_remove_events(self, context, provision_step):
"""Delete all event from the specified ProvisionStep."""
self._manager.cluster_provision_step_remove_events(
context, provision_step)
def cluster_provision_progress_update(self, context, cluster_id):
"""Return cluster with provision progress updated field."""
return self._manager.cluster_provision_progress_update(
context, cluster_id)
def cluster_event_add(self, context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
self._manager.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return self._manager.cluster_event_add(
context, provision_step, values)
class RemoteApi(LocalApi):

View File

@ -473,25 +473,17 @@ class ConductorManager(db_base.Base):
# Events ops
def cluster_provision_step_add(self, context, cluster_id, values):
"""Create a cluster assigned ProvisionStep
from the values dictionary
"""
"""Create a provisioning step assigned to cluster from values dict."""
return self.db.cluster_provision_step_add(context, cluster_id, values)
def cluster_provision_step_update(self, context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
self.db.cluster_provision_step_update(context, provision_step, values)
def cluster_provision_step_update(self, context, provision_step):
"""Update the cluster provisioning step."""
return self.db.cluster_provision_step_update(context, provision_step)
def cluster_provision_step_get_events(self, context, provision_step):
"""Return all events from the specified ProvisionStep."""
return self.db.cluster_provision_step_get_events(
context, provision_step)
def cluster_provision_step_remove_events(self, context, provision_step):
"""Delete all event from the specified ProvisionStep."""
self.db.cluster_provision_step_remove_events(context, provision_step)
def cluster_provision_progress_update(self, context, cluster_id):
"""Return cluster with provision progress updated field."""
return self.db.cluster_provision_progress_update(context, cluster_id)
def cluster_event_add(self, context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
self.db.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return self.db.cluster_event_add(context, provision_step, values)

View File

@ -293,11 +293,8 @@ class ClusterProvisionStep(object):
tenant_id
step_name
step_type
completed
total
successful
started_at
completed_at
events - list of Events objects assigned to the cluster
"""

View File

@ -274,12 +274,13 @@ def sleep(seconds=0):
class InstanceInfo(object):
def __init__(self, cluster_id=None, instance_id=None, instance_name=None,
node_group_id=None, step_type=None):
node_group_id=None, step_type=None, step_id=None):
self.cluster_id = cluster_id
self.instance_id = instance_id
self.instance_name = instance_name
self.node_group_id = node_group_id
self.step_type = step_type
self.step_id = step_id
def set_step_type(step_type):
@ -291,6 +292,8 @@ class InstanceInfoManager(object):
self.prev_instance_info = current().current_instance_info
if not instance_info.step_type:
instance_info.step_type = self.prev_instance_info.step_type
if not instance_info.step_id:
instance_info.step_id = self.prev_instance_info.step_id
current().current_instance_info = instance_info
def __enter__(self):

View File

@ -110,7 +110,10 @@ def to_dict(func):
def cluster_get(context, cluster, show_progress=False):
"""Return the cluster or None if it does not exist."""
cluster = IMPL.cluster_get(context, cluster)
if show_progress:
cluster = IMPL.cluster_provision_progress_update(context, cluster)
else:
cluster = IMPL.cluster_get(context, cluster)
if cluster:
return cluster.to_dict(show_progress)
return None
@ -446,27 +449,24 @@ def job_binary_internal_get_raw_data(context, job_binary_internal_id):
return IMPL.job_binary_internal_get_raw_data(context,
job_binary_internal_id)
# Events ops
def cluster_provision_step_add(context, cluster_id, values):
"""Create a cluster assigned ProvisionStep from the values dictionary."""
return IMPL.cluster_provision_step_add(context, cluster_id, values)
def cluster_provision_step_update(context, provision_step, values):
"""Update the ProvisionStep from the values dictionary."""
IMPL.cluster_provision_step_update(context, provision_step, values)
def cluster_provision_step_update(context, step_id):
"""Updates provision step."""
return IMPL.cluster_provision_step_update(context, step_id)
def cluster_provision_step_get_events(context, provision_step):
"""Return all events from the specified ProvisionStep."""
return IMPL.cluster_provision_step_get_events(context, provision_step)
def cluster_provision_step_remove_events(context, provision_step):
"""Delete all event from the specified ProvisionStep."""
IMPL.cluster_provision_step_remove_events(context, provision_step)
def cluster_provision_progress_update(context, cluster_id):
"""Return cluster with provision progress updated field."""
return IMPL.cluster_provision_progress_update(context, cluster_id)
def cluster_event_add(context, provision_step, values):
"""Assign new event to the specified ProvisionStep."""
IMPL.cluster_event_add(context, provision_step, values)
"""Assign new event to the specified provision step."""
return IMPL.cluster_event_add(context, provision_step, values)

View File

@ -0,0 +1,44 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""remove redandunt progress ops
Revision ID: 020
Revises: 019
Create Date: 2015-02-26 15:01:41.015076
"""
# revision identifiers, used by Alembic.
revision = '020'
down_revision = '019'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_column('cluster_provision_steps', 'completed_at')
op.drop_column('cluster_provision_steps', 'completed')
op.drop_column('cluster_provision_steps', 'started_at')
def downgrade():
op.add_column('cluster_provision_steps',
sa.Column('completed', sa.Integer(), nullable=True))
op.add_column('cluster_provision_steps',
sa.Column('started_at', sa.DateTime(), nullable=True))
op.add_column('cluster_provision_steps',
sa.Column('completed_at', sa.DateTime(), nullable=True))

View File

@ -1114,6 +1114,22 @@ def _cluster_provision_step_get(context, session, provision_step_id):
return query.filter_by(id=provision_step_id).first()
def _cluster_provision_step_update(context, session, step_id):
step = _cluster_provision_step_get(context, session, step_id)
if step is None:
raise ex.NotFoundException(
step_id,
_("Cluster Provision Step id '%s' not found!"))
if step.successful is not None:
return
if len(step.events) == step.total:
for event in step.events:
session.delete(event)
step.update({'successful': True})
def cluster_provision_step_add(context, cluster_id, values):
session = get_session()
@ -1132,64 +1148,29 @@ def cluster_provision_step_add(context, cluster_id, values):
return provision_step.id
def cluster_provision_step_update(context, provision_step_id, values):
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
provision_step.update(values)
def cluster_provision_step_get_events(context, provision_step_id):
def cluster_provision_step_update(context, step_id):
if CONF.disable_event_log:
return
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
return provision_step.events
_cluster_provision_step_update(context, session, step_id)
def cluster_provision_step_remove_events(context, provision_step_id):
def cluster_provision_progress_update(context, cluster_id):
if CONF.disable_event_log:
return _cluster_get(context, get_session(), cluster_id)
session = get_session()
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
cluster = _cluster_get(context, session, cluster_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
for event in provision_step.events:
session.delete(event)
def cluster_provision_step_remove(context, provision_step_id):
session = get_session()
cluster_provision_step_remove_events(context, provision_step_id)
with session.begin():
provision_step = _cluster_provision_step_get(
context, session, provision_step_id)
if not provision_step:
raise ex.NotFoundException(
provision_step_id,
_("Cluster Provision Step id '%s' not found!"))
session.delete(provision_step)
if cluster is None:
raise ex.NotFoundException(cluster_id,
_("Cluster id '%s' not found!"))
for step in cluster.provision_progress:
if step.successful is None:
_cluster_provision_step_update(context, session, step.id)
result_cluster = _cluster_get(context, session, cluster_id)
return result_cluster
def cluster_event_add(context, step_id, values):
@ -1206,6 +1187,8 @@ def cluster_event_add(context, step_id, values):
event = m.ClusterEvent()
values['step_id'] = step_id
if not values['successful']:
provision_step.update({'successful': False})
event.update(values)
session.add(event)

View File

@ -416,11 +416,8 @@ class ClusterProvisionStep(mb.SaharaBase):
tenant_id = sa.Column(sa.String(36))
step_name = sa.Column(sa.String(80))
step_type = sa.Column(sa.String(36))
completed = sa.Column(sa.Integer)
total = sa.Column(sa.Integer)
successful = sa.Column(sa.Boolean, nullable=True)
started_at = sa.Column(sa.DateTime())
completed_at = sa.Column(sa.DateTime())
events = relationship('ClusterEvent', cascade="all,delete",
backref='ClusterProvisionStep',
lazy='joined')

View File

@ -147,6 +147,8 @@ def _make_periodic_tasks():
continue
terminate_cluster(ctx, cluster, description='transient')
# Add event log info cleanup
context.ctx().current_instance_info = context.InstanceInfo()
context.set_ctx(None)
@periodic_task.periodic_task(spacing=zombie_task_spacing)
@ -185,7 +187,8 @@ def _make_periodic_tasks():
continue
terminate_cluster(ctx, cluster, description='incomplete')
# Add event log info cleanup
context.ctx().current_instance_info = context.InstanceInfo()
context.set_ctx(None)
return SaharaPeriodicTasks()

View File

@ -13,8 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from sahara import conductor
from sahara import context
from sahara import exceptions
from sahara.tests.unit import base
from sahara.utils import general as gu
@ -116,6 +120,19 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
ng = gu.get_by_id(cluster.node_groups, ng_id)
self.assertEqual(ng.instances[0].instance_name, 'tst123')
def _get_events(self, ctx, cluster_id, step_id=None):
cluster = self.api.cluster_get(ctx, cluster_id, show_progress=True)
events = []
for step in cluster.provision_progress:
if step_id == step['id']:
return step['events']
else:
events += step['events']
if step_id:
return events
else:
return []
def test_events_ops(self):
ctx, cluster = self._make_sample()
@ -138,20 +155,6 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
self.assertEqual(st_type, provision_step['step_type'])
self.assertEqual(cluster.id, provision_step['cluster_id'])
# test provision step updating
self.api.cluster_provision_step_update(ctx, step_id, {
'total': 100,
'completed': 59,
})
ncluster = self.api.cluster_get(ctx, cluster.id)
self.assertEqual(1, len(ncluster['provision_progress']))
provision_step = ncluster['provision_progress'][0]
self.assertEqual(100, provision_step['total'])
self.assertEqual(59, provision_step['completed'])
# test adding event to step and getting events from step
self.api.cluster_event_add(ctx, step_id, {
@ -162,14 +165,13 @@ class TestConductorClusterApi(base.SaharaWithDbTestCase):
'successful': True
})
events = self.api.cluster_provision_step_get_events(ctx, step_id)
events = self._get_events(ctx, cluster.id, step_id)
self.assertEqual(1, len(events))
self.assertEqual(st_name, events[0].instance_name)
self.assertEqual(True, events[0].successful)
self.assertEqual(st_info, events[0].event_info)
# test removing events from step
self.api.cluster_destroy(ctx, cluster.id)
self.api.cluster_provision_step_remove_events(ctx, step_id)
events = self.api.cluster_provision_step_get_events(ctx, step_id)
self.assertEqual(0, len(events))
with testtools.ExpectedException(exceptions.NotFoundException):
self._get_events(ctx, cluster.id, step_id)

View File

@ -456,6 +456,14 @@ class SaharaMigrationsCheckers(object):
self.assertColumnExists(engine, 'node_group_templates', 'is_default')
self.assertColumnExists(engine, 'cluster_templates', 'is_default')
def _check_020(self, engine, data):
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'completed')
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'completed_at')
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'started_at')
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -83,15 +83,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.volumes._await_attach_volumes')
@mock.patch('sahara.service.volumes._create_attach_volume')
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
@mock.patch('sahara.utils.cluster_progress_ops.update_provisioning_steps')
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
def test_attach(self, add_step, update_step, add_event,
def test_attach(self, add_step, add_event,
p_create_attach_vol, p_await, p_mount):
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
p_await.return_value = None
p_mount.return_value = None
add_event.return_value = None
update_step.return_value = None
add_step.return_value = None
instance1 = {'id': '1',

View File

@ -54,7 +54,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": True
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
# check that we have correct provision step
@ -62,7 +62,6 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
result_step = result_cluster.provision_progress[0]
self.assertEqual(None, result_step.successful)
self.assertEqual(1, result_step.completed)
# check updating in case of successful provision step
@ -71,13 +70,12 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": True
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
result_cluster = self.api.cluster_get(ctx, cluster.id)
result_step = result_cluster.provision_progress[0]
self.assertEqual(True, result_step.successful)
self.assertEqual(2, result_step.completed)
# check updating in case of failed provision step
@ -91,7 +89,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
"successful": False,
})
cpo.update_provisioning_steps(cluster.id)
self.api.cluster_provision_progress_update(ctx, cluster.id)
result_cluster = self.api.cluster_get(ctx, cluster.id)
@ -100,11 +98,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
self.assertEqual(False, step.successful)
# check that it's possible to add provision step after failed step
step_id3 = self.api.cluster_provision_step_add(ctx, cluster.id, {
"step_name": "some_name",
"total": 2,
})
step_id3 = cpo.add_provisioning_step(cluster.id, "some_name", 2)
self.assertEqual(
step_id3, cpo.get_current_provisioning_step(cluster.id))
@ -114,9 +108,11 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
step_id1 = self.api.cluster_provision_step_add(ctx, cluster.id, {
'step_name': "some_name1",
'total': 3,
})
step_id2 = self.api.cluster_provision_step_add(ctx, cluster.id, {
'step_name': "some_name",
'total': 2,
})
self.api.cluster_event_add(ctx, step_id1, {

View File

@ -39,6 +39,9 @@ CONF.register_opts(event_log_opts)
def add_successful_event(instance):
if CONF.disable_event_log:
return
cluster_id = instance.cluster_id
step_id = get_current_provisioning_step(cluster_id)
if step_id:
@ -49,10 +52,12 @@ def add_successful_event(instance):
'instance_name': instance.instance_name,
'event_info': None,
})
update_provisioning_steps(cluster_id)
def add_fail_event(instance, exception):
if CONF.disable_event_log:
return
cluster_id = instance.cluster_id
step_id = get_current_provisioning_step(cluster_id)
event_info = six.text_type(exception)
@ -65,16 +70,18 @@ def add_fail_event(instance, exception):
'instance_name': instance.instance_name,
'event_info': event_info,
})
update_provisioning_steps(cluster_id)
def add_provisioning_step(cluster_id, step_name, total):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return
update_provisioning_steps(cluster_id)
prev_step = get_current_provisioning_step(cluster_id)
if prev_step:
conductor.cluster_provision_step_update(context.ctx(), prev_step)
step_type = context.ctx().current_instance_info.step_type
return conductor.cluster_provision_step_add(
new_step = conductor.cluster_provision_step_add(
context.ctx(), cluster_id, {
'step_name': step_name,
'step_type': step_type,
@ -82,64 +89,15 @@ def add_provisioning_step(cluster_id, step_name, total):
'total': total,
'started_at': timeutils.utcnow(),
})
context.current().current_instance_info.step_id = new_step
return new_step
def get_current_provisioning_step(cluster_id):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return None
update_provisioning_steps(cluster_id)
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
for step in cluster.provision_progress:
if step.successful is not None:
continue
return step.id
return None
def update_provisioning_steps(cluster_id):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
return
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
for step in cluster.provision_progress:
if step.successful is not None:
continue
has_failed = False
successful_events_count = 0
events = conductor.cluster_provision_step_get_events(
ctx, step.id)
for event in events:
if event.successful:
successful_events_count += 1
else:
has_failed = True
successful = None
if has_failed:
successful = False
elif successful_events_count == step.total:
successful = True
completed_at = None
if successful and not step.completed_at:
completed_at = timeutils.utcnow()
conductor.cluster_provision_step_update(ctx, step.id, {
'completed': successful_events_count,
'successful': successful,
'completed_at': completed_at,
})
if successful:
conductor.cluster_provision_step_remove_events(
ctx, step.id)
current_instance_info = context.ctx().current_instance_info
return current_instance_info.step_id
def event_wrapper(mark_successful_on_exit, **spec):

View File

@ -102,6 +102,7 @@ def change_cluster_status(cluster, status, status_description=None):
update_dict["status_description"] = status_description
cluster = conductor.cluster_update(ctx, cluster, update_dict)
conductor.cluster_provision_progress_update(ctx, cluster.id)
LOG.info(_LI("Cluster status has been changed: id={id}, New status="
"{status}").format(id=cluster.id, status=cluster.status))