diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 1caddf708c..3c0b2a1ff0 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -79,11 +79,66 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Holds metrics specific to jobs. This hierarchy includes: - .. stat:: + .. stat:: A set of metrics for each pipeline named as defined in the Zuul config. + .. stat:: event_enqueue_time + :type: timer + + The time elapsed from when a trigger event was received from + the remote system to when the corresponding item is enqueued + in a pipeline. + + .. stat:: merge_request_time + :type: timer + + The amount of time spent waiting for the initial merge + operation(s). This will always include a request to a Zuul + merger to speculatively merge the change, but it may also + include a second request submitted in parallel to identify + the files altered by the change. + + .. stat:: layout_generation_time + :type: timer + + The amount of time spent generating a dynamic configuration layout. + + .. stat:: job_freeze_time + :type: timer + + The amount of time spent freezing the inheritance hierarchy + and parameters of a job. + + .. stat:: repo_state_time + :type: timer + + The amount of time waiting for a secondary Zuul merger + operation to collect additional information about the repo + state of required projects. + + .. stat:: node_request_time + :type: timer + + The amount of time spent waiting for each node request to be + fulfilled. + + .. stat:: job_wait_time + :type: timer + + How long a job waited for an executor to start running it + after the build was requested. + + .. stat:: event_job_time + :type: timer + + The total amount of time elapsed from when a trigger event + was received from the remote system until the item's first + job is run. This is only emitted once per queue item, even + if its buildset is reset due to a speculative execution + failure. + .. stat:: all_jobs :type: counter @@ -153,8 +208,8 @@ These metrics are emitted by the Zuul :ref:`scheduler`: .. stat:: wait_time :type: timer - How long each item spent in the pipeline before its first job - started. + How long the job waited for an executor to + start running it after the build was requested. .. stat:: current_changes :type: gauge @@ -174,6 +229,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The number of changes for this project processed by the pipeline since Zuul started. + .. stat:: read_time + :type: timer + + The time spent reading data from ZooKeeper during a single + pipeline processing run. + .. stat:: refresh :type: timer @@ -206,6 +267,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The size of the pipeline's management event queue. + .. stat:: write_time + :type: timer + + The time spent writing data to ZooKeeper during a single + pipeline processing run. + .. stat:: zuul.executor. Holds metrics emitted by individual executors. The ```` diff --git a/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml new file mode 100644 index 0000000000..cab32e1469 --- /dev/null +++ b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml @@ -0,0 +1,16 @@ +--- +features: + - | + The following new statsd metrics are available in order to monitor + Zuul system performance: + + * :stat:`zuul.tenant..pipeline..event_enqueue_time` + * :stat:`zuul.tenant..pipeline..merge_request_time` + * :stat:`zuul.tenant..pipeline..layout_generation_time` + * :stat:`zuul.tenant..pipeline..job_freeze_time` + * :stat:`zuul.tenant..pipeline..repo_state_time` + * :stat:`zuul.tenant..pipeline..node_request_time` + * :stat:`zuul.tenant..pipeline..job_wait_time` + * :stat:`zuul.tenant..pipeline..event_job_time` + * :stat:`zuul.tenant..pipeline..read_time` + * :stat:`zuul.tenant..pipeline..write_time` diff --git a/tests/base.py b/tests/base.py index ae465b20f6..d58c15b452 100644 --- a/tests/base.py +++ b/tests/base.py @@ -5545,6 +5545,10 @@ class ZuulTestCase(BaseTestCase): return s_value time.sleep(0.1) + stats = list(itertools.chain.from_iterable( + [s.decode('utf-8').split('\n') for s in self.statsd.stats])) + for stat in stats: + self.log.debug("Stat: %s", stat) raise StatException("Key %s not found in reported stats" % key) def assertUnReportedStat(self, key, value=None, kind=None): diff --git a/tests/fixtures/zuul-connections-gerrit-and-github.conf b/tests/fixtures/zuul-connections-gerrit-and-github.conf index 601de4b92f..d71d4f584e 100644 --- a/tests/fixtures/zuul-connections-gerrit-and-github.conf +++ b/tests/fixtures/zuul-connections-gerrit-and-github.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=config/multi-driver/main.yaml diff --git a/tests/fixtures/zuul-sql-driver-mysql.conf b/tests/fixtures/zuul-sql-driver-mysql.conf index 094efdd63e..4d0ee8c615 100644 --- a/tests/fixtures/zuul-sql-driver-mysql.conf +++ b/tests/fixtures/zuul-sql-driver-mysql.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=main.yaml diff --git a/tests/fixtures/zuul-sql-driver-postgres.conf b/tests/fixtures/zuul-sql-driver-postgres.conf index 436d16de48..0d21c9aed0 100644 --- a/tests/fixtures/zuul-sql-driver-postgres.conf +++ b/tests/fixtures/zuul-sql-driver-postgres.conf @@ -1,3 +1,8 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + [scheduler] tenant_config=main.yaml diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 21490da2b1..307589267b 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -440,13 +440,21 @@ class TestScheduler(ZuulTestCase): value='0', kind='g') # Catch time / monotonic errors - val = self.assertReportedStat( - 'zuul.tenant.tenant-one.event_enqueue_processing_time', - kind='ms') - self.assertTrue(0.0 < float(val) < 60000.0) - val = self.assertReportedStat( - 'zuul.tenant.tenant-one.event_enqueue_time', kind='ms') - self.assertTrue(0.0 < float(val) < 60000.0) + for key in [ + 'zuul.tenant.tenant-one.event_enqueue_processing_time', + 'zuul.tenant.tenant-one.event_enqueue_time', + 'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time', + 'zuul.tenant.tenant-one.pipeline.gate.merge_request_time', + 'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time', + 'zuul.tenant.tenant-one.pipeline.gate.node_request_time', + 'zuul.tenant.tenant-one.pipeline.gate.job_wait_time', + 'zuul.tenant.tenant-one.pipeline.gate.event_job_time', + 'zuul.tenant.tenant-one.pipeline.gate.resident_time', + 'zuul.tenant.tenant-one.pipeline.gate.read_time', + 'zuul.tenant.tenant-one.pipeline.gate.write_time', + ]: + val = self.assertReportedStat(key, kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.' 'data_size_compressed', diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index c905896e1f..0a3ad28186 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -1240,6 +1240,12 @@ class TestInRepoConfig(ZuulTestCase): dict(name='project-test3', result='SUCCESS', changes='2,1'), ], ordered=False) + # Catch time / monotonic errors + val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.' + 'tenant-one-gate.layout_generation_time', + kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) + def test_dynamic_template(self): # Tests that a project can't update a template in another # project. @@ -7196,6 +7202,12 @@ class TestProvidesRequiresMysql(ZuulTestCase): } }]) + # Catch time / monotonic errors + val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.' + 'gate.repo_state_time', + kind='ms') + self.assertTrue(0.0 < float(val) < 60000.0) + @simple_layout('layouts/provides-requires-unshared.yaml') def test_provides_requires_unshared_queue(self): self.executor_server.hold_jobs_in_build = True diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 1c8f0d4dac..f1e3483990 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1123,7 +1123,10 @@ class PipelineManager(metaclass=ABCMeta): return self.getFallbackLayout(item) log.debug("Preparing dynamic layout for: %s" % item.change) - return self._loadDynamicLayout(item) + start = time.time() + layout = self._loadDynamicLayout(item) + self.reportPipelineTiming('layout_generation_time', start) + return layout def _branchesForRepoState(self, projects, tenant, items=None): items = items or [] @@ -1256,6 +1259,7 @@ class PipelineManager(metaclass=ABCMeta): branches=branches) item.current_build_set.updateAttributes( self.current_context, + repo_state_request_time=time.time(), repo_state_state=item.current_build_set.PENDING) return True @@ -1344,10 +1348,12 @@ class PipelineManager(metaclass=ABCMeta): if not item.current_build_set.job_graph: try: log.debug("Freezing job graph for %s" % (item,)) + start = time.time() item.freezeJobGraph(self.getLayout(item), self.current_context, skip_file_matcher=False, redact_secrets_and_keys=False) + self.reportPipelineTiming('job_freeze_time', start) except Exception as e: # TODOv3(jeblair): nicify this exception as it will be reported log.exception("Error freezing job graph for %s" % (item,)) @@ -1567,6 +1573,17 @@ class PipelineManager(metaclass=ABCMeta): log = get_annotated_logger(self.log, build.zuul_event_id) log.debug("Build %s started", build) self.sql.reportBuildStart(build) + self.reportPipelineTiming('job_wait_time', + build.execute_time, build.start_time) + if not build.build_set.item.first_job_start_time: + # Only report this for the first job in a queue item so + # that we don't include gate resets. + build.build_set.item.updateAttributes( + self.current_context, + first_job_start_time=build.start_time) + self.reportPipelineTiming('event_job_time', + build.build_set.item.event.timestamp, + build.start_time) return True def onBuildPaused(self, build): @@ -1667,14 +1684,25 @@ class PipelineManager(metaclass=ABCMeta): source.setChangeAttributes(item.change, files=event.files) build_set.updateAttributes(self.current_context, files_state=build_set.COMPLETE) + if build_set.merge_state == build_set.COMPLETE: + # We're the second of the files/merger pair, report the stat + self.reportPipelineTiming('merge_request_time', + build_set.configured_time) def onMergeCompleted(self, event, build_set): if build_set.merge_state == build_set.COMPLETE: self._onGlobalRepoStateCompleted(event, build_set) + self.reportPipelineTiming('repo_state_time', + build_set.repo_state_request_time) else: self._onMergeCompleted(event, build_set) + if build_set.files_state == build_set.COMPLETE: + # We're the second of the files/merger pair, report the stat + self.reportPipelineTiming('merge_request_time', + build_set.configured_time) def _onMergeCompleted(self, event, build_set): + item = build_set.item source = self.sched.connections.getSource( item.change.project.connection_name) @@ -1705,12 +1733,14 @@ class PipelineManager(metaclass=ABCMeta): item.setUnableToMerge() def _onGlobalRepoStateCompleted(self, event, build_set): + item = build_set.item if not event.updated: - item = build_set.item self.log.info("Unable to get global repo state for change %s" % item.change) item.setUnableToMerge() else: + self.log.info("Received global repo state for change %s" + % item.change) with build_set.activeContext(self.current_context): build_set.setExtraRepoState(event.repo_state) build_set.repo_state_state = build_set.COMPLETE @@ -1719,6 +1749,7 @@ class PipelineManager(metaclass=ABCMeta): # TODOv3(jeblair): handle provisioning failure here log = get_annotated_logger(self.log, request.event_id) + self.reportPipelineTiming('node_request_time', request.created_time) if nodeset is not None: build_set.jobNodeRequestComplete(request.job_name, nodeset) if not request.fulfilled: @@ -1887,7 +1918,7 @@ class PipelineManager(metaclass=ABCMeta): # Update the gauge on enqueue and dequeue, but timers only # when dequeing. if item.dequeue_time: - dt = int((item.dequeue_time - item.enqueue_time) * 1000) + dt = (item.dequeue_time - item.enqueue_time) * 1000 else: dt = None items = len(self.pipeline.getAllItems()) @@ -1922,12 +1953,27 @@ class PipelineManager(metaclass=ABCMeta): if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): now = time.time() arrived = item.event.arrived_at_scheduler_timestamp - processing = int((now - arrived) * 1000) - elapsed = int((now - item.event.timestamp) * 1000) + processing = (now - arrived) * 1000 + elapsed = (now - item.event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) + self.reportPipelineTiming('event_enqueue_time', + item.event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") + + def reportPipelineTiming(self, key, start, end=None): + if not self.sched.statsd: + return + if not start: + return + if end is None: + end = time.time() + pipeline = self.pipeline + tenant = pipeline.tenant + stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' + dt = (end - start) * 1000 + self.sched.statsd.timing(f'{stats_key}.{key}', dt) diff --git a/zuul/model.py b/zuul/model.py index bffd3521d7..079ae3d508 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1485,6 +1485,7 @@ class NodeRequest(object): if 'tenant_name' in data: self.tenant_name = data['tenant_name'] self.nodes = data.get('nodes', []) + self.created_time = data.get('created_time') @classmethod def fromDict(cls, data): @@ -3624,6 +3625,9 @@ class BuildSet(zkobject.ZKObject): files_state=self.NEW, repo_state_state=self.NEW, configured=False, + configured_time=None, # When setConfigured was called + start_time=None, # When the buildset reported start + repo_state_request_time=None, # When the refstate job was called fail_fast=False, job_graph=None, jobs={}, @@ -3727,6 +3731,9 @@ class BuildSet(zkobject.ZKObject): "fail_fast": self.fail_fast, "job_graph": (self.job_graph.toDict() if self.job_graph else None), + "configured_time": self.configured_time, + "start_time": self.start_time, + "repo_state_request_time": self.repo_state_request_time, # jobs (serialize as separate objects) } return json.dumps(data, sort_keys=True).encode("utf8") @@ -3831,8 +3838,8 @@ class BuildSet(zkobject.ZKObject): "builds": builds, "retry_builds": retry_builds, # These are local cache objects only valid for one pipeline run - '_old_job_graph': None, - '_old_jobs': {}, + "_old_job_graph": None, + "_old_jobs": {}, }) return data @@ -3868,6 +3875,7 @@ class BuildSet(zkobject.ZKObject): self.dependent_changes = [i.change.toDict() for i in items] self.merger_items = [i.makeMergerItem() for i in items] self.configured = True + self.configured_time = time.time() def getStateName(self, state_num): return self.states_map.get( @@ -4016,6 +4024,7 @@ class QueueItem(zkobject.ZKObject): enqueue_time=None, report_time=None, dequeue_time=None, + first_job_start_time=None, reported=False, reported_start=False, quiet=False, @@ -4088,6 +4097,7 @@ class QueueItem(zkobject.ZKObject): "dynamic_state": self.dynamic_state, "bundle": self.bundle and self.bundle.serialize(), "dequeued_bundle_failing": self.dequeued_bundle_failing, + "first_job_start_time": self.first_job_start_time, } return json.dumps(data, sort_keys=True).encode("utf8") diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 440d9f10b2..57a885ea9b 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -146,7 +146,7 @@ class Nodepool(object): if request.canceled: state = 'canceled' elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): - dt = int((request.state_time - request.requested_time) * 1000) + dt = (request.state_time - request.requested_time) * 1000 key = 'zuul.nodepool.requests.%s' % state pipe.incr(key + ".total") diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 025c6e09bf..297652ed9c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -797,14 +797,14 @@ class Scheduler(threading.Thread): 'RETRY' if build.result is None else build.result ) if build.result in ['SUCCESS', 'FAILURE'] and build.start_time: - dt = int((build.end_time - build.start_time) * 1000) + dt = (build.end_time - build.start_time) * 1000 self.statsd.timing(key, dt) self.statsd.incr(key) # zuul.tenant..pipeline..project. # ...job..wait_time if build.start_time: key = '%s.wait_time' % jobkey - dt = int((build.start_time - build.execute_time) * 1000) + dt = (build.start_time - build.execute_time) * 1000 self.statsd.timing(key, dt) except Exception: self.log.exception("Exception reporting runtime stats") @@ -1929,6 +1929,11 @@ class Scheduler(threading.Thread): self._process_pipeline(tenant, pipeline) # Update pipeline summary for zuul-web pipeline.summary.update(ctx, self.globals) + if self.statsd: + self.statsd.timing(f'{stats_key}.read_time', + ctx.cumulative_read_time * 1000) + self.statsd.timing(f'{stats_key}.write_time', + ctx.cumulative_write_time * 1000) except LockException: self.log.debug("Skipping locked pipeline %s in tenant %s", pipeline.name, tenant.name) diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index fdc99442f7..e35e05ecb2 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -590,7 +590,7 @@ class ManagementEventQueue(ZooKeeperEventQueue): return result_data = {"traceback": event.traceback, - "timestamp": time.monotonic()} + "timestamp": time.time()} try: self.kazoo_client.set( event.result_ref, diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index a1278a8a2c..5ca1158d5f 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -14,6 +14,7 @@ import io from contextlib import suppress +import time import zlib from kazoo.exceptions import NoNodeError @@ -30,6 +31,8 @@ class RawShardIO(io.RawIOBase): self.shard_base = path self.compressed_bytes_read = 0 self.compressed_bytes_written = 0 + self.cumulative_read_time = 0.0 + self.cumulative_write_time = 0.0 def readable(self): return True @@ -46,12 +49,17 @@ class RawShardIO(io.RawIOBase): @property def _shards(self): try: - return self.client.get_children(self.shard_base) + start = time.perf_counter() + ret = self.client.get_children(self.shard_base) + self.cumulative_read_time += time.perf_counter() - start + return ret except NoNodeError: return [] def _getData(self, path): + start = time.perf_counter() data, _ = self.client.get(path) + self.cumulative_read_time += time.perf_counter() - start self.compressed_bytes_read += len(data) return zlib.decompress(data) @@ -69,12 +77,14 @@ class RawShardIO(io.RawIOBase): shard_bytes = zlib.compress(shard_bytes) if not (len(shard_bytes) < NODE_BYTE_SIZE_LIMIT): raise RuntimeError("Shard too large") + start = time.perf_counter() self.client.create( "{}/".format(self.shard_base), shard_bytes, sequence=True, makepath=True, ) + self.cumulative_write_time += time.perf_counter() - start self.compressed_bytes_written += len(shard_bytes) return min(byte_count, NODE_BYTE_SIZE_LIMIT) @@ -88,6 +98,10 @@ class BufferedShardWriter(io.BufferedWriter): def compressed_bytes_written(self): return self.__raw.compressed_bytes_written + @property + def cumulative_write_time(self): + return self.__raw.cumulative_write_time + class BufferedShardReader(io.BufferedReader): def __init__(self, client, path): @@ -97,3 +111,7 @@ class BufferedShardReader(io.BufferedReader): @property def compressed_bytes_read(self): return self.__raw.compressed_bytes_read + + @property + def cumulative_read_time(self): + return self.__raw.cumulative_read_time diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 45eb2b8e22..e837bf3ea6 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -31,6 +31,8 @@ class ZKContext: self.lock = lock self.stop_event = stop_event self.log = log + self.cumulative_read_time = 0.0 + self.cumulative_write_time = 0.0 def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and @@ -237,7 +239,10 @@ class ZKObject: path = self.getPath() while context.sessionIsValid(): try: + start = time.perf_counter() compressed_data, zstat = context.client.get(path) + context.cumulative_read_time += time.perf_counter() - start + self._set(_zkobject_hash=None) try: data = zlib.decompress(compressed_data) @@ -278,6 +283,7 @@ class ZKObject: while context.sessionIsValid(): try: compressed_data = zlib.compress(data) + start = time.perf_counter() if create: real_path, zstat = context.client.create( path, compressed_data, makepath=True, @@ -285,6 +291,7 @@ class ZKObject: else: zstat = context.client.set(path, compressed_data, version=self._zstat.version) + context.cumulative_write_time += time.perf_counter() - start self._set(_zstat=zstat, _zkobject_hash=hash(data), _zkobject_compressed_size=len(compressed_data), @@ -336,6 +343,8 @@ class ShardedZKObject(ZKObject): context.client, path) as stream: data = stream.read() compressed_size = stream.compressed_bytes_read + context.cumulative_read_time += \ + stream.cumulative_read_time if not data and context.client.exists(path) is None: raise NoNodeError self._set(**self.deserialize(data, context)) @@ -382,6 +391,9 @@ class ShardedZKObject(ZKObject): stream.write(data) stream.flush() compressed_size = stream.compressed_bytes_written + context.cumulative_write_time += \ + stream.cumulative_write_time + self._set(_zkobject_hash=hash(data), _zkobject_compressed_size=compressed_size, _zkobject_uncompressed_size=len(data),