From c522bfa46077f0e030f03e0cd4585d5c100ed241 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 18 Feb 2022 20:23:23 -0800 Subject: [PATCH] Add pipeline timing metrics This adds several metrics for different phases of processing an item in a pipeline: * How long we wait for a response from mergers * How long it takes to get or compute a layout * How long it takes to freeze jobs * How long we wait for node requests to complete * How long we wait for an executor to start running a job after the request And finally, the total amount of time from the original event until the first job starts. We already report that at the tenant level, this duplicates that for a pipeline-specific metric. Several of these would also make sense as job metrics, but since they are mainly intended to diagnose Zuul system performance and not individual jobs, that would be a waste of storage space due to the extremely high cardinality. Additionally, two other timing metrics are added: the cumulative time spent reading and writing ZKObject data to ZK during pipeline processing. These can help determine whether more effort should be spent optimizing ZK data transfer. In preparing this change, I noticed that python statsd emits floating point values for timing. It's not clear whether this strictly matches the statsd spec, but since it does emit values with that precision, I have removed several int() casts in order to maintain the precision through to the statsd client. I also noticed a place where we were writing a monotonic timestamp value in a JSON serialized string to ZK. I do not believe this value is currently being used, therefore there is no further error to correct, however, we should not use time.monotonic() for values that are serialized since the reference clock will be different on different systems. Several new attributes are added to the QueueItem and Build classes, but are done so in a way that is backwards compatible, so no model api schema upgrade is needed. The code sites where they are used protect against the null values which will occur in a mixed-version cluster (the components will just not emit these stats in those cases). Change-Id: Iaacbef7fa2ed93bfc398a118c5e8cfbc0a67b846 --- doc/source/monitoring.rst | 73 ++++++++++++++++++- .../pipeline-timing-ea263e6e5939b1aa.yaml | 16 ++++ tests/base.py | 4 + .../zuul-connections-gerrit-and-github.conf | 5 ++ tests/fixtures/zuul-sql-driver-mysql.conf | 5 ++ tests/fixtures/zuul-sql-driver-postgres.conf | 5 ++ tests/unit/test_scheduler.py | 22 ++++-- tests/unit/test_v3.py | 12 +++ zuul/manager/__init__.py | 56 ++++++++++++-- zuul/model.py | 14 +++- zuul/nodepool.py | 2 +- zuul/scheduler.py | 9 ++- zuul/zk/event_queues.py | 2 +- zuul/zk/sharding.py | 20 ++++- zuul/zk/zkobject.py | 12 +++ 15 files changed, 235 insertions(+), 22 deletions(-) create mode 100644 releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 1caddf708c..3c0b2a1ff0 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -79,11 +79,66 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Holds metrics specific to jobs. This hierarchy includes: - .. stat:: + .. stat:: A set of metrics for each pipeline named as defined in the Zuul config. + .. stat:: event_enqueue_time + :type: timer + + The time elapsed from when a trigger event was received from + the remote system to when the corresponding item is enqueued + in a pipeline. + + .. stat:: merge_request_time + :type: timer + + The amount of time spent waiting for the initial merge + operation(s). This will always include a request to a Zuul + merger to speculatively merge the change, but it may also + include a second request submitted in parallel to identify + the files altered by the change. + + .. stat:: layout_generation_time + :type: timer + + The amount of time spent generating a dynamic configuration layout. + + .. stat:: job_freeze_time + :type: timer + + The amount of time spent freezing the inheritance hierarchy + and parameters of a job. + + .. stat:: repo_state_time + :type: timer + + The amount of time waiting for a secondary Zuul merger + operation to collect additional information about the repo + state of required projects. + + .. stat:: node_request_time + :type: timer + + The amount of time spent waiting for each node request to be + fulfilled. + + .. stat:: job_wait_time + :type: timer + + How long a job waited for an executor to start running it + after the build was requested. + + .. stat:: event_job_time + :type: timer + + The total amount of time elapsed from when a trigger event + was received from the remote system until the item's first + job is run. This is only emitted once per queue item, even + if its buildset is reset due to a speculative execution + failure. + .. stat:: all_jobs :type: counter @@ -153,8 +208,8 @@ These metrics are emitted by the Zuul :ref:`scheduler`: .. stat:: wait_time :type: timer - How long each item spent in the pipeline before its first job - started. + How long the job waited for an executor to + start running it after the build was requested. .. stat:: current_changes :type: gauge @@ -174,6 +229,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The number of changes for this project processed by the pipeline since Zuul started. + .. stat:: read_time + :type: timer + + The time spent reading data from ZooKeeper during a single + pipeline processing run. + .. stat:: refresh :type: timer @@ -206,6 +267,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The size of the pipeline's management event queue. + .. stat:: write_time + :type: timer + + The time spent writing data to ZooKeeper during a single + pipeline processing run. + .. stat:: zuul.executor. Holds metrics emitted by individual executors. The ```` diff --git a/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml new file mode 100644 index 0000000000..cab32e1469 --- /dev/null +++ b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml @@ -0,0 +1,16 @@ +--- +features: + - | + The following new statsd metrics are available in order to monitor + Zuul system performance: + + * :stat:`zuul.tenant..pipeline..event_enqueue_time` + * :stat:`zuul.tenant..pipeline..merge_request_time` + * :stat:`zuul.tenant..pipeline..layout_generation_time` + * :stat:`zuul.tenant..pipeline..job_freeze_time` + * :stat:`zuul.tenant..pipeline..repo_state_time` + * :stat:`zuul.tenant..pipeline..node_request_time` + * :stat:`zuul.tenant..pipeline..job_wait_time` + * :stat:`zuul.tenant..pipeline..event_job_time` + * :stat:`zuul.tenant..pipeline..read_time` + * :stat:`zuul.tenant..pipeline..write_time` diff --git a/tests/base.py b/tests/base.py index 64859c5b36..17d6e948f3 100644 --- a/tests/base.py +++ b/tests/base.py @@ -5544,6 +5544,10 @@ class ZuulTestCase(BaseTestCase): return s_value time.sleep(0.1) + stats = list(itertools.chain.from_iterable( + [s.decode('utf-8').split('\n') for s in self.statsd.stats])) + for stat in stats: + self.log.debug("Stat: %s", stat) raise StatException("Key %s not found in reported stats" % key) def assertUnReportedStat(self, key, value=None, kind=None): diff --git a/tests/fixtures/zuul-connections-gerrit-and-github.conf b/tests/fixtures/zuul-connections-gerrit-and-github.conf index 601de4b92f..d71d4f584e 100644 --- a/tests/fixtures/zuul-connections-gerrit-and-github.conf +++ b/tests/fixtures/zuul-connections-gerrit-and-github.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=config/multi-driver/main.yaml diff --git a/tests/fixtures/zuul-sql-driver-mysql.conf b/tests/fixtures/zuul-sql-driver-mysql.conf index 094efdd63e..4d0ee8c615 100644 --- a/tests/fixtures/zuul-sql-driver-mysql.conf +++ b/tests/fixtures/zuul-sql-driver-mysql.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=main.yaml diff --git a/tests/fixtures/zuul-sql-driver-postgres.conf b/tests/fixtures/zuul-sql-driver-postgres.conf index 436d16de48..0d21c9aed0 100644 --- a/tests/fixtures/zuul-sql-driver-postgres.conf +++ b/tests/fixtures/zuul-sql-driver-postgres.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=main.yaml diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 703eece1fd..773900a41a 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -440,13 +440,21 @@ class TestScheduler(ZuulTestCase): value='0', kind='g') # Catch time / monotonic errors - val = self.assertReportedStat( - 'zuul.tenant.tenant-one.event_enqueue_processing_time', - kind='ms') - self.assertTrue(0.0 < float(val) < 60000.0) - val = self.assertReportedStat( - 'zuul.tenant.tenant-one.event_enqueue_time', kind='ms') - self.assertTrue(0.0 < float(val) < 60000.0) + for key in [ + 'zuul.tenant.tenant-one.event_enqueue_processing_time', + 'zuul.tenant.tenant-one.event_enqueue_time', + 'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time', + 'zuul.tenant.tenant-one.pipeline.gate.merge_request_time', + 'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time', + 'zuul.tenant.tenant-one.pipeline.gate.node_request_time', + 'zuul.tenant.tenant-one.pipeline.gate.job_wait_time', + 'zuul.tenant.tenant-one.pipeline.gate.event_job_time', + 'zuul.tenant.tenant-one.pipeline.gate.resident_time', + 'zuul.tenant.tenant-one.pipeline.gate.read_time', + 'zuul.tenant.tenant-one.pipeline.gate.write_time', + ]: + val = self.assertReportedStat(key, kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.' 'data_size_compressed', diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index c905896e1f..0a3ad28186 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -1240,6 +1240,12 @@ class TestInRepoConfig(ZuulTestCase): dict(name='project-test3', result='SUCCESS', changes='2,1'), ], ordered=False) + # Catch time / monotonic errors + val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.' + 'tenant-one-gate.layout_generation_time', + kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) + def test_dynamic_template(self): # Tests that a project can't update a template in another # project. @@ -7196,6 +7202,12 @@ class TestProvidesRequiresMysql(ZuulTestCase): } }]) + # Catch time / monotonic errors + val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.' + 'gate.repo_state_time', + kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) + @simple_layout('layouts/provides-requires-unshared.yaml') def test_provides_requires_unshared_queue(self): self.executor_server.hold_jobs_in_build = True diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 070065ba1f..2e805e3001 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1120,7 +1120,10 @@ class PipelineManager(metaclass=ABCMeta): return self.getFallbackLayout(item) log.debug("Preparing dynamic layout for: %s" % item.change) - return self._loadDynamicLayout(item) + start = time.time() + layout = self._loadDynamicLayout(item) + self.reportPipelineTiming('layout_generation_time', start) + return layout def _branchesForRepoState(self, projects, tenant, items=None): items = items or [] @@ -1253,6 +1256,7 @@ class PipelineManager(metaclass=ABCMeta): branches=branches) item.current_build_set.updateAttributes( self.current_context, + repo_state_request_time=time.time(), repo_state_state=item.current_build_set.PENDING) return True @@ -1341,10 +1345,12 @@ class PipelineManager(metaclass=ABCMeta): if not item.current_build_set.job_graph: try: log.debug("Freezing job graph for %s" % (item,)) + start = time.time() item.freezeJobGraph(self.getLayout(item), self.current_context, skip_file_matcher=False, redact_secrets_and_keys=False) + self.reportPipelineTiming('job_freeze_time', start) except Exception as e: # TODOv3(jeblair): nicify this exception as it will be reported log.exception("Error freezing job graph for %s" % (item,)) @@ -1564,6 +1570,17 @@ class PipelineManager(metaclass=ABCMeta): log = get_annotated_logger(self.log, build.zuul_event_id) log.debug("Build %s started", build) self.sql.reportBuildStart(build) + self.reportPipelineTiming('job_wait_time', + build.execute_time, build.start_time) + if not build.build_set.item.first_job_start_time: + # Only report this for the first job in a queue item so + # that we don't include gate resets. + build.build_set.item.updateAttributes( + self.current_context, + first_job_start_time=build.start_time) + self.reportPipelineTiming('event_job_time', + build.build_set.item.event.timestamp, + build.start_time) return True def onBuildPaused(self, build): @@ -1664,14 +1681,25 @@ class PipelineManager(metaclass=ABCMeta): source.setChangeAttributes(item.change, files=event.files) build_set.updateAttributes(self.current_context, files_state=build_set.COMPLETE) + if build_set.merge_state == build_set.COMPLETE: + # We're the second of the files/merger pair, report the stat + self.reportPipelineTiming('merge_request_time', + build_set.configured_time) def onMergeCompleted(self, event, build_set): if build_set.merge_state == build_set.COMPLETE: self._onGlobalRepoStateCompleted(event, build_set) + self.reportPipelineTiming('repo_state_time', + build_set.repo_state_request_time) else: self._onMergeCompleted(event, build_set) + if build_set.files_state == build_set.COMPLETE: + # We're the second of the files/merger pair, report the stat + self.reportPipelineTiming('merge_request_time', + build_set.configured_time) def _onMergeCompleted(self, event, build_set): + item = build_set.item source = self.sched.connections.getSource( item.change.project.connection_name) @@ -1702,12 +1730,14 @@ class PipelineManager(metaclass=ABCMeta): item.setUnableToMerge() def _onGlobalRepoStateCompleted(self, event, build_set): + item = build_set.item if not event.updated: - item = build_set.item self.log.info("Unable to get global repo state for change %s" % item.change) item.setUnableToMerge() else: + self.log.info("Received global repo state for change %s" + % item.change) with build_set.activeContext(self.current_context): build_set.setExtraRepoState(event.repo_state) build_set.repo_state_state = build_set.COMPLETE @@ -1716,6 +1746,7 @@ class PipelineManager(metaclass=ABCMeta): # TODOv3(jeblair): handle provisioning failure here log = get_annotated_logger(self.log, request.event_id) + self.reportPipelineTiming('node_request_time', request.created_time) if nodeset is not None: build_set.jobNodeRequestComplete(request.job_name, nodeset) if not request.fulfilled: @@ -1878,7 +1909,7 @@ class PipelineManager(metaclass=ABCMeta): # Update the gauge on enqueue and dequeue, but timers only # when dequeing. if item.dequeue_time: - dt = int((item.dequeue_time - item.enqueue_time) * 1000) + dt = (item.dequeue_time - item.enqueue_time) * 1000 else: dt = None items = len(self.pipeline.getAllItems()) @@ -1913,12 +1944,27 @@ class PipelineManager(metaclass=ABCMeta): if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): now = time.time() arrived = item.event.arrived_at_scheduler_timestamp - processing = int((now - arrived) * 1000) - elapsed = int((now - item.event.timestamp) * 1000) + processing = (now - arrived) * 1000 + elapsed = (now - item.event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) + self.reportPipelineTiming('event_enqueue_time', + item.event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") + + def reportPipelineTiming(self, key, start, end=None): + if not self.sched.statsd: + return + if not start: + return + if end is None: + end = time.time() + pipeline = self.pipeline + tenant = pipeline.tenant + stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' + dt = (end - start) * 1000 + self.sched.statsd.timing(f'{stats_key}.{key}', dt) diff --git a/zuul/model.py b/zuul/model.py index 17d2c18148..c47175916e 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1485,6 +1485,7 @@ class NodeRequest(object): if 'tenant_name' in data: self.tenant_name = data['tenant_name'] self.nodes = data.get('nodes', []) + self.created_time = data.get('created_time') @classmethod def fromDict(cls, data): @@ -3624,6 +3625,9 @@ class BuildSet(zkobject.ZKObject): files_state=self.NEW, repo_state_state=self.NEW, configured=False, + configured_time=None, # When setConfigured was called + start_time=None, # When the buildset reported start + repo_state_request_time=None, # When the refstate job was called fail_fast=False, job_graph=None, jobs={}, @@ -3727,6 +3731,9 @@ class BuildSet(zkobject.ZKObject): "fail_fast": self.fail_fast, "job_graph": (self.job_graph.toDict() if self.job_graph else None), + "configured_time": self.configured_time, + "start_time": self.start_time, + "repo_state_request_time": self.repo_state_request_time, # jobs (serialize as separate objects) } return json.dumps(data, sort_keys=True).encode("utf8") @@ -3831,8 +3838,8 @@ class BuildSet(zkobject.ZKObject): "builds": builds, "retry_builds": retry_builds, # These are local cache objects only valid for one pipeline run - '_old_job_graph': None, - '_old_jobs': {}, + "_old_job_graph": None, + "_old_jobs": {}, }) return data @@ -3868,6 +3875,7 @@ class BuildSet(zkobject.ZKObject): self.dependent_changes = [i.change.toDict() for i in items] self.merger_items = [i.makeMergerItem() for i in items] self.configured = True + self.configured_time = time.time() def getStateName(self, state_num): return self.states_map.get( @@ -4016,6 +4024,7 @@ class QueueItem(zkobject.ZKObject): enqueue_time=None, report_time=None, dequeue_time=None, + first_job_start_time=None, reported=False, reported_start=False, quiet=False, @@ -4088,6 +4097,7 @@ class QueueItem(zkobject.ZKObject): "dynamic_state": self.dynamic_state, "bundle": self.bundle and self.bundle.serialize(), "dequeued_bundle_failing": self.dequeued_bundle_failing, + "first_job_start_time": self.first_job_start_time, } return json.dumps(data, sort_keys=True).encode("utf8") diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 8dd27bfe68..d30c846f7b 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -146,7 +146,7 @@ class Nodepool(object): if request.canceled: state = 'canceled' elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): - dt = int((request.state_time - request.requested_time) * 1000) + dt = (request.state_time - request.requested_time) * 1000 key = 'zuul.nodepool.requests.%s' % state pipe.incr(key + ".total") diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 39193f1035..57b2127745 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -795,14 +795,14 @@ class Scheduler(threading.Thread): 'RETRY' if build.result is None else build.result ) if build.result in ['SUCCESS', 'FAILURE'] and build.start_time: - dt = int((build.end_time - build.start_time) * 1000) + dt = (build.end_time - build.start_time) * 1000 self.statsd.timing(key, dt) self.statsd.incr(key) # zuul.tenant..pipeline..project. # ...job..wait_time if build.start_time: key = '%s.wait_time' % jobkey - dt = int((build.start_time - build.execute_time) * 1000) + dt = (build.start_time - build.execute_time) * 1000 self.statsd.timing(key, dt) except Exception: self.log.exception("Exception reporting runtime stats") @@ -1891,6 +1891,11 @@ class Scheduler(threading.Thread): self._process_pipeline(tenant, pipeline) # Update pipeline summary for zuul-web pipeline.summary.update(ctx, self.globals) + if self.statsd: + self.statsd.timing(f'{stats_key}.read_time', + ctx.cumulative_read_time * 1000) + self.statsd.timing(f'{stats_key}.write_time', + ctx.cumulative_write_time * 1000) except LockException: self.log.debug("Skipping locked pipeline %s in tenant %s", pipeline.name, tenant.name) diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index fdc99442f7..e35e05ecb2 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -590,7 +590,7 @@ class ManagementEventQueue(ZooKeeperEventQueue): return result_data = {"traceback": event.traceback, - "timestamp": time.monotonic()} + "timestamp": time.time()} try: self.kazoo_client.set( event.result_ref, diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index a1278a8a2c..5ca1158d5f 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -14,6 +14,7 @@ import io from contextlib import suppress +import time import zlib from kazoo.exceptions import NoNodeError @@ -30,6 +31,8 @@ class RawShardIO(io.RawIOBase): self.shard_base = path self.compressed_bytes_read = 0 self.compressed_bytes_written = 0 + self.cumulative_read_time = 0.0 + self.cumulative_write_time = 0.0 def readable(self): return True @@ -46,12 +49,17 @@ class RawShardIO(io.RawIOBase): @property def _shards(self): try: - return self.client.get_children(self.shard_base) + start = time.perf_counter() + ret = self.client.get_children(self.shard_base) + self.cumulative_read_time += time.perf_counter() - start + return ret except NoNodeError: return [] def _getData(self, path): + start = time.perf_counter() data, _ = self.client.get(path) + self.cumulative_read_time += time.perf_counter() - start self.compressed_bytes_read += len(data) return zlib.decompress(data) @@ -69,12 +77,14 @@ class RawShardIO(io.RawIOBase): shard_bytes = zlib.compress(shard_bytes) if not (len(shard_bytes) < NODE_BYTE_SIZE_LIMIT): raise RuntimeError("Shard too large") + start = time.perf_counter() self.client.create( "{}/".format(self.shard_base), shard_bytes, sequence=True, makepath=True, ) + self.cumulative_write_time += time.perf_counter() - start self.compressed_bytes_written += len(shard_bytes) return min(byte_count, NODE_BYTE_SIZE_LIMIT) @@ -88,6 +98,10 @@ class BufferedShardWriter(io.BufferedWriter): def compressed_bytes_written(self): return self.__raw.compressed_bytes_written + @property + def cumulative_write_time(self): + return self.__raw.cumulative_write_time + class BufferedShardReader(io.BufferedReader): def __init__(self, client, path): @@ -97,3 +111,7 @@ class BufferedShardReader(io.BufferedReader): @property def compressed_bytes_read(self): return self.__raw.compressed_bytes_read + + @property + def cumulative_read_time(self): + return self.__raw.cumulative_read_time diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 45eb2b8e22..e837bf3ea6 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -31,6 +31,8 @@ class ZKContext: self.lock = lock self.stop_event = stop_event self.log = log + self.cumulative_read_time = 0.0 + self.cumulative_write_time = 0.0 def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and @@ -237,7 +239,10 @@ class ZKObject: path = self.getPath() while context.sessionIsValid(): try: + start = time.perf_counter() compressed_data, zstat = context.client.get(path) + context.cumulative_read_time += time.perf_counter() - start + self._set(_zkobject_hash=None) try: data = zlib.decompress(compressed_data) @@ -278,6 +283,7 @@ class ZKObject: while context.sessionIsValid(): try: compressed_data = zlib.compress(data) + start = time.perf_counter() if create: real_path, zstat = context.client.create( path, compressed_data, makepath=True, @@ -285,6 +291,7 @@ class ZKObject: else: zstat = context.client.set(path, compressed_data, version=self._zstat.version) + context.cumulative_write_time += time.perf_counter() - start self._set(_zstat=zstat, _zkobject_hash=hash(data), _zkobject_compressed_size=len(compressed_data), @@ -336,6 +343,8 @@ class ShardedZKObject(ZKObject): context.client, path) as stream: data = stream.read() compressed_size = stream.compressed_bytes_read + context.cumulative_read_time += \ + stream.cumulative_read_time if not data and context.client.exists(path) is None: raise NoNodeError self._set(**self.deserialize(data, context)) @@ -382,6 +391,9 @@ class ShardedZKObject(ZKObject): stream.write(data) stream.flush() compressed_size = stream.compressed_bytes_written + context.cumulative_write_time += \ + stream.cumulative_write_time + self._set(_zkobject_hash=hash(data), _zkobject_compressed_size=compressed_size, _zkobject_uncompressed_size=len(data),