metricd: retry slowly coordination connection failure
If the coordinator fails to start (e.g. ToozConnectionError), the metricd
subprocess started by Cotyledon fails to start and raises an error. That means
Cotyledon will retry to spawn the process with no delay at all, spamming the
CPU with forking request.
This patches fatorize the coordination retrieval and connection code into one
function in gnocchi.utils, which makes sure both metricd and the Carbonara
based drivers leverage the same code to retry to connection to the coordinator
with some delay.
Change-Id: I83157c5fdb0a3e488a9b788d48d974de80219dbb
(cherry picked from commit 04917de585
)
This commit is contained in:
parent
f3c9a9a8ea
commit
9af1d88420
|
@ -16,7 +16,6 @@
|
|||
import multiprocessing
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import cotyledon
|
||||
from futurist import periodics
|
||||
|
@ -143,16 +142,11 @@ class MetricScheduler(MetricProcessBase):
|
|||
TASKS_PER_WORKER = 16
|
||||
BLOCK_SIZE = 4
|
||||
|
||||
def _enable_coordination(self, conf):
|
||||
self._coord = coordination.get_coordinator(
|
||||
conf.storage.coordination_url, self._my_id)
|
||||
self._coord.start(start_heart=True)
|
||||
|
||||
def __init__(self, worker_id, conf, queue):
|
||||
super(MetricScheduler, self).__init__(
|
||||
worker_id, conf, conf.storage.metric_processing_delay)
|
||||
self._my_id = str(uuid.uuid4())
|
||||
self._enable_coordination(conf)
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
self.queue = queue
|
||||
self.previously_scheduled_metrics = set()
|
||||
self.workers = conf.metricd.workers
|
||||
|
|
|
@ -65,24 +65,14 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
|
||||
def __init__(self, conf):
|
||||
super(CarbonaraBasedStorage, self).__init__(conf)
|
||||
self.coord = coordination.get_coordinator(
|
||||
conf.coordination_url,
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
self.aggregation_workers_number = conf.aggregation_workers_number
|
||||
if self.aggregation_workers_number == 1:
|
||||
# NOTE(jd) Avoid using futures at all if we don't want any threads.
|
||||
self._map_in_thread = self._map_no_thread
|
||||
else:
|
||||
self._map_in_thread = self._map_in_futures_threads
|
||||
self.start()
|
||||
|
||||
@utils.retry
|
||||
def start(self):
|
||||
try:
|
||||
self.coord.start(start_heart=True)
|
||||
except Exception as e:
|
||||
LOG.error("Unable to start coordinator: %s" % e)
|
||||
raise utils.Retry(e)
|
||||
self.coord, my_id = utils.get_coordinator_and_start(
|
||||
conf.coordination_url)
|
||||
|
||||
def stop(self):
|
||||
self.coord.stop()
|
||||
|
|
|
@ -14,13 +14,19 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import iso8601
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from pytimeparse import timeparse
|
||||
import retrying
|
||||
import six
|
||||
import uuid
|
||||
from tooz import coordination
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
# uuid5 namespace for id transformation.
|
||||
# NOTE(chdent): This UUID must stay the same, forever, across all
|
||||
|
@ -64,6 +70,23 @@ retry = retrying.retry(wait_exponential_multiplier=500,
|
|||
retry_on_exception=retry_if_retry_is_raised)
|
||||
|
||||
|
||||
# TODO(jd) Move this to tooz?
|
||||
@retry
|
||||
def _enable_coordination(coord):
|
||||
try:
|
||||
coord.start(start_heart=True)
|
||||
except Exception as e:
|
||||
LOG.error("Unable to start coordinator: %s", e)
|
||||
raise Retry(e)
|
||||
|
||||
|
||||
def get_coordinator_and_start(url):
|
||||
my_id = str(uuid.uuid4())
|
||||
coord = coordination.get_coordinator(url, my_id)
|
||||
_enable_coordination(coord)
|
||||
return coord, my_id
|
||||
|
||||
|
||||
def to_timestamp(v):
|
||||
if isinstance(v, datetime.datetime):
|
||||
return v
|
||||
|
|
Loading…
Reference in New Issue