Merge "Redis Driver Rewrite"

This commit is contained in:
Zuul 2018-03-08 13:18:31 +00:00 committed by Gerrit Code Review
commit e2dc45a798
4 changed files with 578 additions and 394 deletions

View File

@ -23,6 +23,7 @@ from dragonflow.conf import df_l3
from dragonflow.conf import df_loadbalancer
from dragonflow.conf import df_metadata_service
from dragonflow.conf import df_provider_networks
from dragonflow.conf import df_redis
from dragonflow.conf import df_ryu
from dragonflow.conf import df_snat
@ -39,6 +40,7 @@ df_active_port_detection.register_opts()
df_l2.register_opts()
df_l3.register_opts()
df_dnat.register_opts()
df_redis.register_opts()
df_ryu.register_opts()
df_provider_networks.register_opts()
df_snat.register_opts()

View File

@ -0,0 +1,45 @@
# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from dragonflow._i18n import _
df_redis_opts = [
cfg.IntOpt(
'retries',
default=5,
min=1,
help=_('Amount of retries for each Redis operations. At least 2 in '
'clustered Redis environments.'),
),
cfg.IntOpt(
'batch_amount',
default=50,
min=10,
help=_('When performing batch operations using pipeline, send this '
'amount of commands each round trip.'),
),
]
def register_opts():
cfg.CONF.register_opts(df_redis_opts, group='df_redis')
def list_opts():
return {'df_redis': df_redis_opts}

View File

