Merge "Reuse Cassandra connections" into stable/mitaka
This commit is contained in:
commit
29ec216479
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
fixes:
|
||||
- Make guestagent reuse Cassandra connections to eliminate resource
|
||||
leaks. Bug 1566946.
|
|
@ -24,9 +24,6 @@ from trove.common import instance as trove_instance
|
|||
from trove.common.notification import EndNotification
|
||||
from trove.guestagent import backup
|
||||
from trove.guestagent.datastore.experimental.cassandra import service
|
||||
from trove.guestagent.datastore.experimental.cassandra.service import (
|
||||
CassandraAdmin
|
||||
)
|
||||
from trove.guestagent.datastore import manager
|
||||
from trove.guestagent import volume
|
||||
|
||||
|
@ -37,10 +34,10 @@ CONF = cfg.CONF
|
|||
|
||||
class Manager(manager.Manager):
|
||||
|
||||
def __init__(self):
|
||||
self._app = service.CassandraApp()
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
super(Manager, self).__init__('cassandra')
|
||||
def __init__(self, manager_name='cassandra'):
|
||||
super(Manager, self).__init__(manager_name)
|
||||
self._app = None
|
||||
self._admin = None
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
|
@ -48,11 +45,18 @@ class Manager(manager.Manager):
|
|||
|
||||
@property
|
||||
def app(self):
|
||||
if self._app is None:
|
||||
self._app = self.build_app()
|
||||
return self._app
|
||||
|
||||
def build_app(self):
|
||||
return service.CassandraApp()
|
||||
|
||||
@property
|
||||
def admin(self):
|
||||
return self.__admin
|
||||
if self._admin is None:
|
||||
self._admin = self.app.build_admin()
|
||||
return self._admin
|
||||
|
||||
@property
|
||||
def configuration_manager(self):
|
||||
|
@ -145,7 +149,7 @@ class Manager(manager.Manager):
|
|||
self.app.secure()
|
||||
self.app.restart()
|
||||
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
|
||||
if not cluster_config and self.is_root_enabled(context):
|
||||
self.status.report_root(context, self.app.default_superuser_name)
|
||||
|
@ -272,7 +276,7 @@ class Manager(manager.Manager):
|
|||
|
||||
def cluster_secure(self, context, password):
|
||||
os_admin = self.app.cluster_secure(password)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
return os_admin
|
||||
|
||||
def get_admin_credentials(self, context):
|
||||
|
@ -280,4 +284,4 @@ class Manager(manager.Manager):
|
|||
|
||||
def store_admin_credentials(self, context, admin_credentials):
|
||||
self.app.store_admin_credentials(admin_credentials)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
|
|
|
@ -21,6 +21,7 @@ from cassandra.auth import PlainTextAuthProvider
|
|||
from cassandra.cluster import Cluster
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra import OperationTimedOut
|
||||
from cassandra.policies import ConstantReconnectionPolicy
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import netutils
|
||||
|
||||
|
@ -45,7 +46,6 @@ from trove.guestagent import pkg
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'cassandra'
|
||||
|
||||
packager = pkg.Package()
|
||||
|
||||
|
@ -135,6 +135,9 @@ class CassandraApp(object):
|
|||
def cqlsh_conf_path(self):
|
||||
return "~/.cassandra/cqlshrc"
|
||||
|
||||
def build_admin(self):
|
||||
return CassandraAdmin(self.get_current_superuser())
|
||||
|
||||
def install_if_needed(self, packages):
|
||||
"""Prepare the guest machine with a Cassandra server installation."""
|
||||
LOG.info(_("Preparing Guest as a Cassandra Server"))
|
||||
|
@ -663,6 +666,12 @@ class CassandraApp(object):
|
|||
LOG.exception(_("The node failed to decommission itself."))
|
||||
self.status.set_status(rd_instance.ServiceStatuses.FAILED)
|
||||
return
|
||||
finally:
|
||||
# Cassandra connections have ability to automatically discover and
|
||||
# fallback to other cluster nodes whenever a node goes down.
|
||||
# Reset the status after decomissioning to ensure the heartbeat
|
||||
# connection talks to this node only.
|
||||
self.status = CassandraAppStatus(self.get_current_superuser())
|
||||
|
||||
try:
|
||||
self.stop_db(update_db=True, do_not_start_on_reboot=True)
|
||||
|
@ -689,7 +698,7 @@ class CassandraApp(object):
|
|||
superuser-level access to all keyspaces.
|
||||
"""
|
||||
cassandra = models.CassandraRootUser(password=root_password)
|
||||
admin = CassandraAdmin(self.get_current_superuser())
|
||||
admin = self.build_admin()
|
||||
if self.is_root_enabled():
|
||||
admin.alter_user_password(cassandra)
|
||||
else:
|
||||
|
@ -701,7 +710,7 @@ class CassandraApp(object):
|
|||
"""The Trove administrative user ('os_admin') should normally be the
|
||||
only superuser in the system.
|
||||
"""
|
||||
found = CassandraAdmin(self.get_current_superuser()).list_superusers()
|
||||
found = self.build_admin().list_superusers()
|
||||
return len([user for user in found
|
||||
if user.name != self._ADMIN_USER]) > 0
|
||||
|
||||
|
@ -716,11 +725,18 @@ class CassandraAppStatus(service.BaseDbStatus):
|
|||
"""
|
||||
super(CassandraAppStatus, self).__init__()
|
||||
self.__user = superuser
|
||||
self.__client = None
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self.__client is None:
|
||||
self.__client = CassandraLocalhostConnection(self.__user)
|
||||
return self.__client
|
||||
|
||||
def _get_actual_db_status(self):
|
||||
try:
|
||||
with CassandraLocalhostConnection(self.__user):
|
||||
return rd_instance.ServiceStatuses.RUNNING
|
||||
self.client.execute('SELECT now() FROM system.local;')
|
||||
return rd_instance.ServiceStatuses.RUNNING
|
||||
except NoHostAvailable:
|
||||
return rd_instance.ServiceStatuses.SHUTDOWN
|
||||
except Exception:
|
||||
|
@ -751,16 +767,22 @@ class CassandraAdmin(object):
|
|||
|
||||
def __init__(self, user):
|
||||
self.__admin_user = user
|
||||
self.__client = None
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self.__client is None:
|
||||
self.__client = CassandraLocalhostConnection(self.__admin_user)
|
||||
return self.__client
|
||||
|
||||
def create_user(self, context, users):
|
||||
"""
|
||||
Create new non-superuser accounts.
|
||||
New users are by default granted full access to all database resources.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in users:
|
||||
self._create_user_and_grant(client,
|
||||
self._deserialize_user(item))
|
||||
for item in users:
|
||||
self._create_user_and_grant(self.client,
|
||||
self._deserialize_user(item))
|
||||
|
||||
def _create_user_and_grant(self, client, user):
|
||||
"""
|
||||
|
@ -783,27 +805,24 @@ class CassandraAdmin(object):
|
|||
access to all keyspaces.
|
||||
"""
|
||||
LOG.debug("Creating a new superuser '%s'." % user.name)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
client.execute("GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';",
|
||||
(user.name,))
|
||||
self.client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
self.client.execute(
|
||||
"GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';", (user.name,))
|
||||
|
||||
def delete_user(self, context, user):
|
||||
self.drop_user(self._deserialize_user(user))
|
||||
|
||||
def drop_user(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_user(client, user)
|
||||
self._drop_user(self.client, user)
|
||||
|
||||
def _drop_user(self, client, user):
|
||||
LOG.debug("Deleting user '%s'." % user.name)
|
||||
client.execute("DROP USER '{}';", (user.name, ))
|
||||
|
||||
def get_user(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
return user.serialize() if user is not None else None
|
||||
user = self._find_user(self.client, username)
|
||||
return user.serialize() if user is not None else None
|
||||
|
||||
def _find_user(self, client, username):
|
||||
"""
|
||||
|
@ -820,11 +839,9 @@ class CassandraAdmin(object):
|
|||
List all non-superuser accounts. Omit names on the ignored list.
|
||||
Return an empty set if None.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
users = [user.serialize() for user in
|
||||
self._get_listed_users(client)]
|
||||
return pagination.paginate_list(users, limit, marker,
|
||||
include_marker)
|
||||
users = [user.serialize() for user in
|
||||
self._get_listed_users(self.client)]
|
||||
return pagination.paginate_list(users, limit, marker, include_marker)
|
||||
|
||||
def _get_listed_users(self, client):
|
||||
"""
|
||||
|
@ -926,27 +943,24 @@ class CassandraAdmin(object):
|
|||
|
||||
def list_superusers(self):
|
||||
"""List all system users existing in the database."""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
return self._get_users(client, lambda user: user.super)
|
||||
return self._get_users(self.client, lambda user: user.super)
|
||||
|
||||
def grant_access(self, context, username, hostname, databases):
|
||||
"""
|
||||
Grant full access on keyspaces to a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for db in databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
client, models.CassandraSchema(db), user)
|
||||
for db in databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
self.client, models.CassandraSchema(db), user)
|
||||
|
||||
def revoke_access(self, context, username, hostname, database):
|
||||
"""
|
||||
Revoke all permissions on any database resources from a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._revoke_all_access_on_keyspace(
|
||||
client, models.CassandraSchema(database), user)
|
||||
self._revoke_all_access_on_keyspace(
|
||||
self.client, models.CassandraSchema(database), user)
|
||||
|
||||
def _grant_full_access_on_keyspace(self, client, keyspace, user,
|
||||
check_reserved=True):
|
||||
|
@ -987,11 +1001,10 @@ class CassandraAdmin(object):
|
|||
(keyspace.name, user.name))
|
||||
|
||||
def update_attributes(self, context, username, hostname, user_attrs):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._load_user(client, username)
|
||||
new_name = user_attrs.get('name')
|
||||
new_password = user_attrs.get('password')
|
||||
self._update_user(client, user, new_name, new_password)
|
||||
user = self._load_user(self.client, username)
|
||||
new_name = user_attrs.get('name')
|
||||
new_password = user_attrs.get('password')
|
||||
self._update_user(self.client, user, new_name, new_password)
|
||||
|
||||
def _update_user(self, client, user, new_username, new_password):
|
||||
"""
|
||||
|
@ -1028,13 +1041,12 @@ class CassandraAdmin(object):
|
|||
self._drop_user(client, user)
|
||||
|
||||
def alter_user_password(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._alter_user_password(client, user)
|
||||
self._alter_user_password(self.client, user)
|
||||
|
||||
def change_passwords(self, context, users):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for user in users:
|
||||
self._alter_user_password(client, self._deserialize_user(user))
|
||||
for user in users:
|
||||
self._alter_user_password(self.client,
|
||||
self._deserialize_user(user))
|
||||
|
||||
def _alter_user_password(self, client, user):
|
||||
LOG.debug("Changing password of user '%s'." % user.name)
|
||||
|
@ -1042,10 +1054,9 @@ class CassandraAdmin(object):
|
|||
"WITH PASSWORD %s;", (user.name,), (user.password,))
|
||||
|
||||
def create_database(self, context, databases):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in databases:
|
||||
self._create_single_node_keyspace(
|
||||
client, self._deserialize_keyspace(item))
|
||||
for item in databases:
|
||||
self._create_single_node_keyspace(
|
||||
self.client, self._deserialize_keyspace(item))
|
||||
|
||||
def _create_single_node_keyspace(self, client, keyspace):
|
||||
"""
|
||||
|
@ -1072,8 +1083,8 @@ class CassandraAdmin(object):
|
|||
"'replication_factor' : 1 }};", (keyspace.name,))
|
||||
|
||||
def delete_database(self, context, database):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_keyspace(client, self._deserialize_keyspace(database))
|
||||
self._drop_keyspace(self.client,
|
||||
self._deserialize_keyspace(database))
|
||||
|
||||
def _drop_keyspace(self, client, keyspace):
|
||||
LOG.debug("Dropping keyspace '%s'." % keyspace.name)
|
||||
|
@ -1081,11 +1092,10 @@ class CassandraAdmin(object):
|
|||
|
||||
def list_databases(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
databases = [keyspace.serialize() for keyspace
|
||||
in self._get_available_keyspaces(client)]
|
||||
return pagination.paginate_list(databases, limit, marker,
|
||||
include_marker)
|
||||
databases = [keyspace.serialize() for keyspace
|
||||
in self._get_available_keyspaces(self.client)]
|
||||
return pagination.paginate_list(databases, limit, marker,
|
||||
include_marker)
|
||||
|
||||
def _get_available_keyspaces(self, client):
|
||||
"""
|
||||
|
@ -1098,10 +1108,9 @@ class CassandraAdmin(object):
|
|||
if db.keyspace_name not in self.ignore_dbs}
|
||||
|
||||
def list_access(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
if user:
|
||||
return user.databases
|
||||
user = self._find_user(self.client, username)
|
||||
if user:
|
||||
return user.databases
|
||||
|
||||
raise exception.UserNotFound(username)
|
||||
|
||||
|
@ -1135,11 +1144,11 @@ class CassandraAdmin(object):
|
|||
|
||||
@property
|
||||
def ignore_users(self):
|
||||
return cfg.get_ignored_users(manager=MANAGER)
|
||||
return cfg.get_ignored_users()
|
||||
|
||||
@property
|
||||
def ignore_dbs(self):
|
||||
return cfg.get_ignored_dbs(manager=MANAGER)
|
||||
return cfg.get_ignored_dbs()
|
||||
|
||||
|
||||
class CassandraConnection(object):
|
||||
|
@ -1147,6 +1156,8 @@ class CassandraConnection(object):
|
|||
|
||||
# Cassandra 2.1 only supports protocol versions 3 and lower.
|
||||
NATIVE_PROTOCOL_VERSION = 3
|
||||
CONNECTION_TIMEOUT_SEC = CONF.agent_call_high_timeout
|
||||
RECONNECT_DELAY_SEC = 3
|
||||
|
||||
def __init__(self, contact_points, user):
|
||||
self.__user = user
|
||||
|
@ -1154,18 +1165,25 @@ class CassandraConnection(object):
|
|||
# After the driver connects to one of the nodes it will automatically
|
||||
# discover the rest.
|
||||
# Will connect to '127.0.0.1' if None contact points are given.
|
||||
#
|
||||
# Set the 'reconnection_policy' so that dead connections recover fast.
|
||||
self._cluster = Cluster(
|
||||
contact_points=contact_points,
|
||||
auth_provider=PlainTextAuthProvider(user.name, user.password),
|
||||
protocol_version=self.NATIVE_PROTOCOL_VERSION)
|
||||
protocol_version=self.NATIVE_PROTOCOL_VERSION,
|
||||
connect_timeout=self.CONNECTION_TIMEOUT_SEC,
|
||||
control_connection_timeout=self.CONNECTION_TIMEOUT_SEC,
|
||||
reconnection_policy=ConstantReconnectionPolicy(
|
||||
self.RECONNECT_DELAY_SEC, max_attempts=None))
|
||||
self.__session = None
|
||||
|
||||
self._connect()
|
||||
|
||||
def __enter__(self):
|
||||
self.__connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.__disconnect()
|
||||
self._disconnect()
|
||||
|
||||
def execute(self, query, identifiers=None, data_values=None, timeout=None):
|
||||
"""
|
||||
|
@ -1181,7 +1199,7 @@ class CassandraConnection(object):
|
|||
There is no timeout if set to None.
|
||||
Return a set of rows or an empty list if None.
|
||||
"""
|
||||
if self.__is_active():
|
||||
if self.is_active():
|
||||
try:
|
||||
rows = self.__session.execute(self.__bind(query, identifiers),
|
||||
data_values, timeout)
|
||||
|
@ -1198,11 +1216,11 @@ class CassandraConnection(object):
|
|||
return query.format(*identifiers)
|
||||
return query
|
||||
|
||||
def __connect(self):
|
||||
def _connect(self):
|
||||
if not self._cluster.is_shutdown:
|
||||
LOG.debug("Connecting to a Cassandra cluster as '%s'."
|
||||
% self.__user.name)
|
||||
if not self.__is_active():
|
||||
if not self.is_active():
|
||||
self.__session = self._cluster.connect()
|
||||
else:
|
||||
LOG.debug("Connection already open.")
|
||||
|
@ -1215,19 +1233,23 @@ class CassandraConnection(object):
|
|||
LOG.debug("Cannot perform this operation on a terminated cluster.")
|
||||
raise exception.UnprocessableEntity()
|
||||
|
||||
def __disconnect(self):
|
||||
if self.__is_active():
|
||||
def _disconnect(self):
|
||||
if self.is_active():
|
||||
try:
|
||||
LOG.debug("Disconnecting from cluster: '%s'"
|
||||
% self._cluster.metadata.cluster_name)
|
||||
self._cluster.shutdown()
|
||||
self.__session.shutdown()
|
||||
except Exception:
|
||||
LOG.debug("Failed to disconnect from a Cassandra cluster.")
|
||||
|
||||
def __is_active(self):
|
||||
def is_active(self):
|
||||
return self.__session and not self.__session.is_shutdown
|
||||
|
||||
def __del__(self):
|
||||
# The connections would survive the parent object's GC.
|
||||
# We need to close it explicitly.
|
||||
self._disconnect()
|
||||
|
||||
|
||||
class CassandraLocalhostConnection(CassandraConnection):
|
||||
"""
|
||||
|
|
|
@ -75,6 +75,13 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||
def setUp(self, *args, **kwargs):
|
||||
super(GuestAgentCassandraDBManagerTest, self).setUp()
|
||||
|
||||
conn_patcher = patch.multiple(cass_service.CassandraConnection,
|
||||
_connect=DEFAULT,
|
||||
is_active=Mock(return_value=True))
|
||||
self.addCleanup(conn_patcher.stop)
|
||||
conn_patcher.start()
|
||||
|
||||
self.real_status = cass_service.CassandraAppStatus.set_status
|
||||
|
||||
class FakeInstanceServiceStatus(object):
|
||||
|
@ -87,9 +94,12 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
return_value=FakeInstanceServiceStatus())
|
||||
self.context = trove_testtools.TroveTestContext(self)
|
||||
self.manager = cass_manager.Manager()
|
||||
self.manager._Manager__admin = cass_service.CassandraAdmin(
|
||||
self.manager._app = cass_service.CassandraApp()
|
||||
self.manager._admin = cass_service.CassandraAdmin(
|
||||
models.CassandraUser('Test'))
|
||||
self.admin = self.manager._Manager__admin
|
||||
self.admin = self.manager._admin
|
||||
self.admin._CassandraAdmin__client = MagicMock()
|
||||
self.conn = self.admin._CassandraAdmin__client
|
||||
self.pkg = cass_service.packager
|
||||
self.origin_os_path_exists = os.path.exists
|
||||
self.origin_format = volume.VolumeDevice.format
|
||||
|
@ -327,64 +337,58 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
def _get_random_name(self, size, chars=string.letters + string.digits):
|
||||
return ''.join(random.choice(chars) for _ in range(size))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_database(self, conn):
|
||||
def test_create_database(self):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
|
||||
self.manager.create_database(self.context,
|
||||
self._serialize_collection(db1, db2, db3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__CREATE_DB_FORMAT, (db1.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db2.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db3.name,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_database(self, conn):
|
||||
def test_delete_database(self):
|
||||
db = models.CassandraSchema(self._get_random_name(32))
|
||||
self.manager.delete_database(self.context, db.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__DROP_DB_FORMAT, (db.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_user(self, conn):
|
||||
def test_create_user(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.create_user(self.context,
|
||||
self._serialize_collection(usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__CREATE_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_user(self, conn):
|
||||
def test_delete_user(self):
|
||||
usr = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
self.manager.delete_user(self.context, usr.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__DROP_USR_FORMAT, (usr.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_change_passwords(self, conn):
|
||||
def test_change_passwords(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.change_passwords(self.context, self._serialize_collection(
|
||||
usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_alter_user_password(self, conn):
|
||||
def test_alter_user_password(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
@ -392,14 +396,13 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
self.admin.alter_user_password(usr1)
|
||||
self.admin.alter_user_password(usr2)
|
||||
self.admin.alter_user_password(usr3)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_grant_access(self, conn):
|
||||
def test_grant_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
|
@ -417,10 +420,11 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
expected.append(call(self.__GRANT_FORMAT,
|
||||
(modifier, db3.name, usr2.name)))
|
||||
|
||||
conn.return_value.execute.assert_has_calls(expected, any_order=True)
|
||||
self.conn.execute.assert_has_calls(
|
||||
expected,
|
||||
any_order=True)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_revoke_access(self, conn):
|
||||
def test_revoke_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
|
@ -428,19 +432,17 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
|
||||
self.manager.revoke_access(self.context, usr1.name, None, db1.name)
|
||||
self.manager.revoke_access(self.context, usr2.name, None, db2.name)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__REVOKE_FORMAT, (db1.name, usr1.name)),
|
||||
call(self.__REVOKE_FORMAT, (db2.name, usr2.name))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_available_keyspaces(self, conn):
|
||||
def test_get_available_keyspaces(self):
|
||||
self.manager.list_databases(self.context)
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__LIST_DB_FORMAT)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_databases(self, conn):
|
||||
def test_list_databases(self):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
|
@ -537,8 +539,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
|
||||
self.assertEqual({}, acl)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_listed_users(self, conn):
|
||||
def test_get_listed_users(self):
|
||||
usr1 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr2 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025))
|
||||
|
@ -554,7 +555,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
rv_3 = NonCallableMagicMock()
|
||||
rv_3.configure_mock(name=usr3.name, super=True)
|
||||
|
||||
with patch.object(conn.return_value, 'execute', return_value=iter(
|
||||
with patch.object(self.conn, 'execute', return_value=iter(
|
||||
[rv_1, rv_2, rv_3])):
|
||||
with patch.object(self.admin, '_get_acl',
|
||||
return_value={usr1.name: {db1.name: {'SELECT'},
|
||||
|
@ -562,15 +563,14 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
usr3.name: {db2.name: {'SELECT'}}}
|
||||
):
|
||||
usrs = self.manager.list_users(self.context)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__LIST_USR_FORMAT),
|
||||
], any_order=True)
|
||||
self.assertIn(usr1.serialize(), usrs[0])
|
||||
self.assertIn(usr2.serialize(), usrs[0])
|
||||
self.assertIn(usr3.serialize(), usrs[0])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_access(self, conn):
|
||||
def test_list_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
@ -593,8 +593,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
with ExpectedException(exception.UserNotFound):
|
||||
self.manager.list_access(self.context, usr3.name, None)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_users(self, conn):
|
||||
def test_list_users(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
@ -612,8 +611,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
with patch.object(self.admin, self.__N_GLU, return_value=set()):
|
||||
self.assertEqual(([], None), self.manager.list_users(self.context))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_user(self, conn):
|
||||
def test_get_user(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
@ -629,8 +627,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
|
||||
@patch.object(cass_service.CassandraAdmin, '_deserialize_keyspace',
|
||||
side_effect=lambda p1: p1)
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_rename_user(self, conn, ks_deserializer):
|
||||
def test_rename_user(self, ks_deserializer):
|
||||
usr = models.CassandraUser('usr')
|
||||
db1 = models.CassandraSchema('db1').serialize()
|
||||
db2 = models.CassandraSchema('db2').serialize()
|
||||
|
@ -652,8 +649,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
|||
call(ANY, db2, ANY)])
|
||||
drop.assert_called_once_with(ANY, usr)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_update_attributes(self, conn):
|
||||
def test_update_attributes(self):
|
||||
usr = models.CassandraUser('usr', 'pwd')
|
||||
|
||||
with patch.object(self.admin, self.__N_BU, return_value=usr):
|
||||
|
|
Loading…
Reference in New Issue