Add 'state' column in 'services' table.

When manila services are stopped or restarted via stop(), the DB
entries are not deleted, they are destroyed only in kill() method. In
cluster deployments, where multiple instances of manila services are
deployed via PODs, unique hostname is derived from node name. However
if pods are deployed again and launched on new hosts/nodes, the old
entries of manila service remains as it is.
Fix it by adding 'state' column in 'services' table and introducing
per service cleanup function. On service stop, state is changed to
'stopped' and cleanup function will delete 'stopped' services unless
they are 'up' again before cleanup periodic interval.

Closes-bug: #1990839
Change-Id: I8b71c4c27ff8fcb25616a95a5ed8362a7f4ffc61
This commit is contained in:
Kiran Pawar 2022-09-26 14:24:29 +00:00
parent 793c5c362e
commit 98be6376b2
12 changed files with 193 additions and 15 deletions

View File

@ -19,7 +19,6 @@ import webob.exc
from manila.api.openstack import wsgi from manila.api.openstack import wsgi
from manila.api.views import services as services_views from manila.api.views import services as services_views
from manila import db from manila import db
from manila import utils
class ServiceMixin(object): class ServiceMixin(object):
@ -47,7 +46,7 @@ class ServiceMixin(object):
'host': service['host'], 'host': service['host'],
'zone': service['availability_zone']['name'], 'zone': service['availability_zone']['name'],
'status': 'disabled' if service['disabled'] else 'enabled', 'status': 'disabled' if service['disabled'] else 'enabled',
'state': 'up' if utils.service_is_up(service) else 'down', 'state': service['state'],
'updated_at': service['updated_at'], 'updated_at': service['updated_at'],
} }
services.append(service) services.append(service)

View File

@ -58,8 +58,10 @@ class DataManager(manager.Manager):
def __init__(self, service_name=None, *args, **kwargs): def __init__(self, service_name=None, *args, **kwargs):
super(DataManager, self).__init__(*args, **kwargs) super(DataManager, self).__init__(*args, **kwargs)
self.busy_tasks_shares = {} self.busy_tasks_shares = {}
self.service_id = None
def init_host(self): def init_host(self, service_id=None):
self.service_id = service_id
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
shares = self.db.share_get_all(ctxt) shares = self.db.share_get_all(ctxt)
for share in shares: for share in shares:

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add state column for service
Revision ID: aebe2a413e13
Revises: ac0620cbe74d
Create Date: 2023-01-10 11:43:24.741726
"""
# revision identifiers, used by Alembic.
revision = 'aebe2a413e13'
down_revision = 'ac0620cbe74d'
from alembic import op
from oslo_log import log
import sqlalchemy as sa
LOG = log.getLogger(__name__)
def upgrade():
try:
op.add_column(
'services',
sa.Column('state', sa.String(36), nullable=True))
except Exception:
LOG.error("services table column state not added")
raise
def downgrade():
try:
op.drop_column('services', 'state')
except Exception:
LOG.error("services table column state not dropped")
raise

View File

@ -63,6 +63,7 @@ class Service(BASE, ManilaBase):
host = Column(String(255)) # , ForeignKey('hosts.id')) host = Column(String(255)) # , ForeignKey('hosts.id'))
binary = Column(String(255)) binary = Column(String(255))
topic = Column(String(255)) topic = Column(String(255))
state = Column(String(36))
report_count = Column(Integer, nullable=False, default=0) report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False) disabled = Column(Boolean, default=False)
availability_zone_id = Column(String(36), availability_zone_id = Column(String(36),

View File

@ -100,23 +100,25 @@ class Manager(base.Base, PeriodicTasks):
"""Tasks to be run at a periodic interval.""" """Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error) return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self): def init_host(self, service_id=None):
"""Handle initialization if this is a standalone service. """Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method. other components. Child classes should override this method.
:param service_id: ID of the service where the manager is running.
""" """
pass pass
def init_host_with_rpc(self): def init_host_with_rpc(self, service_id=None):
"""A hook for service to do jobs after RPC is ready. """A hook for service to do jobs after RPC is ready.
Like init_host(), this method is a hook where services get a chance Like init_host(), this method is a hook where services get a chance
to execute tasks that *need* RPC. Child classes should override to execute tasks that *need* RPC. Child classes should override
this method. this method.
:param service_id: ID of the service where the manager is running.
""" """
pass pass

View File

