Merge "Fix RPC Versioning"

This commit is contained in:
Jenkins 2016-11-16 20:09:43 +00:00 committed by Gerrit Code Review
commit 0d514081c9
14 changed files with 501 additions and 154 deletions

View File

@ -16,13 +16,13 @@ from oslo_concurrency import processutils
from oslo_service import service as openstack_service
from trove.cmd.common import with_initialize
from trove.conductor import api as conductor_api
@with_initialize
def main(conf):
from trove.common import notification
from trove.common.rpc import service as rpc_service
from trove.common.rpc import version as rpc_version
from trove.instance import models as inst_models
notification.DBaaSAPINotification.register_notify_callback(
@ -30,7 +30,7 @@ def main(conf):
topic = conf.conductor_queue
server = rpc_service.RpcService(
manager=conf.conductor_manager, topic=topic,
rpc_api_version=rpc_version.RPC_API_VERSION)
rpc_api_version=conductor_api.API.API_LATEST_VERSION)
workers = conf.trove_conductor_workers or processutils.get_worker_count()
launcher = openstack_service.launch(conf, server, workers=workers)
launcher.wait()

View File

@ -25,6 +25,7 @@ from oslo_service import service as openstack_service
from trove.common import cfg
from trove.common import debug_utils
from trove.common.i18n import _LE
from trove.guestagent import api as guest_api
CONF = cfg.CONF
# The guest_id opt definition must match the one in common/cfg.py
@ -57,11 +58,10 @@ def main():
rpc.init(CONF)
from trove.common.rpc import service as rpc_service
from trove.common.rpc import version as rpc_version
server = rpc_service.RpcService(
topic="guestagent.%s" % CONF.guest_id,
manager=manager, host=CONF.guest_id,
rpc_api_version=rpc_version.RPC_API_VERSION)
rpc_api_version=guest_api.API.API_LATEST_VERSION)
launcher = openstack_service.launch(CONF, server)
launcher.wait()

View File

@ -16,6 +16,7 @@ from oslo_config import cfg as openstack_cfg
from oslo_service import service as openstack_service
from trove.cmd.common import with_initialize
from trove.taskmanager import api as task_api
extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')]
@ -24,14 +25,13 @@ extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')]
def startup(conf, topic):
from trove.common import notification
from trove.common.rpc import service as rpc_service
from trove.common.rpc import version as rpc_version
from trove.instance import models as inst_models
notification.DBaaSAPINotification.register_notify_callback(
inst_models.persist_instance_fault)
server = rpc_service.RpcService(
manager=conf.taskmanager_manager, topic=topic,
rpc_api_version=rpc_version.RPC_API_VERSION)
rpc_api_version=task_api.API.API_LATEST_VERSION)
launcher = openstack_service.launch(conf, server)
launcher.wait()

View File

@ -1449,18 +1449,19 @@ mariadb_opts = [
upgrade_levels = cfg.OptGroup(
'upgrade_levels',
title='RPC upgrade levels group for handling versions',
help='Contains the support version caps for each RPC API')
help='Contains the support version caps (Openstack Release) for '
'each RPC API')
rpcapi_cap_opts = [
cfg.StrOpt(
'taskmanager', default="icehouse",
'taskmanager', default='latest',
help='Set a version cap for messages sent to taskmanager services'),
cfg.StrOpt(
'guestagent', default="icehouse",
'guestagent', default='latest',
help='Set a version cap for messages sent to guestagent services'),
cfg.StrOpt(
'conductor', default="icehouse",
help='Set a version cap for messages sent to conductor services'),
'conductor', default='latest',
help='Set Openstack Release compatibility for conductor services'),
]
CONF = cfg.CONF

View File

