Fixes test in response to optional hostname fix

Several tests have had their behavior changed
with the client appending the default host.
With that new behavior reversed in the client,
some tests use a username that contains an @,
causing an ambiguous case for the API to parse.
Deliberately including the host in the tests
fixes this.

Fixes Bug #1171559

Change-Id: I967eb4b21d8ca3e2c2721db6f362b8dac2551346
This commit is contained in:
Ed Cranford 2013-04-11 14:41:16 -05:00
parent df4405ce54
commit f224dd5dec
7 changed files with 87 additions and 43 deletions

View File

@ -84,7 +84,7 @@ CUSTOM_SERIALIZER_METADATA = {
'flavor': {'id': '', 'ram': '', 'name': ''},
'link': {'href': '', 'rel': ''},
'database': {'name': ''},
'user': {'name': '', 'password': ''},
'user': {'name': '', 'password': '', 'host': ''},
'account': {'id': ''},
'security_group': {'id': '', 'name': '', 'description': '', 'user': '',
'tenant_id': ''},

View File

@ -37,21 +37,18 @@ def populate_databases(dbs):
def populate_users(users):
"""Create a serializable request containing users"""
try:
users_data = []
for user in users:
u = guest_models.MySQLUser()
u.name = user.get('name', '')
u.host = user.get('host', '%')
u.password = user.get('password', '')
dbs = user.get('databases', '')
if dbs:
for db in dbs:
u.databases = db.get('name', '')
users_data.append(u.serialize())
return users_data
except ValueError as ve:
raise exception.BadRequest(str(ve))
users_data = []
for user in users:
u = guest_models.MySQLUser()
u.name = user.get('name', '')
u.host = user.get('host')
u.password = user.get('password', '')
dbs = user.get('databases', '')
if dbs:
for db in dbs:
u.databases = db.get('name', '')
users_data.append(u.serialize())
return users_data
def unquote_user_host(user_hostname):

View File

@ -60,6 +60,9 @@ class User(object):
@classmethod
def load(cls, context, instance_id, username, hostname):
load_and_verify(context, instance_id)
validate = guest_models.MySQLUser()
validate.name = username
validate.host = hostname
client = create_guest_client(context, instance_id)
found_user = client.get_user(username=username, hostname=hostname)
if not found_user:
@ -79,6 +82,8 @@ class User(object):
for user in users:
user_name = user['_name']
host_name = user['_host']
if host_name is None:
host_name = '%'
userhost = "%s@%s" % (user_name, host_name)
existing_users, _nadda = Users.load_with_client(
client,

View File

@ -91,8 +91,11 @@ class UserController(wsgi.Controller):
context = req.environ[wsgi.CONTEXT_KEY]
self.validate(body)
users = body['users']
model_users = populate_users(users)
models.User.create(context, instance_id, model_users)
try:
model_users = populate_users(users)
models.User.create(context, instance_id, model_users)
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
return wsgi.Result(None, 202)
def delete(self, req, tenant_id, instance_id, id):
@ -100,18 +103,17 @@ class UserController(wsgi.Controller):
LOG.info(_("req : '%s'\n\n") % req)
context = req.environ[wsgi.CONTEXT_KEY]
username, host = unquote_user_host(id)
user = None
try:
user = guest_models.MySQLUser()
user.name = username
user.host = host
except ValueError as ve:
raise exception.BadRequest(str(ve))
found_user = models.User.load(context, instance_id, username,
host)
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
if not user:
raise exception.UserNotFound(uuid=id)
models.User.delete(context, instance_id, user.serialize())
return wsgi.Result(None, 202)
@ -121,7 +123,11 @@ class UserController(wsgi.Controller):
LOG.info(_("req : '%s'\n\n") % req)
context = req.environ[wsgi.CONTEXT_KEY]
username, host = unquote_user_host(id)
user = models.User.load(context, instance_id, username, host)
user = None
try:
user = models.User.load(context, instance_id, username, host)
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
if not user:
raise exception.UserNotFound(uuid=id)
view = views.UserView(user)
@ -136,11 +142,14 @@ class UserController(wsgi.Controller):
users = body['users']
model_users = []
for user in users:
mu = guest_models.MySQLUser()
mu.name = user['name']
mu.host = user.get('host')
mu.password = user['password']
model_users.append(mu)
try:
mu = guest_models.MySQLUser()
mu.name = user['name']
mu.host = user.get('host')
mu.password = user['password']
model_users.append(mu)
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
models.User.change_password(context, instance_id, model_users)
return wsgi.Result(None, 202)
@ -153,8 +162,10 @@ class UserAccessController(wsgi.Controller):
"""Validate that the request has all the required parameters"""
if not body:
raise exception.BadRequest("The request contains an empty body")
if not body.get('databases', ''):
if not body.get('databases', []):
raise exception.MissingKey(key='databases')
if type(body['databases']) is not list:
raise exception.BadRequest("Databases must be provided as a list.")
for database in body.get('databases'):
if not database.get('name', ''):
raise exception.MissingKey(key='name')
@ -163,8 +174,8 @@ class UserAccessController(wsgi.Controller):
username, hostname = unquote_user_host(user_id)
try:
user = models.User.load(context, instance_id, username, hostname)
except ValueError as ve:
raise exception.BadRequest(str(ve))
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
if not user:
raise exception.UserNotFound(uuid=user_id)
return user
@ -183,6 +194,8 @@ class UserAccessController(wsgi.Controller):
def update(self, req, body, tenant_id, instance_id, user_id):
"""Grant access for a user to one or more databases."""
LOG.info(_("Granting user access for instance '%s'") % instance_id)
LOG.info(_("req : '%s'\n\n") % req)
context = req.environ[wsgi.CONTEXT_KEY]
self.validate(body)
user = self._get_user(context, instance_id, user_id)
@ -193,6 +206,8 @@ class UserAccessController(wsgi.Controller):
def delete(self, req, tenant_id, instance_id, user_id, id):
"""Revoke access for a user."""
LOG.info(_("Revoking user access for instance '%s'") % instance_id)
LOG.info(_("req : '%s'\n\n") % req)
context = req.environ[wsgi.CONTEXT_KEY]
user = self._get_user(context, instance_id, user_id)
username, hostname = unquote_user_host(user_id)
@ -249,8 +264,8 @@ class SchemaController(wsgi.Controller):
schema = guest_models.MySQLDatabase()
schema.name = id
models.Schema.delete(context, instance_id, schema.serialize())
except ValueError as ve:
raise exception.BadRequest(str(ve))
except (ValueError, AttributeError) as e:
raise exception.BadRequest(msg=e)
return wsgi.Result(None, 202)
def show(self, req, tenant_id, instance_id, id):

View File

@ -181,7 +181,11 @@ class InstanceController(wsgi.Controller):
flavor_ref = body['instance']['flavorRef']
flavor_id = utils.get_id_from_href(flavor_ref)
databases = populate_databases(body['instance'].get('databases', []))
users = populate_users(body['instance'].get('users', []))
users = None
try:
users = populate_users(body['instance'].get('users', []))
except ValueError as ve:
raise exception.BadRequest(msg=ve)
if body['instance'].get('volume', None) is not None:
try:
volume_size = int(body['instance']['volume']['size'])

View File

@ -432,11 +432,35 @@ class TestUserAccessNegative(UserAccessBase):
self._negative_user_test("test.user", self.databases)
@test
def test_user_empty(self):
def test_user_empty_no_host(self):
# This creates a request to .../<instance-id>/users//databases,
# which is parsed to mean "show me user 'databases', which in this
# case is a valid username, but not one of an extant user.
self._negative_user_test("", self.databases, 400, 400, 400, 400)
self._negative_user_test("", self.databases, 400, 500, 404, 404)
@test
def test_user_empty_with_host(self):
#self._negative_user_test("", self.databases, 400, 400, 400, 400)
# Try and fail to create the user.
empty_user = {"name": "", "host": "%",
"password": "password", "databases": []}
assert_raises(exceptions.BadRequest,
self.dbaas.users.create,
instance_info.id,
[empty_user])
assert_equal(400, self.dbaas.last_http_code)
assert_raises(exceptions.BadRequest, self.dbaas.users.grant,
instance_info.id, "", [], "%")
assert_equal(400, self.dbaas.last_http_code)
assert_raises(exceptions.BadRequest, self.dbaas.users.list_access,
instance_info.id, "", "%")
assert_equal(400, self.dbaas.last_http_code)
assert_raises(exceptions.BadRequest, self.dbaas.users.revoke,
instance_info.id, "", "db", "%")
assert_equal(400, self.dbaas.last_http_code)
@test
def test_user_nametoolong(self):

View File

@ -53,10 +53,8 @@ class TestUsers(object):
"""
username = "tes!@#tuser"
username_urlencoded = "tes%21%40%23tuser"
password = "testpa$^%ssword"
username1 = "anous*&^er"
username1_urlendcoded = "anous%2A%26%5Eer"
password1 = "anopas*?.sword"
db1 = "usersfirstdb"
db2 = "usersseconddb"
@ -136,7 +134,8 @@ class TestUsers(object):
@test(depends_on=[test_create_users_list])
def test_get_one_user(self):
user = self.dbaas.users.get(instance_info.id, username=self.username)
user = self.dbaas.users.get(instance_info.id, username=self.username,
hostname='%')
assert_equal(200, self.dbaas.last_http_code)
assert_equal(user.name, self.username)
assert_equal(1, len(user.databases))
@ -159,9 +158,9 @@ class TestUsers(object):
@test(depends_on=[test_create_users_list],
runs_after=[test_fails_when_creating_user_twice])
def test_delete_users(self):
self.dbaas.users.delete(instance_info.id, self.username_urlencoded)
self.dbaas.users.delete(instance_info.id, self.username, hostname='%')
assert_equal(202, self.dbaas.last_http_code)
self.dbaas.users.delete(instance_info.id, self.username1_urlendcoded)
self.dbaas.users.delete(instance_info.id, self.username1, hostname='%')
assert_equal(202, self.dbaas.last_http_code)
if not FAKE:
time.sleep(5)
@ -239,7 +238,7 @@ class TestUsers(object):
fail("User %s not added to collection." % user)
# Confirm via API get.
result = self.dbaas.users.get(instance_info.id, user)
result = self.dbaas.users.get(instance_info.id, user, '%')
assert_equal(200, self.dbaas.last_http_code)
if result.name != user:
fail("User %s not found via get." % user)