@ -87,8 +87,10 @@ class SchedulerManager(manager.Manager):
self.driver = importutils.import_object(scheduler_driver) self.driver = importutils.import_object(scheduler_driver)
self.message_api = message_api.API() self.message_api = message_api.API()
super(SchedulerManager, self).__init__(*args, **kwargs) super(SchedulerManager, self).__init__(*args, **kwargs)
self.service_id = None
def init_host_with_rpc(self): def init_host_with_rpc(self, service_id=None):
self.service_id = service_id
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
self.request_service_capabilities(ctxt) self.request_service_capabilities(ctxt)

View File

@ -33,6 +33,7 @@ from manila import coordination
from manila import db from manila import db
from manila import exception from manila import exception
from manila import rpc from manila import rpc
from manila import utils
from manila import version from manila import version
osprofiler_initializer = importutils.try_import('osprofiler.initializer') osprofiler_initializer = importutils.try_import('osprofiler.initializer')
@ -45,6 +46,10 @@ service_opts = [
cfg.IntOpt('report_interval', cfg.IntOpt('report_interval',
default=10, default=10,
help='Seconds between nodes reporting state to datastore.'), help='Seconds between nodes reporting state to datastore.'),
cfg.IntOpt('cleanup_interval',
min=300,
default=1800,
help='Seconds between cleaning up the stopped nodes.'),
cfg.IntOpt('periodic_interval', cfg.IntOpt('periodic_interval',
default=60, default=60,
help='Seconds between running periodic tasks.'), help='Seconds between running periodic tasks.'),
@ -121,6 +126,7 @@ class Service(service.Service):
*args, **kwargs) *args, **kwargs)
self.availability_zone = self.manager.availability_zone self.availability_zone = self.manager.availability_zone
self.report_interval = report_interval self.report_interval = report_interval
self.cleanup_interval = CONF.cleanup_interval
self.periodic_interval = periodic_interval self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs self.saved_args, self.saved_kwargs = args, kwargs
@ -134,20 +140,22 @@ class Service(service.Service):
LOG.info('Starting %(topic)s node (version %(version_string)s)', LOG.info('Starting %(topic)s node (version %(version_string)s)',
{'topic': self.topic, 'version_string': version_string}) {'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False self.model_disconnected = False
self.manager.init_host()
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
if self.coordinator:
coordination.LOCK_COORDINATOR.start()
try: try:
service_ref = db.service_get_by_args(ctxt, service_ref = db.service_get_by_args(ctxt,
self.host, self.host,
self.binary) self.binary)
self.service_id = service_ref['id'] self.service_id = service_ref['id']
db.service_update(ctxt, self.service_id, {'state': 'down'})
except exception.NotFound: except exception.NotFound:
self._create_service_ref(ctxt) self._create_service_ref(ctxt)
self.manager.init_host(service_id=self.service_id)
if self.coordinator:
coordination.LOCK_COORDINATOR.start()
LOG.debug("Creating RPC server for service %s.", self.topic) LOG.debug("Creating RPC server for service %s.", self.topic)
target = messaging.Target(topic=self.topic, server=self.host) target = messaging.Target(topic=self.topic, server=self.host)
@ -162,6 +170,9 @@ class Service(service.Service):
self.tg.add_timer(self.report_interval, self.report_state, self.tg.add_timer(self.report_interval, self.report_state,
initial_delay=self.report_interval) initial_delay=self.report_interval)
self.tg.add_timer(self.cleanup_interval, self.cleanup_services,
initial_delay=self.cleanup_interval)
if self.periodic_interval: if self.periodic_interval:
if self.periodic_fuzzy_delay: if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay) initial_delay = random.randint(0, self.periodic_fuzzy_delay)
@ -176,6 +187,7 @@ class Service(service.Service):
'host': self.host, 'host': self.host,
'binary': self.binary, 'binary': self.binary,
'topic': self.topic, 'topic': self.topic,
'state': 'up',
'report_count': 0, 'report_count': 0,
'availability_zone': self.availability_zone 'availability_zone': self.availability_zone
} }
@ -242,6 +254,8 @@ class Service(service.Service):
except Exception: except Exception:
pass pass
db.service_update(context.get_admin_context(),
self.service_id, {'state': 'stopped'})
if self.coordinator: if self.coordinator:
try: try:
coordination.LOCK_COORDINATOR.stop() coordination.LOCK_COORDINATOR.stop()
@ -287,8 +301,12 @@ class Service(service.Service):
service_ref['availability_zone']['name']): service_ref['availability_zone']['name']):
state_catalog['availability_zone'] = self.availability_zone state_catalog['availability_zone'] = self.availability_zone
db.service_update(ctxt, if utils.service_is_up(service_ref):
self.service_id, state_catalog) state_catalog['state'] = 'up'
else:
if service_ref['state'] != 'stopped':
state_catalog['state'] = 'down'
db.service_update(ctxt, self.service_id, state_catalog)
# TODO(termie): make this pattern be more elegant. # TODO(termie): make this pattern be more elegant.
if getattr(self, 'model_disconnected', False): if getattr(self, 'model_disconnected', False):
@ -301,6 +319,22 @@ class Service(service.Service):
self.model_disconnected = True self.model_disconnected = True
LOG.exception('model server went away') LOG.exception('model server went away')
def cleanup_services(self):
"""Remove the stopped services of same topic from the datastore."""
ctxt = context.get_admin_context()
try:
services = db.service_get_all(ctxt, self.topic)
except exception.NotFound:
LOG.debug('The service database object disappeared,'
'Exiting from cleanup.')
return
for svc in services:
if (svc['topic'] == self.topic and
svc['state'] == 'stopped' and
not utils.service_is_up(svc)):
db.service_destroy(ctxt, svc['id'])
class WSGIService(service.ServiceBase): class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration.""" """Provides ability to launch API from a 'paste' configuration."""

View File

@ -297,6 +297,7 @@ class ShareManager(manager.SchedulerDependentManager):
self.driver = profiler.trace_cls("driver")(self.driver) self.driver = profiler.trace_cls("driver")(self.driver)
self.hooks = [] self.hooks = []
self._init_hook_drivers() self._init_hook_drivers()
self.service_id = None
def _init_hook_drivers(self): def _init_hook_drivers(self):
# Try to initialize hook driver(s). # Try to initialize hook driver(s).
@ -334,9 +335,10 @@ class ShareManager(manager.SchedulerDependentManager):
return pool return pool
@add_hooks @add_hooks
def init_host(self): def init_host(self, service_id=None):
"""Initialization for a standalone service.""" """Initialization for a standalone service."""
self.service_id = service_id
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
driver_host_pair = "{}@{}".format( driver_host_pair = "{}@{}".format(
self.driver.__class__.__name__, self.driver.__class__.__name__,

View File

@ -37,6 +37,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila1'}, 'availability_zone': {'name': 'manila1'},
'id': 1, 'id': 1,
'disabled': True, 'disabled': True,
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2), 'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27), 'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27),
}, },
@ -46,6 +47,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila1'}, 'availability_zone': {'name': 'manila1'},
'id': 2, 'id': 2,
'disabled': True, 'disabled': True,
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 5), 'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 5),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27)}, 'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27)},
{ {
@ -54,6 +56,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila2'}, 'availability_zone': {'name': 'manila2'},
'id': 3, 'id': 3,
'disabled': False, 'disabled': False,
'state': 'down',
'updated_at': datetime.datetime(2012, 9, 19, 6, 55, 34), 'updated_at': datetime.datetime(2012, 9, 19, 6, 55, 34),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28)}, 'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28)},
{ {
@ -62,6 +65,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila2'}, 'availability_zone': {'name': 'manila2'},
'id': 4, 'id': 4,
'disabled': True, 'disabled': True,
'state': 'down',
'updated_at': datetime.datetime(2012, 9, 18, 8, 3, 38), 'updated_at': datetime.datetime(2012, 9, 18, 8, 3, 38),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28), 'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28),
}, },
@ -74,7 +78,8 @@ fake_response_service_list = {'services': [
'binary': 'manila-scheduler', 'binary': 'manila-scheduler',
'host': 'host1', 'host': 'host1',
'zone': 'manila1', 'zone': 'manila1',
'status': 'disabled', 'state': 'up', 'status': 'disabled',
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2), 'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2),
}, },
{ {

View File

@ -3308,3 +3308,36 @@ class AddSubnetMetadata(BaseMigrationChecks):
self.test_case.assertRaises(sa_exc.NoSuchTableError, self.test_case.assertRaises(sa_exc.NoSuchTableError,
utils.load_table, utils.load_table,
self.new_table_name, engine) self.new_table_name, engine)
@map_to_migration('aebe2a413e13')
class AddServiceState(BaseMigrationChecks):
def _get_service_data(self, options):
base_dict = {
'binary': 'manila-share',
'topic': 'share',
'disabled': False,
'report_count': '100',
}
base_dict.update(options)
return base_dict
def setup_upgrade_data(self, engine):
service_fixture = [
self._get_service_data({'host': 'fake1'}),
self._get_service_data({'host': 'fake2'}),
]
services_table = utils.load_table('services', engine)
for fixture in service_fixture:
engine.execute(services_table.insert(fixture))
def check_upgrade(self, engine, data):
s_table = utils.load_table('services', engine)
for s in engine.execute(s_table.select()):
self.test_case.assertTrue(hasattr(s, 'state'))
def check_downgrade(self, engine):
s_table = utils.load_table('services', engine)
for s in engine.execute(s_table.select()):
self.test_case.assertFalse(hasattr(s, 'state'))

View File

@ -21,6 +21,8 @@
Unit Tests for remote procedure calls using queue Unit Tests for remote procedure calls using queue
""" """
from datetime import datetime
from datetime import timedelta
from unittest import mock from unittest import mock
import ddt import ddt
@ -120,6 +122,7 @@ service_create = {
'host': host, 'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'state': 'up',
'report_count': 0, 'report_count': 0,
'availability_zone': 'nova', 'availability_zone': 'nova',
} }
@ -127,6 +130,7 @@ service_create_other_az = {
'host': host, 'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'state': 'up',
'report_count': 0, 'report_count': 0,
'availability_zone': 'other-zone', 'availability_zone': 'other-zone',
} }
@ -134,6 +138,16 @@ service_ref = {
'host': host, 'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'state': 'up',
'report_count': 0,
'availability_zone': {'name': 'nova'},
'id': 1,
}
service_ref_stopped = {
'host': host,
'binary': binary,
'topic': topic,
'state': 'stopped',
'report_count': 0, 'report_count': 0,
'availability_zone': {'name': 'nova'}, 'availability_zone': {'name': 'nova'},
'id': 1, 'id': 1,
@ -192,6 +206,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update', @mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref. mock.Mock(return_value=service_ref.
update({'report_count': 1}))) update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected(self): def test_report_state_newly_connected(self):
serv = service.Service(host, binary, topic, CONF.fake_manager) serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start() serv.start()
@ -216,6 +232,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update', @mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref. mock.Mock(return_value=service_ref.
update({'report_count': 1}))) update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected_different_az(self): def test_report_state_newly_connected_different_az(self):
serv = service.Service(host, binary, topic, CONF.fake_manager) serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.availability_zone = 'other-zone' serv.availability_zone = 'other-zone'
@ -242,6 +260,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update', @mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref. mock.Mock(return_value=service_ref.
update({'report_count': 1}))) update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected_not_found(self): def test_report_state_newly_connected_not_found(self):
serv = service.Service(host, binary, topic, CONF.fake_manager) serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start() serv.start()
@ -268,7 +288,27 @@ class ServiceTestCase(test.TestCase):
serv.report_state() serv.report_state()
serv.manager.is_service_ready.assert_called_once() serv.manager.is_service_ready.assert_called_once()
mock_db.service_update.assert_not_called()
@ddt.data(True, False)
def test_cleanup_services(self, cleanup_interval_done):
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_all.return_value = [service_ref]
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
serv.cleanup_services()
mock_db.service_destroy.assert_not_called()
if cleanup_interval_done:
service_ref_stopped['updated_at'] = (
datetime.utcnow() - timedelta(minutes=10))
else:
service_ref_stopped['updated_at'] = datetime.utcnow()
mock_db.service_get_all.return_value = [service_ref_stopped]
serv.stop()
serv.cleanup_services()
if cleanup_interval_done:
mock_db.service_destroy.assert_called_once_with(
mock.ANY, service_ref_stopped['id'])
class TestWSGIService(test.TestCase): class TestWSGIService(test.TestCase):

View File

@ -0,0 +1,11 @@
---
fixes:
- |
In cluster deployments, where multiple instances of manila services are
deployed via PODs, unique hostname is derived from node name. However if
pods are deployed again and launched on new hosts/nodes, the old entries
of manila service remains as it is. Fixed it by adding per service cleanup
function and also introducing 'state' column in 'services' table. The
service will be in either of 'up', 'down' or 'stopped' state. Cleanup will
delete DB entries of 'stopeed' services. For more details please refer,
`Launchpad bug 1990839 <https://bugs.launchpad.net/manila/+bug/1990839>`_