@ -30,65 +30,100 @@ class CassandraGuestAgentStrategy(base.BaseGuestAgentStrategy):
class CassandraGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API
**** VERSION CONTROLLED API ****
The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""
def get_data_center(self):
LOG.debug("Retrieving the data center for node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("get_data_center", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def get_rack(self):
LOG.debug("Retrieving the rack for node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("get_rack", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def set_seeds(self, seeds):
LOG.debug("Configuring the gossip seeds for node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("set_seeds", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap, seeds=seeds)
version=version, seeds=seeds)
def get_seeds(self):
LOG.debug("Retrieving the gossip seeds for node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("get_seeds", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def set_auto_bootstrap(self, enabled):
LOG.debug("Setting the auto-bootstrap to '%s' for node: %s"
% (enabled, self.id))
version = guest_api.API.API_BASE_VERSION
return self._call("set_auto_bootstrap", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap, enabled=enabled)
version=version, enabled=enabled)
def cluster_complete(self):
LOG.debug("Sending a setup completion notification for node: %s"
% self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def node_cleanup_begin(self):
LOG.debug("Signaling the node to prepare for cleanup: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("node_cleanup_begin", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def node_cleanup(self):
LOG.debug("Running cleanup on node: %s" % self.id)
return self._cast('node_cleanup', self.version_cap)
version = guest_api.API.API_BASE_VERSION
return self._cast('node_cleanup', version=version)
def node_decommission(self):
LOG.debug("Decommission node: %s" % self.id)
return self._cast("node_decommission", self.version_cap)
version = guest_api.API.API_BASE_VERSION
return self._cast("node_decommission", version=version)
def cluster_secure(self, password):
LOG.debug("Securing the cluster via node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call(
"cluster_secure", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, password=password)
version=version, password=password)
def get_admin_credentials(self):
LOG.debug("Retrieving the admin credentials from node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("get_admin_credentials", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def store_admin_credentials(self, admin_credentials):
LOG.debug("Storing the admin credentials on node: %s" % self.id)
version = guest_api.API.API_BASE_VERSION
return self._call("store_admin_credentials",
guest_api.AGENT_LOW_TIMEOUT, self.version_cap,
guest_api.AGENT_LOW_TIMEOUT,
version=version,
admin_credentials=admin_credentials)

View File

@ -31,39 +31,59 @@ class GaleraCommonGuestAgentStrategy(cluster_base.BaseGuestAgentStrategy):
class GaleraCommonGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API
**** VERSION CONTROLLED API ****
The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""
def install_cluster(self, replication_user, cluster_configuration,
bootstrap):
"""Install the cluster."""
LOG.debug("Installing Galera cluster.")
version = guest_api.API.API_BASE_VERSION
self._call("install_cluster", CONF.cluster_usage_timeout,
self.version_cap,
version=version,
replication_user=replication_user,
cluster_configuration=cluster_configuration,
bootstrap=bootstrap)
def reset_admin_password(self, admin_password):
"""Store this password on the instance as the admin password."""
version = guest_api.API.API_BASE_VERSION
self._call("reset_admin_password", CONF.cluster_usage_timeout,
self.version_cap,
version=version,
admin_password=admin_password)
def cluster_complete(self):
"""Set the status that the cluster is build is complete."""
LOG.debug("Notifying cluster install completion.")
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def get_cluster_context(self):
"""Get the context of the cluster."""
LOG.debug("Getting the cluster context.")
version = guest_api.API.API_BASE_VERSION
return self._call("get_cluster_context", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def write_cluster_configuration_overrides(self, cluster_configuration):
"""Write an updated the cluster configuration."""
LOG.debug("Writing an updated the cluster configuration.")
version = guest_api.API.API_BASE_VERSION
self._call("write_cluster_configuration_overrides",
guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap,
version=version,
cluster_configuration=cluster_configuration)

View File

@ -33,6 +33,16 @@ class MongoDbGuestAgentStrategy(base.BaseGuestAgentStrategy):
class MongoDbGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API
**** VERSION CONTROLLED API ****
The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""
def add_shard(self, replica_set_name, replica_set_member):
LOG.debug("Adding shard with replSet %(replica_set_name)s and member "
@ -40,66 +50,89 @@ class MongoDbGuestAgentAPI(guest_api.API):
"%(id)s" % {'replica_set_name': replica_set_name,
'replica_set_member': replica_set_member,
'id': self.id})
version = guest_api.API.API_BASE_VERSION
return self._call("add_shard", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap,
version=version,
replica_set_name=replica_set_name,
replica_set_member=replica_set_member)
def add_members(self, members):
LOG.debug("Adding members %(members)s on instance %(id)s" % {
'members': members, 'id': self.id})
version = guest_api.API.API_BASE_VERSION
return self._call("add_members", ADD_MEMBERS_TIMEOUT,
self.version_cap, members=members)
version=version, members=members)
def add_config_servers(self, config_servers):
LOG.debug("Adding config servers %(config_servers)s for instance "
"%(id)s" % {'config_servers': config_servers,
'id': self.id})
version = guest_api.API.API_BASE_VERSION
return self._call("add_config_servers", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, config_servers=config_servers)
version=version,
config_servers=config_servers)
def cluster_complete(self):
LOG.debug("Notify regarding cluster install completion")
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def get_key(self):
LOG.debug("Requesting cluster key from guest")
version = guest_api.API.API_BASE_VERSION
return self._call("get_key", guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def prep_primary(self):
LOG.debug("Preparing member to be primary member.")
version = guest_api.API.API_BASE_VERSION
return self._call("prep_primary", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def create_admin_user(self, password):
LOG.debug("Creating admin user")
version = guest_api.API.API_BASE_VERSION
return self._call("create_admin_user", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, password=password)
version=version, password=password)
def store_admin_password(self, password):
LOG.debug("Storing admin password")
version = guest_api.API.API_BASE_VERSION
return self._call("store_admin_password",
guest_api.AGENT_LOW_TIMEOUT,
self.version_cap,
version=version,
password=password)
def get_replica_set_name(self):
LOG.debug("Querying member for its replica set name")
version = guest_api.API.API_BASE_VERSION
return self._call("get_replica_set_name",
guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def get_admin_password(self):
LOG.debug("Querying instance for its admin password")
version = guest_api.API.API_BASE_VERSION
return self._call("get_admin_password",
guest_api.AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def is_shard_active(self, replica_set_name):
LOG.debug("Checking if replica set %s is active" % replica_set_name)
version = guest_api.API.API_BASE_VERSION
return self._call("is_shard_active",
guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap,
version=version,
replica_set_name=replica_set_name)

View File

@ -379,7 +379,8 @@ class MongoDbTaskManagerAPI(task_api.API):
def mongodb_add_shard_cluster(self, cluster_id, shard_id,
replica_set_name):
LOG.debug("Making async call to add shard cluster %s " % cluster_id)
cctxt = self.client.prepare(version=self.version_cap)
version = task_api.API.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context,
"add_shard_cluster",
cluster_id=cluster_id,

View File

@ -28,34 +28,59 @@ class RedisGuestAgentStrategy(base.BaseGuestAgentStrategy):
class RedisGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API
**** VERSION CONTROLLED API ****
The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""
def get_node_ip(self):
LOG.debug("Retrieve ip info from node.")
version = guest_api.API.API_BASE_VERSION
return self._call("get_node_ip",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
guest_api.AGENT_HIGH_TIMEOUT,
version=version)
def get_node_id_for_removal(self):
LOG.debug("Validating cluster node removal.")
version = guest_api.API.API_BASE_VERSION
return self._call("get_node_id_for_removal",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
guest_api.AGENT_HIGH_TIMEOUT,
version=version)
def remove_nodes(self, node_ids):
LOG.debug("Removing nodes from cluster.")
version = guest_api.API.API_BASE_VERSION
return self._call("remove_nodes", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, node_ids=node_ids)
version=version, node_ids=node_ids)
def cluster_meet(self, ip, port):
LOG.debug("Joining node to cluster.")
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_meet", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, ip=ip, port=port)
version=version, ip=ip, port=port)
def cluster_addslots(self, first_slot, last_slot):
LOG.debug("Adding slots %s-%s to cluster.", first_slot, last_slot)
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_addslots",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
guest_api.AGENT_HIGH_TIMEOUT,
version=version,
first_slot=first_slot, last_slot=last_slot)
def cluster_complete(self):
LOG.debug("Notifying cluster install completion.")
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)

View File

@ -30,39 +30,64 @@ class VerticaGuestAgentStrategy(base.BaseGuestAgentStrategy):
class VerticaGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API
**** VERSION CONTROLLED API ****
The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""
def get_public_keys(self, user):
LOG.debug("Getting public keys for user: %s." % user)
version = guest_api.API.API_BASE_VERSION
return self._call("get_public_keys", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap, user=user)
version=version, user=user)
def authorize_public_keys(self, user, public_keys):
LOG.debug("Authorizing public keys for user: %s." % user)
version = guest_api.API.API_BASE_VERSION
return self._call("authorize_public_keys",
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
guest_api.AGENT_HIGH_TIMEOUT,
version=version,
user=user, public_keys=public_keys)
def install_cluster(self, members):
LOG.debug("Installing Vertica cluster on members: %s." % members)
version = guest_api.API.API_BASE_VERSION
return self._call("install_cluster", CONF.cluster_usage_timeout,
self.version_cap, members=members)
version=version, members=members)
def grow_cluster(self, members):
LOG.debug("Growing Vertica cluster with members: %s." % members)
version = guest_api.API.API_BASE_VERSION
return self._call("grow_cluster", CONF.cluster_usage_timeout,
self.version_cap, members=members)
version=version, members=members)
def shrink_cluster(self, members):
LOG.debug("Shrinking Vertica cluster with members: %s." % members)
version = guest_api.API.API_BASE_VERSION
return self._call("shrink_cluster", CONF.cluster_usage_timeout,
self.version_cap, members=members)
version=version, members=members)
def mark_design_ksafe(self, k):
LOG.debug("Setting vertica k-safety level to : %s." % k)
version = guest_api.API.API_BASE_VERSION
return self._call("mark_design_ksafe", CONF.cluster_usage_timeout,
self.version_cap, k=k)
version=version, k=k)
def cluster_complete(self):
LOG.debug("Notifying cluster install completion.")
version = guest_api.API.API_BASE_VERSION
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)

