Merge "Add 'state' column in 'services' table."

This commit is contained in:
Zuul 2023-02-24 23:52:59 +00:00 committed by Gerrit Code Review
commit 74ae712d58
12 changed files with 193 additions and 15 deletions

View File

@ -19,7 +19,6 @@ import webob.exc
from manila.api.openstack import wsgi
from manila.api.views import services as services_views
from manila import db
from manila import utils
class ServiceMixin(object):
@ -47,7 +46,7 @@ class ServiceMixin(object):
'host': service['host'],
'zone': service['availability_zone']['name'],
'status': 'disabled' if service['disabled'] else 'enabled',
'state': 'up' if utils.service_is_up(service) else 'down',
'state': service['state'],
'updated_at': service['updated_at'],
}
services.append(service)

View File

@ -58,8 +58,10 @@ class DataManager(manager.Manager):
def __init__(self, service_name=None, *args, **kwargs):
super(DataManager, self).__init__(*args, **kwargs)
self.busy_tasks_shares = {}
self.service_id = None
def init_host(self):
def init_host(self, service_id=None):
self.service_id = service_id
ctxt = context.get_admin_context()
shares = self.db.share_get_all(ctxt)
for share in shares:

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add state column for service
Revision ID: aebe2a413e13
Revises: ac0620cbe74d
Create Date: 2023-01-10 11:43:24.741726
"""
# revision identifiers, used by Alembic.
revision = 'aebe2a413e13'
down_revision = 'ac0620cbe74d'
from alembic import op
from oslo_log import log
import sqlalchemy as sa
LOG = log.getLogger(__name__)
def upgrade():
try:
op.add_column(
'services',
sa.Column('state', sa.String(36), nullable=True))
except Exception:
LOG.error("services table column state not added")
raise
def downgrade():
try:
op.drop_column('services', 'state')
except Exception:
LOG.error("services table column state not dropped")
raise

View File

@ -63,6 +63,7 @@ class Service(BASE, ManilaBase):
host = Column(String(255)) # , ForeignKey('hosts.id'))
binary = Column(String(255))
topic = Column(String(255))
state = Column(String(36))
report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False)
availability_zone_id = Column(String(36),

View File

@ -100,23 +100,25 @@ class Manager(base.Base, PeriodicTasks):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
def init_host(self, service_id=None):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method.
:param service_id: ID of the service where the manager is running.
"""
pass
def init_host_with_rpc(self):
def init_host_with_rpc(self, service_id=None):
"""A hook for service to do jobs after RPC is ready.
Like init_host(), this method is a hook where services get a chance
to execute tasks that *need* RPC. Child classes should override
this method.
:param service_id: ID of the service where the manager is running.
"""
pass

View File

@ -87,8 +87,10 @@ class SchedulerManager(manager.Manager):
self.driver = importutils.import_object(scheduler_driver)
self.message_api = message_api.API()
super(SchedulerManager, self).__init__(*args, **kwargs)
self.service_id = None
def init_host_with_rpc(self):
def init_host_with_rpc(self, service_id=None):
self.service_id = service_id
ctxt = context.get_admin_context()
self.request_service_capabilities(ctxt)

View File

