Replaces httplib with requests lib in Quobyte RPC layer

Simplifies the Manila Quobyte drivers rpc layer by using
requests library instead of httplib. This reduces overall
complexity and fixes randomly occurring rpc send issues.

Closes-Bug: #1560345

Change-Id: I4cc602722ece3761331bb3fa1c6bce2bbd35c78f
This commit is contained in:
Silvan Kaiser 2016-07-21 09:49:11 +02:00
parent cbb5a1306e
commit b75ab85582
3 changed files with 148 additions and 412 deletions

View File

@ -18,184 +18,88 @@
Control Quobyte over its JSON RPC API.
"""
import base64
import socket
import ssl
import requests
from requests import auth
from requests import codes
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import excutils
import six
from six.moves import http_client
import six.moves.urllib.parse as urlparse
from manila import exception
from manila.i18n import _, _LW
from manila.i18n import _LW
from manila import utils
LOG = log.getLogger(__name__)
ERROR_ENOENT = 2
CONNECTION_RETRIES = 3
class BasicAuthCredentials(object):
def __init__(self, username, password):
self._username = username
self._password = password
@property
def username(self):
return self._username
def get_authorization_header(self):
header = '%s:%s' % (self._username, self._password)
auth = base64.standard_b64encode(six.b(header))
return 'BASIC %s' % auth.decode()
class HTTPSConnectionWithCaVerification(http_client.HTTPConnection):
"""Verify server cert against a given CA certificate."""
default_port = http_client.HTTPS_PORT
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_file=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
http_client.HTTPConnection.__init__(self, host, port, timeout=timeout)
self.key_file = key_file
self.cert_file = cert_file
self.ca_file = ca_file
def connect(self):
"""Connect to a host on a given (SSL) port."""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
http_client.__all__.append("HTTPSConnectionWithCaVerification")
class JsonRpc(object):
def __init__(self, url, user_credentials, ca_file=None):
def __init__(self, url, user_credentials, ca_file=None, key_file=None,
cert_file=None):
parsedurl = urlparse.urlparse(url)
self._url = parsedurl.geturl()
self._netloc = parsedurl.netloc
self._ca_file = ca_file
if parsedurl.scheme == 'https':
if self._ca_file:
self._connection = HTTPSConnectionWithCaVerification(
self._netloc,
ca_file=self._ca_file.name)
else:
self._connection = http_client.HTTPSConnection(self._netloc)
self._url_scheme = parsedurl.scheme
if self._url_scheme == 'https':
if not self._ca_file:
self._ca_file = False
LOG.warning(_LW(
"Will not verify the server certificate of the API service"
" because the CA certificate is not available."))
else:
self._connection = http_client.HTTPConnection(self._netloc)
self._id = 0
self._fail_fast = True
self._credentials = BasicAuthCredentials(
self._credentials = auth.HTTPBasicAuth(
user_credentials[0], user_credentials[1])
self._require_cert_verify = self._ca_file is not None
self._disabled_cert_verification = False
self._key_file = key_file
self._cert_file = cert_file
@utils.synchronized('quobyte-request')
def call(self, method_name, user_parameters):
# prepare request
self._id += 1
parameters = {'retry': 'INFINITELY'} # Backend specific setting
if user_parameters:
parameters.update(user_parameters)
call_body = {'jsonrpc': '2.0',
'method': method_name,
'params': parameters,
'id': six.text_type(self._id)}
self.call_counter = 0
self._connection.connect() # prevents http_client timing issue
post_data = {
'jsonrpc': '2.0',
'method': method_name,
'params': parameters,
'id': six.text_type(self._id),
}
LOG.debug("Request payload to be send is: %s",
jsonutils.dumps(post_data))
while self.call_counter < CONNECTION_RETRIES:
self.call_counter += 1
try:
self._id += 1
call_body['id'] = six.text_type(self._id)
LOG.debug("Posting to Quobyte backend: %s",
jsonutils.dumps(call_body))
self._connection.request(
"POST", self._url + '/', jsonutils.dumps(call_body),
dict(Authorization=(self._credentials.
get_authorization_header())))
# send request
if self._url_scheme == 'https':
if self._cert_file:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials,
verify=self._ca_file,
cert=(self._cert_file, self._key_file))
else:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials,
verify=self._ca_file)
else:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials)
response = self._connection.getresponse()
self._throw_on_http_error(response)
result = jsonutils.loads(response.read())
LOG.debug("Retrieved data from Quobyte backend: %s", result)
return self._checked_for_application_error(result)
except ssl.SSLError as e:
# Generic catch because OpenSSL does not return
# meaningful errors.
if (not self._disabled_cert_verification
and not self._require_cert_verify):
LOG.warning(_LW(
"Could not verify server certificate of "
"API service against CA."))
self._connection.close()
# Core HTTPSConnection does no certificate verification.
self._connection = http_client.HTTPSConnection(
self._netloc)
self._disabled_cert_verification = True
else:
raise exception.QBException(_(
"Client SSL subsystem returned error: %s") % e)
except http_client.BadStatusLine as e:
raise exception.QBException(_(
"If SSL is enabled for the API service, the URL must"
" start with 'https://' for the URL. Failed to parse"
" status code from server response. Error was %s")
% e)
except socket.error as se:
error_code = se.errno
error_msg = se.strerror
composite_msg = _("Socket error No. %(code)s (%(msg)s) "
"connecting to API with") % {
'code': (six.text_type(error_code)),
'msg': error_msg}
if self._fail_fast:
raise exception.QBException(composite_msg)
else:
LOG.warning(composite_msg)
except http_client.HTTPException as e:
with excutils.save_and_reraise_exception() as ctxt:
if self._fail_fast:
ctxt.reraise = True
else:
LOG.warning(_LW("Encountered error, retrying: %s"),
six.text_type(e))
ctxt.reraise = False
# eval request response
if result.status_code == codes['OK']:
LOG.debug("Retrieved data from Quobyte backend: %s", result.text)
response = result.json()
return self._checked_for_application_error(response)
raise exception.QBException("Unable to connect to backend after "
"%s retries" %
six.text_type(CONNECTION_RETRIES))
def _throw_on_http_error(self, response):
if response.status == 401:
raise exception.QBException(
_("JSON RPC failed: unauthorized user %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
elif response.status >= 300:
raise exception.QBException(
_("JSON RPC failed: %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
# If things did not work out provide error info
LOG.debug("Backend request resulted in error: %s" % result.text)
result.raise_for_status()
def _checked_for_application_error(self, result):
if 'error' in result and result['error']:

View File

@ -78,9 +78,10 @@ class QuobyteShareDriver(driver.ExecuteMixin, driver.ShareDriver,):
1.2 - Adds update_access() implementation and related methods
1.2.1 - Improved capacity calculation
1.2.2 - Minor optimizations
1.2.3 - Updated RPC layer for improved stability
"""
DRIVER_VERSION = '1.2.2'
DRIVER_VERSION = '1.2.3'
def __init__(self, *args, **kwargs):
super(QuobyteShareDriver, self).__init__(False, *args, **kwargs)