View File

@ -16,28 +16,50 @@ from oslo_log import log as logging
import oslo_messaging as messaging
from trove.common import cfg
from trove.common.rpc import version as rpc_version
from trove.common.serializable_notification import SerializableNotification
from trove import rpc
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class API(object):
"""API for interacting with trove conductor."""
"""API for interacting with trove conductor.
API version history:
* 1.0 - Initial version.
When updating this API, also update API_LATEST_VERSION
"""
# API_LATEST_VERSION should bump the minor number each time
# a method signature is added or changed
API_LATEST_VERSION = '1.0'
# API_BASE_VERSION should only change on major version upgrade
API_BASE_VERSION = '1.0'
VERSION_ALIASES = {
'icehouse': '1.0',
'juno': '1.0',
'kilo': '1.0',
'liberty': '1.0',
'mitaka': '1.0',
'newton': '1.0',
'latest': API_LATEST_VERSION
}
def __init__(self, context):
self.context = context
super(API, self).__init__()
version_cap = self.VERSION_ALIASES.get(
CONF.upgrade_levels.conductor, CONF.upgrade_levels.conductor)
target = messaging.Target(topic=CONF.conductor_queue,
version=rpc_version.RPC_API_VERSION)
version=version_cap)
self.version_cap = rpc_version.VERSION_ALIASES.get(
CONF.upgrade_levels.conductor)
self.client = self.get_client(target, self.version_cap)
self.client = self.get_client(target, version_cap)
def get_client(self, target, version_cap, serializer=None):
return rpc.get_client(target,
@ -47,8 +69,9 @@ class API(object):
def heartbeat(self, instance_id, payload, sent=None):
LOG.debug("Making async call to cast heartbeat for instance: %s"
% instance_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=self.version_cap)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "heartbeat",
instance_id=instance_id,
sent=sent,
@ -58,8 +81,9 @@ class API(object):
**backup_fields):
LOG.debug("Making async call to cast update_backup for instance: %s"
% instance_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=self.version_cap)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "update_backup",
instance_id=instance_id,
backup_id=backup_id,
@ -69,14 +93,16 @@ class API(object):
def report_root(self, instance_id, user):
LOG.debug("Making async call to cast report_root for instance: %s"
% instance_id)
cctxt = self.client.prepare(version=self.version_cap)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "report_root",
instance_id=instance_id,
user=user)
def notify_end(self, **notification_args):
LOG.debug("Making async call to cast end notification")
cctxt = self.client.prepare(version=self.version_cap)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
context = self.context
serialized = SerializableNotification.serialize(context,
context.notification)
@ -86,7 +112,8 @@ class API(object):
def notify_exc_info(self, message, exception):
LOG.debug("Making async call to cast error notification")
cctxt = self.client.prepare(version=self.version_cap)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
context = self.context
serialized = SerializableNotification.serialize(context,
context.notification)

