Merge "Add a new column and indexes to agent_heartbeats"

This commit is contained in:
Jenkins 2014-06-11 02:07:35 +00:00 committed by Gerrit Code Review
commit fc8932bb63
5 changed files with 330 additions and 1 deletions

View File

@ -0,0 +1,81 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.exc import OperationalError
from sqlalchemy.schema import Column
from sqlalchemy.schema import MetaData
from trove.db.sqlalchemy.migrate_repo.schema import Boolean
from trove.db.sqlalchemy.migrate_repo.schema import create_tables
from trove.db.sqlalchemy.migrate_repo.schema import DateTime
from trove.db.sqlalchemy.migrate_repo.schema import drop_tables
from trove.db.sqlalchemy.migrate_repo.schema import String
from trove.db.sqlalchemy.migrate_repo.schema import Table
from trove.openstack.common import log as logging
logger = logging.getLogger('trove.db.sqlalchemy.migrate_repo.schema')
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# new table with desired columns, indexes, and constraints
new_agent_heartbeats = Table(
'agent_heartbeats', meta,
Column('id', String(36), primary_key=True, nullable=False),
Column('instance_id', String(36),
nullable=False, unique=True, index=True),
Column('guest_agent_version', String(255), index=True),
Column('deleted', Boolean(), index=True),
Column('deleted_at', DateTime()),
Column('updated_at', DateTime(), nullable=False))
# original table from migration 005_heartbeat.py
previous_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)
try:
drop_tables([previous_agent_heartbeats])
except OperationalError as e:
logger.warn("This table may have been dropped by some other means.")
logger.warn(e)
create_tables([new_agent_heartbeats])
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# new table with desired columns, indexes, and constraints
new_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)
try:
drop_tables([new_agent_heartbeats])
except OperationalError as e:
logger.warn("This table may have been dropped by some other means.")
logger.warn(e)
# reset the migrate_engine
meta = MetaData()
meta.bind = migrate_engine
# original table from migration 005_heartbeat.py
previous_agent_heartbeats = Table(
'agent_heartbeats', meta, Column('id', String(36), primary_key=True,
nullable=False),
Column('instance_id', String(36), nullable=False),
Column('updated_at', DateTime()), extend_existing=True)
create_tables([previous_agent_heartbeats])

View File

@ -36,7 +36,8 @@ def persisted_models():
class AgentHeartBeat(dbmodels.DatabaseModelBase):
"""Defines the state of a Guest Agent."""
_data_fields = ['instance_id', 'updated_at']
_data_fields = ['instance_id', 'updated_at', 'guest_agent_version',
'deleted', 'deleted_at']
_table_name = 'agent_heartbeats'
def __init__(self, **kwargs):
@ -58,6 +59,32 @@ class AgentHeartBeat(dbmodels.DatabaseModelBase):
{'name': self.__class__.__name__, 'dict': self.__dict__})
return get_db_api().save(self)
@classmethod
def find_all_by_version(cls, guest_agent_version, deleted=0):
if guest_agent_version is None:
raise exception.ModelNotFoundError()
heartbeats = cls.find_all(guest_agent_version=guest_agent_version,
deleted=deleted)
if heartbeats is None or heartbeats.count() == 0:
raise exception.ModelNotFoundError(
guest_agent_version=guest_agent_version)
return heartbeats
@classmethod
def find_by_instance_id(cls, instance_id):
if instance_id is None:
raise exception.ModelNotFoundError(instance_id=instance_id)
try:
return cls.find_by(instance_id=instance_id)
except exception.NotFound as e:
LOG.error(e.message)
raise exception.ModelNotFoundError(instance_id=instance_id)
@staticmethod
def is_active(agent):
return (datetime.now() - agent.updated_at <

View File

@ -514,6 +514,7 @@ class BaseInstance(SimpleInstance):
_delete_resources)
def _delete_resources(self, deleted_at):
"""Implemented in subclass."""
pass
def delete_async(self):

View File

