governance/tools/validate_atcs.py

131 lines
3.8 KiB
Python
Executable File

#!/usr/bin/env python3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import json
import logging
import requests
import yaml
LOG = logging.getLogger()
# The OpenStack foundation member directory lookup API endpoint
MEMBER_LOOKUP_URL = 'https://openstackid-resources.openstack.org/'
def lookup_member(email):
"A requests wrapper to querying the OSF member directory API"
# URL pattern for querying foundation members by E-mail address
LOG.debug('looking up %s', email)
raw = requester(
MEMBER_LOOKUP_URL + '/api/public/v1/members',
params={
'filter[]': [
'group_slug==foundation-members',
'email==' + email,
],
'expand': 'all_affiliations',
},
headers={'Accept': 'application/json'},
)
decoded = decode_json(raw)
try:
return decoded['data'][0]
except (KeyError, IndexError):
return None
def requester(url, params={}, headers={}):
"""A requests wrapper to consistently retry HTTPS queries
:param url: The URL to get.
:type url: str
:param params: Additional parameters to provide.
:type params: dict(str, str)
:param headers: Additional headers to set.
:type params: dict(str, str)
"""
# Try up to 3 times
retry = requests.Session()
retry.mount("https://", requests.adapters.HTTPAdapter(max_retries=3))
return retry.get(url=url, params=params, headers=headers)
def decode_json(raw):
"""Trap JSON decoding failures and provide more detailed errors
Remove ')]}' XSS prefix from data if it is present, then decode it
as JSON and return the results.
:param raw: Response text from API
:type raw: str
"""
# Gerrit's REST API prepends a JSON-breaker to avoid XSS vulnerabilities
if raw.text.startswith(")]}'"):
trimmed = raw.text[4:]
else:
trimmed = raw.text
# Try to decode and bail with much detail if it fails
try:
decoded = json.loads(trimmed)
except Exception:
LOG.error(
'\nrequest returned %s error to query:\n\n %s\n'
'\nwith detail:\n\n %s\n',
raw, raw.url, trimmed)
raise
return decoded
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-p', '--projects',
default='./reference/projects.yaml',
help='projects.yaml file path (%(default)s)',
)
args = parser.parse_args()
# Quiet the urllib3 module output coming out of requests.
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.basicConfig()
with open(args.projects, 'r', encoding='utf-8') as f:
projects = yaml.safe_load(f.read())
for project_name, project_data in sorted(projects.items()):
atcs = project_data.get('extra-atcs', [])
for atc in atcs:
try:
member = lookup_member(atc['email'])
except Exception as e:
print('ERROR: {}: Could not validate ATC {}: {}'.format(
project_name, atc, e))
else:
if not member:
print(('ERROR: {}: Did not find {} associated '
'with a member for {}').format(
project_name, atc['email'], atc))
if __name__ == '__main__':
import sys
sys.exit(main())