View File

@ -26,7 +26,6 @@ from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common.notification import NotificationCastWrapper
import trove.common.rpc.version as rpc_version
from trove import rpc
CONF = cfg.CONF
@ -37,19 +36,43 @@ AGENT_SNAPSHOT_TIMEOUT = CONF.agent_replication_snapshot_timeout
class API(object):
"""API for interacting with the guest manager."""
"""API for interacting with the guest manager.
API version history:
* 1.0 - Initial version.
When updating this API, also update API_LATEST_VERSION
"""
# API_LATEST_VERSION should bump the minor number each time
# a method signature is added or changed
API_LATEST_VERSION = '1.0'
# API_BASE_VERSION should only change on major version upgrade
API_BASE_VERSION = '1.0'
VERSION_ALIASES = {
'icehouse': '1.0',
'juno': '1.0',
'kilo': '1.0',
'liberty': '1.0',
'mitaka': '1.0',
'newton': '1.0',
'latest': API_LATEST_VERSION
}
def __init__(self, context, id):
self.context = context
self.id = id
super(API, self).__init__()
version_cap = self.VERSION_ALIASES.get(
CONF.upgrade_levels.guestagent, CONF.upgrade_levels.guestagent)
target = messaging.Target(topic=self._get_routing_key(),
version=rpc_version.RPC_API_VERSION)
version=version_cap)
self.version_cap = rpc_version.VERSION_ALIASES.get(
CONF.upgrade_levels.guestagent)
self.client = self.get_client(target, self.version_cap)
self.client = self.get_client(target, version_cap)
def get_client(self, target, version_cap, serializer=None):
return rpc.get_client(target,
@ -95,31 +118,44 @@ class API(object):
users.
"""
LOG.debug("Changing passwords for users on instance %s.", self.id)
self._cast("change_passwords", self.version_cap, users=users)
version = self.API_BASE_VERSION
self._cast("change_passwords", version=version, users=users)
def update_attributes(self, username, hostname, user_attrs):
"""Update user attributes."""
LOG.debug("Changing user attributes on instance %s.", self.id)
self._cast("update_attributes", self.version_cap, username=username,
version = self.API_BASE_VERSION
self._cast("update_attributes",
version=version, username=username,
hostname=hostname, user_attrs=user_attrs)
def create_user(self, users):
"""Make an asynchronous call to create a new database user"""
LOG.debug("Creating Users for instance %s.", self.id)
self._cast("create_user", self.version_cap, users=users)
version = self.API_BASE_VERSION
self._cast("create_user", version=version, users=users)
def get_user(self, username, hostname):
"""Make an asynchronous call to get a single database user."""
LOG.debug("Getting a user %(username)s on instance %(id)s.",
{'username': username, 'id': self.id})
return self._call("get_user", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
return self._call("get_user",
AGENT_LOW_TIMEOUT, version=version,
username=username, hostname=hostname)
def list_access(self, username, hostname):
"""Show all the databases to which a user has more than USAGE."""
LOG.debug("Showing user %(username)s grants on instance %(id)s.",
{'username': username, 'id': self.id})
return self._call("list_access", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
return self._call("list_access",
AGENT_LOW_TIMEOUT, version=version,
username=username, hostname=hostname)
def grant_access(self, username, hostname, databases):
@ -128,7 +164,10 @@ class API(object):
"%(username)s on instance %(id)s.", {'username': username,
'databases': databases,
'id': self.id})
return self._call("grant_access", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
return self._call("grant_access",
AGENT_LOW_TIMEOUT, version=version,
username=username, hostname=hostname,
databases=databases)
@ -138,14 +177,20 @@ class API(object):
"%(username)s on instance %(id)s.", {'username': username,
'database': database,
'id': self.id})
return self._call("revoke_access", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
return self._call("revoke_access",
AGENT_LOW_TIMEOUT, version=version,
username=username, hostname=hostname,
database=database)
def list_users(self, limit=None, marker=None, include_marker=False):
"""Make an asynchronous call to list database users."""
LOG.debug("Listing Users for instance %s.", self.id)
return self._call("list_users", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
return self._call("list_users", AGENT_HIGH_TIMEOUT,
version=version,
limit=limit, marker=marker,
include_marker=include_marker)
@ -153,20 +198,27 @@ class API(object):
"""Make an asynchronous call to delete an existing database user."""
LOG.debug("Deleting user %(user)s for instance %(instance_id)s." %
{'user': user, 'instance_id': self.id})
self._cast("delete_user", self.version_cap, user=user)
version = self.API_BASE_VERSION
self._cast("delete_user", version=version, user=user)
def create_database(self, databases):
"""Make an asynchronous call to create a new database
within the specified container
"""
LOG.debug("Creating databases for instance %s.", self.id)
self._cast("create_database", self.version_cap, databases=databases)
version = self.API_BASE_VERSION
self._cast("create_database", version=version,
databases=databases)
def list_databases(self, limit=None, marker=None, include_marker=False):
"""Make an asynchronous call to list databases."""
LOG.debug("Listing databases for instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("list_databases", AGENT_LOW_TIMEOUT,
self.version_cap, limit=limit, marker=marker,
version=version, limit=limit, marker=marker,
include_marker=include_marker)
def delete_database(self, database):
@ -176,53 +228,72 @@ class API(object):
LOG.debug("Deleting database %(database)s for "
"instance %(instance_id)s." % {'database': database,
'instance_id': self.id})
self._cast("delete_database", self.version_cap, database=database)
version = self.API_BASE_VERSION
self._cast("delete_database", version=version, database=database)
def enable_root(self):
"""Make a synchronous call to enable the root user for
access from anywhere
"""
LOG.debug("Enable root user for instance %s.", self.id)
return self._call("enable_root", AGENT_HIGH_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
return self._call("enable_root", AGENT_HIGH_TIMEOUT,
version=version)
def enable_root_with_password(self, root_password=None):
"""Make a synchronous call to enable the root user for
access from anywhere
"""
LOG.debug("Enable root user for instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("enable_root_with_password", AGENT_HIGH_TIMEOUT,
self.version_cap, root_password=root_password)
version=version, root_password=root_password)
def disable_root(self):
"""Make a synchronous call to disable the root user for
access from anywhere
"""
LOG.debug("Disable root user for instance %s.", self.id)
return self._call("disable_root", AGENT_LOW_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
return self._call("disable_root", AGENT_LOW_TIMEOUT,
version=version)
def is_root_enabled(self):
"""Make a synchronous call to check if root access is
available for the container
"""
LOG.debug("Check root access for instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("is_root_enabled", AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def get_hwinfo(self):
"""Make a synchronous call to get hardware info for the container"""
LOG.debug("Check hwinfo on instance %s.", self.id)
return self._call("get_hwinfo", AGENT_LOW_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
return self._call("get_hwinfo", AGENT_LOW_TIMEOUT,
version=version)
def get_diagnostics(self):
"""Make a synchronous call to get diagnostics for the container"""
LOG.debug("Check diagnostics on instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("get_diagnostics", AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def rpc_ping(self):
"""Make a synchronous RPC call to check if we can ping the instance."""
LOG.debug("Check RPC ping on instance %s.", self.id)
return self._call("rpc_ping", AGENT_LOW_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
return self._call("rpc_ping", AGENT_LOW_TIMEOUT, version=version)
def prepare(self, memory_mb, packages, databases, users,
device_path='/dev/vdb', mount_point='/mnt/volume',
@ -234,6 +305,8 @@ class API(object):
"""
LOG.debug("Sending the call to prepare the Guest.")
version = self.API_BASE_VERSION
# Taskmanager is a publisher, guestagent is a consumer. Usually
# consumer creates a queue, but in this case we have to make sure
# "prepare" doesn't get lost if for some reason guest was delayed and
@ -242,7 +315,7 @@ class API(object):
packages = packages.split()
self._cast(
"prepare", self.version_cap, packages=packages,
"prepare", version=version, packages=packages,
databases=databases, memory_mb=memory_mb, users=users,
device_path=device_path, mount_point=mount_point,
backup_info=backup_info, config_contents=config_contents,
@ -258,7 +331,7 @@ class API(object):
server = None
target = messaging.Target(topic=self._get_routing_key(),
server=self.id,
version=rpc_version.RPC_API_VERSION)
version=self.API_BASE_VERSION)
try:
server = rpc.get_server(target, [])
server.start()
@ -270,26 +343,35 @@ class API(object):
def pre_upgrade(self):
"""Prepare the guest for upgrade."""
LOG.debug("Sending the call to prepare the guest for upgrade.")
return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT,
version=version)
def post_upgrade(self, upgrade_info):
"""Recover the guest after upgrading the guest's image."""
LOG.debug("Recover the guest after upgrading the guest's image.")
self._call("post_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("post_upgrade", AGENT_HIGH_TIMEOUT, version=version,
upgrade_info=upgrade_info)
def restart(self):
"""Restart the database server."""
LOG.debug("Sending the call to restart the database process "
"on the Guest.")
self._call("restart", AGENT_HIGH_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
self._call("restart", AGENT_HIGH_TIMEOUT, version=version)
def start_db_with_conf_changes(self, config_contents):
"""Start the database server."""
LOG.debug("Sending the call to start the database process on "
"the Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT)
version = self.API_BASE_VERSION
self._call("start_db_with_conf_changes", AGENT_HIGH_TIMEOUT,
self.version_cap, config_contents=config_contents)
version=version, config_contents=config_contents)
def reset_configuration(self, configuration):
"""Ignore running state of the database server; just change
@ -297,20 +379,26 @@ class API(object):
"""
LOG.debug("Sending the call to change the database conf file on the "
"Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT)
version = self.API_BASE_VERSION
self._call("reset_configuration", AGENT_HIGH_TIMEOUT,
self.version_cap, configuration=configuration)
version=version, configuration=configuration)
def stop_db(self, do_not_start_on_reboot=False):
"""Stop the database server."""
LOG.debug("Sending the call to stop the database process "
"on the Guest.")
self._call("stop_db", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("stop_db", AGENT_HIGH_TIMEOUT, version=version,
do_not_start_on_reboot=do_not_start_on_reboot)
def upgrade(self, instance_version, location, metadata=None):
"""Make an asynchronous call to self upgrade the guest agent."""
LOG.debug("Sending an upgrade call to nova-guest.")
self._cast("upgrade", self.version_cap,
version = self.API_BASE_VERSION
self._cast("upgrade", version=version,
instance_version=instance_version,
location=location,
metadata=metadata)
@ -318,158 +406,214 @@ class API(object):
def get_volume_info(self):
"""Make a synchronous call to get volume info for the container."""
LOG.debug("Check Volume Info on instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("get_filesystem_stats", AGENT_LOW_TIMEOUT,
self.version_cap, fs_path=None)
version=version, fs_path=None)
def update_guest(self):
"""Make a synchronous call to update the guest agent."""
LOG.debug("Updating guest agent on instance %s.", self.id)
self._call("update_guest", AGENT_HIGH_TIMEOUT, self.version_cap)
version = self.API_BASE_VERSION
self._call("update_guest", AGENT_HIGH_TIMEOUT, version=version)
def create_backup(self, backup_info):
"""Make async call to create a full backup of this instance."""
LOG.debug("Create Backup %(backup_id)s "
"for instance %(instance_id)s." %
{'backup_id': backup_info['id'], 'instance_id': self.id})
self._cast("create_backup", self.version_cap, backup_info=backup_info)
version = self.API_BASE_VERSION
self._cast("create_backup", version=version,
backup_info=backup_info)
def mount_volume(self, device_path=None, mount_point=None):
"""Mount the volume."""
LOG.debug("Mount volume %(mount)s on instance %(id)s." % {
'mount': mount_point, 'id': self.id})
self._call("mount_volume", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("mount_volume", AGENT_LOW_TIMEOUT, version=version,
device_path=device_path, mount_point=mount_point)
def unmount_volume(self, device_path=None, mount_point=None):
"""Unmount the volume."""
LOG.debug("Unmount volume %(device)s on instance %(id)s." % {
'device': device_path, 'id': self.id})
self._call("unmount_volume", AGENT_LOW_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("unmount_volume", AGENT_LOW_TIMEOUT, version=version,
device_path=device_path, mount_point=mount_point)
def resize_fs(self, device_path=None, mount_point=None):
"""Resize the filesystem."""
LOG.debug("Resize device %(device)s on instance %(id)s." % {
'device': device_path, 'id': self.id})
self._call("resize_fs", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("resize_fs", AGENT_HIGH_TIMEOUT, version=version,
device_path=device_path, mount_point=mount_point)
def update_overrides(self, overrides, remove=False):
"""Update the overrides."""
LOG.debug("Updating overrides values %(overrides)s on instance "
"%(id)s.", {'overrides': overrides, 'id': self.id})
version = self.API_BASE_VERSION
self._call("update_overrides", AGENT_HIGH_TIMEOUT,
self.version_cap, overrides=overrides, remove=remove)
version=version, overrides=overrides, remove=remove)
def apply_overrides(self, overrides):
LOG.debug("Applying overrides values %(overrides)s on instance "
"%(id)s.", {'overrides': overrides, 'id': self.id})
self._call("apply_overrides", AGENT_HIGH_TIMEOUT, self.version_cap,
overrides=overrides)
version = self.API_BASE_VERSION
self._call("apply_overrides", AGENT_HIGH_TIMEOUT,
version=version, overrides=overrides)
def backup_required_for_replication(self):
LOG.debug("Checking backup requirement for replication")
version = self.API_BASE_VERSION
return self._call("backup_required_for_replication",
AGENT_LOW_TIMEOUT,
self.version_cap)
version=version)
def get_replication_snapshot(self, snapshot_info=None,
replica_source_config=None):
LOG.debug("Retrieving replication snapshot from instance %s.", self.id)
version = self.API_BASE_VERSION
return self._call("get_replication_snapshot", AGENT_SNAPSHOT_TIMEOUT,
self.version_cap, snapshot_info=snapshot_info,
version=version, snapshot_info=snapshot_info,
replica_source_config=replica_source_config)
def attach_replication_slave(self, snapshot, replica_config=None):
LOG.debug("Configuring instance %s to replicate from %s.",
self.id, snapshot.get('master').get('id'))
self._cast("attach_replication_slave", self.version_cap,
version = self.API_BASE_VERSION
self._cast("attach_replication_slave", version=version,
snapshot=snapshot, slave_config=replica_config)
def detach_replica(self, for_failover=False):
LOG.debug("Detaching replica %s from its replication source.", self.id)
version = self.API_BASE_VERSION
return self._call("detach_replica", AGENT_HIGH_TIMEOUT,
self.version_cap, for_failover=for_failover)
version=version, for_failover=for_failover)
def get_replica_context(self):
LOG.debug("Getting replica context.")
version = self.API_BASE_VERSION
return self._call("get_replica_context",
AGENT_HIGH_TIMEOUT, self.version_cap)
AGENT_HIGH_TIMEOUT, version=version)
def attach_replica(self, replica_info, slave_config):
LOG.debug("Attaching replica %s." % replica_info)
self._call("attach_replica", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("attach_replica", AGENT_HIGH_TIMEOUT, version=version,
replica_info=replica_info, slave_config=slave_config)
def make_read_only(self, read_only):
LOG.debug("Executing make_read_only(%s)" % read_only)
self._call("make_read_only", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("make_read_only", AGENT_HIGH_TIMEOUT, version=version,
read_only=read_only)
def enable_as_master(self, replica_source_config):
LOG.debug("Executing enable_as_master")
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("enable_as_master", AGENT_HIGH_TIMEOUT,
version=version,
replica_source_config=replica_source_config)
# DEPRECATED: Maintain for API Compatibility
def get_txn_count(self):
LOG.debug("Executing get_txn_count.")
version = self.API_BASE_VERSION
return self._call("get_txn_count",
AGENT_HIGH_TIMEOUT, self.version_cap)
AGENT_HIGH_TIMEOUT, version=version)
def get_last_txn(self):
LOG.debug("Executing get_last_txn.")
version = self.API_BASE_VERSION
return self._call("get_last_txn",
AGENT_HIGH_TIMEOUT, self.version_cap)
AGENT_HIGH_TIMEOUT, version=version)
def get_latest_txn_id(self):
LOG.debug("Executing get_latest_txn_id.")
version = self.API_BASE_VERSION
return self._call("get_latest_txn_id",
AGENT_HIGH_TIMEOUT, self.version_cap)
AGENT_HIGH_TIMEOUT, version=version)
def wait_for_txn(self, txn):
LOG.debug("Executing wait_for_txn.")
self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, self.version_cap,
version = self.API_BASE_VERSION
self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, version=version,
txn=txn)
def cleanup_source_on_replica_detach(self, replica_info):
LOG.debug("Cleaning up master %s on detach of replica.", self.id)
version = self.API_BASE_VERSION
self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
self.version_cap, replica_info=replica_info)
version=version, replica_info=replica_info)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)
version = self.API_BASE_VERSION
self._call("demote_replication_master", AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
def guest_log_list(self):
LOG.debug("Retrieving guest log list for %s.", self.id)
version = self.API_BASE_VERSION
result = self._call("guest_log_list", AGENT_HIGH_TIMEOUT,
self.version_cap)
version=version)
LOG.debug("guest_log_list returns %s", result)
return result
def guest_log_action(self, log_name, enable, disable, publish, discard):
LOG.debug("Processing guest log '%s' for %s.", log_name, self.id)
version = self.API_BASE_VERSION
return self._call("guest_log_action", AGENT_HIGH_TIMEOUT,
self.version_cap, log_name=log_name,
version=version, log_name=log_name,
enable=enable, disable=disable,
publish=publish, discard=discard)
def module_list(self, include_contents):
LOG.debug("Querying modules on %s (contents: %s).",
self.id, include_contents)
version = self.API_BASE_VERSION
result = self._call("module_list", AGENT_HIGH_TIMEOUT,
self.version_cap,
version=version,
include_contents=include_contents)
return result
def module_apply(self, modules):
LOG.debug("Applying modules to %s.", self.id)
version = self.API_BASE_VERSION
return self._call("module_apply", AGENT_HIGH_TIMEOUT,
self.version_cap, modules=modules)
version=version, modules=modules)
def module_remove(self, module):
LOG.debug("Removing modules from %s.", self.id)
version = self.API_BASE_VERSION
return self._call("module_remove", AGENT_HIGH_TIMEOUT,
self.version_cap, module=module)
version=version, module=module)