View File

@ -13,15 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
import ssl
import requests
from requests import auth
from requests import exceptions
import tempfile
import time
import mock
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from manila import exception
from manila.share.drivers.quobyte import jsonrpc
@ -30,78 +29,13 @@ from manila import test
class FakeResponse(object):
def __init__(self, status, body):
self.status = status
self.status_code = status
self.reason = "HTTP reason"
self._body = body
self.body = body
self.text = six.text_type(body)
def read(self):
return self._body
class QuobyteBasicAuthCredentialsTestCase(test.TestCase):
def test_get_authorization_header(self):
creds = jsonrpc.BasicAuthCredentials('fakeuser', 'fakepwd')
self.assertEqual('BASIC ZmFrZXVzZXI6ZmFrZXB3ZA==',
creds.get_authorization_header())
class QuobyteHttpsConnectionWithCaVerificationTestCase(test.TestCase):
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect(self, mock_ssl, mock_cc):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
port=1234,
timeout=999))
mycon.connect()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
@mock.patch.object(http_client.HTTPConnection, "_tunnel")
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect_tunnel(self,
mock_ssl,
mock_cc,
mock_tunnel):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
port=1234,
timeout=999))
mycon._tunnel_host = "fake_tunnel_host"
mycon.connect()
mock_tunnel.assert_called_once_with()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
def json(self):
return self.body
class QuobyteJsonRpcTestCase(test.TestCase):
@ -110,38 +44,35 @@ class QuobyteJsonRpcTestCase(test.TestCase):
super(QuobyteJsonRpcTestCase, self).setUp()
self.rpc = jsonrpc.JsonRpc(url="http://test",
user_credentials=("me", "team"))
self.mock_object(self.rpc, '_connection')
self.mock_object(time, 'sleep')
def test_request_generation_and_basic_auth(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(200, '{"result":"yes"}')))
@mock.patch.object(requests, 'post',
return_value=FakeResponse(200, {"result": "yes"}))
def test_request_generation_and_basic_auth(self, req_get_mock):
self.rpc.call('method', {'param': 'value'})
self.rpc._connection.request.assert_called_once_with(
'POST', 'http://test/',
jsonutils.dumps({'jsonrpc': '2.0',
'method': 'method',
'params': {'retry': 'INFINITELY',
'param': 'value'},
'id': '1'}),
dict(Authorization=jsonrpc.BasicAuthCredentials("me", "team")
.get_authorization_header()))
req_get_mock.assert_called_once_with(
url='http://test',
auth=auth.HTTPBasicAuth("me", "team"),
json=mock.ANY)
@mock.patch.object(jsonrpc.HTTPSConnectionWithCaVerification,
'__init__',
return_value=None)
def test_jsonrpc_init_with_ca(self, mock_init):
def test_jsonrpc_init_with_ca(self):
foofile = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc("https://foo.bar/",
('fakeuser', 'fakepwd'),
foofile)
fake_url = "https://foo.bar/"
fake_credentials = ('fakeuser', 'fakepwd')
fake_cert_file = tempfile.TemporaryFile()
fake_key_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url=fake_url,
user_credentials=fake_credentials,
ca_file=foofile,
key_file=fake_key_file,
cert_file=fake_cert_file)
mock_init.assert_called_once_with("foo.bar",
ca_file=foofile.name)
self.assertEqual("https", self.rpc._url_scheme)
self.assertEqual(fake_url, self.rpc._url)
self.assertEqual(foofile, self.rpc._ca_file)
self.assertEqual(fake_cert_file, self.rpc._cert_file)
self.assertEqual(fake_key_file, self.rpc._key_file)
@mock.patch.object(jsonrpc.LOG, "warning")
def test_jsonrpc_init_without_ca(self, mock_warning):
@ -153,190 +84,90 @@ class QuobyteJsonRpcTestCase(test.TestCase):
"Will not verify the server certificate of the API service"
" because the CA certificate is not available.")
@mock.patch.object(http_client.HTTPConnection,
'__init__',
return_value=None)
def test_jsonrpc_init_no_ssl(self, mock_init):
def test_jsonrpc_init_no_ssl(self):
self.rpc = jsonrpc.JsonRpc("http://foo.bar/",
('fakeuser', 'fakepwd'))
mock_init.assert_called_once_with("foo.bar")
self.assertEqual("http", self.rpc._url_scheme)
def test_successful_call(self):
self.mock_object(
self.rpc._connection, 'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"result":"Sweet gorilla of Manila"}')))
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_successful_call(self, mock_req_get):
result = self.rpc.call('method', {'param': 'value'})
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_https_call_with_cert(self, mock_req_get):
fake_cert_file = tempfile.TemporaryFile()
fake_key_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url="https://test",
user_credentials=("me", "team"),
cert_file=fake_cert_file,
key_file=fake_key_file)
result = self.rpc.call('method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials,
verify=False,
cert=(fake_cert_file, fake_key_file))
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch('six.moves.http_client.HTTPSConnection')
def test_jsonrpc_call_ssl_disable(self, mock_connection):
mock_connection.return_value = self.rpc._connection
self.mock_object(
self.rpc._connection,
'request',
mock.Mock(side_effect=ssl.SSLError))
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
403, '{"error":{"code":28,"message":"text"}}')))
self.mock_object(jsonrpc.LOG, 'warning')
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_https_call_verify(self, mock_req_get):
fake_ca_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url="https://test",
user_credentials=("me", "team"),
ca_file=fake_ca_file)
self.assertRaises(exception.QBException,
result = self.rpc.call('method', {'param': 'value'})
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials,
verify=fake_ca_file)
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch.object(requests, "post", side_effect=exceptions.HTTPError)
def test_jsonrpc_call_http_exception(self, req_get_mock):
self.assertRaises(exceptions.HTTPError,
self.rpc.call,
'method', {'param': 'value'})
req_get_mock.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
self.assertTrue(self.rpc._disabled_cert_verification)
jsonrpc.LOG.warning.assert_called_once_with(
"Could not verify server certificate of "
"API service against CA.")
def test_jsonrpc_call_ssl_error(self):
"""This test succeeds if a specific exception is thrown.
Throwing a different exception or none at all
is a failure in this specific test case.
"""
self.mock_object(
self.rpc._connection,
'request',
mock.Mock(side_effect=ssl.SSLError))
self.rpc._disabled_cert_verification = True
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.rpc._connection.connect.assert_called_once_with()
(self.assertTrue(six.text_type(me).startswith
('Client SSL subsystem returned error:')))
except Exception as e:
self.fail('Unexpected exception thrown: %s' % e)
else:
self.fail('Expected exception not thrown')
def test_jsonrpc_call_bad_status_line(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.BadStatusLine("fake_line")))
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
def test_jsonrpc_call_http_exception(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.HTTPException))
self.mock_object(jsonrpc.LOG, 'warning')
self.assertRaises(http_client.HTTPException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_has_calls([])
def test_jsonrpc_call_socket_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=socket.error(23, "Test")))
self.mock_object(jsonrpc.LOG, 'warning')
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_has_calls([])
def test_jsonrpc_call_http_exception_retry(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.HTTPException))
self.mock_object(jsonrpc.LOG, 'warning')
self.rpc._fail_fast = False
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_called_with(
"Encountered error, retrying: %s", "")
def test_jsonrpc_call_no_connect(self):
orig_retries = jsonrpc.CONNECTION_RETRIES
jsonrpc.CONNECTION_RETRIES = 0
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.rpc._connection.connect.assert_called_once_with()
self.assertEqual("Unable to connect to backend after 0 retries",
six.text_type(me))
else:
self.fail('Expected exception not thrown')
finally:
jsonrpc.CONNECTION_RETRIES = orig_retries
def test_http_error_401(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(401, '')))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
def test_http_error_other(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(300, '')))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
def test_application_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"error":{"code":28,"message":"text"}}')))
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200,
{"error": {"code": 28, "message": "text"}}))
def test_application_error(self, req_get_mock):
self.assertRaises(exception.QBRpcException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
def test_broken_application_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"error":{"code":28,"message":"text"}}')))
self.assertRaises(exception.QBRpcException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
req_get_mock.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
def test_checked_for_application_error(self):
resultdict = {"result": "Sweet gorilla of Manila"}
self.assertEqual("Sweet gorilla of Manila",
(self.rpc.
_checked_for_application_error(result=resultdict))
)
(self.rpc._checked_for_application_error(
result=resultdict)))
def test_checked_for_application_error_no_entry(self):
resultdict = {"result": "Sweet gorilla of Manila",