Add folder credential support

implement functions to manage credential in Jenkins Folder
job with rest api provided by :
https://wiki.jenkins.io/display/JENKINS/Credentials+Plugin

Change-Id: I9bcc3943e4ec705fe8705ad1d457b6fd4ad1024a
This commit is contained in:
joelee 2018-08-02 10:08:30 +03:00
parent 4150a83d45
commit 857235b7fa
2 changed files with 545 additions and 0 deletions

View File

@ -62,6 +62,7 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
from six.moves.http_client import BadStatusLine from six.moves.http_client import BadStatusLine
from six.moves.urllib.error import URLError from six.moves.urllib.error import URLError
from six.moves.urllib.parse import quote, urlencode, urljoin, urlparse from six.moves.urllib.parse import quote, urlencode, urljoin, urlparse
import xml.etree.ElementTree as ET
from jenkins import plugins from jenkins import plugins
@ -141,6 +142,14 @@ PROMOTION_INFO = '%(folder_url)sjob/%(short_name)s/promotion/api/json?depth=%(de
DELETE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/doDelete' DELETE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/doDelete'
CREATE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/createProcess?name=%(name)s' CREATE_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/createProcess?name=%(name)s'
CONFIG_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/config.xml' CONFIG_PROMOTION = '%(folder_url)sjob/%(short_name)s/promotion/process/%(name)s/config.xml'
LIST_CREDENTIALS = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/api/json?tree=credentials[id]'
CREATE_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/createCredentials'
CONFIG_CREDENTIAL = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/credential/%(name)s/config.xml'
CREDENTIAL_INFO = '%(folder_url)sjob/%(short_name)s/credentials/store/folder/' \
'domain/%(domain_name)s/credential/%(name)s/api/json?depth=0'
QUIET_DOWN = 'quietDown' QUIET_DOWN = 'quietDown'
# for testing only # for testing only
@ -1944,6 +1953,189 @@ class Jenkins(object):
'GET', self._build_url(CONFIG_PROMOTION, locals())) 'GET', self._build_url(CONFIG_PROMOTION, locals()))
return self.jenkins_open(request) return self.jenkins_open(request)
def _get_tag_text(self, name, xml):
'''Get text of tag from xml
:param name: XML tag name, ``str``
:param xml: XML configuration, ``str``
:returns: Text of tag, ``str``
:throws: :class:`JenkinsException` whenever tag does not exist
or has invalidated text
'''
tag = ET.fromstring(xml).find(name)
try:
text = tag.text.strip()
if text:
return text
raise JenkinsException("tag[%s] is invalidated" % name)
except AttributeError:
raise JenkinsException("tag[%s] is invalidated" % name)
def assert_folder(self, name, exception_message='job[%s] is not a folder'):
'''Raise an exception if job is not Cloudbees Folder
:param name: Name of job, ``str``
:param exception_message: Message to use for the exception.
:throws: :class:`JenkinsException` whenever the job is
not Cloudbees Folder
'''
if not self.is_folder(name):
raise JenkinsException(exception_message % name)
def is_folder(self, name):
'''Check whether a job is Cloudbees Folder
:param name: Job name, ``str``
:returns: ``True`` if job is folder, ``False`` otherwise
'''
return 'com.cloudbees.hudson.plugins.folder.Folder' \
== self.get_job_info(name)['_class']
def assert_credential_exists(self, name, folder_name, domain_name='_',
exception_message='credential[%s] does not '
'exist in the domain[%s] of [%s]'):
'''Raise an exception if credential does not exist in domain of folder
:param name: Name of credential, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:param exception_message: Message to use for the exception.
Formatted with ``name``, ``domain_name``,
and ``folder_name``
:throws: :class:`JenkinsException` whenever the credentail
does not exist in domain of folder
'''
if not self.credential_exists(name, folder_name, domain_name):
raise JenkinsException(exception_message
% (name, domain_name, folder_name))
def credential_exists(self, name, folder_name, domain_name='_'):
'''Check whether a credentail exists in domain of folder
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: ``True`` if credentail exists, ``False`` otherwise
'''
try:
return self.get_credential_info(name, folder_name,
domain_name)['id'] == name
except JenkinsException:
return False
def get_credential_info(self, name, folder_name, domain_name='_'):
'''Get credential information dictionary in domain of folder
:param name: Name of credentail, ``str``
:param folder_name: folder_name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Dictionary of credential info, ``dict``
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
try:
response = self.jenkins_open(requests.Request(
'GET', self._build_url(CREDENTIAL_INFO, locals())
))
if response:
return json.loads(response)
else:
raise JenkinsException('credential[%s] does not exist '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
except (req_exc.HTTPError, NotFoundException):
raise JenkinsException('credential[%s] does not exist '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
except ValueError:
raise JenkinsException(
'Could not parse JSON info for credential[%s] '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name)
)
def get_credential_config(self, name, folder_name, domain_name='_'):
'''Get configuration of credential in domain of folder.
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Credential configuration (XML format)
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
return self.jenkins_open(requests.Request(
'GET', self._build_url(CONFIG_CREDENTIAL, locals())
))
def create_credential(self, folder_name, config_xml,
domain_name='_'):
'''Create credentail in domain of folder
:param folder_name: Folder name, ``str``
:param config_xml: New XML configuration, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
name = self._get_tag_text('id', config_xml)
if self.credential_exists(name, folder_name, domain_name):
raise JenkinsException('credential[%s] already exists '
'in the domain[%s] of [%s]'
% (name, domain_name, folder_name))
self.jenkins_open(requests.Request(
'POST', self._build_url(CREATE_CREDENTIAL, locals()),
data=config_xml.encode('utf-8'),
headers=DEFAULT_HEADERS
))
self.assert_credential_exists(name, folder_name, domain_name,
'create[%s] failed in the '
'domain[%s] of [%s]')
def delete_credential(self, name, folder_name, domain_name='_'):
'''Delete credential from domain of folder
:param name: Name of credentail, ``str``
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
self.jenkins_open(requests.Request(
'DELETE', self._build_url(CONFIG_CREDENTIAL, locals())
))
if self.credential_exists(name, folder_name, domain_name):
raise JenkinsException('delete credential[%s] from '
'domain[%s] of [%s] failed'
% (name, domain_name, folder_name))
def reconfig_credential(self, folder_name, config_xml, domain_name='_'):
'''Reconfig credential with new config in domain of folder
:param folder_name: Folder name, ``str``
:param config_xml: New XML configuration, ``str``
:param domain_name: Domain name, default is '_', ``str``
'''
folder_url, short_name = self._get_job_folder(folder_name)
name = self._get_tag_text('id', config_xml)
self.assert_credential_exists(name, folder_name, domain_name)
self.jenkins_open(requests.Request(
'POST', self._build_url(CONFIG_CREDENTIAL, locals())
))
def list_credentials(self, folder_name, domain_name='_'):
'''List credentials in domain of folder
:param folder_name: Folder name, ``str``
:param domain_name: Domain name, default is '_', ``str``
:returns: Credentials list, ``list``
'''
self.assert_folder(folder_name)
folder_url, short_name = self._get_job_folder(folder_name)
response = self.jenkins_open(requests.Request(
'GET', self._build_url(LIST_CREDENTIALS, locals())
))
return json.loads(response)['credentials']
def quiet_down(self): def quiet_down(self):
'''Prepare Jenkins for shutdown. '''Prepare Jenkins for shutdown.

