# Copyright (c) 2011 Justin Santa Barbara # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from urllib import parse from oslo_log import log as logging from oslo_serialization import jsonutils import requests LOG = logging.getLogger(__name__) class APIResponse(object): """Decoded API Response This provides a decoded version of the Requests response which include a json decoded body, far more convenient for testing that returned structures are correct, or using parts of returned structures in tests. This class is a simple wrapper around dictionaries for API responses in tests. It includes extra attributes so that they can be inspected in addition to the attributes. All json responses from Nova APIs are dictionary compatible, or blank, so other possible base classes are not needed. """ status = 200 """The HTTP status code as an int""" content = "" """The Raw HTTP response body as a string""" body = {} """The decoded json body as a dictionary""" headers = {} """Response headers as a dictionary""" def __init__(self, response): """Construct an API response from a Requests response :param response: a ``requests`` library response """ super(APIResponse, self).__init__() self.status = response.status_code self.content = response.content if self.content: # The Compute API and Placement API handle error responses a bit # differently so we need to check the content-type header to # figure out what to do. content_type = response.headers.get('content-type') if 'application/json' in content_type: self.body = response.json() elif 'text/html' in content_type: self.body = response.text else: raise ValueError('Unexpected response content-type: %s' % content_type) self.headers = response.headers def __str__(self): # because __str__ falls back to __repr__ we can still use repr # on self but add in the other attributes. return "" % (self.body, self.status) class OpenStackApiException(Exception): def __init__(self, message=None, response=None): self.response = response if not message: message = 'Unspecified error' if response: _status = response.status_code _body = response.content message = ('%(message)s\nStatus Code: %(_status)s\n' 'Body: %(_body)s' % {'message': message, '_status': _status, '_body': _body}) super(OpenStackApiException, self).__init__(message) class OpenStackApiAuthenticationException(OpenStackApiException): def __init__(self, response=None, message=None): if not message: message = "Authentication error" super(OpenStackApiAuthenticationException, self).__init__(message, response) class OpenStackApiAuthorizationException(OpenStackApiException): def __init__(self, response=None, message=None): if not message: message = "Authorization error" super(OpenStackApiAuthorizationException, self).__init__(message, response) class OpenStackApiNotFoundException(OpenStackApiException): def __init__(self, response=None, message=None): if not message: message = "Item not found" super(OpenStackApiNotFoundException, self).__init__(message, response) class TestOpenStackClient(object): """Simple OpenStack API Client. This is a really basic OpenStack API client that is under our control, so we can make changes / insert hooks for testing By default, no roles are implied and must be passed like roles=['reader', 'member'] in order for the user to have privileges on the project, just like in a real deployment. """ def __init__(self, auth_user, base_url, project_id=None, roles=None): super(TestOpenStackClient, self).__init__() self.auth_user = auth_user self.base_url = base_url if project_id is None: self.project_id = "6f70656e737461636b20342065766572" else: self.project_id = project_id self.microversion = None self.roles = roles or [] def request(self, url, method='GET', body=None, headers=None): _headers = {'Content-Type': 'application/json'} _headers.update(headers or {}) response = requests.request(method, url, data=body, headers=_headers) return response def api_request(self, relative_uri, check_response_status=None, strip_version=False, **kwargs): base_uri = self.base_url if strip_version: # The base_uri is either http://%(host)s:%(port)s/%(api_version)s # or http://%(host)s:%(port)s/%(api_version)s/%(project_id)s # NOTE(efried): Using urlparse was not easier :) chunked = base_uri.split('/') base_uri = '/'.join(chunked[:3]) # Restore the project ID if present if len(chunked) == 5: base_uri += '/' + chunked[-1] full_uri = '%s/%s' % (base_uri, relative_uri) headers = kwargs.setdefault('headers', {}) if ('X-OpenStack-Nova-API-Version' in headers or 'OpenStack-API-Version' in headers): raise Exception('Microversion should be set via ' 'microversion attribute in API client.') elif self.microversion: headers['X-OpenStack-Nova-API-Version'] = self.microversion headers['OpenStack-API-Version'] = 'compute %s' % self.microversion headers.setdefault('X-Auth-User', self.auth_user) headers.setdefault('X-User-Id', self.auth_user) headers.setdefault('X-Auth-Project-Id', self.project_id) headers.setdefault('X-Roles', ','.join(self.roles)) response = self.request(full_uri, **kwargs) http_status = response.status_code LOG.debug("%(relative_uri)s => code %(http_status)s", {'relative_uri': relative_uri, 'http_status': http_status}) if check_response_status: if http_status not in check_response_status: if http_status == 404: raise OpenStackApiNotFoundException(response=response) elif http_status == 401: raise OpenStackApiAuthorizationException(response=response) else: raise OpenStackApiException( message="Unexpected status code: %s" % response.text, response=response) return response def _decode_json(self, response): resp = APIResponse(status=response.status_code) if response.content: resp.body = jsonutils.loads(response.content) return resp def api_get(self, relative_uri, **kwargs): kwargs.setdefault('check_response_status', [200]) return APIResponse(self.api_request(relative_uri, **kwargs)) def api_post(self, relative_uri, body, **kwargs): kwargs['method'] = 'POST' if body: headers = kwargs.setdefault('headers', {}) headers['Content-Type'] = 'application/json' kwargs['body'] = jsonutils.dumps(body) kwargs.setdefault('check_response_status', [200, 201, 202, 204]) return APIResponse(self.api_request(relative_uri, **kwargs)) def api_put(self, relative_uri, body, **kwargs): kwargs['method'] = 'PUT' if body: headers = kwargs.setdefault('headers', {}) headers['Content-Type'] = 'application/json' kwargs['body'] = jsonutils.dumps(body) kwargs.setdefault('check_response_status', [200, 202, 204]) return APIResponse(self.api_request(relative_uri, **kwargs)) def api_delete(self, relative_uri, **kwargs): kwargs['method'] = 'DELETE' kwargs.setdefault('check_response_status', [200, 202, 204]) return APIResponse(self.api_request(relative_uri, **kwargs)) ##################################### # # Convenience methods # # The following are a set of convenience methods to get well known # resources, they can be helpful in setting up resources in # tests. All of these convenience methods throw exceptions if they # get a non 20x status code, so will appropriately abort tests if # they fail. # # They all return the most relevant part of their response body as # decoded data structure. # ##################################### def get_server(self, server_id): return self.api_get('/servers/%s' % server_id).body['server'] def get_servers(self, detail=True, search_opts=None): rel_url = '/servers/detail' if detail else '/servers' if search_opts is not None: qparams = {} for opt, val in search_opts.items(): qparams[opt] = val if qparams: query_string = "?%s" % parse.urlencode(qparams) rel_url += query_string return self.api_get(rel_url).body['servers'] def post_server(self, server): response = self.api_post('/servers', server).body if 'reservation_id' in response: return response else: return response['server'] def put_server(self, server_id, server): return self.api_put('/servers/%s' % server_id, server).body def post_server_action(self, server_id, data, **kwargs): return self.api_post( '/servers/%s/action' % server_id, data, **kwargs).body def delete_server(self, server_id): return self.api_delete('/servers/%s' % server_id) def force_down_service(self, host, binary, forced_down): req = { "host": host, "binary": binary, "forced_down": forced_down } return self.api_put('/os-services/force-down', req).body['service'] def get_image(self, image_id): return self.api_get('/images/%s' % image_id).body['image'] def get_images(self, detail=True): rel_url = '/images/detail' if detail else '/images' return self.api_get(rel_url).body['images'] def post_image(self, image): return self.api_post('/images', image).body['image'] def delete_image(self, image_id): return self.api_delete('/images/%s' % image_id) def put_image_meta_key(self, image_id, key, value): """Creates or updates a given image metadata key/value pair.""" req_body = { 'meta': { key: value } } return self.api_put('/images/%s/metadata/%s' % (image_id, key), req_body) def get_flavor(self, flavor_id): return self.api_get('/flavors/%s' % flavor_id).body['flavor'] def get_flavors(self, detail=True): rel_url = '/flavors/detail' if detail else '/flavors' return self.api_get(rel_url).body['flavors'] def post_flavor(self, flavor): return self.api_post('/flavors', flavor).body['flavor'] def delete_flavor(self, flavor_id): return self.api_delete('/flavors/%s' % flavor_id) def get_extra_specs(self, flavor_id): return self.api_get( '/flavors/%s/os-extra_specs' % flavor_id ).body['extra_specs'] def get_extra_spec(self, flavor_id, spec_id): return self.api_get( '/flavors/%s/os-extra_specs/%s' % (flavor_id, spec_id), ).body def post_extra_spec(self, flavor_id, body, **_params): url = '/flavors/%s/os-extra_specs' % flavor_id if _params: query_string = '?%s' % parse.urlencode(list(_params.items())) url += query_string return self.api_post(url, body) def put_extra_spec(self, flavor_id, spec_id, body, **_params): url = '/flavors/%s/os-extra_specs/%s' % (flavor_id, spec_id) if _params: query_string = '?%s' % parse.urlencode(list(_params.items())) url += query_string return self.api_put(url, body) def get_volume(self, volume_id): return self.api_get('/os-volumes/%s' % volume_id).body['volume'] def get_volumes(self, detail=True): rel_url = '/os-volumes/detail' if detail else '/os-volumes' return self.api_get(rel_url).body['volumes'] def post_volume(self, volume): return self.api_post('/os-volumes', volume).body['volume'] def delete_volume(self, volume_id): return self.api_delete('/os-volumes/%s' % volume_id) def get_snapshot(self, snap_id): return self.api_get('/os-snapshots/%s' % snap_id).body['snapshot'] def get_snapshots(self, detail=True): rel_url = '/os-snapshots/detail' if detail else '/os-snapshots' return self.api_get(rel_url).body['snapshots'] def post_snapshot(self, snapshot): return self.api_post('/os-snapshots', snapshot).body['snapshot'] def delete_snapshot(self, snap_id): return self.api_delete('/os-snapshots/%s' % snap_id) def get_server_volume(self, server_id, volume_id): return self.api_get('/servers/%s/os-volume_attachments/%s' % (server_id, volume_id) ).body['volumeAttachment'] def get_server_volumes(self, server_id): return self.api_get('/servers/%s/os-volume_attachments' % (server_id)).body['volumeAttachments'] def post_server_volume(self, server_id, volume_attachment): return self.api_post('/servers/%s/os-volume_attachments' % (server_id), volume_attachment ).body['volumeAttachment'] def put_server_volume(self, server_id, original_volume_id, volume_id): return self.api_put('/servers/%s/os-volume_attachments/%s' % (server_id, original_volume_id), {"volumeAttachment": {"volumeId": volume_id}}) def delete_server_volume(self, server_id, volume_id): return self.api_delete('/servers/%s/os-volume_attachments/%s' % (server_id, volume_id)) def post_server_metadata(self, server_id, metadata): post_body = {'metadata': {}} post_body['metadata'].update(metadata) return self.api_post('/servers/%s/metadata' % server_id, post_body).body['metadata'] def delete_server_metadata(self, server_id, key): return self.api_delete('/servers/%s/metadata/%s' % (server_id, key)) def get_server_groups(self, all_projects=None): if all_projects: return self.api_get( '/os-server-groups?all_projects').body['server_groups'] else: return self.api_get('/os-server-groups').body['server_groups'] def get_server_group(self, group_id): return self.api_get('/os-server-groups/%s' % group_id).body['server_group'] def post_server_groups(self, group): response = self.api_post('/os-server-groups', {"server_group": group}) return response.body['server_group'] def delete_server_group(self, group_id): self.api_delete('/os-server-groups/%s' % group_id) def create_server_external_events(self, events): body = {'events': events} return self.api_post('/os-server-external-events', body).body['events'] def get_instance_actions(self, server_id): return self.api_get('/servers/%s/os-instance-actions' % (server_id)).body['instanceActions'] def get_instance_action_details(self, server_id, request_id): return self.api_get('/servers/%s/os-instance-actions/%s' % (server_id, request_id)).body['instanceAction'] def post_aggregate(self, aggregate): return self.api_post('/os-aggregates', aggregate).body['aggregate'] def delete_aggregate(self, aggregate_id): self.api_delete('/os-aggregates/%s' % aggregate_id) def add_host_to_aggregate(self, aggregate_id, host): return self.api_post('/os-aggregates/%s/action' % aggregate_id, {'add_host': {'host': host}}) def remove_host_from_aggregate(self, aggregate_id, host): return self.api_post('/os-aggregates/%s/action' % aggregate_id, {'remove_host': {'host': host}}) def get_limits(self): return self.api_get('/limits').body['limits'] def get_server_tags(self, server_id): """Get the tags on the given server. :param server_id: The server uuid :return: The list of tags from the response """ return self.api_get('/servers/%s/tags' % server_id).body['tags'] def put_server_tags(self, server_id, tags): """Put (or replace) a list of tags on the given server. Returns the list of tags from the response. """ return self.api_put('/servers/%s/tags' % server_id, {'tags': tags}).body['tags'] def get_port_interfaces(self, server_id): return self.api_get('/servers/%s/os-interface' % (server_id)).body['interfaceAttachments'] def attach_interface(self, server_id, post): return self.api_post('/servers/%s/os-interface' % server_id, post) def detach_interface(self, server_id, port_id): return self.api_delete('/servers/%s/os-interface/%s' % (server_id, port_id)) def get_services(self, binary=None, host=None): url = '/os-services?' if binary: url += 'binary=%s&' % binary if host: url += 'host=%s&' % host return self.api_get(url).body['services'] def put_service(self, service_id, req): return self.api_put( '/os-services/%s' % service_id, req).body['service'] def post_keypair(self, keypair): return self.api_post('/os-keypairs', keypair).body['keypair'] def delete_keypair(self, keypair_name): self.api_delete('/os-keypairs/%s' % keypair_name) def post_aggregate_action(self, aggregate_id, body): return self.api_post( '/os-aggregates/%s/action' % aggregate_id, body).body['aggregate'] def get_active_migrations(self, server_id): return self.api_get('/servers/%s/migrations' % server_id).body['migrations'] def get_migrations(self, user_id=None, project_id=None): url = '/os-migrations?' if user_id: url += 'user_id=%s&' % user_id if project_id: url += 'project_id=%s&' % project_id return self.api_get(url).body['migrations'] def force_complete_migration(self, server_id, migration_id): return self.api_post( '/servers/%s/migrations/%s/action' % (server_id, migration_id), {'force_complete': None}) def delete_migration(self, server_id, migration_id): return self.api_delete( '/servers/%s/migrations/%s' % (server_id, migration_id)) def put_aggregate(self, aggregate_id, body): return self.api_put( '/os-aggregates/%s' % aggregate_id, body).body['aggregate'] def get_hypervisor_stats(self): return self.api_get( '/os-hypervisors/statistics').body['hypervisor_statistics'] def get_service_id(self, binary_name): for service in self.get_services(): if service['binary'] == binary_name: return service['id'] raise OpenStackApiNotFoundException('Service cannot be found.') def put_service_force_down(self, service_id, forced_down): req = { 'forced_down': forced_down } return self.api_put('os-services/%s' % service_id, req).body['service'] def get_server_diagnostics(self, server_id): return self.api_get('/servers/%s/diagnostics' % server_id).body def get_quota_detail(self, project_id=None, user_id=None): if not project_id: project_id = self.project_id url = '/os-quota-sets/%s/detail' if user_id: url += '?user_id=%s' % user_id return self.api_get(url % project_id).body['quota_set'] def update_quota(self, quotas, project_id=None, user_id=None): if not project_id: project_id = self.project_id url = '/os-quota-sets/%s' if user_id: url += '?user_id=%s' % user_id body = {'quota_set': {}} body['quota_set'].update(quotas) return self.api_put(url % project_id, body).body['quota_set']