From b2b54ff05c1276f6d2070feec8d7466afe727e6c Mon Sep 17 00:00:00 2001 From: zhanggang Date: Mon, 23 Oct 2017 05:59:35 -0400 Subject: [PATCH] Avoid load deleted instances that belong to a cluster. Cluster node may be set deleted by "shrink" operation. In this case, when use DBInstance.find_all(cluster_id=).all(), it will return all instances include deleted which may raise errors. Change-Id: I088f3a99e0185ae33df4ce84c080adb7d813f17c --- .../cluster/experimental/cassandra/taskmanager.py | 3 ++- trove/common/strategies/cluster/experimental/mongodb/api.py | 1 + .../strategies/cluster/experimental/mongodb/taskmanager.py | 1 + .../strategies/cluster/experimental/redis/taskmanager.py | 3 ++- trove/common/strategies/cluster/experimental/vertica/api.py | 3 ++- trove/extensions/common/service.py | 4 +++- trove/extensions/mongodb/service.py | 2 +- trove/taskmanager/models.py | 6 ++++-- 8 files changed, 16 insertions(+), 7 deletions(-) diff --git a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py index 0d1b3a1250..ecb00e8211 100644 --- a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py @@ -117,7 +117,8 @@ class CassandraClusterTasks(task_models.ClusterTasks): @classmethod def find_cluster_node_ids(cls, cluster_id): - db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + db_instances = DBInstance.find_all(cluster_id=cluster_id, + deleted=False).all() return [db_instance.id for db_instance in db_instances] @classmethod diff --git a/trove/common/strategies/cluster/experimental/mongodb/api.py b/trove/common/strategies/cluster/experimental/mongodb/api.py index 642ee59c0f..9b988cc297 100644 --- a/trove/common/strategies/cluster/experimental/mongodb/api.py +++ b/trove/common/strategies/cluster/experimental/mongodb/api.py @@ -248,6 +248,7 @@ class MongoDbCluster(models.Cluster): raise exception.UnprocessableEntity(msg) db_insts = inst_models.DBInstance.find_all(cluster_id=self.id, + deleted=False, type='member').all() num_unique_shards = len(set([db_inst.shard_id for db_inst in db_insts])) diff --git a/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py b/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py index e41e30ef50..837502d4ca 100644 --- a/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py @@ -138,6 +138,7 @@ class MongoDbClusterTasks(task_models.ClusterTasks): def _add_shard_cluster(): db_instances = DBInstance.find_all(cluster_id=cluster_id, + deleted=False, shard_id=shard_id).all() instance_ids = [db_instance.id for db_instance in db_instances] LOG.debug("instances in shard %(shard_id)s: %(instance_ids)s", diff --git a/trove/common/strategies/cluster/experimental/redis/taskmanager.py b/trove/common/strategies/cluster/experimental/redis/taskmanager.py index 6d4c576524..3517b272b7 100644 --- a/trove/common/strategies/cluster/experimental/redis/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/redis/taskmanager.py @@ -107,7 +107,8 @@ class RedisClusterTasks(task_models.ClusterTasks): def _grow_cluster(): - db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + db_instances = DBInstance.find_all(cluster_id=cluster_id, + deleted=False).all() cluster_head = next(Instance.load(context, db_inst.id) for db_inst in db_instances if db_inst.id not in new_instance_ids) diff --git a/trove/common/strategies/cluster/experimental/vertica/api.py b/trove/common/strategies/cluster/experimental/vertica/api.py index 26e98b2297..ca5c458730 100644 --- a/trove/common/strategies/cluster/experimental/vertica/api.py +++ b/trove/common/strategies/cluster/experimental/vertica/api.py @@ -74,7 +74,8 @@ class VerticaCluster(models.Cluster): vertica_conf = CONF.get(datastore_version.manager) num_instances = len(instances) - existing = inst_models.DBInstance.find_all(cluster_id=db_info.id).all() + existing = inst_models.DBInstance.find_all(cluster_id=db_info.id, + deleted=False).all() num_existing = len(existing) # Matching number of instances with configured cluster_member_count diff --git a/trove/extensions/common/service.py b/trove/extensions/common/service.py index b49282a9ea..97cf7bdeb0 100644 --- a/trove/extensions/common/service.py +++ b/trove/extensions/common/service.py @@ -190,7 +190,9 @@ class ClusterRootController(DefaultRootController): cluster_instances) def _find_cluster_node_ids(self, tenant_id, cluster_id): - args = {'tenant_id': tenant_id, 'cluster_id': cluster_id} + args = {'tenant_id': tenant_id, + 'cluster_id': cluster_id, + 'deleted': False} cluster_instances = DBInstance.find_all(**args).all() return [db_instance.id for db_instance in cluster_instances] diff --git a/trove/extensions/mongodb/service.py b/trove/extensions/mongodb/service.py index 0025d337ba..076e71c2b1 100644 --- a/trove/extensions/mongodb/service.py +++ b/trove/extensions/mongodb/service.py @@ -33,7 +33,7 @@ class MongoDBRootController(ClusterRootController): def _find_query_router_ids(self, tenant_id, cluster_id): args = {'tenant_id': tenant_id, 'cluster_id': cluster_id, - 'type': 'query_router'} + 'deleted': False, 'type': 'query_router'} query_router_instances = DBInstance.find_all(**args).all() return [db_instance.id for db_instance in query_router_instances] diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 892c833e03..c89cb2f82e 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -336,7 +336,8 @@ class ClusterTasks(Cluster): cluster_notification = context.notification request_info = cluster_notification.serialize(context) try: - node_db_inst = DBInstance.find_all(cluster_id=cluster_id).all() + node_db_inst = DBInstance.find_all(cluster_id=cluster_id, + deleted=False).all() for index, db_inst in enumerate(node_db_inst): if index > 0: LOG.debug( @@ -380,7 +381,8 @@ class ClusterTasks(Cluster): cluster_notification = context.notification request_info = cluster_notification.serialize(context) try: - for db_inst in DBInstance.find_all(cluster_id=cluster_id).all(): + for db_inst in DBInstance.find_all(cluster_id=cluster_id, + deleted=False).all(): instance = BuiltInstanceTasks.load( context, db_inst.id) _upgrade_cluster_instance(instance)