353
tests/test_credential.py Normal file
View File

@ -0,0 +1,353 @@
import json
from mock import patch
import jenkins
from tests.base import JenkinsTestBase
class JenkinsCredentialTestBase(JenkinsTestBase):
config_xml = """<com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl>
<scope>GLOBAL</scope>
<id>Test Credential</id>
<username>Test-User</username>
<password>secret123</password>
</com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl>"""
class JenkinsGetTagTextTest(JenkinsCredentialTestBase):
def test_simple(self):
name_to_return = self.j._get_tag_text('id', self.config_xml)
self.assertEqual('Test Credential', name_to_return)
def test_failed(self):
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml><id></id></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j._get_tag_text('id', '<xml><id> </id></xml>')
self.assertEqual(str(context_manager.exception),
'tag[id] is invalidated')
class JenkinsIsFolderTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
]
self.assertTrue(self.j.is_folder('Test Folder'))
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_not_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
]
self.assertFalse(self.j.is_folder('Test Job'))
class JenkinsAssertFolderTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
]
self.j.assert_folder('Test Folder')
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_is_not_folder(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'org.jenkinsci.plugins.workflow.job.WorkflowJob'}),
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.assert_folder('Test Job')
self.assertEqual(str(context_manager.exception),
'job[Test Job] is not a folder')
class JenkinsAssertCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_missing(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException()
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.assert_credential_exists('NonExistent', 'TestFoler')
self.assertEqual(
str(context_manager.exception),
'credential[NonExistent] does not exist'
' in the domain[_] of [TestFoler]')
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
self.j.assert_credential_exists('ExistingCredential', 'TestFoler')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsCredentialExistsTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_missing(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException()
]
self.assertEqual(self.j.credential_exists('NonExistent', 'TestFolder'),
False)
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_credential_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
self.assertEqual(self.j.credential_exists('ExistingCredential',
'TestFolder'),
True)
self._check_requests(jenkins_mock.call_args_list)
class JenkinsGetCredentialInfoTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
credential_info_to_return = {'id': 'ExistingCredential'}
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps(credential_info_to_return)
]
credential_info = self.j.get_credential_info('ExistingCredential', 'TestFolder')
self.assertEqual(credential_info, credential_info_to_return)
self.assertEqual(
jenkins_mock.call_args[0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/'
'domain/_/credential/ExistingCredential/api/json?depth=0'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_nonexistent(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.get_credential_info('NonExistent', 'TestFolder')
self.assertEqual(
str(context_manager.exception),
'credential[NonExistent] does not exist '
'in the domain[_] of [TestFolder]')
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_invalid_json(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
'{invalid_json}'
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.get_credential_info('NonExistent', 'TestFolder')
self.assertEqual(
str(context_manager.exception),
'Could not parse JSON info for credential[NonExistent]'
' in the domain[_] of [TestFolder]')
class JenkinsGetCredentialConfigTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_encodes_credential_name(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
self.j.get_credential_config(u'Test Credential', u'Test Folder')
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
class JenkinsCreateCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
None,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
]
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/createCredentials'))
self.assertEqual(
jenkins_mock.call_args_list[4][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_already_exists(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
str(context_manager.exception),
'credential[Test Credential] already exists'
' in the domain[_] of [Test Folder]')
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_failed(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
None,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
None,
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.create_credential('Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/'
'folder/domain/_/createCredentials'))
self.assertEqual(
jenkins_mock.call_args_list[4][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/'
'domain/_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
str(context_manager.exception),
'create[Test Credential] failed in the domain[_] of [Test Folder]')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsDeleteCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
True,
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
jenkins.NotFoundException(),
]
self.j.delete_credential(u'Test Credential', 'TestFolder')
self.assertEqual(
jenkins_mock.call_args_list[0][0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_failed(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'id': 'ExistingCredential'}),
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'ExistingCredential'})
]
with self.assertRaises(jenkins.JenkinsException) as context_manager:
self.j.delete_credential(u'ExistingCredential', 'TestFolder')
self.assertEqual(
jenkins_mock.call_args_list[0][0][0].url,
self.make_url('job/TestFolder/credentials/store/folder/'
'domain/_/credential/ExistingCredential/config.xml'))
self.assertEqual(
str(context_manager.exception),
'delete credential[ExistingCredential] from '
'domain[_] of [TestFolder] failed')
self._check_requests(jenkins_mock.call_args_list)
class JenkinsReconfigCredentialTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'id': 'Test Credential'}),
None
]
self.j.reconfig_credential(u'Test Folder', self.config_xml)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/api/json?depth=0'))
self.assertEqual(
jenkins_mock.call_args_list[2][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/credential/Test%20Credential/config.xml'))
self._check_requests(jenkins_mock.call_args_list)
class JenkinsListCredentialConfigTest(JenkinsCredentialTestBase):
@patch.object(jenkins.Jenkins, 'jenkins_open')
def test_simple(self, jenkins_mock):
credentials_to_return = [{'id': 'Test Credential'}]
jenkins_mock.side_effect = [
json.dumps({'_class': 'com.cloudbees.hudson.plugins.folder.Folder'}),
json.dumps({'credentials': [{'id': 'Test Credential'}]}),
]
credentials = self.j.list_credentials(u'Test Folder')
self.assertEqual(credentials, credentials_to_return)
self.assertEqual(
jenkins_mock.call_args_list[1][0][0].url,
self.make_url('job/Test%20Folder/credentials/store/folder/domain/'
'_/api/json?tree=credentials[id]'))
self._check_requests(jenkins_mock.call_args_list)