Merge "Add pipeline timing metrics"
This commit is contained in:
commit
88ea050f68
|
@ -79,11 +79,66 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
|||
|
||||
Holds metrics specific to jobs. This hierarchy includes:
|
||||
|
||||
.. stat:: <pipeline name>
|
||||
.. stat:: <pipeline>
|
||||
|
||||
A set of metrics for each pipeline named as defined in the Zuul
|
||||
config.
|
||||
|
||||
.. stat:: event_enqueue_time
|
||||
:type: timer
|
||||
|
||||
The time elapsed from when a trigger event was received from
|
||||
the remote system to when the corresponding item is enqueued
|
||||
in a pipeline.
|
||||
|
||||
.. stat:: merge_request_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent waiting for the initial merge
|
||||
operation(s). This will always include a request to a Zuul
|
||||
merger to speculatively merge the change, but it may also
|
||||
include a second request submitted in parallel to identify
|
||||
the files altered by the change.
|
||||
|
||||
.. stat:: layout_generation_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent generating a dynamic configuration layout.
|
||||
|
||||
.. stat:: job_freeze_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent freezing the inheritance hierarchy
|
||||
and parameters of a job.
|
||||
|
||||
.. stat:: repo_state_time
|
||||
:type: timer
|
||||
|
||||
The amount of time waiting for a secondary Zuul merger
|
||||
operation to collect additional information about the repo
|
||||
state of required projects.
|
||||
|
||||
.. stat:: node_request_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent waiting for each node request to be
|
||||
fulfilled.
|
||||
|
||||
.. stat:: job_wait_time
|
||||
:type: timer
|
||||
|
||||
How long a job waited for an executor to start running it
|
||||
after the build was requested.
|
||||
|
||||
.. stat:: event_job_time
|
||||
:type: timer
|
||||
|
||||
The total amount of time elapsed from when a trigger event
|
||||
was received from the remote system until the item's first
|
||||
job is run. This is only emitted once per queue item, even
|
||||
if its buildset is reset due to a speculative execution
|
||||
failure.
|
||||
|
||||
.. stat:: all_jobs
|
||||
:type: counter
|
||||
|
||||
|
@ -153,8 +208,8 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
|||
.. stat:: wait_time
|
||||
:type: timer
|
||||
|
||||
How long each item spent in the pipeline before its first job
|
||||
started.
|
||||
How long the job waited for an executor to
|
||||
start running it after the build was requested.
|
||||
|
||||
.. stat:: current_changes
|
||||
:type: gauge
|
||||
|
@ -174,6 +229,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
|||
The number of changes for this project processed by the
|
||||
pipeline since Zuul started.
|
||||
|
||||
.. stat:: read_time
|
||||
:type: timer
|
||||
|
||||
The time spent reading data from ZooKeeper during a single
|
||||
pipeline processing run.
|
||||
|
||||
.. stat:: refresh
|
||||
:type: timer
|
||||
|
||||
|
@ -206,6 +267,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
|||
|
||||
The size of the pipeline's management event queue.
|
||||
|
||||
.. stat:: write_time
|
||||
:type: timer
|
||||
|
||||
The time spent writing data to ZooKeeper during a single
|
||||
pipeline processing run.
|
||||
|
||||
.. stat:: zuul.executor.<executor>
|
||||
|
||||
Holds metrics emitted by individual executors. The ``<executor>``
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
The following new statsd metrics are available in order to monitor
|
||||
Zuul system performance:
|
||||
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_enqueue_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merge_request_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.layout_generation_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_freeze_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.repo_state_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.node_request_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_wait_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_job_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_time`
|
|
@ -5545,6 +5545,10 @@ class ZuulTestCase(BaseTestCase):
|
|||
return s_value
|
||||
time.sleep(0.1)
|
||||
|
||||
stats = list(itertools.chain.from_iterable(
|
||||
[s.decode('utf-8').split('\n') for s in self.statsd.stats]))
|
||||
for stat in stats:
|
||||
self.log.debug("Stat: %s", stat)
|
||||
raise StatException("Key %s not found in reported stats" % key)
|
||||
|
||||
def assertUnReportedStat(self, key, value=None, kind=None):
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=config/multi-driver/main.yaml
|
||||
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
|
|
|
@ -440,13 +440,21 @@ class TestScheduler(ZuulTestCase):
|
|||
value='0', kind='g')
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat(
|
||||
'zuul.tenant.tenant-one.event_enqueue_processing_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
val = self.assertReportedStat(
|
||||
'zuul.tenant.tenant-one.event_enqueue_time', kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
for key in [
|
||||
'zuul.tenant.tenant-one.event_enqueue_processing_time',
|
||||
'zuul.tenant.tenant-one.event_enqueue_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.merge_request_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.node_request_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.job_wait_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.event_job_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.resident_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.read_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.write_time',
|
||||
]:
|
||||
val = self.assertReportedStat(key, kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_compressed',
|
||||
|
|
|
@ -1240,6 +1240,12 @@ class TestInRepoConfig(ZuulTestCase):
|
|||
dict(name='project-test3', result='SUCCESS', changes='2,1'),
|
||||
], ordered=False)
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
|
||||
'tenant-one-gate.layout_generation_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
def test_dynamic_template(self):
|
||||
# Tests that a project can't update a template in another
|
||||
# project.
|
||||
|
@ -7196,6 +7202,12 @@ class TestProvidesRequiresMysql(ZuulTestCase):
|
|||
}
|
||||
}])
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
|
||||
'gate.repo_state_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
@simple_layout('layouts/provides-requires-unshared.yaml')
|
||||
def test_provides_requires_unshared_queue(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
|
|
@ -1123,7 +1123,10 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
return self.getFallbackLayout(item)
|
||||
|
||||
log.debug("Preparing dynamic layout for: %s" % item.change)
|
||||
return self._loadDynamicLayout(item)
|
||||
start = time.time()
|
||||
layout = self._loadDynamicLayout(item)
|
||||
self.reportPipelineTiming('layout_generation_time', start)
|
||||
return layout
|
||||
|
||||
def _branchesForRepoState(self, projects, tenant, items=None):
|
||||
items = items or []
|
||||
|
@ -1256,6 +1259,7 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
branches=branches)
|
||||
item.current_build_set.updateAttributes(
|
||||
self.current_context,
|
||||
repo_state_request_time=time.time(),
|
||||
repo_state_state=item.current_build_set.PENDING)
|
||||
return True
|
||||
|
||||
|
@ -1344,10 +1348,12 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
if not item.current_build_set.job_graph:
|
||||
try:
|
||||
log.debug("Freezing job graph for %s" % (item,))
|
||||
start = time.time()
|
||||
item.freezeJobGraph(self.getLayout(item),
|
||||
self.current_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
self.reportPipelineTiming('job_freeze_time', start)
|
||||
except Exception as e:
|
||||
# TODOv3(jeblair): nicify this exception as it will be reported
|
||||
log.exception("Error freezing job graph for %s" % (item,))
|
||||
|
@ -1567,6 +1573,17 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
log.debug("Build %s started", build)
|
||||
self.sql.reportBuildStart(build)
|
||||
self.reportPipelineTiming('job_wait_time',
|
||||
build.execute_time, build.start_time)
|
||||
if not build.build_set.item.first_job_start_time:
|
||||
# Only report this for the first job in a queue item so
|
||||
# that we don't include gate resets.
|
||||
build.build_set.item.updateAttributes(
|
||||
self.current_context,
|
||||
first_job_start_time=build.start_time)
|
||||
self.reportPipelineTiming('event_job_time',
|
||||
build.build_set.item.event.timestamp,
|
||||
build.start_time)
|
||||
return True
|
||||
|
||||
def onBuildPaused(self, build):
|
||||
|
@ -1667,14 +1684,25 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
source.setChangeAttributes(item.change, files=event.files)
|
||||
build_set.updateAttributes(self.current_context,
|
||||
files_state=build_set.COMPLETE)
|
||||
if build_set.merge_state == build_set.COMPLETE:
|
||||
# We're the second of the files/merger pair, report the stat
|
||||
self.reportPipelineTiming('merge_request_time',
|
||||
build_set.configured_time)
|
||||
|
||||
def onMergeCompleted(self, event, build_set):
|
||||
if build_set.merge_state == build_set.COMPLETE:
|
||||
self._onGlobalRepoStateCompleted(event, build_set)
|
||||
self.reportPipelineTiming('repo_state_time',
|
||||
build_set.repo_state_request_time)
|
||||
else:
|
||||
self._onMergeCompleted(event, build_set)
|
||||
if build_set.files_state == build_set.COMPLETE:
|
||||
# We're the second of the files/merger pair, report the stat
|
||||
self.reportPipelineTiming('merge_request_time',
|
||||
build_set.configured_time)
|
||||
|
||||
def _onMergeCompleted(self, event, build_set):
|
||||
|
||||
item = build_set.item
|
||||
source = self.sched.connections.getSource(
|
||||
item.change.project.connection_name)
|
||||
|
@ -1705,12 +1733,14 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
item.setUnableToMerge()
|
||||
|
||||
def _onGlobalRepoStateCompleted(self, event, build_set):
|
||||
item = build_set.item
|
||||
if not event.updated:
|
||||
item = build_set.item
|
||||
self.log.info("Unable to get global repo state for change %s"
|
||||
% item.change)
|
||||
item.setUnableToMerge()
|
||||
else:
|
||||
self.log.info("Received global repo state for change %s"
|
||||
% item.change)
|
||||
with build_set.activeContext(self.current_context):
|
||||
build_set.setExtraRepoState(event.repo_state)
|
||||
build_set.repo_state_state = build_set.COMPLETE
|
||||
|
@ -1719,6 +1749,7 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
# TODOv3(jeblair): handle provisioning failure here
|
||||
log = get_annotated_logger(self.log, request.event_id)
|
||||
|
||||
self.reportPipelineTiming('node_request_time', request.created_time)
|
||||
if nodeset is not None:
|
||||
build_set.jobNodeRequestComplete(request.job_name, nodeset)
|
||||
if not request.fulfilled:
|
||||
|
@ -1887,7 +1918,7 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
# Update the gauge on enqueue and dequeue, but timers only
|
||||
# when dequeing.
|
||||
if item.dequeue_time:
|
||||
dt = int((item.dequeue_time - item.enqueue_time) * 1000)
|
||||
dt = (item.dequeue_time - item.enqueue_time) * 1000
|
||||
else:
|
||||
dt = None
|
||||
items = len(self.pipeline.getAllItems())
|
||||
|
@ -1922,12 +1953,27 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
|
||||
now = time.time()
|
||||
arrived = item.event.arrived_at_scheduler_timestamp
|
||||
processing = int((now - arrived) * 1000)
|
||||
elapsed = int((now - item.event.timestamp) * 1000)
|
||||
processing = (now - arrived) * 1000
|
||||
elapsed = (now - item.event.timestamp) * 1000
|
||||
self.sched.statsd.timing(
|
||||
basekey + '.event_enqueue_processing_time',
|
||||
processing)
|
||||
self.sched.statsd.timing(
|
||||
basekey + '.event_enqueue_time', elapsed)
|
||||
self.reportPipelineTiming('event_enqueue_time',
|
||||
item.event.timestamp)
|
||||
except Exception:
|
||||
self.log.exception("Exception reporting pipeline stats")
|
||||
|
||||
def reportPipelineTiming(self, key, start, end=None):
|
||||
if not self.sched.statsd:
|
||||
return
|
||||
if not start:
|
||||
return
|
||||
if end is None:
|
||||
end = time.time()
|
||||
pipeline = self.pipeline
|
||||
tenant = pipeline.tenant
|
||||
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
|
||||
dt = (end - start) * 1000
|
||||
self.sched.statsd.timing(f'{stats_key}.{key}', dt)
|
||||
|
|
|
@ -1485,6 +1485,7 @@ class NodeRequest(object):
|
|||
if 'tenant_name' in data:
|
||||
self.tenant_name = data['tenant_name']
|
||||
self.nodes = data.get('nodes', [])
|
||||
self.created_time = data.get('created_time')
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
|
@ -3624,6 +3625,9 @@ class BuildSet(zkobject.ZKObject):
|
|||
files_state=self.NEW,
|
||||
repo_state_state=self.NEW,
|
||||
configured=False,
|
||||
configured_time=None, # When setConfigured was called
|
||||
start_time=None, # When the buildset reported start
|
||||
repo_state_request_time=None, # When the refstate job was called
|
||||
fail_fast=False,
|
||||
job_graph=None,
|
||||
jobs={},
|
||||
|
@ -3727,6 +3731,9 @@ class BuildSet(zkobject.ZKObject):
|
|||
"fail_fast": self.fail_fast,
|
||||
"job_graph": (self.job_graph.toDict()
|
||||
if self.job_graph else None),
|
||||
"configured_time": self.configured_time,
|
||||
"start_time": self.start_time,
|
||||
"repo_state_request_time": self.repo_state_request_time,
|
||||
# jobs (serialize as separate objects)
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
@ -3831,8 +3838,8 @@ class BuildSet(zkobject.ZKObject):
|
|||
"builds": builds,
|
||||
"retry_builds": retry_builds,
|
||||
# These are local cache objects only valid for one pipeline run
|
||||
'_old_job_graph': None,
|
||||
'_old_jobs': {},
|
||||
"_old_job_graph": None,
|
||||
"_old_jobs": {},
|
||||
})
|
||||
return data
|
||||
|
||||
|
@ -3868,6 +3875,7 @@ class BuildSet(zkobject.ZKObject):
|
|||
self.dependent_changes = [i.change.toDict() for i in items]
|
||||
self.merger_items = [i.makeMergerItem() for i in items]
|
||||
self.configured = True
|
||||
self.configured_time = time.time()
|
||||
|
||||
def getStateName(self, state_num):
|
||||
return self.states_map.get(
|
||||
|
@ -4016,6 +4024,7 @@ class QueueItem(zkobject.ZKObject):
|
|||
enqueue_time=None,
|
||||
report_time=None,
|
||||
dequeue_time=None,
|
||||
first_job_start_time=None,
|
||||
reported=False,
|
||||
reported_start=False,
|
||||
quiet=False,
|
||||
|
@ -4088,6 +4097,7 @@ class QueueItem(zkobject.ZKObject):
|
|||
"dynamic_state": self.dynamic_state,
|
||||
"bundle": self.bundle and self.bundle.serialize(),
|
||||
"dequeued_bundle_failing": self.dequeued_bundle_failing,
|
||||
"first_job_start_time": self.first_job_start_time,
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
||||
|
|
|
@ -146,7 +146,7 @@ class Nodepool(object):
|
|||
if request.canceled:
|
||||
state = 'canceled'
|
||||
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||
dt = int((request.state_time - request.requested_time) * 1000)
|
||||
dt = (request.state_time - request.requested_time) * 1000
|
||||
|
||||
key = 'zuul.nodepool.requests.%s' % state
|
||||
pipe.incr(key + ".total")
|
||||
|
|
|
@ -797,14 +797,14 @@ class Scheduler(threading.Thread):
|
|||
'RETRY' if build.result is None else build.result
|
||||
)
|
||||
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
|
||||
dt = int((build.end_time - build.start_time) * 1000)
|
||||
dt = (build.end_time - build.start_time) * 1000
|
||||
self.statsd.timing(key, dt)
|
||||
self.statsd.incr(key)
|
||||
# zuul.tenant.<tenant>.pipeline.<pipeline>.project.
|
||||
# <host>.<project>.<branch>.job.<job>.wait_time
|
||||
if build.start_time:
|
||||
key = '%s.wait_time' % jobkey
|
||||
dt = int((build.start_time - build.execute_time) * 1000)
|
||||
dt = (build.start_time - build.execute_time) * 1000
|
||||
self.statsd.timing(key, dt)
|
||||
except Exception:
|
||||
self.log.exception("Exception reporting runtime stats")
|
||||
|
@ -1929,6 +1929,11 @@ class Scheduler(threading.Thread):
|
|||
self._process_pipeline(tenant, pipeline)
|
||||
# Update pipeline summary for zuul-web
|
||||
pipeline.summary.update(ctx, self.globals)
|
||||
if self.statsd:
|
||||
self.statsd.timing(f'{stats_key}.read_time',
|
||||
ctx.cumulative_read_time * 1000)
|
||||
self.statsd.timing(f'{stats_key}.write_time',
|
||||
ctx.cumulative_write_time * 1000)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked pipeline %s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
|
|
|
@ -590,7 +590,7 @@ class ManagementEventQueue(ZooKeeperEventQueue):
|
|||
return
|
||||
|
||||
result_data = {"traceback": event.traceback,
|
||||
"timestamp": time.monotonic()}
|
||||
"timestamp": time.time()}
|
||||
try:
|
||||
self.kazoo_client.set(
|
||||
event.result_ref,
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
import io
|
||||
from contextlib import suppress
|
||||
import time
|
||||
import zlib
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
@ -30,6 +31,8 @@ class RawShardIO(io.RawIOBase):
|
|||
self.shard_base = path
|
||||
self.compressed_bytes_read = 0
|
||||
self.compressed_bytes_written = 0
|
||||
self.cumulative_read_time = 0.0
|
||||
self.cumulative_write_time = 0.0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
@ -46,12 +49,17 @@ class RawShardIO(io.RawIOBase):
|
|||
@property
|
||||
def _shards(self):
|
||||
try:
|
||||
return self.client.get_children(self.shard_base)
|
||||
start = time.perf_counter()
|
||||
ret = self.client.get_children(self.shard_base)
|
||||
self.cumulative_read_time += time.perf_counter() - start
|
||||
return ret
|
||||
except NoNodeError:
|
||||
return []
|
||||
|
||||
def _getData(self, path):
|
||||
start = time.perf_counter()
|
||||
data, _ = self.client.get(path)
|
||||
self.cumulative_read_time += time.perf_counter() - start
|
||||
self.compressed_bytes_read += len(data)
|
||||
return zlib.decompress(data)
|
||||
|
||||
|
@ -69,12 +77,14 @@ class RawShardIO(io.RawIOBase):
|
|||
shard_bytes = zlib.compress(shard_bytes)
|
||||
if not (len(shard_bytes) < NODE_BYTE_SIZE_LIMIT):
|
||||
raise RuntimeError("Shard too large")
|
||||
start = time.perf_counter()
|
||||
self.client.create(
|
||||
"{}/".format(self.shard_base),
|
||||
shard_bytes,
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
self.cumulative_write_time += time.perf_counter() - start
|
||||
self.compressed_bytes_written += len(shard_bytes)
|
||||
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
|
@ -88,6 +98,10 @@ class BufferedShardWriter(io.BufferedWriter):
|
|||
def compressed_bytes_written(self):
|
||||
return self.__raw.compressed_bytes_written
|
||||
|
||||
@property
|
||||
def cumulative_write_time(self):
|
||||
return self.__raw.cumulative_write_time
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
|
@ -97,3 +111,7 @@ class BufferedShardReader(io.BufferedReader):
|
|||
@property
|
||||
def compressed_bytes_read(self):
|
||||
return self.__raw.compressed_bytes_read
|
||||
|
||||
@property
|
||||
def cumulative_read_time(self):
|
||||
return self.__raw.cumulative_read_time
|
||||
|
|
|
@ -31,6 +31,8 @@ class ZKContext:
|
|||
self.lock = lock
|
||||
self.stop_event = stop_event
|
||||
self.log = log
|
||||
self.cumulative_read_time = 0.0
|
||||
self.cumulative_write_time = 0.0
|
||||
|
||||
def sessionIsValid(self):
|
||||
return ((not self.lock or self.lock.is_still_valid()) and
|
||||
|
@ -237,7 +239,10 @@ class ZKObject:
|
|||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
compressed_data, zstat = context.client.get(path)
|
||||
context.cumulative_read_time += time.perf_counter() - start
|
||||
|
||||
self._set(_zkobject_hash=None)
|
||||
try:
|
||||
data = zlib.decompress(compressed_data)
|
||||
|
@ -278,6 +283,7 @@ class ZKObject:
|
|||
while context.sessionIsValid():
|
||||
try:
|
||||
compressed_data = zlib.compress(data)
|
||||
start = time.perf_counter()
|
||||
if create:
|
||||
real_path, zstat = context.client.create(
|
||||
path, compressed_data, makepath=True,
|
||||
|
@ -285,6 +291,7 @@ class ZKObject:
|
|||
else:
|
||||
zstat = context.client.set(path, compressed_data,
|
||||
version=self._zstat.version)
|
||||
context.cumulative_write_time += time.perf_counter() - start
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
|
@ -336,6 +343,8 @@ class ShardedZKObject(ZKObject):
|
|||
context.client, path) as stream:
|
||||
data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
context.cumulative_read_time += \
|
||||
stream.cumulative_read_time
|
||||
if not data and context.client.exists(path) is None:
|
||||
raise NoNodeError
|
||||
self._set(**self.deserialize(data, context))
|
||||
|
@ -382,6 +391,9 @@ class ShardedZKObject(ZKObject):
|
|||
stream.write(data)
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
context.cumulative_write_time += \
|
||||
stream.cumulative_write_time
|
||||
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
|
|
Loading…
Reference in New Issue