@ -33,6 +33,7 @@ from manila import coordination
from manila import db
from manila import exception
from manila import rpc
from manila import utils
from manila import version
osprofiler_initializer = importutils.try_import('osprofiler.initializer')
@ -45,6 +46,10 @@ service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Seconds between nodes reporting state to datastore.'),
cfg.IntOpt('cleanup_interval',
min=300,
default=1800,
help='Seconds between cleaning up the stopped nodes.'),
cfg.IntOpt('periodic_interval',
default=60,
help='Seconds between running periodic tasks.'),
@ -121,6 +126,7 @@ class Service(service.Service):
*args, **kwargs)
self.availability_zone = self.manager.availability_zone
self.report_interval = report_interval
self.cleanup_interval = CONF.cleanup_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
@ -134,20 +140,22 @@ class Service(service.Service):
LOG.info('Starting %(topic)s node (version %(version_string)s)',
{'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False
self.manager.init_host()
ctxt = context.get_admin_context()
if self.coordinator:
coordination.LOCK_COORDINATOR.start()
try:
service_ref = db.service_get_by_args(ctxt,
self.host,
self.binary)
self.service_id = service_ref['id']
db.service_update(ctxt, self.service_id, {'state': 'down'})
except exception.NotFound:
self._create_service_ref(ctxt)
self.manager.init_host(service_id=self.service_id)
if self.coordinator:
coordination.LOCK_COORDINATOR.start()
LOG.debug("Creating RPC server for service %s.", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
@ -162,6 +170,9 @@ class Service(service.Service):
self.tg.add_timer(self.report_interval, self.report_state,
initial_delay=self.report_interval)
self.tg.add_timer(self.cleanup_interval, self.cleanup_services,
initial_delay=self.cleanup_interval)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
@ -176,6 +187,7 @@ class Service(service.Service):
'host': self.host,
'binary': self.binary,
'topic': self.topic,
'state': 'up',
'report_count': 0,
'availability_zone': self.availability_zone
}
@ -242,6 +254,8 @@ class Service(service.Service):
except Exception:
pass
db.service_update(context.get_admin_context(),
self.service_id, {'state': 'stopped'})
if self.coordinator:
try:
coordination.LOCK_COORDINATOR.stop()
@ -287,8 +301,12 @@ class Service(service.Service):
service_ref['availability_zone']['name']):
state_catalog['availability_zone'] = self.availability_zone
db.service_update(ctxt,
self.service_id, state_catalog)
if utils.service_is_up(service_ref):
state_catalog['state'] = 'up'
else:
if service_ref['state'] != 'stopped':
state_catalog['state'] = 'down'
db.service_update(ctxt, self.service_id, state_catalog)
# TODO(termie): make this pattern be more elegant.
if getattr(self, 'model_disconnected', False):
@ -301,6 +319,22 @@ class Service(service.Service):
self.model_disconnected = True
LOG.exception('model server went away')
def cleanup_services(self):
"""Remove the stopped services of same topic from the datastore."""
ctxt = context.get_admin_context()
try:
services = db.service_get_all(ctxt, self.topic)
except exception.NotFound:
LOG.debug('The service database object disappeared,'
'Exiting from cleanup.')
return
for svc in services:
if (svc['topic'] == self.topic and
svc['state'] == 'stopped' and
not utils.service_is_up(svc)):
db.service_destroy(ctxt, svc['id'])
class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration."""

View File

@ -297,6 +297,7 @@ class ShareManager(manager.SchedulerDependentManager):
self.driver = profiler.trace_cls("driver")(self.driver)
self.hooks = []
self._init_hook_drivers()
self.service_id = None
def _init_hook_drivers(self):
# Try to initialize hook driver(s).
@ -334,9 +335,10 @@ class ShareManager(manager.SchedulerDependentManager):
return pool
@add_hooks
def init_host(self):
def init_host(self, service_id=None):
"""Initialization for a standalone service."""
self.service_id = service_id
ctxt = context.get_admin_context()
driver_host_pair = "{}@{}".format(
self.driver.__class__.__name__,

View File

@ -37,6 +37,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila1'},
'id': 1,
'disabled': True,
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27),
},
@ -46,6 +47,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila1'},
'id': 2,
'disabled': True,
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 5),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 27)},
{
@ -54,6 +56,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila2'},
'id': 3,
'disabled': False,
'state': 'down',
'updated_at': datetime.datetime(2012, 9, 19, 6, 55, 34),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28)},
{
@ -62,6 +65,7 @@ fake_services_list = [
'availability_zone': {'name': 'manila2'},
'id': 4,
'disabled': True,
'state': 'down',
'updated_at': datetime.datetime(2012, 9, 18, 8, 3, 38),
'created_at': datetime.datetime(2012, 9, 18, 2, 46, 28),
},
@ -74,7 +78,8 @@ fake_response_service_list = {'services': [
'binary': 'manila-scheduler',
'host': 'host1',
'zone': 'manila1',
'status': 'disabled', 'state': 'up',
'status': 'disabled',
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29, 13, 42, 2),
},
{

View File

@ -3308,3 +3308,36 @@ class AddSubnetMetadata(BaseMigrationChecks):
self.test_case.assertRaises(sa_exc.NoSuchTableError,
utils.load_table,
self.new_table_name, engine)
@map_to_migration('aebe2a413e13')
class AddServiceState(BaseMigrationChecks):
def _get_service_data(self, options):
base_dict = {
'binary': 'manila-share',
'topic': 'share',
'disabled': False,
'report_count': '100',
}
base_dict.update(options)
return base_dict
def setup_upgrade_data(self, engine):
service_fixture = [
self._get_service_data({'host': 'fake1'}),
self._get_service_data({'host': 'fake2'}),
]
services_table = utils.load_table('services', engine)
for fixture in service_fixture:
engine.execute(services_table.insert(fixture))
def check_upgrade(self, engine, data):
s_table = utils.load_table('services', engine)
for s in engine.execute(s_table.select()):
self.test_case.assertTrue(hasattr(s, 'state'))
def check_downgrade(self, engine):
s_table = utils.load_table('services', engine)
for s in engine.execute(s_table.select()):
self.test_case.assertFalse(hasattr(s, 'state'))

View File

@ -21,6 +21,8 @@
Unit Tests for remote procedure calls using queue
"""
from datetime import datetime
from datetime import timedelta
from unittest import mock
import ddt
@ -120,6 +122,7 @@ service_create = {
'host': host,
'binary': binary,
'topic': topic,
'state': 'up',
'report_count': 0,
'availability_zone': 'nova',
}
@ -127,6 +130,7 @@ service_create_other_az = {
'host': host,
'binary': binary,
'topic': topic,
'state': 'up',
'report_count': 0,
'availability_zone': 'other-zone',
}
@ -134,6 +138,16 @@ service_ref = {
'host': host,
'binary': binary,
'topic': topic,
'state': 'up',
'report_count': 0,
'availability_zone': {'name': 'nova'},
'id': 1,
}
service_ref_stopped = {
'host': host,
'binary': binary,
'topic': topic,
'state': 'stopped',
'report_count': 0,
'availability_zone': {'name': 'nova'},
'id': 1,
@ -192,6 +206,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref.
update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected(self):
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
@ -216,6 +232,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref.
update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected_different_az(self):
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.availability_zone = 'other-zone'
@ -242,6 +260,8 @@ class ServiceTestCase(test.TestCase):
@mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref.
update({'report_count': 1})))
@mock.patch.object(utils, 'service_is_up',
mock.Mock(return_value=True))
def test_report_state_newly_connected_not_found(self):
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
@ -268,7 +288,27 @@ class ServiceTestCase(test.TestCase):
serv.report_state()
serv.manager.is_service_ready.assert_called_once()
mock_db.service_update.assert_not_called()
@ddt.data(True, False)
def test_cleanup_services(self, cleanup_interval_done):
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_all.return_value = [service_ref]
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
serv.cleanup_services()
mock_db.service_destroy.assert_not_called()
if cleanup_interval_done:
service_ref_stopped['updated_at'] = (
datetime.utcnow() - timedelta(minutes=10))
else:
service_ref_stopped['updated_at'] = datetime.utcnow()
mock_db.service_get_all.return_value = [service_ref_stopped]
serv.stop()
serv.cleanup_services()
if cleanup_interval_done:
mock_db.service_destroy.assert_called_once_with(
mock.ANY, service_ref_stopped['id'])
class TestWSGIService(test.TestCase):

View File

@ -0,0 +1,11 @@
---
fixes:
- |
In cluster deployments, where multiple instances of manila services are
deployed via PODs, unique hostname is derived from node name. However if
pods are deployed again and launched on new hosts/nodes, the old entries
of manila service remains as it is. Fixed it by adding per service cleanup
function and also introducing 'state' column in 'services' table. The
service will be in either of 'up', 'down' or 'stopped' state. Cleanup will
delete DB entries of 'stopeed' services. For more details please refer,
`Launchpad bug 1990839 <https://bugs.launchpad.net/manila/+bug/1990839>`_