Modify User Attributes - name, host and password
At present, when a Cloud Database user creates a username, password and hostname for a database user, he does not have the ability to modify these user attributes. This API enables the Cloud DB user to modify one or more of these user attributes (username,hostname,password). Implements: Blueprint modify-userattributes Change-Id: I253cacf998e93b3d6136905dbb4a3516272efb0e
This commit is contained in:
parent
a9eb50ec60
commit
366b492298
|
@ -61,6 +61,13 @@ host_string = {
|
|||
"pattern": "^[%]?[\w(-).]*[%]?$"
|
||||
}
|
||||
|
||||
name_string = {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"maxLength": 16,
|
||||
"pattern": "^.*[0-9a-zA-Z]+.*$"
|
||||
}
|
||||
|
||||
uuid = {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
|
@ -131,6 +138,18 @@ databases_def = {
|
|||
}
|
||||
}
|
||||
|
||||
user_attributes = {
|
||||
"type": "object",
|
||||
"additionalProperties": False,
|
||||
"minProperties": 1,
|
||||
"properties": {
|
||||
"name": name_string,
|
||||
"password": non_empty_string,
|
||||
"host": host_string
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
users_list = {
|
||||
"type": "array",
|
||||
"minItems": 1,
|
||||
|
@ -139,7 +158,7 @@ users_list = {
|
|||
"required": ["name", "password"],
|
||||
"additionalProperties": False,
|
||||
"properties": {
|
||||
"name": non_empty_string,
|
||||
"name": name_string,
|
||||
"password": non_empty_string,
|
||||
"host": host_string,
|
||||
"databases": databases_ref_list
|
||||
|
@ -266,7 +285,7 @@ user = {
|
|||
"users": users_list
|
||||
}
|
||||
},
|
||||
"update": {
|
||||
"update_all": {
|
||||
"users": {
|
||||
"type": "object",
|
||||
"required": ["users"],
|
||||
|
@ -276,6 +295,14 @@ user = {
|
|||
}
|
||||
},
|
||||
"databases": databases_ref
|
||||
},
|
||||
"update": {
|
||||
"type": "object",
|
||||
"required": ["user"],
|
||||
"additionalProperties": False,
|
||||
"properties": {
|
||||
"user": user_attributes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,8 @@ class Mysql(extensions.ExtensionsDescriptor):
|
|||
# deserializer=extensions.ExtensionsXMLSerializer()
|
||||
deserializer=wsgi.TroveRequestDeserializer(),
|
||||
serializer=serializer,
|
||||
collection_actions={'update': 'PUT'})
|
||||
member_actions={'update': 'PUT'},
|
||||
collection_actions={'update_all': 'PUT'})
|
||||
resources.append(resource)
|
||||
|
||||
collection_url = '{tenant_id}/instances/:instance_id/users'
|
||||
|
|
|
@ -139,6 +139,29 @@ class User(object):
|
|||
change_users.append(change_user)
|
||||
client.change_passwords(change_users)
|
||||
|
||||
@classmethod
|
||||
def update_attributes(cls, context, instance_id, username, hostname,
|
||||
user_attrs):
|
||||
load_and_verify(context, instance_id)
|
||||
client = create_guest_client(context, instance_id)
|
||||
user_name = user_attrs.get('name')
|
||||
host_name = user_attrs.get('host')
|
||||
user = user_name or username
|
||||
host = host_name or hostname
|
||||
userhost = "%s@%s" % (user, host)
|
||||
if user_name or host_name:
|
||||
existing_users, _nadda = Users.load_with_client(
|
||||
client,
|
||||
limit=1,
|
||||
marker=userhost,
|
||||
include_marker=True)
|
||||
if (len(existing_users) > 0 and
|
||||
existing_users[0].name == user and
|
||||
existing_users[0].host == host):
|
||||
raise exception.UserAlreadyExists(name=user,
|
||||
host=host)
|
||||
client.update_attributes(username, hostname, user_attrs)
|
||||
|
||||
|
||||
class UserAccess(object):
|
||||
_data_fields = ['databases']
|
||||
|
|
|
@ -63,7 +63,7 @@ class UserController(wsgi.Controller):
|
|||
@classmethod
|
||||
def get_schema(cls, action, body):
|
||||
action_schema = super(UserController, cls).get_schema(action, body)
|
||||
if 'update' == action:
|
||||
if 'update_all' == action:
|
||||
update_type = body.keys()[0]
|
||||
action_schema = action_schema.get(update_type, {})
|
||||
return action_schema
|
||||
|
@ -130,7 +130,25 @@ class UserController(wsgi.Controller):
|
|||
view = views.UserView(user)
|
||||
return wsgi.Result(view.data(), 200)
|
||||
|
||||
def update(self, req, body, tenant_id, instance_id):
|
||||
def update(self, req, body, tenant_id, instance_id, id):
|
||||
"""Change attributes for one user."""
|
||||
LOG.info(_("Updating user attributes for instance '%s'") % instance_id)
|
||||
LOG.info(_("req : '%s'\n\n") % req)
|
||||
context = req.environ[wsgi.CONTEXT_KEY]
|
||||
username, hostname = unquote_user_host(id)
|
||||
user = None
|
||||
user_attrs = body['user']
|
||||
try:
|
||||
user = models.User.load(context, instance_id, username, hostname)
|
||||
except (ValueError, AttributeError) as e:
|
||||
raise exception.BadRequest(msg=str(e))
|
||||
if not user:
|
||||
raise exception.UserNotFound(uuid=id)
|
||||
models.User.update_attributes(context, instance_id, username, hostname,
|
||||
user_attrs)
|
||||
return wsgi.Result(None, 202)
|
||||
|
||||
def update_all(self, req, body, tenant_id, instance_id):
|
||||
"""Change the password of one or more users."""
|
||||
LOG.info(_("Updating user passwords for instance '%s'") % instance_id)
|
||||
LOG.info(_("req : '%s'\n\n") % req)
|
||||
|
@ -164,7 +182,7 @@ class UserAccessController(wsgi.Controller):
|
|||
@classmethod
|
||||
def get_schema(cls, action, body):
|
||||
schema = {}
|
||||
if 'update' == action:
|
||||
if 'update_all' == action:
|
||||
schema = cls.schemas.get(action).get('databases')
|
||||
return schema
|
||||
|
||||
|
|
|
@ -115,6 +115,12 @@ class API(proxy.RpcProxy):
|
|||
LOG.debug(_("Changing passwords for users on Instance %s"), self.id)
|
||||
self._cast("change_passwords", users=users)
|
||||
|
||||
def update_attributes(self, username, hostname, user_attrs):
|
||||
"""Update user attributes."""
|
||||
LOG.debug(_("Changing user attributes on Instance %s"), self.id)
|
||||
self._cast("update_attributes", username=username, hostname=hostname,
|
||||
user_attrs=user_attrs)
|
||||
|
||||
def create_user(self, users):
|
||||
"""Make an asynchronous call to create a new database user"""
|
||||
LOG.debug(_("Creating Users for Instance %s"), self.id)
|
||||
|
|
|
@ -26,6 +26,9 @@ class Manager(periodic_task.PeriodicTasks):
|
|||
def change_passwords(self, context, users):
|
||||
return MySqlAdmin().change_passwords(users)
|
||||
|
||||
def update_attributes(self, context, username, hostname, user_attrs):
|
||||
return MySqlAdmin().update_attributes(username, hostname, user_attrs)
|
||||
|
||||
def reset_configuration(self, context, configuration):
|
||||
app = MySqlApp(MySqlAppStatus.get())
|
||||
app.reset_configuration(configuration)
|
||||
|
|
|
@ -314,6 +314,41 @@ class MySqlAdmin(object):
|
|||
t = text(str(uu))
|
||||
client.execute(t)
|
||||
|
||||
def update_attributes(self, username, hostname, user_attrs):
|
||||
"""Change the attributes of one existing user."""
|
||||
LOG.debug("Changing the user attributes")
|
||||
LOG.debug("User is %s" % username)
|
||||
user = self._get_user(username, hostname)
|
||||
db_access = set()
|
||||
grantee = set()
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
q = query.Query()
|
||||
q.columns = ["grantee", "table_schema"]
|
||||
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
|
||||
q.group = ["grantee", "table_schema"]
|
||||
q.where = ["privilege_type != 'USAGE'"]
|
||||
t = text(str(q))
|
||||
db_result = client.execute(t)
|
||||
for db in db_result:
|
||||
grantee.add(db['grantee'])
|
||||
if db['grantee'] == "'%s'@'%s'" % (user.name, user.host):
|
||||
db_name = db['table_schema']
|
||||
db_access.add(db_name)
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
uu = query.UpdateUser(user.name, host=user.host,
|
||||
clear=user_attrs.get('password'),
|
||||
new_user=user_attrs.get('name'),
|
||||
new_host=user_attrs.get('host'))
|
||||
t = text(str(uu))
|
||||
client.execute(t)
|
||||
if user_attrs.get('name') is not None:
|
||||
if user_attrs['name'] not in grantee:
|
||||
if user_attrs.get('host') is None:
|
||||
host = user.host
|
||||
else:
|
||||
host = user_attrs.get('host')
|
||||
self.grant_access(user_attrs['name'], host, db_access)
|
||||
|
||||
def create_database(self, databases):
|
||||
"""Create the list of specified databases"""
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
|
|
|
@ -350,17 +350,31 @@ class CreateUser(object):
|
|||
|
||||
class UpdateUser(object):
|
||||
|
||||
def __init__(self, user, host=None, clear=None):
|
||||
def __init__(self, user, host=None, clear=None, new_user=None,
|
||||
new_host=None):
|
||||
self.user = user
|
||||
self.host = host
|
||||
self.clear = clear
|
||||
self.new_user = new_user
|
||||
self.new_host = new_host
|
||||
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
@property
|
||||
def _set_password(self):
|
||||
return "SET Password=PASSWORD('%s')" % self.clear
|
||||
if self.clear:
|
||||
return "Password=PASSWORD('%s')" % self.clear
|
||||
|
||||
@property
|
||||
def _set_user(self):
|
||||
if self.new_user:
|
||||
return "User='%s'" % self.new_user
|
||||
|
||||
@property
|
||||
def _set_host(self):
|
||||
if self.new_host:
|
||||
return "Host='%s'" % self.new_host
|
||||
|
||||
@property
|
||||
def _host(self):
|
||||
|
@ -368,6 +382,16 @@ class UpdateUser(object):
|
|||
return "%"
|
||||
return self.host
|
||||
|
||||
@property
|
||||
def _set_attrs(self):
|
||||
sets = [self._set_user,
|
||||
self._set_host,
|
||||
self._set_password,
|
||||
]
|
||||
sets = [s for s in sets if s]
|
||||
sets = ', '.join(sets)
|
||||
return 'SET %s' % sets
|
||||
|
||||
@property
|
||||
def _where(self):
|
||||
clauses = []
|
||||
|
@ -381,7 +405,7 @@ class UpdateUser(object):
|
|||
|
||||
def __str__(self):
|
||||
query = ["UPDATE mysql.user",
|
||||
self._set_password,
|
||||
self._set_attrs,
|
||||
self._where,
|
||||
]
|
||||
query = [q for q in query if q]
|
||||
|
|
|
@ -202,6 +202,80 @@ class TestUsers(object):
|
|||
self.dbaas.users.delete(instance_info.id, username,
|
||||
hostname=hostname)
|
||||
|
||||
@test()
|
||||
def test_updateduser_newname_host_unique(self):
|
||||
#The updated_username@hostname should not exist already
|
||||
users = []
|
||||
old_name = "testuser1"
|
||||
hostname = "192.168.0.1"
|
||||
users.append({"name": old_name, "password": "password",
|
||||
"host": hostname, "databases": []})
|
||||
users.append({"name": "testuser2", "password": "password",
|
||||
"host": hostname, "databases": []})
|
||||
self.dbaas.users.create(instance_info.id, users)
|
||||
user_new = {"name": "testuser2"}
|
||||
hostname = hostname.replace('.', '%2e')
|
||||
assert_raises(exceptions.BadRequest,
|
||||
self.dbaas.users.update_attributes, instance_info.id,
|
||||
old_name, user_new, hostname)
|
||||
assert_equal(400, self.dbaas.last_http_code)
|
||||
self.dbaas.users.delete(instance_info.id, old_name, hostname=hostname)
|
||||
self.dbaas.users.delete(instance_info.id, "testuser2",
|
||||
hostname=hostname)
|
||||
|
||||
@test()
|
||||
def test_updateduser_name_newhost_unique(self):
|
||||
# The username@updated_hostname should not exist already
|
||||
users = []
|
||||
username = "testuser"
|
||||
hostname1 = "192.168.0.1"
|
||||
hostname2 = "192.168.0.2"
|
||||
users.append({"name": username, "password": "password",
|
||||
"host": hostname1, "databases": []})
|
||||
users.append({"name": username, "password": "password",
|
||||
"host": hostname2, "databases": []})
|
||||
self.dbaas.users.create(instance_info.id, users)
|
||||
hostname1 = hostname1.replace('.', '%2e')
|
||||
hostname2 = hostname2.replace('.', '%2e')
|
||||
user_new = {"host": "192.168.0.2"}
|
||||
assert_raises(exceptions.BadRequest,
|
||||
self.dbaas.users.update_attributes, instance_info.id,
|
||||
username, user_new, hostname1)
|
||||
assert_equal(400, self.dbaas.last_http_code)
|
||||
self.dbaas.users.delete(instance_info.id, username, hostname=hostname1)
|
||||
self.dbaas.users.delete(instance_info.id, username, hostname=hostname2)
|
||||
|
||||
@test()
|
||||
def test_updateduser_newname_newhost_unique(self):
|
||||
# The updated_username@updated_hostname should not exist already
|
||||
users = []
|
||||
username = "testuser1"
|
||||
hostname1 = "192.168.0.1"
|
||||
hostname2 = "192.168.0.2"
|
||||
users.append({"name": username, "password": "password",
|
||||
"host": hostname1, "databases": []})
|
||||
users.append({"name": "testuser2", "password": "password",
|
||||
"host": hostname2, "databases": []})
|
||||
self.dbaas.users.create(instance_info.id, users)
|
||||
user_new = {"name": "testuser2", "host": "192.168.0.2"}
|
||||
hostname1 = hostname1.replace('.', '%2e')
|
||||
hostname2 = hostname2.replace('.', '%2e')
|
||||
assert_raises(exceptions.BadRequest,
|
||||
self.dbaas.users.update_attributes, instance_info.id,
|
||||
username, user_new, hostname1)
|
||||
assert_equal(400, self.dbaas.last_http_code)
|
||||
self.dbaas.users.delete(instance_info.id, username, hostname=hostname1)
|
||||
self.dbaas.users.delete(instance_info.id, "testuser2",
|
||||
hostname=hostname2)
|
||||
|
||||
@test()
|
||||
def test_cannot_change_rootpassword(self):
|
||||
# Cannot change password for a root user
|
||||
user_new = {"password": "12345"}
|
||||
assert_raises(exceptions.BadRequest,
|
||||
self.dbaas.users.update_attributes, instance_info.id,
|
||||
"root", user_new)
|
||||
|
||||
@test(depends_on=[test_create_users])
|
||||
def test_hostname_ipv4_restriction(self):
|
||||
# By default, user hostnames are required to be % or IPv4 addresses.
|
||||
|
|
|
@ -87,6 +87,33 @@ class FakeGuest(object):
|
|||
% (username, hostname))
|
||||
self.users[(username, hostname)]['password'] = password
|
||||
|
||||
def update_attributes(self, username, hostname, user_attrs):
|
||||
LOG.debug("Updating attributes")
|
||||
self._check_username(username)
|
||||
if (username, hostname) not in self.users:
|
||||
raise rd_exception.UserNotFound(
|
||||
"User %s@%s cannot be found on the instance."
|
||||
% (username, hostname))
|
||||
new_name = user_attrs.get('name')
|
||||
new_host = user_attrs.get('host')
|
||||
new_password = user_attrs.get('password')
|
||||
old_name = username
|
||||
old_host = hostname
|
||||
name = new_name or old_name
|
||||
host = new_host or old_host
|
||||
if new_name or new_host:
|
||||
old_grants = self.grants.get((old_name, old_host), set())
|
||||
self._create_user({
|
||||
"_name": name,
|
||||
"_host": host,
|
||||
"_password": self.users[(name, host)]['password'],
|
||||
"_databases": [],
|
||||
})
|
||||
self.grants[(name, host)] = old_grants
|
||||
del self.users[(old_name, old_host)]
|
||||
if new_password:
|
||||
self.users[(name, host)]['password'] = new_password
|
||||
|
||||
def create_database(self, databases):
|
||||
for db in databases:
|
||||
self.dbs[db['_name']] = db
|
||||
|
|
|
@ -16,7 +16,15 @@ import testtools
|
|||
from trove.guestagent import query
|
||||
|
||||
|
||||
class QueryTest(testtools.TestCase):
|
||||
class QueryTestBase(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(QueryTestBase, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(QueryTestBase, self).tearDown()
|
||||
|
||||
|
||||
class QueryTest(QueryTestBase):
|
||||
def setUp(self):
|
||||
super(QueryTest, self).setUp()
|
||||
|
||||
|
@ -73,6 +81,14 @@ class QueryTest(testtools.TestCase):
|
|||
myQuery = query.Query(limit=limit_count)
|
||||
self.assertEqual('LIMIT 20', myQuery._limit)
|
||||
|
||||
|
||||
class GrantTest(QueryTestBase):
|
||||
def setUp(self):
|
||||
super(GrantTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(GrantTest, self).tearDown()
|
||||
|
||||
def test_grant_no_arg_constr(self):
|
||||
grant = query.Grant()
|
||||
self.assertIsNotNone(grant)
|
||||
|
@ -237,3 +253,197 @@ class QueryTest(testtools.TestCase):
|
|||
"IDENTIFIED BY "
|
||||
"'password123';",
|
||||
str(grant))
|
||||
|
||||
|
||||
class RevokeTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(RevokeTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(RevokeTest, self).tearDown()
|
||||
|
||||
def test_defaults(self):
|
||||
r = query.Revoke()
|
||||
# Technically, this isn't valid for MySQL.
|
||||
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM ``@`%`;")
|
||||
|
||||
def test_permissions(self):
|
||||
r = query.Revoke()
|
||||
r.user = 'x'
|
||||
r.permissions = ['CREATE', 'DELETE', 'DROP']
|
||||
self.assertEqual(str(r),
|
||||
"REVOKE CREATE, DELETE, DROP ON *.* FROM `x`@`%`;")
|
||||
|
||||
def test_database(self):
|
||||
r = query.Revoke()
|
||||
r.user = 'x'
|
||||
r.database = 'foo'
|
||||
self.assertEqual(str(r), "REVOKE ALL ON `foo`.* FROM `x`@`%`;")
|
||||
|
||||
def test_table(self):
|
||||
r = query.Revoke()
|
||||
r.user = 'x'
|
||||
r.database = 'foo'
|
||||
r.table = 'bar'
|
||||
self.assertEqual(str(r), "REVOKE ALL ON `foo`.'bar' FROM `x`@`%`;")
|
||||
|
||||
def test_user(self):
|
||||
r = query.Revoke()
|
||||
r.user = 'x'
|
||||
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM `x`@`%`;")
|
||||
|
||||
def test_user_host(self):
|
||||
r = query.Revoke()
|
||||
r.user = 'x'
|
||||
r.host = 'y'
|
||||
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM `x`@`y`;")
|
||||
|
||||
|
||||
class CreateDatabaseTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(CreateDatabaseTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(CreateDatabaseTest, self).tearDown()
|
||||
|
||||
def test_defaults(self):
|
||||
cd = query.CreateDatabase('foo')
|
||||
self.assertEqual(str(cd), "CREATE DATABASE IF NOT EXISTS `foo`;")
|
||||
|
||||
def test_charset(self):
|
||||
cd = query.CreateDatabase('foo')
|
||||
cd.charset = "foo"
|
||||
self.assertEqual(str(cd), ("CREATE DATABASE IF NOT EXISTS `foo` "
|
||||
"CHARACTER SET = 'foo';"))
|
||||
|
||||
def test_collate(self):
|
||||
cd = query.CreateDatabase('foo')
|
||||
cd.collate = "bar"
|
||||
self.assertEqual(str(cd), ("CREATE DATABASE IF NOT EXISTS `foo` "
|
||||
"COLLATE = 'bar';"))
|
||||
|
||||
|
||||
class DropDatabaseTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(DropDatabaseTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(DropDatabaseTest, self).tearDown()
|
||||
|
||||
def test_defaults(self):
|
||||
dd = query.DropDatabase('foo')
|
||||
self.assertEqual(str(dd), "DROP DATABASE `foo`;")
|
||||
|
||||
|
||||
class CreateUserTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(CreateUserTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(CreateUserTest, self).tearDown()
|
||||
|
||||
def test_defaults(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
password = 'password123'
|
||||
cu = query.CreateUser(user=username, host=hostname, clear=password)
|
||||
self.assertEqual(str(cu), "CREATE USER :user@:host "
|
||||
"IDENTIFIED BY 'password123';")
|
||||
|
||||
|
||||
class UpdateUserTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(UpdateUserTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(UpdateUserTest, self).tearDown()
|
||||
|
||||
def test_rename_user(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_user = 'root123'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
new_user=new_user)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123' "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
def test_change_password(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_password = 'password123'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
clear=new_password)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET "
|
||||
"Password=PASSWORD('password123') "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
def test_change_host(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_host = '%'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
new_host=new_host)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET Host='%' "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
def test_change_password_and_username(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_user = 'root123'
|
||||
new_password = 'password123'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
clear=new_password, new_user=new_user)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
|
||||
"Password=PASSWORD('password123') "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
def test_change_username_password_hostname(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_user = 'root123'
|
||||
new_password = 'password123'
|
||||
new_host = '%'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
clear=new_password, new_user=new_user,
|
||||
new_host=new_host)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
|
||||
"Host='%', "
|
||||
"Password=PASSWORD('password123') "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
def test_change_username_and_hostname(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
new_user = 'root123'
|
||||
new_host = '%'
|
||||
uu = query.UpdateUser(user=username, host=hostname,
|
||||
new_host=new_host, new_user=new_user)
|
||||
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
|
||||
"Host='%' "
|
||||
"WHERE User = 'root' "
|
||||
"AND Host = 'localhost';")
|
||||
|
||||
|
||||
class DropUserTest(QueryTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(DropUserTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(DropUserTest, self).tearDown()
|
||||
|
||||
def test_defaults(self):
|
||||
username = 'root'
|
||||
hostname = 'localhost'
|
||||
du = query.DropUser(user=username, host=hostname)
|
||||
self.assertEqual(str(du), "DROP USER `root`@`localhost`;")
|
||||
|
|
|
@ -34,12 +34,12 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_get_update_user_pw(self):
|
||||
body = {'users': [{'name': 'test', 'password': 'test'}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
self.assertTrue('users' in schema['properties'])
|
||||
|
||||
def test_get_update_user_db(self):
|
||||
body = {'databases': [{'name': 'test'}, {'name': 'test'}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
self.assertTrue('databases' in schema['properties'])
|
||||
|
||||
def test_validate_create_empty(self):
|
||||
|
@ -116,7 +116,7 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_validate_update_empty(self):
|
||||
body = {"users": []}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -126,7 +126,7 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_validate_update_short_password(self):
|
||||
body = {"users": [{"name": "joe", "password": ""}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -139,7 +139,7 @@ class TestUserController(TestCase):
|
|||
def test_validate_update_user_complete(self):
|
||||
body = {"users": [{"name": "joe", "password": "",
|
||||
"databases": [{"name": "testdb"}]}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -152,7 +152,7 @@ class TestUserController(TestCase):
|
|||
def test_validate_update_user_with_db_short_password(self):
|
||||
body = {"users": [{"name": "joe", "password": "",
|
||||
"databases": [{"name": "testdb"}]}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -162,7 +162,7 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_validate_update_no_password(self):
|
||||
body = {"users": [{"name": "joe"}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -172,13 +172,13 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_validate_update_database_complete(self):
|
||||
body = {"databases": [{"name": "test1"}, {"name": "test2"}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertTrue(validator.is_valid(body))
|
||||
|
||||
def test_validate_update_database_empty(self):
|
||||
body = {"databases": []}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -187,7 +187,7 @@ class TestUserController(TestCase):
|
|||
|
||||
def test_validate_update_short_name(self):
|
||||
body = {"users": [{"name": ""}]}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
schema = self.controller.get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
@ -199,11 +199,28 @@ class TestUserController(TestCase):
|
|||
Equals("'' does not match '^.*[0-9a-zA-Z]+.*$'"))
|
||||
self.assertThat(errors[1].path.pop(), Equals("name"))
|
||||
|
||||
def test_get_update_user_attributes(self):
|
||||
body = {'user': {'name': 'test'}}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
self.assertTrue('user' in schema['properties'])
|
||||
|
||||
def test_validate_update_user_attributes(self):
|
||||
body = {'user': {'name': 'test', 'password': 'test', 'host': '%'}}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertTrue(validator.is_valid(body))
|
||||
|
||||
def test_validate_update_user_attributes_empty(self):
|
||||
body = {"user": {}}
|
||||
schema = self.controller.get_schema('update', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
|
||||
|
||||
class TestUserAccessController(TestCase):
|
||||
def test_validate_update_db(self):
|
||||
body = {"databases": []}
|
||||
schema = (UserAccessController()).get_schema('update', body)
|
||||
schema = (UserAccessController()).get_schema('update_all', body)
|
||||
validator = jsonschema.Draft4Validator(schema)
|
||||
self.assertFalse(validator.is_valid(body))
|
||||
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
|
||||
|
|
Loading…
Reference in New Issue