Restore the DSE opt group
As discussed in IRC team meeting a while back, we now have enough DSE options to restore the group. Likely more to come. Change-Id: Ib08b26d0e1b8035a43e0093a2594b0869294f02a
This commit is contained in:
parent
d7729f3836
commit
17c82c4c5d
|
@ -27,7 +27,7 @@ class APIModel(object):
|
|||
|
||||
def __init__(self, name, bus=None):
|
||||
self.name = name
|
||||
self.dse_long_timeout = cfg.CONF.dse_long_timeout
|
||||
self.dse_long_timeout = cfg.CONF.dse.long_timeout
|
||||
self.bus = bus
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
|
|
|
@ -72,12 +72,6 @@ core_opts = [
|
|||
cfg.BoolOpt('enable_execute_action', default=True,
|
||||
help='Set the flag to False if you don\'t want Congress '
|
||||
'to execute actions.'),
|
||||
cfg.BoolOpt('execute_action_retry', default=False,
|
||||
help='Set the flag to True to make Congress retry execute '
|
||||
'actions; may cause duplicate executions.'),
|
||||
cfg.IntOpt('execute_action_retry_timeout', default=600,
|
||||
help='The number of seconds to retry execute action before '
|
||||
'giving up. Zero or negative value means never give up.'),
|
||||
cfg.BoolOpt('distributed_architecture', default=True,
|
||||
help="The flag to use congress new distributed architecture."
|
||||
"Don't set it to True in L release since the new "
|
||||
|
|
|
@ -482,7 +482,7 @@ class DataService(object):
|
|||
table in self.oldest_queue_times[publisher] and
|
||||
self.oldest_queue_times[publisher][table] is not None and
|
||||
(time.time() - self.oldest_queue_times[publisher][table]
|
||||
> cfg.CONF.dse_time_to_resub)):
|
||||
> cfg.CONF.dse.time_to_resub)):
|
||||
self.unsubscribe(publisher, table)
|
||||
self.subscribe(publisher, table)
|
||||
return True
|
||||
|
|
|
@ -45,17 +45,23 @@ LOG = logging.getLogger(__name__)
|
|||
_dse_opts = [
|
||||
cfg.StrOpt('bus_id', default='bus',
|
||||
help='Unique ID of this DSE bus'),
|
||||
cfg.IntOpt('dse_ping_timeout', default=5,
|
||||
cfg.IntOpt('ping_timeout', default=5,
|
||||
help='RPC short timeout in seconds; used to ping destination'),
|
||||
cfg.IntOpt('dse_long_timeout', default=120,
|
||||
cfg.IntOpt('long_timeout', default=120,
|
||||
help='RPC long timeout in seconds; used on potentially long '
|
||||
'running requests such as datasource action and PE row '
|
||||
'query'),
|
||||
cfg.IntOpt('dse_time_to_resub', default=10,
|
||||
cfg.IntOpt('time_to_resub', default=10,
|
||||
help='Time in seconds which a subscriber will wait for missing '
|
||||
'update before attempting to resubscribe from publisher'),
|
||||
cfg.BoolOpt('execute_action_retry', default=False,
|
||||
help='Set the flag to True to make Congress retry execute '
|
||||
'actions; may cause duplicate executions.'),
|
||||
cfg.IntOpt('execute_action_retry_timeout', default=600,
|
||||
help='The number of seconds to retry execute action before '
|
||||
'giving up. Zero or negative value means never give up.'),
|
||||
]
|
||||
cfg.CONF.register_opts(_dse_opts)
|
||||
cfg.CONF.register_opts(_dse_opts, group='dse')
|
||||
|
||||
|
||||
class DseNode(object):
|
||||
|
@ -112,7 +118,7 @@ class DseNode(object):
|
|||
self.node_id = node_id
|
||||
self.node_rpc_endpoints = node_rpc_endpoints
|
||||
# unique identifier shared by all nodes that can communicate
|
||||
self.partition_id = partition_id or cfg.CONF.bus_id or "bus"
|
||||
self.partition_id = partition_id or cfg.CONF.dse.bus_id or "bus"
|
||||
self.node_rpc_endpoints.append(DseNodeEndpoints(self))
|
||||
self._running = False
|
||||
self._services = []
|
||||
|
@ -341,7 +347,7 @@ class DseNode(object):
|
|||
# First ping the destination to fail fast if unresponsive
|
||||
LOG.trace("<%s> Checking responsiveness before invoking RPC "
|
||||
"'%s' on %s", self.node_id, method, target)
|
||||
client.prepare(timeout=cfg.CONF.dse_ping_timeout).call(
|
||||
client.prepare(timeout=cfg.CONF.dse.ping_timeout).call(
|
||||
self.context, 'ping')
|
||||
except (messaging_exceptions.MessagingTimeout,
|
||||
messaging_exceptions.MessageDeliveryFailure):
|
||||
|
@ -661,7 +667,7 @@ class DseNode(object):
|
|||
# return (db_config['name'] == active_config.service_id and
|
||||
# db_config['config'] == active_config.service_info['args'])
|
||||
|
||||
@periodics.periodic(spacing=cfg.CONF.dse_time_to_resub)
|
||||
@periodics.periodic(spacing=cfg.CONF.dse.time_to_resub)
|
||||
def _check_resub_all(self):
|
||||
for s in self._services:
|
||||
s.check_resub_all()
|
||||
|
|
|
@ -30,8 +30,8 @@ def list_opts():
|
|||
('DEFAULT',
|
||||
itertools.chain(
|
||||
congress.common.config.core_opts,
|
||||
congress.dse2.dse_node._dse_opts,
|
||||
congress.utils.utils_opts,
|
||||
congress.exception.exc_log_opts,
|
||||
)),
|
||||
('dse', congress.dse2.dse_node._dse_opts)
|
||||
]
|
||||
|
|
|
@ -2147,17 +2147,17 @@ class Dse2Runtime(DseRuntime):
|
|||
|
||||
def execute_once():
|
||||
return self.rpc(service_name, 'request_execute', args,
|
||||
timeout=cfg.CONF.dse_long_timeout, retry=0)
|
||||
timeout=cfg.CONF.dse.long_timeout, retry=0)
|
||||
|
||||
def execute_retry():
|
||||
timeout = cfg.CONF.execute_action_retry_timeout
|
||||
timeout = cfg.CONF.dse.execute_action_retry_timeout
|
||||
start_time = time.time()
|
||||
end_time = start_time + timeout
|
||||
while timeout <= 0 or time.time() < end_time:
|
||||
try:
|
||||
return self.rpc(
|
||||
service_name, 'request_execute', args,
|
||||
timeout=cfg.CONF.dse_long_timeout, retry=0)
|
||||
timeout=cfg.CONF.dse.long_timeout, retry=0)
|
||||
except (messaging_exceptions.MessagingTimeout,
|
||||
messaging_exceptions.MessageDeliveryFailure):
|
||||
LOG.warning('DSE failure executing action %s with '
|
||||
|
@ -2167,7 +2167,7 @@ class Dse2Runtime(DseRuntime):
|
|||
action, args['action_args'])
|
||||
|
||||
# long timeout for action execution because actions can take a while
|
||||
if not cfg.CONF.execute_action_retry:
|
||||
if not cfg.CONF.dse.execute_action_retry:
|
||||
# Note(thread-safety): blocking call
|
||||
# Only when thread pool at capacity
|
||||
eventlet.spawn_n(execute_once)
|
||||
|
|
|
@ -90,7 +90,7 @@ def create_api_server(conf_path, node_id, host, port, workers, policy_engine,
|
|||
policy_engine=policy_engine,
|
||||
api=True,
|
||||
datasources=datasources,
|
||||
bus_id=cfg.CONF.bus_id)
|
||||
bus_id=cfg.CONF.dse.bus_id)
|
||||
# TODO(thinrichs): there's some sort of magic happening for the api
|
||||
# server. We call eventlet_server, which on start() calls
|
||||
# service.congress_app_factory, which uses harness to create the
|
||||
|
@ -101,7 +101,8 @@ def create_api_server(conf_path, node_id, host, port, workers, policy_engine,
|
|||
|
||||
|
||||
def create_nonapi_server(node_id, policy_engine, datasources, workers):
|
||||
congress_server = eventlet_server.Server(node_id, bus_id=cfg.CONF.bus_id)
|
||||
congress_server = eventlet_server.Server(
|
||||
node_id, bus_id=cfg.CONF.dse.bus_id)
|
||||
harness.create2(node=congress_server.node, api=False,
|
||||
policy_engine=policy_engine,
|
||||
datasources=datasources)
|
||||
|
|
|
@ -181,7 +181,7 @@ class TestDatasourceModel(base.SqlTestCase):
|
|||
{}, context, bad_request)
|
||||
|
||||
# Positive test with retry: no body args
|
||||
cfg.CONF.execute_action_retry = True
|
||||
cfg.CONF.dse.execute_action_retry = True
|
||||
context = {'ds_id': 'nova'}
|
||||
body = {'name': 'disconnect_all'}
|
||||
request = helper.FakeRequest(body)
|
||||
|
|
|
@ -78,10 +78,10 @@ class TestHA(manager_congress.ScenarioPolicyBase):
|
|||
conf = (conf[:index] +
|
||||
'bind_port = %d\n' % port_num +
|
||||
'datasource_sync_period = 5\n' +
|
||||
'bus_id = replica-node\n' +
|
||||
conf[index:])
|
||||
sindex = conf.find('signing_dir')
|
||||
conf = conf[:sindex] + '#' + conf[sindex:]
|
||||
conf = conf + '\n[dse]\nbus_id = replica-node\n'
|
||||
LOG.debug("Configuration file for replica: %s\n", conf)
|
||||
f.write(conf)
|
||||
f.close()
|
||||
|
|
Loading…
Reference in New Issue