Move fake gerrit and pagure into dedicated files

This change is merely a reorganization to move the fake gerrit and
gitlab classes into their own files to match github and gitlab.

The Fake*Connection classes for all 4 drivers are also moved into
their respective files.  This is accomplished by moving some symbols
from base.py into a new tests/util.py to resolve the import cycle
(which is likely why they were not there in the first place).

Change-Id: I274b9e5abf6086656f8ceb5a16dab2f8393deead
This commit is contained in:
James E. Blair 2024-03-26 16:53:27 -07:00
parent a94768c645
commit 629f48e291
9 changed files with 2882 additions and 2784 deletions

View File

@ -14,7 +14,7 @@ the environment being simulated in the test:
.. autoclass:: tests.base.ZuulTestCase
:members:
.. autoclass:: tests.base.FakeGerritConnection
.. autoclass:: tests.fakegerrit.FakeGerritConnection
:members:
:inherited-members:

File diff suppressed because it is too large Load Diff

1269
tests/fakegerrit.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -13,26 +13,454 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import urllib
from collections import defaultdict
import datetime
import github3.exceptions
import functools
import json
import logging
import os
import re
import time
import graphene
from requests import HTTPError
from requests.structures import CaseInsensitiveDict
import urllib
import uuid
import string
import random
from tests.fake_graphql import FakeGithubQuery
from zuul.driver.github.githubconnection import utc
import zuul.driver.github.githubconnection as githubconnection
from zuul.driver.github.githubconnection import utc, GithubClientManager
from tests.util import random_sha1
import git
import github3.exceptions
import graphene
import requests
from requests.structures import CaseInsensitiveDict
import requests_mock
FAKE_BASE_URL = 'https://example.com/api/v3/'
class GithubChangeReference(git.Reference):
_common_path_default = "refs/pull"
_points_to_commits_only = True
class FakeGithubPullRequest(object):
def __init__(self, github, number, project, branch,
subject, upstream_root, files=None, number_of_commits=1,
writers=[], body=None, body_text=None, draft=False,
mergeable=True, base_sha=None):
"""Creates a new PR with several commits.
Sends an event about opened PR.
If the `files` argument is provided it must be a dictionary of
file names OR FakeFile instances -> content.
"""
self.github = github
self.source = github
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.body = body
self.body_text = body_text
self.draft = draft
self.mergeable = mergeable
self.number_of_commits = 0
self.upstream_root = upstream_root
# Dictionary of FakeFile -> content
self.files = {}
self.comments = []
self.labels = []
self.statuses = {}
self.reviews = []
self.writers = []
self.admins = []
self.updated_at = None
self.head_sha = None
self.is_merged = False
self.merge_message = None
self.state = 'open'
self.url = 'https://%s/%s/pull/%s' % (github.server, project, number)
self.base_sha = base_sha
self.pr_ref = self._createPRRef(base_sha=base_sha)
self._addCommitToRepo(files=files)
self._updateTimeStamp()
def addCommit(self, files=None, delete_files=None):
"""Adds a commit on top of the actual PR head."""
self._addCommitToRepo(files=files, delete_files=delete_files)
self._updateTimeStamp()
def forcePush(self, files=None):
"""Clears actual commits and add a commit on top of the base."""
self._addCommitToRepo(files=files, reset=True)
self._updateTimeStamp()
def getPullRequestOpenedEvent(self):
return self._getPullRequestEvent('opened')
def getPullRequestSynchronizeEvent(self):
return self._getPullRequestEvent('synchronize')
def getPullRequestReopenedEvent(self):
return self._getPullRequestEvent('reopened')
def getPullRequestClosedEvent(self):
return self._getPullRequestEvent('closed')
def getPullRequestEditedEvent(self, old_body=None):
return self._getPullRequestEvent('edited', old_body)
def addComment(self, message):
self.comments.append(message)
self._updateTimeStamp()
def getIssueCommentAddedEvent(self, text):
name = 'issue_comment'
data = {
'action': 'created',
'issue': {
'number': self.number
},
'comment': {
'body': text
},
'repository': {
'full_name': self.project
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def getCommentAddedEvent(self, text):
name, data = self.getIssueCommentAddedEvent(text)
# A PR comment has an additional 'pull_request' key in the issue data
data['issue']['pull_request'] = {
'url': 'http://%s/api/v3/repos/%s/pull/%s' % (
self.github.server, self.project, self.number)
}
return (name, data)
def getReviewAddedEvent(self, review):
name = 'pull_request_review'
data = {
'action': 'submitted',
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha
}
},
'review': {
'state': review
},
'repository': {
'full_name': self.project
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def addLabel(self, name):
if name not in self.labels:
self.labels.append(name)
self._updateTimeStamp()
return self._getLabelEvent(name)
def removeLabel(self, name):
if name in self.labels:
self.labels.remove(name)
self._updateTimeStamp()
return self._getUnlabelEvent(name)
def _getLabelEvent(self, label):
name = 'pull_request'
data = {
'action': 'labeled',
'pull_request': {
'number': self.number,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha
}
},
'label': {
'name': label
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def _getUnlabelEvent(self, label):
name = 'pull_request'
data = {
'action': 'unlabeled',
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha,
'repo': {
'full_name': self.project
}
}
},
'label': {
'name': label
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def editBody(self, body):
old_body = self.body
self.body = body
self._updateTimeStamp()
return self.getPullRequestEditedEvent(old_body=old_body)
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createPRRef(self, base_sha=None):
base_sha = base_sha or 'refs/tags/init'
repo = self._getRepo()
return GithubChangeReference.create(
repo, self.getPRReference(), base_sha)
def _addCommitToRepo(self, files=None, delete_files=None, reset=False):
repo = self._getRepo()
ref = repo.references[self.getPRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
if files:
# Normalize the dictionary of 'Union[str,FakeFile] -> content'
# to 'FakeFile -> content'.
normalized_files = {}
for fn, content in files.items():
if isinstance(fn, FakeFile):
normalized_files[fn] = content
else:
normalized_files[FakeFile(fn)] = content
self.files.update(normalized_files)
elif not delete_files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
content = f"test {self.branch} {self.number}\n"
self.files.update({FakeFile(fn): content})
msg = self.subject + '-' + str(self.number_of_commits)
for fake_file, content in self.files.items():
fn = os.path.join(repo.working_dir, fake_file.filename)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
if delete_files:
for fn in delete_files:
if fn in self.files:
del self.files[fn]
fn = os.path.join(repo.working_dir, fn)
repo.index.remove([fn])
self.head_sha = repo.index.commit(msg).hexsha
repo.create_head(self.getPRReference(), self.head_sha, force=True)
self.pr_ref.set_commit(self.head_sha)
# Create an empty set of statuses for the given sha,
# each sha on a PR may have a status set on it
self.statuses[self.head_sha] = []
repo.head.reference = 'master'
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.updated_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime())
def getPRHeadSha(self):
repo = self._getRepo()
return repo.references[self.getPRReference()].commit.hexsha
def addReview(self, user, state, granted_on=None):
gh_time_format = '%Y-%m-%dT%H:%M:%SZ'
# convert the timestamp to a str format that would be returned
# from github as 'submitted_at' in the API response
if granted_on:
granted_on = datetime.datetime.utcfromtimestamp(granted_on)
submitted_at = time.strftime(
gh_time_format, granted_on.timetuple())
else:
# github timestamps only down to the second, so we need to make
# sure reviews that tests add appear to be added over a period of
# time in the past and not all at once.
if not self.reviews:
# the first review happens 10 mins ago
offset = 600
else:
# subsequent reviews happen 1 minute closer to now
offset = 600 - (len(self.reviews) * 60)
granted_on = datetime.datetime.utcfromtimestamp(
time.time() - offset)
submitted_at = time.strftime(
gh_time_format, granted_on.timetuple())
self.reviews.append(FakeGHReview({
'state': state,
'user': {
'login': user,
'email': user + "@example.com",
},
'submitted_at': submitted_at,
}))
def getPRReference(self):
return '%s/head' % self.number
def _getPullRequestEvent(self, action, old_body=None):
name = 'pull_request'
data = {
'action': action,
'number': self.number,
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha,
'repo': {
'full_name': self.project
}
},
'body': self.body
},
'sender': {
'login': 'ghuser'
},
'repository': {
'full_name': self.project,
},
'installation': {
'id': 123,
},
'changes': {},
'labels': [{'name': l} for l in self.labels]
}
if old_body:
data['changes']['body'] = {'from': old_body}
return (name, data)
def getCommitStatusEvent(self, context, state='success', user='zuul'):
name = 'status'
data = {
'state': state,
'sha': self.head_sha,
'name': self.project,
'description': 'Test results for %s: %s' % (self.head_sha, state),
'target_url': 'http://zuul/%s' % self.head_sha,
'branches': [],
'context': context,
'sender': {
'login': user
}
}
return (name, data)
def getCheckRunRequestedEvent(self, cr_name, app="zuul"):
name = "check_run"
data = {
"action": "rerequested",
"check_run": {
"head_sha": self.head_sha,
"name": cr_name,
"app": {
"slug": app,
},
},
"repository": {
"full_name": self.project,
},
}
return (name, data)
def getCheckRunAbortEvent(self, check_run):
# A check run aborted event can only be created from a FakeCheckRun as
# we need some information like external_id which is "calculated"
# during the creation of the check run.
name = "check_run"
data = {
"action": "requested_action",
"requested_action": {
"identifier": "abort",
},
"check_run": {
"head_sha": self.head_sha,
"name": check_run["name"],
"app": {
"slug": check_run["app"]
},
"external_id": check_run["external_id"],
},
"repository": {
"full_name": self.project,
},
}
return (name, data)
def setMerged(self, commit_message):
self.is_merged = True
self.merge_message = commit_message
repo = self._getRepo()
repo.heads[self.branch].commit = repo.commit(self.head_sha)
class FakeUser(object):
def __init__(self, login):
self.login = login
@ -603,7 +1031,7 @@ class FakeResponse(object):
text = '{} {}'.format(self.status_code, self.data)
else:
text = '{} {}'.format(self.status_code, self.status_message)
raise HTTPError(text, response=self)
raise requests.HTTPError(text, response=self)
class FakeGithubSession(object):
@ -920,3 +1348,288 @@ class FakeGithubEnterpriseClient(FakeGithubClient):
'installed_version': self.version,
}
return data
class FakeGithubClientManager(GithubClientManager):
github_class = FakeGithubClient
github_enterprise_class = FakeGithubEnterpriseClient
log = logging.getLogger("zuul.test.FakeGithubClientManager")
def __init__(self, connection_config):
super().__init__(connection_config)
self.record_clients = False
self.recorded_clients = []
self.github_data = None
def getGithubClient(self,
project_name=None,
zuul_event_id=None):
client = super().getGithubClient(
project_name=project_name,
zuul_event_id=zuul_event_id)
# Some tests expect the installation id as part of the
if self.app_id:
inst_id = self.installation_map.get(project_name)
client.setInstId(inst_id)
# The super method creates a fake github client with empty data so
# add it here.
client.setData(self.github_data)
if self.record_clients:
self.recorded_clients.append(client)
return client
def _prime_installation_map(self):
# Only valid if installed as a github app
if not self.app_id:
return
# github_data.repos is a hash like
# { ('org', 'project1'): <dataobj>
# ('org', 'project2'): <dataobj>,
# ('org2', 'project1'): <dataobj>, ... }
#
# we don't care about the value. index by org, e.g.
#
# {
# 'org': ('project1', 'project2')
# 'org2': ('project1', 'project2')
# }
orgs = defaultdict(list)
project_id = 1
for org, project in self.github_data.repos:
# Each entry is in the format for "repositories" response
# of GET /installation/repositories
orgs[org].append({
'id': project_id,
'name': project,
'full_name': '%s/%s' % (org, project)
# note, lots of other stuff that's not relevant
})
project_id += 1
self.log.debug("GitHub installation mapped to: %s" % orgs)
# Mock response to GET /app/installations
app_json = []
app_projects = []
app_id = 1
# Ensure that we ignore suspended apps
app_json.append(
{
'id': app_id,
'suspended_at': '2021-09-23T01:43:44Z',
'suspended_by': {
'login': 'ianw',
'type': 'User',
'id': 12345
}
})
app_projects.append([])
app_id += 1
for org, projects in orgs.items():
# We respond as if each org is a different app instance
#
# Below we will be sent the app_id in a token to query
# what projects this app exports. Keep the projects in a
# sequential list so we can just look up "projects for app
# X" == app_projects[X]
app_projects.append(projects)
app_json.append(
{
'id': app_id,
# Acutally none of this matters, and there's lots
# more in a real response. Padded out just for
# example sake.
'account': {
'login': org,
'id': 1234,
'type': 'User',
},
'permissions': {
'checks': 'write',
'metadata': 'read',
'contents': 'read'
},
'events': ['push',
'pull_request'
],
'suspended_at': None,
'suspended_by': None,
}
)
app_id += 1
# TODO(ianw) : we could exercise the pagination paths ...
with requests_mock.Mocker() as m:
m.get('%s/app/installations' % self.base_url, json=app_json)
def repositories_callback(request, context):
# FakeGithubSession gives us an auth token "token
# token-X" where "X" corresponds to the app id we want
# the projects for. apps start at id "1", so the projects
# to return for this call are app_projects[token-1]
token = int(request.headers['Authorization'][12:])
projects = app_projects[token - 1]
return {
'total_count': len(projects),
'repositories': projects
}
m.get('%s/installation/repositories?per_page=100' % self.base_url,
json=repositories_callback)
# everything mocked now, call real implementation
super()._prime_installation_map()
class FakeGithubConnection(githubconnection.GithubConnection):
log = logging.getLogger("zuul.test.FakeGithubConnection")
client_manager_class = FakeGithubClientManager
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None, git_url_with_auth=False):
super(FakeGithubConnection, self).__init__(driver, connection_name,
connection_config)
self.connection_name = connection_name
self.pr_number = 0
self.pull_requests = changes_db
self.statuses = {}
self.upstream_root = upstream_root
self.merge_failure = False
self.merge_not_allowed_count = 0
self.github_data = FakeGithubData(changes_db)
self._github_client_manager.github_data = self.github_data
self.git_url_with_auth = git_url_with_auth
def setZuulWebPort(self, port):
self.zuul_web_port = port
def openFakePullRequest(self, project, branch, subject, files=[],
body=None, body_text=None, draft=False,
mergeable=True, base_sha=None):
self.pr_number += 1
pull_request = FakeGithubPullRequest(
self, self.pr_number, project, branch, subject, self.upstream_root,
files=files, body=body, body_text=body_text, draft=draft,
mergeable=mergeable, base_sha=base_sha)
self.pull_requests[self.pr_number] = pull_request
return pull_request
def getPushEvent(self, project, ref, old_rev=None, new_rev=None,
added_files=None, removed_files=None,
modified_files=None):
if added_files is None:
added_files = []
if removed_files is None:
removed_files = []
if modified_files is None:
modified_files = []
if not old_rev:
old_rev = '0' * 40
if not new_rev:
new_rev = random_sha1()
name = 'push'
data = {
'ref': ref,
'before': old_rev,
'after': new_rev,
'repository': {
'full_name': project
},
'commits': [
{
'added': added_files,
'removed': removed_files,
'modified': modified_files
}
]
}
return (name, data)
def getBranchProtectionRuleEvent(self, project, action):
name = 'branch_protection_rule'
data = {
'action': action,
'rule': {},
'repository': {
'full_name': project,
}
}
return (name, data)
def getRepositoryEvent(self, repository, action, changes):
name = 'repository'
data = {
'action': action,
'changes': changes,
'repository': repository,
}
return (name, data)
def emitEvent(self, event, use_zuulweb=False):
"""Emulates sending the GitHub webhook event to the connection."""
name, data = event
payload = json.dumps(data).encode('utf8')
secret = self.connection_config['webhook_token']
signature = githubconnection._sign_request(payload, secret)
headers = {'x-github-event': name,
'x-hub-signature': signature,
'x-github-delivery': str(uuid.uuid4())}
if use_zuulweb:
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
json=data, headers=headers)
else:
data = {'headers': headers, 'body': data}
self.event_queue.put(data)
return data
def addProject(self, project):
# use the original method here and additionally register it in the
# fake github
super(FakeGithubConnection, self).addProject(project)
self.getGithubClient(project.name).addProject(project)
def getGitUrl(self, project):
if self.git_url_with_auth:
auth_token = ''.join(
random.choice(string.ascii_lowercase) for x in range(8))
prefix = 'file://x-access-token:%s@' % auth_token
else:
prefix = ''
if self.repo_cache:
return prefix + os.path.join(self.repo_cache, str(project))
return prefix + os.path.join(self.upstream_root, str(project))
def real_getGitUrl(self, project):
return super(FakeGithubConnection, self).getGitUrl(project)
def setCommitStatus(self, project, sha, state, url='', description='',
context='default', user='zuul', zuul_event_id=None):
# record that this got reported and call original method
self.github_data.reports.append(
(project, sha, 'status', (user, context, state)))
super(FakeGithubConnection, self).setCommitStatus(
project, sha, state,
url=url, description=description, context=context)
def labelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.github_data.reports.append((project, pr_number, 'label', label))
pull_request = self.pull_requests[int(pr_number)]
pull_request.addLabel(label)
def unlabelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.github_data.reports.append((project, pr_number, 'unlabel', label))
pull_request = self.pull_requests[pr_number]
pull_request.removeLabel(label)