@ -10,346 +10,390 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
import crc16
from oslo_log import log
import re
from redis import client as redis_client
from redis import exceptions
import six
from dragonflow.common import exceptions as df_exceptions
from dragonflow import conf as cfg
from dragonflow.db import db_api
from dragonflow.db import db_common
from dragonflow.db.drivers import redis_mgt
LOG = log.getLogger(__name__)
REDIS_NSLOTS = 16384
def key2slot(key):
k = six.text_type(key)
start = k.find('{')
if start > -1:
end = k.find('}', start + 1)
if end > -1 and end != start + 1:
k = k[start + 1:end]
return crc16.crc16xmodem(k.encode('UTF-8')) % REDIS_NSLOTS
class Node(object):
def __init__(self, ip, port, node_id=None):
self.ip = ip
self.port = port
self.node_id = node_id
self._client = None
@property
def client(self):
if self._client is None:
self._client = redis_client.StrictRedis(host=self.ip,
port=self.port)
return self._client
@property
def key(self):
return (self.ip, self.port)
class Cluster(object):
def __init__(self, nodes):
self._is_cluster = True
self._configured_nodes = (Node(*node) for node in nodes)
self._nodes_by_host = {}
self._nodes_by_slot = [None] * REDIS_NSLOTS
self._covered = False
def get_node(self, key):
if self._is_cluster:
return self._nodes_by_slot[key2slot(key)]
else:
return self._nodes_by_host
def get_node_by_host(self, ip, port):
if self._is_cluster:
return self._nodes_by_host[(ip, port)]
else:
return self._nodes_by_host
def is_cluster_covered(self):
try:
self._nodes_by_slot.index(None)
except ValueError:
return True
else:
return False
def populate_cluster(self):
for node in self._configured_nodes:
client = node.client
try:
slots = client.execute_command('CLUSTER', 'SLOTS')
except exceptions.ConnectionError:
LOG.exception('Error connecting to cluster node %s:%s',
node.ip, node.port)
continue
except exceptions.ResponseError as e:
if str(e).find('cluster support disabled') != -1:
LOG.info('Using a single non-cluster node %s:%s',
node.ip, node.port)
self._nodes_by_host = node
self._is_cluster = False
return
LOG.exception('Response error from node %s:%s')
continue
self._is_cluster = True
for slot_info in slots:
(range_begin, range_end, master_info) = slot_info[0:3]
master = Node(*master_info)
self._nodes_by_host[master.key] = master
for slot in range(int(range_begin), int(range_end) + 1):
self._nodes_by_slot[slot] = master
if self.is_cluster_covered():
self._covered = True
break
if not self._covered:
LOG.error('Redis cluster not covering slot space')
for node in self._nodes_by_host.values():
LOG.info('Cluster node: %s:%s', node.ip, node.port)
@property
def nodes(self):
if self._is_cluster:
return self._nodes_by_host.values()
else:
return (self._nodes_by_host, )
class RedisDbDriver(db_api.DbApi):
RequestRetryTimes = 5
def __init__(self):
super(RedisDbDriver, self).__init__()
self.clients = {}
self.remote_server_lists = []
self.redis_mgt = None
self.is_neutron_server = False
def __init__(self, *args, **kwargs):
super(RedisDbDriver, self).__init__(*args, **kwargs)
self._table_strip_re = re.compile(b'^{.+}(.+)$')
self.config = cfg.CONF.df_redis
self.BATCH_KEY_AMOUNT = self.config.batch_amount
self.RETRY_COUNT = self.config.retries
def initialize(self, db_ip, db_port, **args):
# get remote ip port list
self.redis_mgt = redis_mgt.RedisMgt.get_instance(db_ip, db_port)
self._update_server_list()
nodes = self._config_to_nodes(args['config'].remote_db_hosts)
self._cluster = Cluster(nodes)
self._cluster.populate_cluster()
def _update_server_list(self):
if self.redis_mgt is not None:
self.remote_server_lists = self.redis_mgt.get_master_list()
self.clients = {}
for remote in self.remote_server_lists:
remote_ip_port = remote['ip_port']
ip_port = remote_ip_port.split(':')
self.clients[remote_ip_port] = \
redis_client.StrictRedis(host=ip_port[0], port=ip_port[1])
@staticmethod
def _config_to_nodes(hosts_list):
def host_to_node(host):
(ip, port) = host.split(':')
return (ip, int(port))
def create_table(self, table):
# Not needed in redis
pass
return map(host_to_node, hosts_list)
def delete_table(self, table):
local_key = self._uuid_to_key(table, '*', '*')
for host, client in self.clients.items():
local_keys = client.keys(local_key)
if len(local_keys) > 0:
for tmp_key in local_keys:
try:
self._execute_cmd("DEL", tmp_key)
except Exception:
LOG.exception("exception when delete_table: "
"%(key)s ", {'key': local_key})
@staticmethod
def _key_name(table, topic, key):
return '{%s.%s}%s' % (table, topic or '', key)
def _handle_db_conn_error(self, ip_port, local_key=None):
self.redis_mgt.remove_node_from_master_list(ip_port)
self._update_server_list()
if local_key is not None:
LOG.exception("update server list, key: %(key)s",
{'key': local_key})
def _sync_master_list(self):
if self.is_neutron_server:
result = self.redis_mgt.redis_get_master_list_from_syncstring(
redis_mgt.RedisMgt.global_sharedlist.raw)
if result:
self._update_server_list()
def _gen_args(self, local_key, value):
args = []
args.append(local_key)
if value is not None:
args.append(value)
return args
def _is_oper_valid(self, oper):
if oper == 'SET' or oper == 'GET' or oper == 'DEL':
return True
return False
def _update_client(self, local_key):
self._sync_master_list()
ip_port = self.redis_mgt.get_ip_by_key(local_key)
client = self._get_client(local_key, ip_port)
return client
def _execute_cmd(self, oper, local_key, value=None):
if not self._is_oper_valid(oper):
LOG.warning("invalid oper: %(oper)s",
{'oper': oper})
return None
ip_port = self.redis_mgt.get_ip_by_key(local_key)
client = self._get_client(local_key, ip_port)
if client is None:
LOG.warning("Could not find client for key: %s ip_port: %s",
local_key, ip_port)
return None
arg = self._gen_args(local_key, value)
ttl = self.RequestRetryTimes
asking = False
alreadysync = False
while ttl > 0:
ttl -= 1
def _key_command(self, command, key, *args):
node = self._cluster.get_node(key)
ask = False
retry = 0
command_pcs = [command, key]
command_pcs.extend(args)
while retry < self.RETRY_COUNT:
LOG.debug('Executing command "%s" (retry %s)', command_pcs, retry)
if node is None:
LOG.error('Error finding node for key %s in cluster', key)
self._cluster.populate_cluster()
try:
if asking:
client.execute_command('ASKING')
asking = False
return client.execute_command(oper, *arg)
except exceptions.ConnectionError as e:
if not alreadysync:
client = self._update_client(local_key)
alreadysync = True
continue
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("connection error while sending "
"request to db: %(e)s", {'e': e})
raise e
if ask:
node.client.execute_command('ASKING')
ask = False
return node.client.execute_command(*command_pcs)
except exceptions.ResponseError as e:
if not alreadysync:
client = self._update_client(local_key)
alreadysync = True
continue
resp = str(e).split(' ')
if 'ASK' in resp[0]:
# one-time flag to force a node to serve a query about an
# IMPORTING slot
asking = True
if 'ASK' in resp[0] or 'MOVE' in resp[0]:
# MOVED/ASK XXX X.X.X.X:X
# do redirection
client = self._get_client(host=resp[2])
if client is None:
# maybe there is a fast failover
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("no client available: "
"%(ip_port)s, %(e)s",
{'ip_port': resp[2], 'e': e})
raise e
else:
LOG.exception("error not handled: %(e)s",
{'e': e})
raise e
except Exception as e:
if not alreadysync:
client = self._update_client(local_key)
alreadysync = True
continue
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("exception while sending request to "
"db: %(e)s", {'e': e})
raise e
def _find_key_without_topic(self, table, key):
local_key = self._uuid_to_key(table, key, '*')
self._sync_master_list()
for client in self.clients.values():
local_keys = client.keys(local_key)
if len(local_keys) == 1:
return local_keys[0]
def get_key(self, table, key, topic=None):
if topic:
local_key = self._uuid_to_key(table, key, topic)
else:
local_key = self._find_key_without_topic(table, key)
if local_key is None:
raise df_exceptions.DBKeyNotFound(key=key)
try:
res = self._execute_cmd("GET", local_key)
if res is not None:
return res
except Exception:
LOG.exception("exception when get_key: %(key)s",
{'key': local_key})
(reason, slot, ip_port) = str(e).split(' ')
(ip, port) = ip_port.split(':')
if reason == 'MOVED':
self._cluster.populate_cluster()
node = self._cluster.get_node(key)
if reason == 'ASK':
node = self._cluster.get_node_by_host(ip, port)
ask = True
except exceptions.ConnectionError as e:
LOG.exception('Connection to node %s:%s failed, refreshing',
node.ip, node.port)
self._cluster.populate_cluster()
node = self._cluster.get_node(key)
retry += 1
raise df_exceptions.DBKeyNotFound(key=key)
def create_table(self, table):
pass
def delete_table(self, table):
self._bulk_operation(table, None, 'DEL')
def _get_key_topic(self, table, key, topic):
real_key = self._key_name(table, topic, key)
value = self._key_command('GET', real_key)
if value is None:
raise df_exceptions.DBKeyNotFound(key=key)
return value
def _get_key_notopic(self, table, key):
result = []
def add_key(k, v):
result.append(v)
self._bulk_operation(table, None, 'GET', key_pattern=key,
entry_cb=add_key)
n_keys = len(result)
if n_keys != 1:
LOG.error('Found %d entries with key "%s"', n_keys, key)
raise df_exceptions.DBKeyNotFound(key=key)
return result[0]
def get_key(self, table, key, topic=None):
if topic is None:
return self._get_key_notopic(table, key)
else:
return self._get_key_topic(table, key, topic)
def set_key(self, table, key, value, topic=None):
local_key = self._uuid_to_key(table, key, topic)
try:
res = self._execute_cmd("SET", local_key, value)
if res is None:
res = 0
return res
except Exception:
LOG.exception("exception when set_key: %(key)s",
{'key': local_key})
if topic is None:
real_key = self._key_name_infer_topic(table, key)
else:
real_key = self._key_name(table, topic, key)
self._key_command('SET', real_key, value)
def create_key(self, table, key, value, topic=None):
return self.set_key(table, key, value, topic)
real_key = self._key_name(table, topic, key)
self._key_command('SET', real_key, value)
def delete_key(self, table, key, topic=None):
if topic:
local_key = self._uuid_to_key(table, key, topic)
if topic is None:
real_key = self._key_name_infer_topic(table, key)
else:
local_key = self._find_key_without_topic(table, key)
if local_key is None:
raise df_exceptions.DBKeyNotFound(key=key)
real_key = self._key_name(table, topic, key)
self._key_command('DEL', real_key)
try:
res = self._execute_cmd("DEL", local_key)
if res is None:
res = 0
def _bulk_execute(self, node, keys, command, args=()):
pipeline = node.client.pipeline(transaction=False)
retry = 0
command_pcs = [command, None]
command_pcs.extend(args)
while retry < self.RETRY_COUNT:
for key in keys:
command_pcs[1] = key
pipeline.execute_command(*command_pcs)
try:
values = pipeline.execute(raise_on_error=False)
return zip(keys, values)
except exceptions.RedisError:
LOG.exception('Error executing pipeline at retry %d', retry)
retry += 1
return False
return res
except Exception:
LOG.exception("exception when delete_key: %(key)s",
{'key': local_key})
def _bulk_operation(self, table, topic, command, args=(), key_pattern=None,
entry_cb=None, stop_on_fail=False):
def is_error(value):
return isinstance(value, exceptions.RedisError)
(pattern, nodes) = self._query_info(table, topic, key_pattern)
success = True
batch_key_amount = self.BATCH_KEY_AMOUNT
LOG.debug('Performing bulk operation "%s" on table %s topic %s',
command, table, topic or 'None')
for node in nodes:
node_failed_keys = set()
retry = 0
while retry < self.RETRY_COUNT:
try:
node_keys = list(self._get_all_keys_from_node(node,
pattern))
break
except exceptions.RedisError:
LOG.exception('Error get keys from node %s:%s retry %d',
node.ip, node.port, retry)
retry += 1
LOG.debug('Node %s:%s has %d keys for table %s topic %s',
node.ip, node.port, len(node_keys), table,
topic or 'None')
if retry == self.RETRY_COUNT:
raise df_exceptions.DBKeyNotFound('ALL KEYS')
bulk_begin = 0
bulk_end = batch_key_amount
while bulk_begin < len(node_keys):
LOG.debug('Working on chunk %d:%d', bulk_begin, bulk_end)
result = self._bulk_execute(
node, node_keys[bulk_begin:bulk_end], command, args)
if result is False:
LOG.error('Error executing bulk operation on node %s:%s',
node.ip, node.port)
if stop_on_fail:
return False
else:
continue
for (k, v) in result:
if is_error(v):
LOG.warning('Bulk operation error node %s:%s key "%s"',
node.ip, node.port, k)
if stop_on_fail:
return False
node_failed_keys.update(k)
elif v is not None and callable(entry_cb):
entry_cb(k, v)
bulk_begin += batch_key_amount
bulk_end += batch_key_amount
for key in node_failed_keys:
try:
value = self._key_command(command, key, args)
except Exception:
LOG.warning('Failed to process key "%s" from node %s:%s',
key, node.ip, node.port)
if stop_on_fail:
return False
success = False
else:
if callable(entry_cb):
entry_cb(key, value)
return success
def get_all_entries(self, table, topic=None):
res = []
ip_port = None
self._sync_master_list()
if not topic:
local_key = self._uuid_to_key(table, '*', '*')
try:
for host, client in self.clients.items():
local_keys = client.keys(local_key)
if len(local_keys) > 0:
for tmp_key in local_keys:
res.append(self._execute_cmd("GET", tmp_key))
return res
except Exception:
LOG.exception("exception when get_all_entries: %(key)s",
{'key': local_key})
def add_to_entries(key, value):
entries[key] = value
entries = {}
self._bulk_operation(table, topic, 'GET', entry_cb=add_to_entries)
LOG.debug('found %d entries', len(entries))
return list(entries.values())
def _get_all_keys_from_node(self, node, pattern):
keys = set()
cursor = 0
while True:
(cursor, partial_keys) = node.client.scan(cursor, match=pattern)
keys.update(partial_keys)
if cursor == 0:
break
return keys
def _query_info(self, table, topic, key=None):
if topic is None:
# ask all nodes
pattern = self._key_name(table, '*', key or '*')
nodes = self._cluster.nodes
else:
local_key = self._uuid_to_key(table, '*', topic)
try:
ip_port = self.redis_mgt.get_ip_by_key(local_key)
client = self._get_client(local_key, ip_port)
if client is None:
return res
# ask a specific node
pattern = self._key_name(table, topic, key or '*')
nodes = (self._cluster.get_node(pattern), )
return (pattern, nodes)
local_keys = client.keys(local_key)
if len(local_keys) > 0:
res.extend(client.mget(local_keys))
return res
except Exception as e:
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("exception when mget: %(key)s, %(e)s",
{'key': local_key, 'e': e})
def _scan(self, table, key=None, topic=None):
(pattern, nodes) = self._query_info(table, topic, key)
keys = set()
for node in nodes:
retry = 0
while retry < self.RETRY_COUNT:
LOG.debug('Getting all keys with pattern %s retry %d',
pattern, retry)
try:
node_keys = self._get_all_keys_from_node(node, pattern)
keys.update(node_keys)
break
except exceptions.RedisError:
LOG.exception('Error getting keys from node %s:%s',
node.ip, node.port)
retry += 1
self._cluster.populate_cluster()
if retry == self.RETRY_COUNT:
raise df_exceptions.DBKeyNotFound('ALL KEYS')
return keys
def _key_name_infer_topic(self, table, key):
raw_keys = self._scan(table, key=key)
if len(raw_keys) != 1:
LOG.error('Found %d entries with key "%s" in table %s',
len(raw_keys), key, table)
raise df_exceptions.DBKeyNotFound(key=key)
return raw_keys.pop()
def get_all_keys(self, table, topic=None):
res = []
ip_port = None
self._sync_master_list()
if not topic:
local_key = self._uuid_to_key(table, '*', '*')
try:
for host, client in self.clients.items():
ip_port = host
res.extend(client.keys(local_key))
return [self._strip_table_name_from_key(key) for key in res]
except Exception as e:
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("exception when get_all_keys: %(key)s, %(e)s",
{'key': local_key, 'e': e})
else:
local_key = self._uuid_to_key(table, '*', topic)
try:
ip_port = self.redis_mgt.get_ip_by_key(local_key)
client = self._get_client(local_key, ip_port)
if client is None:
return res
res = client.keys(local_key)
return [self._strip_table_name_from_key(key) for key in res]
except Exception as e:
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("exception when get_all_keys: %(key)s, %(e)s",
{'key': local_key, 'e': e})
def _strip_table_name_from_key(self, key):
regex = '^{.*}\\.(.*)$'
m = re.match(regex, key)
return m.group(1)
def _allocate_unique_key(self, table):
local_key = self._uuid_to_key(db_common.UNIQUE_KEY_TABLE, table, None)
ip_port = None
try:
client = self._update_client(local_key)
if client is None:
return None
return client.incr(local_key)
except Exception as e:
self._handle_db_conn_error(ip_port, local_key)
LOG.exception("exception when incr: %(key)s, %(e)s",
{'key': local_key, 'e': e})
def _strip_table_topic(key):
match = self._table_strip_re.match(key)
return match.group(1) if match else key
raw_keys = self._scan(table, topic=topic)
keys = [_strip_table_topic(raw_key) for raw_key in raw_keys]
LOG.debug('found %d keys', len(keys))
return keys
def allocate_unique_key(self, table):
try:
return self._allocate_unique_key(table)
except Exception as e:
LOG.error("allocate_unique_key exception: %(e)s",
{'e': e})
return
def _uuid_to_key(self, table, key, topic):
if not topic:
local_key = ('{' + table + '.' + '}' + '.' + key)
else:
local_key = ('{' + table + '.' + topic + '}' + '.' + key)
return local_key
def _get_client(self, key=None, host=None):
if host is None:
ip_port = self.redis_mgt.get_ip_by_key(key)
if ip_port is None:
return None
else:
ip_port = host
client = self.clients.get(ip_port, None)
if client is not None:
return self.clients[ip_port]
else:
return None
real_key = self._key_name(db_common.UNIQUE_KEY_TABLE, None, table)
return int(self._key_command('INCR', real_key))
def process_ha(self):
if self.is_neutron_server:
self._sync_master_list()
else:
self._update_server_list()
pass
def set_neutron_server(self, is_neutron_server):
self.is_neutron_server = is_neutron_server
pass