View File

@ -24,7 +24,6 @@ import oslo_messaging as messaging
from trove.common import cfg
from trove.common import exception
from trove.common.notification import NotificationCastWrapper
import trove.common.rpc.version as rpc_version
from trove.common.strategies.cluster import strategy
from trove.guestagent import models as agent_models
from trove import rpc
@ -34,18 +33,42 @@ LOG = logging.getLogger(__name__)
class API(object):
"""API for interacting with the task manager."""
"""API for interacting with the task manager.
API version history:
* 1.0 - Initial version.
When updating this API, also update API_LATEST_VERSION
"""
# API_LATEST_VERSION should bump the minor number each time
# a method signature is added or changed
API_LATEST_VERSION = '1.0'
# API_BASE_VERSION should only change on major version upgrade
API_BASE_VERSION = '1.0'
VERSION_ALIASES = {
'icehouse': '1.0',
'juno': '1.0',
'kilo': '1.0',
'liberty': '1.0',
'mitaka': '1.0',
'newton': '1.0',
'latest': API_LATEST_VERSION
}
def __init__(self, context):
self.context = context
super(API, self).__init__()
version_cap = self.VERSION_ALIASES.get(
CONF.upgrade_levels.taskmanager, CONF.upgrade_levels.taskmanager)
target = messaging.Target(topic=CONF.taskmanager_queue,
version=rpc_version.RPC_API_VERSION)
version=version_cap)
self.version_cap = rpc_version.VERSION_ALIASES.get(
CONF.upgrade_levels.taskmanager)
self.client = self.get_client(target, self.version_cap)
self.client = self.get_client(target, version_cap)
def _cast(self, method_name, version, **kwargs):
LOG.debug("Casting %s" % method_name)
@ -79,72 +102,83 @@ class API(object):
def resize_volume(self, new_size, instance_id):
LOG.debug("Making async call to resize volume for instance: %s"
% instance_id)
version = self.API_BASE_VERSION
self._cast("resize_volume", self.version_cap,
self._cast("resize_volume", version=version,
new_size=new_size,
instance_id=instance_id)
def resize_flavor(self, instance_id, old_flavor, new_flavor):
LOG.debug("Making async call to resize flavor for instance: %s" %
instance_id)
version = self.API_BASE_VERSION
self._cast("resize_flavor", self.version_cap,
self._cast("resize_flavor", version=version,
instance_id=instance_id,
old_flavor=self._transform_obj(old_flavor),
new_flavor=self._transform_obj(new_flavor))
def reboot(self, instance_id):
LOG.debug("Making async call to reboot instance: %s" % instance_id)
version = self.API_BASE_VERSION
self._cast("reboot", self.version_cap, instance_id=instance_id)
self._cast("reboot", version=version, instance_id=instance_id)
def restart(self, instance_id):
LOG.debug("Making async call to restart instance: %s" % instance_id)
version = self.API_BASE_VERSION
self._cast("restart", self.version_cap, instance_id=instance_id)
self._cast("restart", version=version, instance_id=instance_id)
def detach_replica(self, instance_id):
LOG.debug("Making async call to detach replica: %s" % instance_id)
version = self.API_BASE_VERSION
self._cast("detach_replica", self.version_cap,
self._cast("detach_replica", version=version,
instance_id=instance_id)
def promote_to_replica_source(self, instance_id):
LOG.debug("Making async call to promote replica to source: %s" %
instance_id)
self._cast("promote_to_replica_source", self.version_cap,
version = self.API_BASE_VERSION
self._cast("promote_to_replica_source", version=version,
instance_id=instance_id)
def eject_replica_source(self, instance_id):
LOG.debug("Making async call to eject replica source: %s" %
instance_id)
self._cast("eject_replica_source", self.version_cap,
version = self.API_BASE_VERSION
self._cast("eject_replica_source", version=version,
instance_id=instance_id)
def migrate(self, instance_id, host):
LOG.debug("Making async call to migrate instance: %s" % instance_id)
version = self.API_BASE_VERSION
self._cast("migrate", self.version_cap,
self._cast("migrate", version=version,
instance_id=instance_id, host=host)
def delete_instance(self, instance_id):
LOG.debug("Making async call to delete instance: %s" % instance_id)
version = self.API_BASE_VERSION
self._cast("delete_instance", self.version_cap,
self._cast("delete_instance", version=version,
instance_id=instance_id)
def create_backup(self, backup_info, instance_id):
LOG.debug("Making async call to create a backup for instance: %s" %
instance_id)
version = self.API_BASE_VERSION
self._cast("create_backup", self.version_cap,
self._cast("create_backup", version=version,
backup_info=backup_info,
instance_id=instance_id)
def delete_backup(self, backup_id):
LOG.debug("Making async call to delete backup: %s" % backup_id)
version = self.API_BASE_VERSION
self._cast("delete_backup", self.version_cap, backup_id=backup_id)
self._cast("delete_backup", version=version, backup_id=backup_id)
def create_instance(self, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
@ -155,7 +189,8 @@ class API(object):
modules=None, locality=None):
LOG.debug("Making async call to create instance %s " % instance_id)
self._cast("create_instance", self.version_cap,
version = self.API_BASE_VERSION
self._cast("create_instance", version=version,
instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor),
image_id=image_id,
@ -176,33 +211,38 @@ class API(object):
def create_cluster(self, cluster_id):
LOG.debug("Making async call to create cluster %s " % cluster_id)
version = self.API_BASE_VERSION
self._cast("create_cluster", self.version_cap, cluster_id=cluster_id)
self._cast("create_cluster", version=version, cluster_id=cluster_id)
def grow_cluster(self, cluster_id, new_instance_ids):
LOG.debug("Making async call to grow cluster %s " % cluster_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=self.version_cap)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "grow_cluster",
cluster_id=cluster_id, new_instance_ids=new_instance_ids)
def shrink_cluster(self, cluster_id, instance_ids):
LOG.debug("Making async call to shrink cluster %s " % cluster_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=self.version_cap)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "shrink_cluster",
cluster_id=cluster_id, instance_ids=instance_ids)
def delete_cluster(self, cluster_id):
LOG.debug("Making async call to delete cluster %s " % cluster_id)
version = self.API_BASE_VERSION
self._cast("delete_cluster", self.version_cap, cluster_id=cluster_id)
self._cast("delete_cluster", version=version, cluster_id=cluster_id)
def upgrade(self, instance_id, datastore_version_id):
LOG.debug("Making async call to upgrade guest to datastore "
"version %s " % datastore_version_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=self.version_cap)
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "upgrade", instance_id=instance_id,
datastore_version_id=datastore_version_id)

View File

@ -14,7 +14,6 @@
# under the License.
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from oslo_utils import importutils
@ -27,7 +26,6 @@ from trove.common.exception import TroveError
from trove.common.i18n import _
from trove.common.notification import DBaaSQuotas, EndNotification
from trove.common import remote
import trove.common.rpc.version as rpc_version
from trove.common import server_group as srv_grp
from trove.common.strategies.cluster import strategy
from trove.datastore.models import DatastoreVersion
@ -43,8 +41,6 @@ CONF = cfg.CONF
class Manager(periodic_task.PeriodicTasks):
target = messaging.Target(version=rpc_version.RPC_API_VERSION)
def __init__(self):
super(Manager, self).__init__(CONF)
self.admin_context = TroveContext(