Switch etcd base driver to use etcd3gw

python-etcd does not support V3 of etcd's API.
Switching to an updated library with the etcd3 API
will allow us to implement a Pub/Sub mechanism
using the 'watch' option.

Change-Id: I4b88b3f259a25509fb452d13d0a71bf1bd441abf
This commit is contained in:
Omer Anson 2017-07-13 14:21:49 +00:00
parent b9492f79a3
commit 2c181ec382
3 changed files with 33 additions and 100 deletions

View File

@ -13,7 +13,7 @@
from contextlib import contextmanager
from socket import timeout as SocketTimeout
import etcd
import etcd3gw as etcd
from oslo_log import log
import urllib3
from urllib3 import connection
@ -86,21 +86,6 @@ def _error_catcher(self):
urllib3.HTTPResponse._error_catcher = _error_catcher
def _check_valid_host(host_str):
return ':' in host_str and host_str[-1] != ':'
def _parse_hosts(hosts):
host_ports = []
for host_str in hosts:
if _check_valid_host(host_str):
host_port = host_str.strip().split(':')
host_ports.append((host_port[0], int(host_port[1])))
else:
LOG.error("The host string %s is invalid.", host_str)
return tuple(host_ports)
class EtcdDbDriver(db_api.DbApi):
def __init__(self):
@ -110,11 +95,7 @@ class EtcdDbDriver(db_api.DbApi):
self.notify_callback = None
def initialize(self, db_ip, db_port, **args):
hosts = _parse_hosts(args['config'].remote_db_hosts)
if hosts:
self.client = etcd.Client(host=hosts, allow_reconnect=True)
else:
self.client = etcd.Client(host=db_ip, port=db_port)
self.client = etcd.client(host=db_ip, port=db_port)
def create_table(self, table):
# Not needed in etcd
@ -124,56 +105,60 @@ class EtcdDbDriver(db_api.DbApi):
# Not needed in etcd
pass
@staticmethod
def _make_key(table, obj_id=None):
if obj_id:
key = '/{}/{}'.format(table, obj_id)
else:
key = '/{}/'.format(table)
return key
def get_key(self, table, key, topic=None):
try:
return self.client.read('/' + table + '/' + key).value
except etcd.EtcdKeyNotFound:
raise df_exceptions.DBKeyNotFound(key=key)
value = self.client.get(self._make_key(table, key))
if len(value) > 0:
return value.pop()
raise df_exceptions.DBKeyNotFound(key=key)
def set_key(self, table, key, value, topic=None):
self.client.write('/' + table + '/' + key, value)
self.client.put(self._make_key(table, key), value)
def create_key(self, table, key, value, topic=None):
self.client.write('/' + table + '/' + key, value)
self.client.put(self._make_key(table, key), value)
def delete_key(self, table, key, topic=None):
try:
self.client.delete('/' + table + '/' + key)
except etcd.EtcdKeyNotFound:
deleted = self.client.delete(self._make_key(table, key))
if not deleted:
raise df_exceptions.DBKeyNotFound(key=key)
def get_all_entries(self, table, topic=None):
res = []
try:
directory = self.client.get("/" + table)
except etcd.EtcdKeyNotFound:
return res
for entry in directory.children:
if entry.value:
res.append(entry.value)
directory = self.client.get_prefix(self._make_key(table))
for entry in directory:
value = entry[0]
if value:
res.append(value)
return res
def get_all_keys(self, table, topic=None):
res = []
try:
directory = self.client.get("/" + table)
except etcd.EtcdKeyNotFound:
raise df_exceptions.DBKeyNotFound(key=table)
for entry in directory.children:
directory = self.client.get_prefix(self._make_key(table))
for entry in directory:
table_name_size = len(table) + 2
res.append(entry.key[table_name_size:])
key = entry[1]["key"]
res.append(key[table_name_size:])
return res
def _allocate_unique_key(self, table):
key = '/unique_key/%s' % table
prev_value = 0
try:
prev_value = int(self.client.read(key).value)
self.client.test_and_set(key, str(prev_value + 1), str(prev_value))
prev_value = int(self.get_key('unique_key', table))
# FIXME(lihi): race-condition?
self.client.replace(key, str(prev_value), str(prev_value + 1))
return prev_value + 1
except Exception:
except df_exceptions.DBKeyNotFound:
if prev_value == 0:
self.client.write(key, "1", prevExist=False)
self.client.put(key, "1")
return 1
raise

View File

@ -1,52 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from dragonflow.db.drivers import etcd_db_driver
from dragonflow.tests import base as tests_base
class TestEtcdDB(tests_base.BaseTestCase):
def test_parse_none(self):
fake_host = []
expected = ()
output = etcd_db_driver._parse_hosts(fake_host)
self.assertEqual(expected, output)
def test_parse_empty(self):
fake_host = [""]
expected = ()
output = etcd_db_driver._parse_hosts(fake_host)
self.assertEqual(expected, output)
def test_parse_one_host(self):
fake_host = ['127.0.0.1:80']
expected = (('127.0.0.1', 80),)
output = etcd_db_driver._parse_hosts(fake_host)
self.assertEqual(expected, output)
def test_parse_multiple_hosts(self):
fake_host = ['127.0.0.1:80', '192.168.0.1:8080']
expected = (('127.0.0.1', 80), ('192.168.0.1', 8080))
output = etcd_db_driver._parse_hosts(fake_host)
self.assertEqual(expected, output)
def test_parse_multiple_hosts_invalid(self):
fake_host = ['127.0.0.1:80', '192.168.0.1']
expected = (('127.0.0.1', 80),)
with mock.patch.object(etcd_db_driver.LOG, 'error') as log_err:
output = etcd_db_driver._parse_hosts(fake_host)
self.assertEqual(expected, output)
log_err.assert_called_once_with(
u'The host string %s is invalid.', '192.168.0.1')

View File

@ -4,7 +4,7 @@
pbr!=2.1.0,>=2.0.0 # Apache-2.0
Babel!=2.4.0,>=2.3.4 # BSD
python-etcd>=0.4.3 # MIT License
etcd3gw>=0.1.0 # Apache-2.0
cassandra-driver!=3.6.0,>=2.1.4 # Apache-2.0
kazoo>=2.2 # Apache-2.0
ovs>=2.7.0 # Apache-2.0