View File

@ -13,7 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import math
import mock
import re
import redis
from dragonflow.common import exceptions
from dragonflow.db.drivers import redis_db_driver
@ -21,118 +25,207 @@ from dragonflow.tests import base as tests_base
class TestRedisDB(tests_base.BaseTestCase):
def setUp(self):
super(TestRedisDB, self).setUp()
self.RedisDbDriver = redis_db_driver.RedisDbDriver()
def test_set_key(self):
client = mock.Mock()
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
client.execute_command.return_value = 1
redis_mgt = mock.Mock()
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
self.RedisDbDriver.redis_mgt = redis_mgt
result = self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
self.assertEqual(1, result)
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
self.RedisDbDriver._cluster.get_node.return_value = node
self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
node.client.execute_command.assert_called_once_with(
'SET', '{table.topic}key', 'value')
client.execute_command.return_value = None
result = self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
self.assertEqual(0, result)
def test_get_key(self):
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
expected = 'value'
node.client.execute_command.return_value = expected
self.RedisDbDriver._cluster.get_node.return_value = node
actual = self.RedisDbDriver.get_key('table', 'key', 'topic')
node.client.execute_command.assert_called_once_with(
'GET', '{table.topic}key')
self.assertEqual(expected, actual)
def test_get_method(self):
client = mock.Mock()
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
self.RedisDbDriver._sync_master_list = mock.Mock()
self.RedisDbDriver.clients[0] = client
client.keys.return_value = ['a']
client.mget.return_value = ['value']
client.execute_command.return_value = 'value'
redis_mgt = mock.Mock()
self.RedisDbDriver.redis_mgt = redis_mgt
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
def test_get_non_existent_key(self):
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
node.client.execute_command.return_value = None
self.RedisDbDriver._cluster.get_node.return_value = node
self.assertRaisesRegex(
exceptions.DBKeyNotFound,
'key',
self.RedisDbDriver.get_key,
'table',
'key',
'topic',
)
node.client.execute_command.assert_called_once_with(
'GET', '{table.topic}key')
# test get_key
result = self.RedisDbDriver.get_key('table', 'key')
self.assertEqual('value', result)
redis_mgt.get_ip_by_key.assert_called_with('a')
def _test_key_with_side_effect(self, node1, node2, side_effect):
def _side_effect(*args, **kwargs):
cluster.get_node_by_host.return_value = node2
cluster.get_node.return_value = node2
side_effect()
result = self.RedisDbDriver.get_key('table', 'key', '')
self.assertEqual('value', result)
redis_mgt.get_ip_by_key.assert_called_with('a')
self.RedisDbDriver._cluster = mock.Mock()
cluster = self.RedisDbDriver._cluster
expected = 'value'
node1.client.execute_command.side_effect = _side_effect
node2.client.execute_command.return_value = expected
cluster.get_node.return_value = node1
actual = self.RedisDbDriver.get_key('table', 'key', 'topic')
self.assertEqual(expected, actual)
result = self.RedisDbDriver.get_key('table', 'key', 'topic')
self.assertEqual('value', result)
local_key = '{table.topic}.key'
redis_mgt.get_ip_by_key.assert_called_with(local_key)
def test_moved_key(self):
def fail(*args, **kwargs):
raise redis.exceptions.ResponseError('MOVED 1 1.2.3.4:7000')
with mock.patch(
'dragonflow.db.drivers.redis_db_driver.RedisDbDriver._execute_cmd',
return_value=None,
):
self.assertRaises(
exceptions.DBKeyNotFound,
self.RedisDbDriver.get_key,
'table', 'key', 'topic',
)
node1 = mock.Mock()
node2 = mock.Mock()
self._test_key_with_side_effect(node1, node2, fail)
node1.client.execute_command.assert_called_once_with(
'GET', '{table.topic}key')
node2.client.execute_command.assert_called_once_with(
'GET', '{table.topic}key')
self.RedisDbDriver._cluster.populate_cluster.assert_called_once()
with mock.patch(
'dragonflow.db.drivers.redis_db_driver.RedisDbDriver._execute_cmd',
side_effect=RuntimeError,
):
self.assertRaises(
exceptions.DBKeyNotFound,
self.RedisDbDriver.get_key,
'table', 'key', 'topic',
)
def test_migrating_key(self):
def fail(*args, **kwargs):
raise redis.exceptions.ResponseError('ASK 1 1.2.3.4:7000')
# test get_all_entries
result = self.RedisDbDriver.get_all_entries('table')
self.assertEqual(['value'], result)
redis_mgt.get_ip_by_key.assert_called_with('a')
node1 = mock.Mock()
node2 = mock.Mock()
self._test_key_with_side_effect(node1, node2, fail)
node1.client.execute_command.assert_called_once_with(
'GET', '{table.topic}key')
node2.client.execute_command.assert_any_call(
'ASKING')
node2.client.execute_command.assert_any_call(
'GET', '{table.topic}key')
result = self.RedisDbDriver.get_all_entries('table', '')
self.assertEqual(['value'], result)
redis_mgt.get_ip_by_key.assert_called_with('a')
def test_connection_error(self):
def fail(*args, **kwargs):
raise redis.exceptions.ConnectionError('Error 111')
result = self.RedisDbDriver.get_all_entries('table', 'topic')
self.assertEqual(['value'], result)
local_key = '{table.topic}.*'
redis_mgt.get_ip_by_key.assert_called_with(local_key)
# test get_all_key
client.keys.return_value = ['{table.*}.key']
result = self.RedisDbDriver.get_all_keys('table')
self.assertEqual(['key'], result)
local_key = '{table.*}.*'
client.keys.assert_called_with(local_key)
result = self.RedisDbDriver.get_all_keys('table', '')
self.assertEqual(['key'], result)
client.keys.assert_called_with(local_key)
result = self.RedisDbDriver.get_all_keys('table', 'topic')
self.assertEqual(['key'], result)
local_key = '{table.topic}.*'
redis_mgt.get_ip_by_key.assert_called_with(local_key)
node1 = mock.Mock()
node2 = mock.Mock()
self._test_key_with_side_effect(node1, node2, fail)
node1.client.execute_command.assert_called_with(
'GET', '{table.topic}key')
node2.client.execute_command.assert_called_with(
'GET', '{table.topic}key')
self.RedisDbDriver._cluster.populate_cluster.assert_called_once()
def test_delete_key(self):
client = mock.Mock()
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
client.execute_command.return_value = 1
redis_mgt = mock.Mock()
self.RedisDbDriver.redis_mgt = redis_mgt
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
result = self.RedisDbDriver.delete_key('table', 'key', 'topic')
self.assertEqual(1, result)
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
self.RedisDbDriver._cluster.get_node.return_value = node
self.RedisDbDriver.delete_key('table', 'key', 'topic')
node.client.execute_command.assert_called_once_with(
'DEL', '{table.topic}key')
client.execute_command.return_value = None
result = self.RedisDbDriver.delete_key('table', 'key', 'topic')
self.assertEqual(0, result)
def test_get_all_keys_topic(self):
expected = [b'key1', b'key2', b'key3']
keys_response = [b'{table.topic}' + key for key in expected]
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
self.RedisDbDriver._cluster.get_node.return_value = node
node.client.scan.return_value = (0, keys_response)
actual = self.RedisDbDriver.get_all_keys('table', 'topic')
self.assertEqual(set(expected), set(actual))
def test_get_all_keys_notopic(self):
nodes_keys = (
[b'key1', b'key2', b'key3'],
[b'key3', b'key4', b'key5'],
)
expected = set()
nodes = []
for node_keys in nodes_keys:
expected.update(node_keys)
keys_response = [b'{table.topic}' + key for key in node_keys]
node = mock.Mock()
node.client.scan.return_value = (0, keys_response)
nodes.append(node)
self.RedisDbDriver._cluster = mock.Mock()
self.RedisDbDriver._cluster.nodes = nodes
actual = self.RedisDbDriver.get_all_keys('table')
self.assertEqual(expected, set(actual))
def _test_batch(self, expected_command, method, *args, **kwargs):
def strip(key):
match = table_strip_re.match(key)
return match.group(1) if match else key
def pipeline_execute_cmd(command, key, *args, **kwargs):
pipeline = kwargs['pipeline']
pipeline.calls.append(key)
self.assertEqual(expected_command, command)
def pipeline_execute(pipeline, *args, **kwargs):
return [key_values[strip(key)] for key in pipeline.calls]
def create_pipeline(*args, **kwargs):
pipeline = mock.Mock()
pipeline.calls = []
pipeline.execute_command.side_effect = functools.partial(
pipeline_execute_cmd, pipeline=pipeline)
pipeline.execute.side_effect = functools.partial(
pipeline_execute, pipeline=pipeline)
pipelines.append(pipeline)
return pipeline
check_retval = kwargs.get('check_retval', True)
table_strip_re = re.compile(b'^{.+}(.+)$')
pipelines = []
key_values = {
b'key1': b'value1',
b'key2': b'value2',
b'key3': b'value3',
}
keys_response = [b'{table.topic}' + key for key in key_values.keys()]
batch_key_amount = 2
self.RedisDbDriver.BATCH_KEY_AMOUNT = batch_key_amount
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
self.RedisDbDriver._cluster.get_node.return_value = node
self.RedisDbDriver._cluster.nodes = (node, )
node.client.scan.return_value = (0, keys_response)
node.client.pipeline.side_effect = create_pipeline
actual = getattr(self.RedisDbDriver, method)(*args)
if check_retval:
self.assertEqual(set(key_values.values()), set(actual))
number_of_batches = int(math.ceil(len(key_values) /
float(batch_key_amount)))
self.assertEqual(number_of_batches, len(pipelines))
total_calls = set()
for pipeline in pipelines:
pipeline.execute.assert_called_once()
total_calls.update(map(strip, pipeline.calls))
self.assertEqual(total_calls, set(key_values.keys()))
def test_get_all_entries_topic(self):
self._test_batch('GET', 'get_all_entries', 'table', 'topic')
def test_get_all_entries_notopic(self):
self._test_batch('GET', 'get_all_entries', 'table', None)
def test_delete_table(self):
self._test_batch('DEL', 'delete_table', 'table', check_retval=False)
def test_allocate_unique_key(self):
client = mock.Mock()
self.RedisDbDriver._update_client = mock.Mock(return_value=client)
client.incr.return_value = 1
result = self.RedisDbDriver.allocate_unique_key('fake_table')
self.assertEqual(1, result)
self.RedisDbDriver._cluster = mock.Mock()
node = mock.Mock()
expected = 1
node.client.execute_command.return_value = expected
self.RedisDbDriver._cluster.get_node.return_value = node
actual = self.RedisDbDriver.allocate_unique_key('table')
node.client.execute_command.assert_called_once_with(
'INCR', '{unique_key.}table')
self.assertEqual(expected, actual)