View File

@ -13,17 +13,27 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from collections import defaultdict, namedtuple
from contextlib import contextmanager
import datetime
import http.server
import json
import logging
import os
import re
import socketserver
import threading
import urllib.parse
import time
import urllib.parse
import zuul.driver.gitlab.gitlabconnection as gitlabconnection
from tests.util import random_sha1
import git
from git.util import IterableList
import requests
FakeGitlabBranch = namedtuple('Branch', ('name', 'protected'))
class GitlabWebServer(object):
@ -285,3 +295,379 @@ class GitlabWebServer(object):
self.httpd.shutdown()
self.thread.join()
self.httpd.server_close()
class FakeGitlabConnection(gitlabconnection.GitlabConnection):
log = logging.getLogger("zuul.test.FakeGitlabConnection")
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None):
self.merge_requests = changes_db
self.upstream_root = upstream_root
self.mr_number = 0
self._test_web_server = GitlabWebServer(changes_db)
self._test_web_server.start()
self._test_baseurl = 'http://localhost:%s' % self._test_web_server.port
connection_config['baseurl'] = self._test_baseurl
super(FakeGitlabConnection, self).__init__(driver, connection_name,
connection_config)
def onStop(self):
super().onStop()
self._test_web_server.stop()
def addProject(self, project):
super(FakeGitlabConnection, self).addProject(project)
self.addProjectByName(project.name)
def addProjectByName(self, project_name):
owner, proj = project_name.split('/')
repo = self._test_web_server.fake_repos[(owner, proj)]
branch = FakeGitlabBranch('master', False)
if 'master' not in repo:
repo.append(branch)
def protectBranch(self, owner, project, branch, protected=True):
if branch in self._test_web_server.fake_repos[(owner, project)]:
del self._test_web_server.fake_repos[(owner, project)][branch]
fake_branch = FakeGitlabBranch(branch, protected=protected)
self._test_web_server.fake_repos[(owner, project)].append(fake_branch)
def deleteBranch(self, owner, project, branch):
if branch in self._test_web_server.fake_repos[(owner, project)]:
del self._test_web_server.fake_repos[(owner, project)][branch]
def getGitUrl(self, project):
return 'file://' + os.path.join(self.upstream_root, project.name)
def real_getGitUrl(self, project):
return super(FakeGitlabConnection, self).getGitUrl(project)
def openFakeMergeRequest(self, project,
branch, title, description='', files=[],
base_sha=None):
self.mr_number += 1
merge_request = FakeGitlabMergeRequest(
self, self.mr_number, project, branch, title, self.upstream_root,
files=files, description=description, base_sha=base_sha)
self.merge_requests.setdefault(
project, {})[str(self.mr_number)] = merge_request
return merge_request
def emitEvent(self, event, use_zuulweb=False, project=None):
name, payload = event
if use_zuulweb:
payload = json.dumps(payload).encode('utf-8')
headers = {'x-gitlab-token': self.webhook_token}
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
data = {'payload': payload}
self.event_queue.put(data)
return data
def setZuulWebPort(self, port):
self.zuul_web_port = port
def getPushEvent(
self, project, before=None, after=None,
branch='refs/heads/master',
added_files=None, removed_files=None,
modified_files=None):
if added_files is None:
added_files = []
if removed_files is None:
removed_files = []
if modified_files is None:
modified_files = []
name = 'gl_push'
if not after:
repo_path = os.path.join(self.upstream_root, project)
repo = git.Repo(repo_path)
after = repo.head.commit.hexsha
data = {
'object_kind': 'push',
'before': before or '1' * 40,
'after': after,
'ref': branch,
'project': {
'path_with_namespace': project
},
'commits': [
{
'added': added_files,
'removed': removed_files,
'modified': modified_files
}
],
'total_commits_count': 1,
}
return (name, data)
def getGitTagEvent(self, project, tag, sha):
name = 'gl_push'
data = {
'object_kind': 'tag_push',
'before': '0' * 40,
'after': sha,
'ref': 'refs/tags/%s' % tag,
'project': {
'path_with_namespace': project
},
}
return (name, data)
@contextmanager
def enable_community_edition(self):
self._test_web_server.options['community_edition'] = True
yield
self._test_web_server.options['community_edition'] = False
@contextmanager
def enable_delayed_complete_mr(self, complete_at):
self._test_web_server.options['delayed_complete_mr'] = complete_at
yield
self._test_web_server.options['delayed_complete_mr'] = 0
@contextmanager
def enable_uncomplete_mr(self):
self._test_web_server.options['uncomplete_mr'] = True
orig = self.gl_client.get_mr_wait_factor
self.gl_client.get_mr_wait_factor = 0.1
yield
self.gl_client.get_mr_wait_factor = orig
self._test_web_server.options['uncomplete_mr'] = False
class GitlabChangeReference(git.Reference):
_common_path_default = "refs/merge-requests"
_points_to_commits_only = True
class FakeGitlabMergeRequest(object):
log = logging.getLogger("zuul.test.FakeGitlabMergeRequest")
def __init__(self, gitlab, number, project, branch,
subject, upstream_root, files=[], description='',
base_sha=None):
self.gitlab = gitlab
self.source = gitlab
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.description = description
self.upstream_root = upstream_root
self.number_of_commits = 0
self.created_at = datetime.datetime.now(datetime.timezone.utc)
self.updated_at = self.created_at
self.merged_at = None
self.sha = None
self.state = 'opened'
self.is_merged = False
self.merge_status = 'can_be_merged'
self.squash_merge = None
self.labels = []
self.notes = []
self.url = "https://%s/%s/merge_requests/%s" % (
self.gitlab.server, self.project, self.number)
self.base_sha = base_sha
self.approved = False
self.blocking_discussions_resolved = True
self.mr_ref = self._createMRRef(base_sha=base_sha)
self._addCommitInMR(files=files)
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createMRRef(self, base_sha=None):
base_sha = base_sha or 'refs/tags/init'
repo = self._getRepo()
return GitlabChangeReference.create(
repo, self.getMRReference(), base_sha)
def getMRReference(self):
return '%s/head' % self.number
def addNote(self, body):
self.notes.append(
{
"body": body,
"created_at": datetime.datetime.now(datetime.timezone.utc),
}
)
def addCommit(self, files=[], delete_files=None):
self._addCommitInMR(files=files, delete_files=delete_files)
self._updateTimeStamp()
def closeMergeRequest(self):
self.state = 'closed'
self._updateTimeStamp()
def mergeMergeRequest(self, squash=None):
self.state = 'merged'
self.is_merged = True
self.squash_merge = squash
self._updateTimeStamp()
self.merged_at = self.updated_at
def reopenMergeRequest(self):
self.state = 'opened'
self._updateTimeStamp()
self.merged_at = None
def _addCommitInMR(self, files=[], delete_files=None, reset=False):
repo = self._getRepo()
ref = repo.references[self.getMRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
elif not delete_files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
if delete_files:
for fn in delete_files:
if fn in self.files:
del self.files[fn]
fn = os.path.join(repo.working_dir, fn)
repo.index.remove([fn])
self.sha = repo.index.commit(msg).hexsha
repo.create_head(self.getMRReference(), self.sha, force=True)
self.mr_ref.set_commit(self.sha)
repo.head.reference = 'master'
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.updated_at = datetime.datetime.now(datetime.timezone.utc)
def getMergeRequestEvent(self, action, code_change=False,
previous_labels=None,
reviewers_updated=False):
name = 'gl_merge_request'
data = {
'object_kind': 'merge_request',
'project': {
'path_with_namespace': self.project
},
'object_attributes': {
'title': self.subject,
'created_at': self.created_at.strftime(
'%Y-%m-%d %H:%M:%S.%f%z'),
'updated_at': self.updated_at.strftime(
'%Y-%m-%d %H:%M:%S UTC'),
'iid': self.number,
'target_branch': self.branch,
'last_commit': {'id': self.sha},
'action': action,
'blocking_discussions_resolved':
self.blocking_discussions_resolved
},
}
data['labels'] = [{'title': label} for label in self.labels]
if action == "update" and code_change:
data["object_attributes"]["oldrev"] = random_sha1()
data['changes'] = {}
if previous_labels is not None:
data['changes']['labels'] = {
'previous': [{'title': label} for label in previous_labels],
'current': data['labels']
}
if reviewers_updated:
data["changes"]["reviewers"] = {'current': [], 'previous': []}
return (name, data)
def getMergeRequestOpenedEvent(self):
return self.getMergeRequestEvent(action='open')
def getMergeRequestUpdatedEvent(self):
self.addCommit()
return self.getMergeRequestEvent(action='update',
code_change=True)
def getMergeRequestReviewersUpdatedEvent(self):
return self.getMergeRequestEvent(action='update',
reviewers_updated=True)
def getMergeRequestMergedEvent(self):
self.mergeMergeRequest()
return self.getMergeRequestEvent(action='merge')
def getMergeRequestMergedPushEvent(self, added_files=None,
removed_files=None,
modified_files=None):
return self.gitlab.getPushEvent(
project=self.project,
branch='refs/heads/%s' % self.branch,
before=random_sha1(),
after=self.sha,
added_files=added_files,
removed_files=removed_files,
modified_files=modified_files)
def getMergeRequestApprovedEvent(self):
self.approved = True
return self.getMergeRequestEvent(action='approved')
def getMergeRequestUnapprovedEvent(self):
self.approved = False
return self.getMergeRequestEvent(action='unapproved')
def getMergeRequestLabeledEvent(self, add_labels=[], remove_labels=[]):
previous_labels = self.labels
labels = set(previous_labels)
labels = labels - set(remove_labels)
labels = labels | set(add_labels)
self.labels = list(labels)
return self.getMergeRequestEvent(action='update',
previous_labels=previous_labels)
def getMergeRequestCommentedEvent(self, note):
self.addNote(note)
note_date = self.notes[-1]['created_at'].strftime(
'%Y-%m-%d %H:%M:%S UTC')
name = 'gl_merge_request'
data = {
'object_kind': 'note',
'project': {
'path_with_namespace': self.project
},
'merge_request': {
'title': self.subject,
'iid': self.number,
'target_branch': self.branch,
'last_commit': {'id': self.sha}
},
'object_attributes': {
'created_at': note_date,
'updated_at': note_date,
'note': self.notes[-1]['body'],
},
}
return (name, data)

459
tests/fakepagure.py Normal file
View File

@ -0,0 +1,459 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat, Inc.
# Copyright 2021-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import re
import time
import uuid
import zuul.driver.pagure.pagureconnection as pagureconnection
import git
import requests
class PagureChangeReference(git.Reference):
_common_path_default = "refs/pull"
_points_to_commits_only = True
class FakePagurePullRequest(object):
log = logging.getLogger("zuul.test.FakePagurePullRequest")
def __init__(self, pagure, number, project, branch,
subject, upstream_root, files={}, number_of_commits=1,
initial_comment=None):
self.pagure = pagure
self.source = pagure
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.upstream_root = upstream_root
self.number_of_commits = 0
self.status = 'Open'
self.initial_comment = initial_comment
self.uuid = uuid.uuid4().hex
self.comments = []
self.flags = []
self.files = {}
self.tags = []
self.cached_merge_status = ''
self.threshold_reached = False
self.commit_stop = None
self.commit_start = None
self.threshold_reached = False
self.upstream_root = upstream_root
self.cached_merge_status = 'MERGE'
self.url = "https://%s/%s/pull-request/%s" % (
self.pagure.server, self.project, self.number)
self.is_merged = False
self.pr_ref = self._createPRRef()
self._addCommitInPR(files=files)
self._updateTimeStamp()
def _getPullRequestEvent(self, action, pull_data_field='pullrequest'):
name = 'pg_pull_request'
data = {
'msg': {
pull_data_field: {
'branch': self.branch,
'comments': self.comments,
'commit_start': self.commit_start,
'commit_stop': self.commit_stop,
'date_created': '0',
'tags': self.tags,
'initial_comment': self.initial_comment,
'id': self.number,
'project': {
'fullname': self.project,
},
'status': self.status,
'subject': self.subject,
'uid': self.uuid,
}
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': action
}
if action == 'pull-request.flag.added':
data['msg']['flag'] = self.flags[0]
if action == 'pull-request.tag.added':
data['msg']['tags'] = self.tags
return (name, data)
def getPullRequestOpenedEvent(self):
return self._getPullRequestEvent('pull-request.new')
def getPullRequestClosedEvent(self, merged=True):
if merged:
self.is_merged = True
self.status = 'Merged'
else:
self.is_merged = False
self.status = 'Closed'
return self._getPullRequestEvent('pull-request.closed')
def getPullRequestUpdatedEvent(self):
self._addCommitInPR()
self.addComment(
"**1 new commit added**\n\n * ``Bump``\n",
True)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestCommentedEvent(self, message):
self.addComment(message)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestInitialCommentEvent(self, message):
self.initial_comment = message
self._updateTimeStamp()
return self._getPullRequestEvent('pull-request.initial_comment.edited')
def getPullRequestTagAddedEvent(self, tags, reset=True):
if reset:
self.tags = []
_tags = set(self.tags)
_tags.update(set(tags))
self.tags = list(_tags)
self.addComment(
"**Metadata Update from @pingou**:\n- " +
"Pull-request tagged with: %s" % ', '.join(tags),
True)
self._updateTimeStamp()
return self._getPullRequestEvent(
'pull-request.tag.added', pull_data_field='pull_request')
def getPullRequestStatusSetEvent(self, status, username="zuul"):
self.addFlag(
status, "https://url", "Build %s" % status, username)
return self._getPullRequestEvent('pull-request.flag.added')
def insertFlag(self, flag):
to_pop = None
for i, _flag in enumerate(self.flags):
if _flag['uid'] == flag['uid']:
to_pop = i
if to_pop is not None:
self.flags.pop(to_pop)
self.flags.insert(0, flag)
def addFlag(self, status, url, comment, username="zuul"):
flag_uid = "%s-%s-%s" % (username, self.number, self.project)
flag = {
"username": "Zuul CI",
"user": {
"name": username
},
"uid": flag_uid[:32],
"comment": comment,
"status": status,
"url": url
}
self.insertFlag(flag)
self._updateTimeStamp()
def editInitialComment(self, initial_comment):
self.initial_comment = initial_comment
self._updateTimeStamp()
def addComment(self, message, notification=False, fullname=None):
self.comments.append({
'comment': message,
'notification': notification,
'date_created': str(int(time.time())),
'user': {
'fullname': fullname or 'Pingou'
}}
)
self._updateTimeStamp()
def getPRReference(self):
return '%s/head' % self.number
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createPRRef(self):
repo = self._getRepo()
return PagureChangeReference.create(
repo, self.getPRReference(), 'refs/tags/init')
def addCommit(self, files={}, delete_files=None):
"""Adds a commit on top of the actual PR head."""
self._addCommitInPR(files=files, delete_files=delete_files)
self._updateTimeStamp()
def forcePush(self, files={}):
"""Clears actual commits and add a commit on top of the base."""
self._addCommitInPR(files=files, reset=True)
self._updateTimeStamp()
def _addCommitInPR(self, files={}, delete_files=None, reset=False):
repo = self._getRepo()
ref = repo.references[self.getPRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
elif not delete_files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
if delete_files:
for fn in delete_files:
if fn in self.files:
del self.files[fn]
fn = os.path.join(repo.working_dir, fn)
repo.index.remove([fn])
self.commit_stop = repo.index.commit(msg).hexsha
if not self.commit_start:
self.commit_start = self.commit_stop
repo.create_head(self.getPRReference(), self.commit_stop, force=True)
self.pr_ref.set_commit(self.commit_stop)
repo.head.reference = 'master'
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.last_updated = str(int(time.time()))
class FakePagureAPIClient(pagureconnection.PagureAPIClient):
log = logging.getLogger("zuul.test.FakePagureAPIClient")
def __init__(self, baseurl, api_token, project,
pull_requests_db={}):
super(FakePagureAPIClient, self).__init__(
baseurl, api_token, project)
self.session = None
self.pull_requests = pull_requests_db
self.return_post_error = None
def gen_error(self, verb, custom_only=False):
if verb == 'POST' and self.return_post_error:
return {
'error': self.return_post_error['error'],
'error_code': self.return_post_error['error_code']
}, 401, "", 'POST'
self.return_post_error = None
if not custom_only:
return {
'error': 'some error',
'error_code': 'some error code'
}, 503, "", verb
def _get_pr(self, match):
project, number = match.groups()
pr = self.pull_requests.get(project, {}).get(number)
if not pr:
return self.gen_error("GET")
return pr
def get(self, url):
self.log.debug("Getting resource %s ..." % url)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)$', url)
if match:
pr = self._get_pr(match)
return {
'branch': pr.branch,
'subject': pr.subject,
'status': pr.status,
'initial_comment': pr.initial_comment,
'last_updated': pr.last_updated,
'comments': pr.comments,
'commit_stop': pr.commit_stop,
'threshold_reached': pr.threshold_reached,
'cached_merge_status': pr.cached_merge_status,
'tags': pr.tags,
}, 200, "", "GET"
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
return {'flags': pr.flags}, 200, "", "GET"
match = re.match('.+/api/0/(.+)/git/branches$', url)
if match:
# project = match.groups()[0]
return {'branches': ['master']}, 200, "", "GET"
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/diffstats$', url)
if match:
pr = self._get_pr(match)
return pr.files, 200, "", "GET"
def post(self, url, params=None):
self.log.info(
"Posting on resource %s, params (%s) ..." % (url, params))
# Will only match if return_post_error is set
err = self.gen_error("POST", custom_only=True)
if err:
return err
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/merge$', url)
if match:
pr = self._get_pr(match)
pr.status = 'Merged'
pr.is_merged = True
return {}, 200, "", "POST"
match = re.match(r'.+/api/0/-/whoami$', url)
if match:
return {"username": "zuul"}, 200, "", "POST"
if not params:
return self.gen_error("POST")
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
params['user'] = {"name": "zuul"}
pr.insertFlag(params)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/comment$', url)
if match:
pr = self._get_pr(match)
pr.addComment(params['comment'])
return {}, 200, "", "POST"
class FakePagureConnection(pagureconnection.PagureConnection):
log = logging.getLogger("zuul.test.FakePagureConnection")
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None):
super(FakePagureConnection, self).__init__(driver, connection_name,
connection_config)
self.connection_name = connection_name
self.pr_number = 0
self.pull_requests = changes_db
self.statuses = {}
self.upstream_root = upstream_root
self.reports = []
self.cloneurl = self.upstream_root
def get_project_api_client(self, project):
client = FakePagureAPIClient(
self.baseurl, None, project,
pull_requests_db=self.pull_requests)
if not self.username:
self.set_my_username(client)
return client
def get_project_webhook_token(self, project):
return 'fake_webhook_token-%s' % project
def emitEvent(self, event, use_zuulweb=False, project=None,
wrong_token=False):
name, payload = event
if use_zuulweb:
if not wrong_token:
secret = 'fake_webhook_token-%s' % project
else:
secret = ''
payload = json.dumps(payload).encode('utf-8')
signature, _ = pagureconnection._sign_request(payload, secret)
headers = {'x-pagure-signature': signature,
'x-pagure-project': project}
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
data = {'payload': payload}
self.event_queue.put(data)
return data
def openFakePullRequest(self, project, branch, subject, files=[],
initial_comment=None):
self.pr_number += 1
pull_request = FakePagurePullRequest(
self, self.pr_number, project, branch, subject, self.upstream_root,
files=files, initial_comment=initial_comment)
self.pull_requests.setdefault(
project, {})[str(self.pr_number)] = pull_request
return pull_request
def getGitReceiveEvent(self, project):
name = 'pg_push'
repo_path = os.path.join(self.upstream_root, project)
repo = git.Repo(repo_path)
headsha = repo.head.commit.hexsha
data = {
'msg': {
'project_fullname': project,
'branch': 'master',
'end_commit': headsha,
'old_commit': '1' * 40,
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.receive',
}
return (name, data)
def getGitTagCreatedEvent(self, project, tag, rev):
name = 'pg_push'
data = {
'msg': {
'project_fullname': project,
'tag': tag,
'rev': rev
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.tag.creation',
}
return (name, data)
def getGitBranchEvent(self, project, branch, type, rev):
name = 'pg_push'
data = {
'msg': {
'project_fullname': project,
'branch': branch,
'rev': rev,
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.branch.%s' % type,
}
return (name, data)
def setZuulWebPort(self, port):
self.zuul_web_port = port

View File

@ -36,9 +36,10 @@ from zuul.merger.merger import Repo
from zuul.model import MergeRequest, EnqueueEvent, DequeueEvent
from zuul.zk.change_cache import ChangeKey
from tests.util import random_sha1
from tests.base import (AnsibleZuulTestCase, BaseTestCase,
ZuulGithubAppTestCase, ZuulTestCase,
simple_layout, random_sha1, iterate_timeout)
simple_layout, iterate_timeout)
from tests.base import ZuulWebFixture
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)

View File

@ -23,8 +23,13 @@ import time
from zuul.lib import strings
from zuul.zk.layout import LayoutState
from tests.base import random_sha1, simple_layout, skipIfMultiScheduler
from tests.base import ZuulTestCase, ZuulWebFixture
from tests.base import (
ZuulTestCase,
ZuulWebFixture,
simple_layout,
skipIfMultiScheduler,
)
from tests.util import random_sha1
from testtools.matchers import MatchesRegex

25
tests/util.py Normal file
View File

@ -0,0 +1,25 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat, Inc.
# Copyright 2021-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import hashlib
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
def random_sha1():
return hashlib.sha1(str(random.random()).encode('ascii')).hexdigest()