@ -20,6 +20,8 @@ Routes all the requests to the task manager.
from trove.common import cfg
from trove.common import exception
from trove.guestagent import models as agent_models
from trove.openstack.common.rpc import proxy
from trove.openstack.common import log as logging
@ -51,6 +53,14 @@ class API(proxy.RpcProxy):
"""Create the routing key for the taskmanager"""
return CONF.taskmanager_queue
def _delete_heartbeat(self, instance_id):
agent_heart_beat = agent_models.AgentHeartBeat()
try:
heartbeat = agent_heart_beat.find_by_instance_id(instance_id)
heartbeat.delete()
except exception.ModelNotFoundError as e:
LOG.error(e.message)
def resize_volume(self, new_size, instance_id):
LOG.debug("Making async call to resize volume for instance: %s"
% instance_id)
@ -86,6 +96,7 @@ class API(proxy.RpcProxy):
LOG.debug("Making async call to delete instance: %s" % instance_id)
self.cast(self.context,
self.make_msg("delete_instance", instance_id=instance_id))
self._delete_heartbeat(instance_id)
def create_backup(self, backup_info, instance_id):
LOG.debug("Making async call to create a backup for instance: %s" %

View File

@ -0,0 +1,209 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import uuid
from trove.common import exception
from trove.guestagent.models import AgentHeartBeat
from trove.tests.unittests.util import util
class AgentHeartBeatTest(testtools.TestCase):
def setUp(self):
super(AgentHeartBeatTest, self).setUp()
util.init_db()
def tearDown(self):
super(AgentHeartBeatTest, self).tearDown()
def test_create(self):
"""
Test the creation of a new agent heartbeat record
"""
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id)
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id,
heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNone(heartbeat.guest_agent_version)
def test_create_with_version(self):
"""
Test the creation of a new agent heartbeat record w/ guest version
"""
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id,
guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id,
heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
def test_find_by_instance_id(self):
"""
Test to retrieve a guest agents by it's id
"""
# create a unique record
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id, guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id, heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
# retrieve the record
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
def test_find_by_instance_id_none(self):
"""
Test to retrieve a guest agents when id is None
"""
heartbeat_found = None
exception_raised = False
try:
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=None)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeat_found)
self.assertTrue(exception_raised)
def test_find_by_instance_id_not_found(self):
"""
Test to retrieve a guest agents when id is not found
"""
instance_id = str(uuid.uuid4())
heartbeat_found = None
exception_raised = False
try:
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeat_found)
self.assertTrue(exception_raised)
def test_find_all_by_version(self):
"""
Test to retrieve all guest agents with a particular version
"""
# create some unique records with the same version
version = str(uuid.uuid4())
for x in xrange(5):
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id,
guest_agent_version=version,
deleted=0)
self.assertIsNotNone(heartbeat)
# get all guests by version
heartbeats = AgentHeartBeat.find_all_by_version(version)
self.assertIsNotNone(heartbeats)
self.assertEqual(5, heartbeats.count())
def test_find_all_by_version_none(self):
"""
Test to retrieve all guest agents with a None version
"""
heartbeats = None
exception_raised = False
try:
heartbeats = AgentHeartBeat.find_all_by_version(None)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeats)
self.assertTrue(exception_raised)
def test_find_all_by_version_not_found(self):
"""
Test to retrieve all guest agents with a non-existing version
"""
version = str(uuid.uuid4())
exception_raised = False
heartbeats = None
try:
heartbeats = AgentHeartBeat.find_all_by_version(version)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeats)
self.assertTrue(exception_raised)
def test_update_heartbeat(self):
"""
Test to show the upgrade scenario that will be used by conductor
"""
# create a unique record
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id, guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id, heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
# retrieve the record
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
# update
AgentHeartBeat().update(id=heartbeat_found.id,
instance_id=instance_id,
guest_agent_version="1.2.3")
# retrieve the record
updated_heartbeat = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(updated_heartbeat)
self.assertEqual(updated_heartbeat.id, heartbeat.id)
self.assertEqual(updated_heartbeat.instance_id, heartbeat.instance_id)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)