Redis Driver Rewrite
- Use 'cluster slots' instead of 'cluster nodes' [0] - Old driver used 'mget' on keys from different slots which would have resulted in error. Use pipelining in order to reduce RTT and get keys from different slots. - Use 'scan' instead of 'keys', as 'keys' shouldn't be used [1] - Ability to configure multiple 'base nodes' on which cluster operates. These nodes will be asked for cluster peers using 'cluster slots'. - Python 3 support (i.e unicode) - Non-cluster redis support [0] https://groups.google.com/d/msg/redis-db/XpFWvrzAmZ4/UxVJg7L_AwAJ [1] https://redis.io/commands/keys Co-Authored-By: Yuval Brik <yuval@brik.org.il> Change-Id: I4adbc675687811708b82b73f8c7e94586357ecc5
This commit is contained in:
parent
c7fcf45b99
commit
d122eed028
|
@ -23,6 +23,7 @@ from dragonflow.conf import df_l3
|
|||
from dragonflow.conf import df_loadbalancer
|
||||
from dragonflow.conf import df_metadata_service
|
||||
from dragonflow.conf import df_provider_networks
|
||||
from dragonflow.conf import df_redis
|
||||
from dragonflow.conf import df_ryu
|
||||
from dragonflow.conf import df_snat
|
||||
|
||||
|
@ -39,6 +40,7 @@ df_active_port_detection.register_opts()
|
|||
df_l2.register_opts()
|
||||
df_l3.register_opts()
|
||||
df_dnat.register_opts()
|
||||
df_redis.register_opts()
|
||||
df_ryu.register_opts()
|
||||
df_provider_networks.register_opts()
|
||||
df_snat.register_opts()
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from dragonflow._i18n import _
|
||||
|
||||
|
||||
df_redis_opts = [
|
||||
cfg.IntOpt(
|
||||
'retries',
|
||||
default=5,
|
||||
min=1,
|
||||
help=_('Amount of retries for each Redis operations. At least 2 in '
|
||||
'clustered Redis environments.'),
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'batch_amount',
|
||||
default=50,
|
||||
min=10,
|
||||
help=_('When performing batch operations using pipeline, send this '
|
||||
'amount of commands each round trip.'),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def register_opts():
|
||||
cfg.CONF.register_opts(df_redis_opts, group='df_redis')
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {'df_redis': df_redis_opts}
|
|
@ -10,346 +10,390 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
import crc16
|
||||
from oslo_log import log
|
||||
import re
|
||||
from redis import client as redis_client
|
||||
from redis import exceptions
|
||||
import six
|
||||
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow import conf as cfg
|
||||
from dragonflow.db import db_api
|
||||
from dragonflow.db import db_common
|
||||
from dragonflow.db.drivers import redis_mgt
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
REDIS_NSLOTS = 16384
|
||||
|
||||
|
||||
def key2slot(key):
|
||||
k = six.text_type(key)
|
||||
start = k.find('{')
|
||||
if start > -1:
|
||||
end = k.find('}', start + 1)
|
||||
if end > -1 and end != start + 1:
|
||||
k = k[start + 1:end]
|
||||
return crc16.crc16xmodem(k.encode('UTF-8')) % REDIS_NSLOTS
|
||||
|
||||
|
||||
class Node(object):
|
||||
def __init__(self, ip, port, node_id=None):
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.node_id = node_id
|
||||
self._client = None
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self._client is None:
|
||||
self._client = redis_client.StrictRedis(host=self.ip,
|
||||
port=self.port)
|
||||
return self._client
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
return (self.ip, self.port)
|
||||
|
||||
|
||||
class Cluster(object):
|
||||
def __init__(self, nodes):
|
||||
self._is_cluster = True
|
||||
self._configured_nodes = (Node(*node) for node in nodes)
|
||||
self._nodes_by_host = {}
|
||||
self._nodes_by_slot = [None] * REDIS_NSLOTS
|
||||
self._covered = False
|
||||
|
||||
def get_node(self, key):
|
||||
if self._is_cluster:
|
||||
return self._nodes_by_slot[key2slot(key)]
|
||||
else:
|
||||
return self._nodes_by_host
|
||||
|
||||
def get_node_by_host(self, ip, port):
|
||||
if self._is_cluster:
|
||||
return self._nodes_by_host[(ip, port)]
|
||||
else:
|
||||
return self._nodes_by_host
|
||||
|
||||
def is_cluster_covered(self):
|
||||
try:
|
||||
self._nodes_by_slot.index(None)
|
||||
except ValueError:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def populate_cluster(self):
|
||||
for node in self._configured_nodes:
|
||||
client = node.client
|
||||
try:
|
||||
slots = client.execute_command('CLUSTER', 'SLOTS')
|
||||
except exceptions.ConnectionError:
|
||||
LOG.exception('Error connecting to cluster node %s:%s',
|
||||
node.ip, node.port)
|
||||
continue
|
||||
except exceptions.ResponseError as e:
|
||||
if str(e).find('cluster support disabled') != -1:
|
||||
LOG.info('Using a single non-cluster node %s:%s',
|
||||
node.ip, node.port)
|
||||
self._nodes_by_host = node
|
||||
self._is_cluster = False
|
||||
return
|
||||
LOG.exception('Response error from node %s:%s')
|
||||
continue
|
||||
self._is_cluster = True
|
||||
for slot_info in slots:
|
||||
(range_begin, range_end, master_info) = slot_info[0:3]
|
||||
master = Node(*master_info)
|
||||
self._nodes_by_host[master.key] = master
|
||||
for slot in range(int(range_begin), int(range_end) + 1):
|
||||
self._nodes_by_slot[slot] = master
|
||||
if self.is_cluster_covered():
|
||||
self._covered = True
|
||||
break
|
||||
if not self._covered:
|
||||
LOG.error('Redis cluster not covering slot space')
|
||||
for node in self._nodes_by_host.values():
|
||||
LOG.info('Cluster node: %s:%s', node.ip, node.port)
|
||||
|
||||
@property
|
||||
def nodes(self):
|
||||
if self._is_cluster:
|
||||
return self._nodes_by_host.values()
|
||||
else:
|
||||
return (self._nodes_by_host, )
|
||||
|
||||
|
||||
class RedisDbDriver(db_api.DbApi):
|
||||
|
||||
RequestRetryTimes = 5
|
||||
|
||||
def __init__(self):
|
||||
super(RedisDbDriver, self).__init__()
|
||||
self.clients = {}
|
||||
self.remote_server_lists = []
|
||||
self.redis_mgt = None
|
||||
self.is_neutron_server = False
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RedisDbDriver, self).__init__(*args, **kwargs)
|
||||
self._table_strip_re = re.compile(b'^{.+}(.+)$')
|
||||
self.config = cfg.CONF.df_redis
|
||||
self.BATCH_KEY_AMOUNT = self.config.batch_amount
|
||||
self.RETRY_COUNT = self.config.retries
|
||||
|
||||
def initialize(self, db_ip, db_port, **args):
|
||||
# get remote ip port list
|
||||
self.redis_mgt = redis_mgt.RedisMgt.get_instance(db_ip, db_port)
|
||||
self._update_server_list()
|
||||
nodes = self._config_to_nodes(args['config'].remote_db_hosts)
|
||||
self._cluster = Cluster(nodes)
|
||||
self._cluster.populate_cluster()
|
||||
|
||||
def _update_server_list(self):
|
||||
if self.redis_mgt is not None:
|
||||
self.remote_server_lists = self.redis_mgt.get_master_list()
|
||||
self.clients = {}
|
||||
for remote in self.remote_server_lists:
|
||||
remote_ip_port = remote['ip_port']
|
||||
ip_port = remote_ip_port.split(':')
|
||||
self.clients[remote_ip_port] = \
|
||||
redis_client.StrictRedis(host=ip_port[0], port=ip_port[1])
|
||||
@staticmethod
|
||||
def _config_to_nodes(hosts_list):
|
||||
def host_to_node(host):
|
||||
(ip, port) = host.split(':')
|
||||
return (ip, int(port))
|
||||
|
||||
def create_table(self, table):
|
||||
# Not needed in redis
|
||||
pass
|
||||
return map(host_to_node, hosts_list)
|
||||
|
||||
def delete_table(self, table):
|
||||
local_key = self._uuid_to_key(table, '*', '*')
|
||||
for host, client in self.clients.items():
|
||||
local_keys = client.keys(local_key)
|
||||
if len(local_keys) > 0:
|
||||
for tmp_key in local_keys:
|
||||
try:
|
||||
self._execute_cmd("DEL", tmp_key)
|
||||
except Exception:
|
||||
LOG.exception("exception when delete_table: "
|
||||
"%(key)s ", {'key': local_key})
|
||||
@staticmethod
|
||||
def _key_name(table, topic, key):
|
||||
return '{%s.%s}%s' % (table, topic or '', key)
|
||||
|
||||
def _handle_db_conn_error(self, ip_port, local_key=None):
|
||||
self.redis_mgt.remove_node_from_master_list(ip_port)
|
||||
self._update_server_list()
|
||||
|
||||
if local_key is not None:
|
||||
LOG.exception("update server list, key: %(key)s",
|
||||
{'key': local_key})
|
||||
|
||||
def _sync_master_list(self):
|
||||
if self.is_neutron_server:
|
||||
result = self.redis_mgt.redis_get_master_list_from_syncstring(
|
||||
redis_mgt.RedisMgt.global_sharedlist.raw)
|
||||
if result:
|
||||
self._update_server_list()
|
||||
|
||||
def _gen_args(self, local_key, value):
|
||||
args = []
|
||||
args.append(local_key)
|
||||
if value is not None:
|
||||
args.append(value)
|
||||
|
||||
return args
|
||||
|
||||
def _is_oper_valid(self, oper):
|
||||
if oper == 'SET' or oper == 'GET' or oper == 'DEL':
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _update_client(self, local_key):
|
||||
self._sync_master_list()
|
||||
ip_port = self.redis_mgt.get_ip_by_key(local_key)
|
||||
client = self._get_client(local_key, ip_port)
|
||||
return client
|
||||
|
||||
def _execute_cmd(self, oper, local_key, value=None):
|
||||
if not self._is_oper_valid(oper):
|
||||
LOG.warning("invalid oper: %(oper)s",
|
||||
{'oper': oper})
|
||||
return None
|
||||
|
||||
ip_port = self.redis_mgt.get_ip_by_key(local_key)
|
||||
client = self._get_client(local_key, ip_port)
|
||||
if client is None:
|
||||
LOG.warning("Could not find client for key: %s ip_port: %s",
|
||||
local_key, ip_port)
|
||||
return None
|
||||
|
||||
arg = self._gen_args(local_key, value)
|
||||
|
||||
ttl = self.RequestRetryTimes
|
||||
asking = False
|
||||
alreadysync = False
|
||||
while ttl > 0:
|
||||
ttl -= 1
|
||||
def _key_command(self, command, key, *args):
|
||||
node = self._cluster.get_node(key)
|
||||
ask = False
|
||||
retry = 0
|
||||
command_pcs = [command, key]
|
||||
command_pcs.extend(args)
|
||||
while retry < self.RETRY_COUNT:
|
||||
LOG.debug('Executing command "%s" (retry %s)', command_pcs, retry)
|
||||
if node is None:
|
||||
LOG.error('Error finding node for key %s in cluster', key)
|
||||
self._cluster.populate_cluster()
|
||||
try:
|
||||
if asking:
|
||||
client.execute_command('ASKING')
|
||||
asking = False
|
||||
|
||||
return client.execute_command(oper, *arg)
|
||||
except exceptions.ConnectionError as e:
|
||||
if not alreadysync:
|
||||
client = self._update_client(local_key)
|
||||
alreadysync = True
|
||||
continue
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("connection error while sending "
|
||||
"request to db: %(e)s", {'e': e})
|
||||
raise e
|
||||
if ask:
|
||||
node.client.execute_command('ASKING')
|
||||
ask = False
|
||||
return node.client.execute_command(*command_pcs)
|
||||
except exceptions.ResponseError as e:
|
||||
if not alreadysync:
|
||||
client = self._update_client(local_key)
|
||||
alreadysync = True
|
||||
continue
|
||||
resp = str(e).split(' ')
|
||||
if 'ASK' in resp[0]:
|
||||
# one-time flag to force a node to serve a query about an
|
||||
# IMPORTING slot
|
||||
asking = True
|
||||
|
||||
if 'ASK' in resp[0] or 'MOVE' in resp[0]:
|
||||
# MOVED/ASK XXX X.X.X.X:X
|
||||
# do redirection
|
||||
client = self._get_client(host=resp[2])
|
||||
if client is None:
|
||||
# maybe there is a fast failover
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("no client available: "
|
||||
"%(ip_port)s, %(e)s",
|
||||
{'ip_port': resp[2], 'e': e})
|
||||
raise e
|
||||
else:
|
||||
LOG.exception("error not handled: %(e)s",
|
||||
{'e': e})
|
||||
raise e
|
||||
except Exception as e:
|
||||
if not alreadysync:
|
||||
client = self._update_client(local_key)
|
||||
alreadysync = True
|
||||
continue
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("exception while sending request to "
|
||||
"db: %(e)s", {'e': e})
|
||||
raise e
|
||||
|
||||
def _find_key_without_topic(self, table, key):
|
||||
local_key = self._uuid_to_key(table, key, '*')
|
||||
self._sync_master_list()
|
||||
for client in self.clients.values():
|
||||
local_keys = client.keys(local_key)
|
||||
if len(local_keys) == 1:
|
||||
return local_keys[0]
|
||||
|
||||
def get_key(self, table, key, topic=None):
|
||||
if topic:
|
||||
local_key = self._uuid_to_key(table, key, topic)
|
||||
else:
|
||||
local_key = self._find_key_without_topic(table, key)
|
||||
if local_key is None:
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
try:
|
||||
res = self._execute_cmd("GET", local_key)
|
||||
if res is not None:
|
||||
return res
|
||||
except Exception:
|
||||
LOG.exception("exception when get_key: %(key)s",
|
||||
{'key': local_key})
|
||||
(reason, slot, ip_port) = str(e).split(' ')
|
||||
(ip, port) = ip_port.split(':')
|
||||
if reason == 'MOVED':
|
||||
self._cluster.populate_cluster()
|
||||
node = self._cluster.get_node(key)
|
||||
if reason == 'ASK':
|
||||
node = self._cluster.get_node_by_host(ip, port)
|
||||
ask = True
|
||||
except exceptions.ConnectionError as e:
|
||||
LOG.exception('Connection to node %s:%s failed, refreshing',
|
||||
node.ip, node.port)
|
||||
self._cluster.populate_cluster()
|
||||
node = self._cluster.get_node(key)
|
||||
retry += 1
|
||||
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
def create_table(self, table):
|
||||
pass
|
||||
|
||||
def delete_table(self, table):
|
||||
self._bulk_operation(table, None, 'DEL')
|
||||
|
||||
def _get_key_topic(self, table, key, topic):
|
||||
real_key = self._key_name(table, topic, key)
|
||||
value = self._key_command('GET', real_key)
|
||||
if value is None:
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
return value
|
||||
|
||||
def _get_key_notopic(self, table, key):
|
||||
result = []
|
||||
|
||||
def add_key(k, v):
|
||||
result.append(v)
|
||||
|
||||
self._bulk_operation(table, None, 'GET', key_pattern=key,
|
||||
entry_cb=add_key)
|
||||
n_keys = len(result)
|
||||
if n_keys != 1:
|
||||
LOG.error('Found %d entries with key "%s"', n_keys, key)
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
return result[0]
|
||||
|
||||
def get_key(self, table, key, topic=None):
|
||||
if topic is None:
|
||||
return self._get_key_notopic(table, key)
|
||||
else:
|
||||
return self._get_key_topic(table, key, topic)
|
||||
|
||||
def set_key(self, table, key, value, topic=None):
|
||||
local_key = self._uuid_to_key(table, key, topic)
|
||||
|
||||
try:
|
||||
res = self._execute_cmd("SET", local_key, value)
|
||||
if res is None:
|
||||
res = 0
|
||||
|
||||
return res
|
||||
except Exception:
|
||||
LOG.exception("exception when set_key: %(key)s",
|
||||
{'key': local_key})
|
||||
if topic is None:
|
||||
real_key = self._key_name_infer_topic(table, key)
|
||||
else:
|
||||
real_key = self._key_name(table, topic, key)
|
||||
self._key_command('SET', real_key, value)
|
||||
|
||||
def create_key(self, table, key, value, topic=None):
|
||||
return self.set_key(table, key, value, topic)
|
||||
real_key = self._key_name(table, topic, key)
|
||||
self._key_command('SET', real_key, value)
|
||||
|
||||
def delete_key(self, table, key, topic=None):
|
||||
if topic:
|
||||
local_key = self._uuid_to_key(table, key, topic)
|
||||
if topic is None:
|
||||
real_key = self._key_name_infer_topic(table, key)
|
||||
else:
|
||||
local_key = self._find_key_without_topic(table, key)
|
||||
if local_key is None:
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
real_key = self._key_name(table, topic, key)
|
||||
self._key_command('DEL', real_key)
|
||||
|
||||
try:
|
||||
res = self._execute_cmd("DEL", local_key)
|
||||
if res is None:
|
||||
res = 0
|
||||
def _bulk_execute(self, node, keys, command, args=()):
|
||||
pipeline = node.client.pipeline(transaction=False)
|
||||
retry = 0
|
||||
command_pcs = [command, None]
|
||||
command_pcs.extend(args)
|
||||
while retry < self.RETRY_COUNT:
|
||||
for key in keys:
|
||||
command_pcs[1] = key
|
||||
pipeline.execute_command(*command_pcs)
|
||||
try:
|
||||
values = pipeline.execute(raise_on_error=False)
|
||||
return zip(keys, values)
|
||||
except exceptions.RedisError:
|
||||
LOG.exception('Error executing pipeline at retry %d', retry)
|
||||
retry += 1
|
||||
return False
|
||||
|
||||
return res
|
||||
except Exception:
|
||||
LOG.exception("exception when delete_key: %(key)s",
|
||||
{'key': local_key})
|
||||
def _bulk_operation(self, table, topic, command, args=(), key_pattern=None,
|
||||
entry_cb=None, stop_on_fail=False):
|
||||
def is_error(value):
|
||||
return isinstance(value, exceptions.RedisError)
|
||||
|
||||
(pattern, nodes) = self._query_info(table, topic, key_pattern)
|
||||
success = True
|
||||
batch_key_amount = self.BATCH_KEY_AMOUNT
|
||||
LOG.debug('Performing bulk operation "%s" on table %s topic %s',
|
||||
command, table, topic or 'None')
|
||||
for node in nodes:
|
||||
node_failed_keys = set()
|
||||
retry = 0
|
||||
while retry < self.RETRY_COUNT:
|
||||
try:
|
||||
node_keys = list(self._get_all_keys_from_node(node,
|
||||
pattern))
|
||||
break
|
||||
except exceptions.RedisError:
|
||||
LOG.exception('Error get keys from node %s:%s retry %d',
|
||||
node.ip, node.port, retry)
|
||||
retry += 1
|
||||
LOG.debug('Node %s:%s has %d keys for table %s topic %s',
|
||||
node.ip, node.port, len(node_keys), table,
|
||||
topic or 'None')
|
||||
if retry == self.RETRY_COUNT:
|
||||
raise df_exceptions.DBKeyNotFound('ALL KEYS')
|
||||
bulk_begin = 0
|
||||
bulk_end = batch_key_amount
|
||||
while bulk_begin < len(node_keys):
|
||||
LOG.debug('Working on chunk %d:%d', bulk_begin, bulk_end)
|
||||
result = self._bulk_execute(
|
||||
node, node_keys[bulk_begin:bulk_end], command, args)
|
||||
if result is False:
|
||||
LOG.error('Error executing bulk operation on node %s:%s',
|
||||
node.ip, node.port)
|
||||
if stop_on_fail:
|
||||
return False
|
||||
else:
|
||||
continue
|
||||
for (k, v) in result:
|
||||
if is_error(v):
|
||||
LOG.warning('Bulk operation error node %s:%s key "%s"',
|
||||
node.ip, node.port, k)
|
||||
if stop_on_fail:
|
||||
return False
|
||||
node_failed_keys.update(k)
|
||||
elif v is not None and callable(entry_cb):
|
||||
entry_cb(k, v)
|
||||
bulk_begin += batch_key_amount
|
||||
bulk_end += batch_key_amount
|
||||
|
||||
for key in node_failed_keys:
|
||||
try:
|
||||
value = self._key_command(command, key, args)
|
||||
except Exception:
|
||||
LOG.warning('Failed to process key "%s" from node %s:%s',
|
||||
key, node.ip, node.port)
|
||||
if stop_on_fail:
|
||||
return False
|
||||
success = False
|
||||
else:
|
||||
if callable(entry_cb):
|
||||
entry_cb(key, value)
|
||||
return success
|
||||
|
||||
def get_all_entries(self, table, topic=None):
|
||||
res = []
|
||||
ip_port = None
|
||||
self._sync_master_list()
|
||||
if not topic:
|
||||
local_key = self._uuid_to_key(table, '*', '*')
|
||||
try:
|
||||
for host, client in self.clients.items():
|
||||
local_keys = client.keys(local_key)
|
||||
if len(local_keys) > 0:
|
||||
for tmp_key in local_keys:
|
||||
res.append(self._execute_cmd("GET", tmp_key))
|
||||
return res
|
||||
except Exception:
|
||||
LOG.exception("exception when get_all_entries: %(key)s",
|
||||
{'key': local_key})
|
||||
def add_to_entries(key, value):
|
||||
entries[key] = value
|
||||
|
||||
entries = {}
|
||||
self._bulk_operation(table, topic, 'GET', entry_cb=add_to_entries)
|
||||
LOG.debug('found %d entries', len(entries))
|
||||
return list(entries.values())
|
||||
|
||||
def _get_all_keys_from_node(self, node, pattern):
|
||||
keys = set()
|
||||
cursor = 0
|
||||
while True:
|
||||
(cursor, partial_keys) = node.client.scan(cursor, match=pattern)
|
||||
keys.update(partial_keys)
|
||||
if cursor == 0:
|
||||
break
|
||||
return keys
|
||||
|
||||
def _query_info(self, table, topic, key=None):
|
||||
if topic is None:
|
||||
# ask all nodes
|
||||
pattern = self._key_name(table, '*', key or '*')
|
||||
nodes = self._cluster.nodes
|
||||
else:
|
||||
local_key = self._uuid_to_key(table, '*', topic)
|
||||
try:
|
||||
ip_port = self.redis_mgt.get_ip_by_key(local_key)
|
||||
client = self._get_client(local_key, ip_port)
|
||||
if client is None:
|
||||
return res
|
||||
# ask a specific node
|
||||
pattern = self._key_name(table, topic, key or '*')
|
||||
nodes = (self._cluster.get_node(pattern), )
|
||||
return (pattern, nodes)
|
||||
|
||||
local_keys = client.keys(local_key)
|
||||
if len(local_keys) > 0:
|
||||
res.extend(client.mget(local_keys))
|
||||
return res
|
||||
except Exception as e:
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("exception when mget: %(key)s, %(e)s",
|
||||
{'key': local_key, 'e': e})
|
||||
def _scan(self, table, key=None, topic=None):
|
||||
(pattern, nodes) = self._query_info(table, topic, key)
|
||||
keys = set()
|
||||
|
||||
for node in nodes:
|
||||
retry = 0
|
||||
while retry < self.RETRY_COUNT:
|
||||
LOG.debug('Getting all keys with pattern %s retry %d',
|
||||
pattern, retry)
|
||||
try:
|
||||
node_keys = self._get_all_keys_from_node(node, pattern)
|
||||
keys.update(node_keys)
|
||||
break
|
||||
except exceptions.RedisError:
|
||||
LOG.exception('Error getting keys from node %s:%s',
|
||||
node.ip, node.port)
|
||||
retry += 1
|
||||
self._cluster.populate_cluster()
|
||||
if retry == self.RETRY_COUNT:
|
||||
raise df_exceptions.DBKeyNotFound('ALL KEYS')
|
||||
return keys
|
||||
|
||||
def _key_name_infer_topic(self, table, key):
|
||||
raw_keys = self._scan(table, key=key)
|
||||
if len(raw_keys) != 1:
|
||||
LOG.error('Found %d entries with key "%s" in table %s',
|
||||
len(raw_keys), key, table)
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
return raw_keys.pop()
|
||||
|
||||
def get_all_keys(self, table, topic=None):
|
||||
res = []
|
||||
ip_port = None
|
||||
self._sync_master_list()
|
||||
if not topic:
|
||||
local_key = self._uuid_to_key(table, '*', '*')
|
||||
try:
|
||||
for host, client in self.clients.items():
|
||||
ip_port = host
|
||||
res.extend(client.keys(local_key))
|
||||
return [self._strip_table_name_from_key(key) for key in res]
|
||||
except Exception as e:
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("exception when get_all_keys: %(key)s, %(e)s",
|
||||
{'key': local_key, 'e': e})
|
||||
|
||||
else:
|
||||
local_key = self._uuid_to_key(table, '*', topic)
|
||||
try:
|
||||
ip_port = self.redis_mgt.get_ip_by_key(local_key)
|
||||
client = self._get_client(local_key, ip_port)
|
||||
if client is None:
|
||||
return res
|
||||
|
||||
res = client.keys(local_key)
|
||||
return [self._strip_table_name_from_key(key) for key in res]
|
||||
|
||||
except Exception as e:
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("exception when get_all_keys: %(key)s, %(e)s",
|
||||
{'key': local_key, 'e': e})
|
||||
|
||||
def _strip_table_name_from_key(self, key):
|
||||
regex = '^{.*}\\.(.*)$'
|
||||
m = re.match(regex, key)
|
||||
return m.group(1)
|
||||
|
||||
def _allocate_unique_key(self, table):
|
||||
local_key = self._uuid_to_key(db_common.UNIQUE_KEY_TABLE, table, None)
|
||||
ip_port = None
|
||||
try:
|
||||
client = self._update_client(local_key)
|
||||
if client is None:
|
||||
return None
|
||||
return client.incr(local_key)
|
||||
except Exception as e:
|
||||
self._handle_db_conn_error(ip_port, local_key)
|
||||
LOG.exception("exception when incr: %(key)s, %(e)s",
|
||||
{'key': local_key, 'e': e})
|
||||
def _strip_table_topic(key):
|
||||
match = self._table_strip_re.match(key)
|
||||
return match.group(1) if match else key
|
||||
raw_keys = self._scan(table, topic=topic)
|
||||
keys = [_strip_table_topic(raw_key) for raw_key in raw_keys]
|
||||
LOG.debug('found %d keys', len(keys))
|
||||
return keys
|
||||
|
||||
def allocate_unique_key(self, table):
|
||||
try:
|
||||
return self._allocate_unique_key(table)
|
||||
except Exception as e:
|
||||
LOG.error("allocate_unique_key exception: %(e)s",
|
||||
{'e': e})
|
||||
return
|
||||
|
||||
def _uuid_to_key(self, table, key, topic):
|
||||
if not topic:
|
||||
local_key = ('{' + table + '.' + '}' + '.' + key)
|
||||
else:
|
||||
local_key = ('{' + table + '.' + topic + '}' + '.' + key)
|
||||
return local_key
|
||||
|
||||
def _get_client(self, key=None, host=None):
|
||||
if host is None:
|
||||
ip_port = self.redis_mgt.get_ip_by_key(key)
|
||||
if ip_port is None:
|
||||
return None
|
||||
else:
|
||||
ip_port = host
|
||||
|
||||
client = self.clients.get(ip_port, None)
|
||||
if client is not None:
|
||||
return self.clients[ip_port]
|
||||
else:
|
||||
return None
|
||||
real_key = self._key_name(db_common.UNIQUE_KEY_TABLE, None, table)
|
||||
return int(self._key_command('INCR', real_key))
|
||||
|
||||
def process_ha(self):
|
||||
if self.is_neutron_server:
|
||||
self._sync_master_list()
|
||||
else:
|
||||
self._update_server_list()
|
||||
pass
|
||||
|
||||
def set_neutron_server(self, is_neutron_server):
|
||||
self.is_neutron_server = is_neutron_server
|
||||
pass
|
||||
|
|
|
@ -13,7 +13,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import math
|
||||
import mock
|
||||
import re
|
||||
import redis
|
||||
|
||||
from dragonflow.common import exceptions
|
||||
from dragonflow.db.drivers import redis_db_driver
|
||||
|
@ -21,118 +25,207 @@ from dragonflow.tests import base as tests_base
|
|||
|
||||
|
||||
class TestRedisDB(tests_base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRedisDB, self).setUp()
|
||||
self.RedisDbDriver = redis_db_driver.RedisDbDriver()
|
||||
|
||||
def test_set_key(self):
|
||||
client = mock.Mock()
|
||||
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
|
||||
client.execute_command.return_value = 1
|
||||
redis_mgt = mock.Mock()
|
||||
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
|
||||
self.RedisDbDriver.redis_mgt = redis_mgt
|
||||
result = self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
|
||||
self.assertEqual(1, result)
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
|
||||
node.client.execute_command.assert_called_once_with(
|
||||
'SET', '{table.topic}key', 'value')
|
||||
|
||||
client.execute_command.return_value = None
|
||||
result = self.RedisDbDriver.set_key('table', 'key', 'value', 'topic')
|
||||
self.assertEqual(0, result)
|
||||
def test_get_key(self):
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
expected = 'value'
|
||||
node.client.execute_command.return_value = expected
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
actual = self.RedisDbDriver.get_key('table', 'key', 'topic')
|
||||
node.client.execute_command.assert_called_once_with(
|
||||
'GET', '{table.topic}key')
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_get_method(self):
|
||||
client = mock.Mock()
|
||||
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
|
||||
self.RedisDbDriver._sync_master_list = mock.Mock()
|
||||
self.RedisDbDriver.clients[0] = client
|
||||
client.keys.return_value = ['a']
|
||||
client.mget.return_value = ['value']
|
||||
client.execute_command.return_value = 'value'
|
||||
redis_mgt = mock.Mock()
|
||||
self.RedisDbDriver.redis_mgt = redis_mgt
|
||||
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
|
||||
def test_get_non_existent_key(self):
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
node.client.execute_command.return_value = None
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
self.assertRaisesRegex(
|
||||
exceptions.DBKeyNotFound,
|
||||
'key',
|
||||
self.RedisDbDriver.get_key,
|
||||
'table',
|
||||
'key',
|
||||
'topic',
|
||||
)
|
||||
node.client.execute_command.assert_called_once_with(
|
||||
'GET', '{table.topic}key')
|
||||
|
||||
# test get_key
|
||||
result = self.RedisDbDriver.get_key('table', 'key')
|
||||
self.assertEqual('value', result)
|
||||
redis_mgt.get_ip_by_key.assert_called_with('a')
|
||||
def _test_key_with_side_effect(self, node1, node2, side_effect):
|
||||
def _side_effect(*args, **kwargs):
|
||||
cluster.get_node_by_host.return_value = node2
|
||||
cluster.get_node.return_value = node2
|
||||
side_effect()
|
||||
|
||||
result = self.RedisDbDriver.get_key('table', 'key', '')
|
||||
self.assertEqual('value', result)
|
||||
redis_mgt.get_ip_by_key.assert_called_with('a')
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
cluster = self.RedisDbDriver._cluster
|
||||
expected = 'value'
|
||||
node1.client.execute_command.side_effect = _side_effect
|
||||
node2.client.execute_command.return_value = expected
|
||||
cluster.get_node.return_value = node1
|
||||
actual = self.RedisDbDriver.get_key('table', 'key', 'topic')
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
result = self.RedisDbDriver.get_key('table', 'key', 'topic')
|
||||
self.assertEqual('value', result)
|
||||
local_key = '{table.topic}.key'
|
||||
redis_mgt.get_ip_by_key.assert_called_with(local_key)
|
||||
def test_moved_key(self):
|
||||
def fail(*args, **kwargs):
|
||||
raise redis.exceptions.ResponseError('MOVED 1 1.2.3.4:7000')
|
||||
|
||||
with mock.patch(
|
||||
'dragonflow.db.drivers.redis_db_driver.RedisDbDriver._execute_cmd',
|
||||
return_value=None,
|
||||
):
|
||||
self.assertRaises(
|
||||
exceptions.DBKeyNotFound,
|
||||
self.RedisDbDriver.get_key,
|
||||
'table', 'key', 'topic',
|
||||
)
|
||||
node1 = mock.Mock()
|
||||
node2 = mock.Mock()
|
||||
self._test_key_with_side_effect(node1, node2, fail)
|
||||
node1.client.execute_command.assert_called_once_with(
|
||||
'GET', '{table.topic}key')
|
||||
node2.client.execute_command.assert_called_once_with(
|
||||
'GET', '{table.topic}key')
|
||||
self.RedisDbDriver._cluster.populate_cluster.assert_called_once()
|
||||
|
||||
with mock.patch(
|
||||
'dragonflow.db.drivers.redis_db_driver.RedisDbDriver._execute_cmd',
|
||||
side_effect=RuntimeError,
|
||||
):
|
||||
self.assertRaises(
|
||||
exceptions.DBKeyNotFound,
|
||||
self.RedisDbDriver.get_key,
|
||||
'table', 'key', 'topic',
|
||||
)
|
||||
def test_migrating_key(self):
|
||||
def fail(*args, **kwargs):
|
||||
raise redis.exceptions.ResponseError('ASK 1 1.2.3.4:7000')
|
||||
|
||||
# test get_all_entries
|
||||
result = self.RedisDbDriver.get_all_entries('table')
|
||||
self.assertEqual(['value'], result)
|
||||
redis_mgt.get_ip_by_key.assert_called_with('a')
|
||||
node1 = mock.Mock()
|
||||
node2 = mock.Mock()
|
||||
self._test_key_with_side_effect(node1, node2, fail)
|
||||
node1.client.execute_command.assert_called_once_with(
|
||||
'GET', '{table.topic}key')
|
||||
node2.client.execute_command.assert_any_call(
|
||||
'ASKING')
|
||||
node2.client.execute_command.assert_any_call(
|
||||
'GET', '{table.topic}key')
|
||||
|
||||
result = self.RedisDbDriver.get_all_entries('table', '')
|
||||
self.assertEqual(['value'], result)
|
||||
redis_mgt.get_ip_by_key.assert_called_with('a')
|
||||
def test_connection_error(self):
|
||||
def fail(*args, **kwargs):
|
||||
raise redis.exceptions.ConnectionError('Error 111')
|
||||
|
||||
result = self.RedisDbDriver.get_all_entries('table', 'topic')
|
||||
self.assertEqual(['value'], result)
|
||||
local_key = '{table.topic}.*'
|
||||
redis_mgt.get_ip_by_key.assert_called_with(local_key)
|
||||
|
||||
# test get_all_key
|
||||
client.keys.return_value = ['{table.*}.key']
|
||||
result = self.RedisDbDriver.get_all_keys('table')
|
||||
self.assertEqual(['key'], result)
|
||||
local_key = '{table.*}.*'
|
||||
client.keys.assert_called_with(local_key)
|
||||
|
||||
result = self.RedisDbDriver.get_all_keys('table', '')
|
||||
self.assertEqual(['key'], result)
|
||||
client.keys.assert_called_with(local_key)
|
||||
|
||||
result = self.RedisDbDriver.get_all_keys('table', 'topic')
|
||||
self.assertEqual(['key'], result)
|
||||
local_key = '{table.topic}.*'
|
||||
redis_mgt.get_ip_by_key.assert_called_with(local_key)
|
||||
node1 = mock.Mock()
|
||||
node2 = mock.Mock()
|
||||
self._test_key_with_side_effect(node1, node2, fail)
|
||||
node1.client.execute_command.assert_called_with(
|
||||
'GET', '{table.topic}key')
|
||||
node2.client.execute_command.assert_called_with(
|
||||
'GET', '{table.topic}key')
|
||||
self.RedisDbDriver._cluster.populate_cluster.assert_called_once()
|
||||
|
||||
def test_delete_key(self):
|
||||
client = mock.Mock()
|
||||
self.RedisDbDriver._get_client = mock.Mock(return_value=client)
|
||||
client.execute_command.return_value = 1
|
||||
redis_mgt = mock.Mock()
|
||||
self.RedisDbDriver.redis_mgt = redis_mgt
|
||||
redis_mgt.get_ip_by_key.return_value = '0.0.0.0:1000'
|
||||
result = self.RedisDbDriver.delete_key('table', 'key', 'topic')
|
||||
self.assertEqual(1, result)
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
self.RedisDbDriver.delete_key('table', 'key', 'topic')
|
||||
node.client.execute_command.assert_called_once_with(
|
||||
'DEL', '{table.topic}key')
|
||||
|
||||
client.execute_command.return_value = None
|
||||
result = self.RedisDbDriver.delete_key('table', 'key', 'topic')
|
||||
self.assertEqual(0, result)
|
||||
def test_get_all_keys_topic(self):
|
||||
expected = [b'key1', b'key2', b'key3']
|
||||
keys_response = [b'{table.topic}' + key for key in expected]
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
node.client.scan.return_value = (0, keys_response)
|
||||
actual = self.RedisDbDriver.get_all_keys('table', 'topic')
|
||||
self.assertEqual(set(expected), set(actual))
|
||||
|
||||
def test_get_all_keys_notopic(self):
|
||||
nodes_keys = (
|
||||
[b'key1', b'key2', b'key3'],
|
||||
[b'key3', b'key4', b'key5'],
|
||||
)
|
||||
expected = set()
|
||||
nodes = []
|
||||
for node_keys in nodes_keys:
|
||||
expected.update(node_keys)
|
||||
keys_response = [b'{table.topic}' + key for key in node_keys]
|
||||
node = mock.Mock()
|
||||
node.client.scan.return_value = (0, keys_response)
|
||||
nodes.append(node)
|
||||
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
self.RedisDbDriver._cluster.nodes = nodes
|
||||
actual = self.RedisDbDriver.get_all_keys('table')
|
||||
self.assertEqual(expected, set(actual))
|
||||
|
||||
def _test_batch(self, expected_command, method, *args, **kwargs):
|
||||
def strip(key):
|
||||
match = table_strip_re.match(key)
|
||||
return match.group(1) if match else key
|
||||
|
||||
def pipeline_execute_cmd(command, key, *args, **kwargs):
|
||||
pipeline = kwargs['pipeline']
|
||||
pipeline.calls.append(key)
|
||||
self.assertEqual(expected_command, command)
|
||||
|
||||
def pipeline_execute(pipeline, *args, **kwargs):
|
||||
return [key_values[strip(key)] for key in pipeline.calls]
|
||||
|
||||
def create_pipeline(*args, **kwargs):
|
||||
pipeline = mock.Mock()
|
||||
pipeline.calls = []
|
||||
pipeline.execute_command.side_effect = functools.partial(
|
||||
pipeline_execute_cmd, pipeline=pipeline)
|
||||
pipeline.execute.side_effect = functools.partial(
|
||||
pipeline_execute, pipeline=pipeline)
|
||||
pipelines.append(pipeline)
|
||||
return pipeline
|
||||
|
||||
check_retval = kwargs.get('check_retval', True)
|
||||
table_strip_re = re.compile(b'^{.+}(.+)$')
|
||||
pipelines = []
|
||||
key_values = {
|
||||
b'key1': b'value1',
|
||||
b'key2': b'value2',
|
||||
b'key3': b'value3',
|
||||
}
|
||||
keys_response = [b'{table.topic}' + key for key in key_values.keys()]
|
||||
batch_key_amount = 2
|
||||
self.RedisDbDriver.BATCH_KEY_AMOUNT = batch_key_amount
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
self.RedisDbDriver._cluster.nodes = (node, )
|
||||
node.client.scan.return_value = (0, keys_response)
|
||||
node.client.pipeline.side_effect = create_pipeline
|
||||
actual = getattr(self.RedisDbDriver, method)(*args)
|
||||
if check_retval:
|
||||
self.assertEqual(set(key_values.values()), set(actual))
|
||||
|
||||
number_of_batches = int(math.ceil(len(key_values) /
|
||||
float(batch_key_amount)))
|
||||
self.assertEqual(number_of_batches, len(pipelines))
|
||||
total_calls = set()
|
||||
for pipeline in pipelines:
|
||||
pipeline.execute.assert_called_once()
|
||||
total_calls.update(map(strip, pipeline.calls))
|
||||
|
||||
self.assertEqual(total_calls, set(key_values.keys()))
|
||||
|
||||
def test_get_all_entries_topic(self):
|
||||
self._test_batch('GET', 'get_all_entries', 'table', 'topic')
|
||||
|
||||
def test_get_all_entries_notopic(self):
|
||||
self._test_batch('GET', 'get_all_entries', 'table', None)
|
||||
|
||||
def test_delete_table(self):
|
||||
self._test_batch('DEL', 'delete_table', 'table', check_retval=False)
|
||||
|
||||
def test_allocate_unique_key(self):
|
||||
client = mock.Mock()
|
||||
self.RedisDbDriver._update_client = mock.Mock(return_value=client)
|
||||
client.incr.return_value = 1
|
||||
result = self.RedisDbDriver.allocate_unique_key('fake_table')
|
||||
self.assertEqual(1, result)
|
||||
self.RedisDbDriver._cluster = mock.Mock()
|
||||
node = mock.Mock()
|
||||
expected = 1
|
||||
node.client.execute_command.return_value = expected
|
||||
self.RedisDbDriver._cluster.get_node.return_value = node
|
||||
actual = self.RedisDbDriver.allocate_unique_key('table')
|
||||
node.client.execute_command.assert_called_once_with(
|
||||
'INCR', '{unique_key.}table')
|
||||
self.assertEqual(expected, actual)
|
||||
|
|
Loading…
Reference in New Issue