Introduced API to build DEB and RPM repository
Change-Id: I7abc2705f66228e81b36febe146d06988efb7b00 Implements: blueprint build-repository
This commit is contained in:
parent
1ce69b4fef
commit
aff85a919e
|
@ -112,6 +112,15 @@ class RepositoryApi(object):
|
|||
context = config if isinstance(config, Context) else Context(config)
|
||||
return cls(RepositoryController.load(context, repotype, repoarch))
|
||||
|
||||
def create_repository(self, repo_data, package_files):
|
||||
"""Create new repository with specified packages.
|
||||
|
||||
:param repo_data: The description of repository
|
||||
:param package_files: The list of URLs of packages
|
||||
"""
|
||||
self._validate_repo_data(repo_data)
|
||||
return self.controller.create_repository(repo_data, package_files)
|
||||
|
||||
def get_packages(self, repos_data, requirements_data=None,
|
||||
include_mandatory=False):
|
||||
"""Gets the list of packages from repository(es).
|
||||
|
@ -192,7 +201,8 @@ class RepositoryApi(object):
|
|||
self.controller.load_packages(repo, consumer)
|
||||
|
||||
def _load_repositories(self, repos_data):
|
||||
self._validate_repos_data(repos_data)
|
||||
for repo_data in repos_data:
|
||||
self._validate_repo_data(repo_data)
|
||||
return self.controller.load_repositories(repos_data)
|
||||
|
||||
def _load_requirements(self, requirements_data):
|
||||
|
@ -213,7 +223,7 @@ class RepositoryApi(object):
|
|||
))
|
||||
return result
|
||||
|
||||
def _validate_repos_data(self, repos_data):
|
||||
def _validate_repo_data(self, repo_data):
|
||||
# TODO(bgaifullin) implement me
|
||||
pass
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ class RepositoryController(object):
|
|||
destination,
|
||||
repository.path or utils.get_path_from_url(repository.url, False)
|
||||
)
|
||||
logger.info("cloning repository '%s' to '%s'", repository, new_path)
|
||||
return self.driver.fork_repository(
|
||||
self.context.connection, repository, new_path, source, locale
|
||||
)
|
||||
|
@ -120,6 +121,22 @@ class RepositoryController(object):
|
|||
self.context.connection, repository, packages
|
||||
)
|
||||
|
||||
def create_repository(self, repository_data, package_files):
|
||||
"""Creates new repository from specified packages.
|
||||
|
||||
:param repository_data: the description of repository
|
||||
:param package_files: the list of paths of packages
|
||||
:return : the new repository
|
||||
"""
|
||||
repo = self.driver.create_repository(repository_data, self.arch)
|
||||
packages = set()
|
||||
with self.context.async_section() as section:
|
||||
for url in package_files:
|
||||
section.execute(self._add_package, repo, url, packages.add)
|
||||
|
||||
self.assign_packages(repo, packages)
|
||||
return repo
|
||||
|
||||
def _copy_packages(self, target, packages, observer):
|
||||
with self.context.async_section() as section:
|
||||
for package in packages:
|
||||
|
@ -128,17 +145,33 @@ class RepositoryController(object):
|
|||
)
|
||||
|
||||
def _copy_package(self, target, package, observer):
|
||||
bytes_copied = 0
|
||||
if target.url != package.repository.url:
|
||||
dst_path = os.path.join(
|
||||
utils.get_path_from_url(target.url), package.filename
|
||||
if package.repository is None:
|
||||
src_url = package.filename
|
||||
dst_path = self.driver.get_relative_path(
|
||||
target, utils.get_filename_from_uri(package.filename)
|
||||
)
|
||||
src_path = urljoin(package.repository.url, package.filename)
|
||||
bytes_copied = self.context.connection.retrieve(
|
||||
src_path, dst_path, size=package.filesize
|
||||
)
|
||||
if package.filesize < 0:
|
||||
package.filesize = bytes_copied
|
||||
elif target.url != package.repository.url:
|
||||
src_url = urljoin(package.repository.url, package.filename)
|
||||
dst_path = package.filename
|
||||
else:
|
||||
return
|
||||
|
||||
bytes_copied = self.context.connection.retrieve(
|
||||
src_url,
|
||||
utils.get_path_from_url(urljoin(target.url, dst_path)),
|
||||
size=package.filesize
|
||||
)
|
||||
if package.filesize < 0:
|
||||
package.filesize = bytes_copied
|
||||
if observer:
|
||||
observer(bytes_copied)
|
||||
|
||||
def _add_package(self, repository, src_url, consumer):
|
||||
dst_path = self.driver.get_relative_path(
|
||||
repository, utils.get_filename_from_uri(src_url)
|
||||
)
|
||||
self.context.connection.retrieve(
|
||||
src_url,
|
||||
utils.get_path_from_url(urljoin(repository.url, dst_path))
|
||||
)
|
||||
consumer(self.driver.load_package_from_file(repository, dst_path))
|
||||
|
|
|
@ -31,6 +31,7 @@ class RepositoryDriverBase(object):
|
|||
- implement all abstract methods
|
||||
- register implementation in 'packetary.drivers' namespace
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__package__)
|
||||
|
||||
|
@ -75,6 +76,33 @@ class RepositoryDriverBase(object):
|
|||
:param packages: the set of packages
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_repository(self, repository_data, arch):
|
||||
"""Create new repository.
|
||||
|
||||
:param repository_data: repository input data
|
||||
:param arch: the repository`s architecture
|
||||
:return: new repository object
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def load_package_from_file(self, repository, filepath):
|
||||
"""Create package object from file.
|
||||
|
||||
:param repository: the repository object
|
||||
:param filepath: the package path
|
||||
:return: new package object
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_relative_path(self, repository, filename):
|
||||
"""Gets the relative path from filename of package.
|
||||
|
||||
:param repository: the repository object
|
||||
:param filename: the full package url
|
||||
:return: relative path
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def priority_sort(self, repo_data):
|
||||
"""Key method to sort repositories data by priority.
|
||||
|
|
|
@ -191,14 +191,71 @@ class DebRepositoryDriver(RepositoryDriverBase):
|
|||
# TODO(sources and locales)
|
||||
new_repo = copy.copy(repository)
|
||||
new_repo.url = utils.normalize_repository_url(destination)
|
||||
self._create_repository_structure(new_repo)
|
||||
return new_repo
|
||||
|
||||
def create_repository(self, repository_data, arch):
|
||||
url = utils.normalize_repository_url(repository_data['url'])
|
||||
suite = repository_data['suite']
|
||||
component = repository_data.get('section')
|
||||
path = repository_data.get('path')
|
||||
name = repository_data.get('name')
|
||||
origin = repository_data.get('origin')
|
||||
|
||||
if component is None:
|
||||
raise ValueError("The flat format does not supported.")
|
||||
if isinstance(component, list):
|
||||
if len(component) != 1:
|
||||
raise ValueError("The only single component is acceptable.")
|
||||
component = component[0]
|
||||
|
||||
repository = Repository(
|
||||
name=name,
|
||||
url=url,
|
||||
architecture=arch,
|
||||
origin=origin,
|
||||
section=(suite, component),
|
||||
path=path
|
||||
)
|
||||
self._create_repository_structure(repository)
|
||||
self.logger.info("Created: %d repository.", repository.name)
|
||||
return repository
|
||||
|
||||
def load_package_from_file(self, repository, filename):
|
||||
filepath = utils.get_path_from_url(repository.url + filename)
|
||||
_, size, checksum = next(iter(utils.get_size_and_checksum_for_files(
|
||||
[filepath], _checksum_collector)
|
||||
))
|
||||
with closing(debfile.DebFile(filepath)) as deb:
|
||||
debcontrol = deb822.Packages(
|
||||
deb.control.get_content(debfile.CONTROL_FILE)
|
||||
)
|
||||
|
||||
return Package(
|
||||
repository=repository,
|
||||
name=debcontrol["package"],
|
||||
version=Version(debcontrol['version']),
|
||||
filesize=int(debcontrol.get('size', size)),
|
||||
filename=filename,
|
||||
checksum=FileChecksum(*checksum),
|
||||
mandatory=self._is_mandatory(debcontrol),
|
||||
requires=self._get_relations(
|
||||
debcontrol, "depends", "pre-depends",
|
||||
"recommends"
|
||||
),
|
||||
provides=self._get_relations(debcontrol, "provides"),
|
||||
obsoletes=[]
|
||||
)
|
||||
|
||||
def get_relative_path(self, repository, filename):
|
||||
return "/".join(("pool", repository.section[1], filename[0], filename))
|
||||
|
||||
def _create_repository_structure(self, repository):
|
||||
packages_file = utils.get_path_from_url(
|
||||
self._get_url_of_metafile(new_repo, "Packages")
|
||||
self._get_url_of_metafile(repository, "Packages")
|
||||
)
|
||||
release_file = utils.get_path_from_url(
|
||||
self._get_url_of_metafile(new_repo, "Release")
|
||||
)
|
||||
self.logger.info(
|
||||
"clone repository %s to %s", repository, new_repo.url
|
||||
self._get_url_of_metafile(repository, "Release")
|
||||
)
|
||||
utils.ensure_dir_exist(os.path.dirname(release_file))
|
||||
|
||||
|
@ -213,7 +270,6 @@ class DebRepositoryDriver(RepositoryDriverBase):
|
|||
|
||||
open(packages_file, "ab").close()
|
||||
gzip.open(packages_file + ".gz", "ab").close()
|
||||
return new_repo
|
||||
|
||||
def _update_suite_index(self, repository):
|
||||
"""Updates the Release file in the suite."""
|
||||
|
|
|
@ -25,7 +25,9 @@ import createrepo
|
|||
import lxml.etree as etree
|
||||
import six
|
||||
|
||||
|
||||
from packetary.drivers.base import RepositoryDriverBase
|
||||
from packetary.library.checksum import composite as checksum_composite
|
||||
from packetary.library.streams import GzipDecompress
|
||||
from packetary.library import utils
|
||||
from packetary.objects import FileChecksum
|
||||
|
@ -33,6 +35,7 @@ from packetary.objects import Package
|
|||
from packetary.objects import PackageRelation
|
||||
from packetary.objects import PackageVersion
|
||||
from packetary.objects import Repository
|
||||
from packetary.objects import VersionRange
|
||||
|
||||
|
||||
urljoin = six.moves.urllib.parse.urljoin
|
||||
|
@ -49,7 +52,10 @@ _NAMESPACES = {
|
|||
"rpm": "http://linux.duke.edu/metadata/rpm"
|
||||
}
|
||||
|
||||
_checksum_collector = checksum_composite('md5', 'sha1', 'sha256')
|
||||
|
||||
_OPERATORS_MAPPING = {
|
||||
None: None,
|
||||
'GT': '>',
|
||||
'LT': '<',
|
||||
'EQ': '=',
|
||||
|
@ -57,12 +63,12 @@ _OPERATORS_MAPPING = {
|
|||
'LE': '<=',
|
||||
}
|
||||
|
||||
|
||||
_DEFAULT_PRIORITY = 10
|
||||
|
||||
|
||||
class CreaterepoCallBack(object):
|
||||
"""Callback object for createrepo"""
|
||||
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
|
||||
|
@ -152,7 +158,9 @@ class RpmRepositoryDriver(RepositoryDriverBase):
|
|||
try:
|
||||
md_config.workers = multiprocessing.cpu_count()
|
||||
md_config.directory = str(basepath)
|
||||
md_config.update = True
|
||||
md_config.update = os.path.exists(
|
||||
os.path.join(basepath, md_config.finaldir)
|
||||
)
|
||||
mdgen = createrepo.MetaDataGenerator(
|
||||
config_obj=md_config, callback=CreaterepoCallBack(self.logger)
|
||||
)
|
||||
|
@ -180,13 +188,45 @@ class RpmRepositoryDriver(RepositoryDriverBase):
|
|||
# TODO(sources and locales)
|
||||
new_repo = copy.copy(repository)
|
||||
new_repo.url = utils.normalize_repository_url(destination)
|
||||
self.logger.info(
|
||||
"clone repository %s to %s", repository, new_repo.url
|
||||
)
|
||||
utils.ensure_dir_exist(destination)
|
||||
self.add_packages(connection, new_repo, set())
|
||||
return new_repo
|
||||
|
||||
def create_repository(self, repository_data, arch):
|
||||
repository = Repository(
|
||||
name=repository_data['name'],
|
||||
url=utils.normalize_repository_url(repository_data["url"]),
|
||||
architecture=arch,
|
||||
origin=repository_data.get('origin')
|
||||
)
|
||||
utils.ensure_dir_exist(utils.get_path_from_url(repository.url))
|
||||
return repository
|
||||
|
||||
def load_package_from_file(self, repository, filepath):
|
||||
fullpath = utils.get_path_from_url(repository.url + filepath)
|
||||
_, size, checksum = next(iter(utils.get_size_and_checksum_for_files(
|
||||
[fullpath], _checksum_collector)
|
||||
))
|
||||
pkg = createrepo.yumbased.YumLocalPackage(filename=fullpath)
|
||||
hdr = pkg.returnLocalHeader()
|
||||
return Package(
|
||||
repository=repository,
|
||||
name=hdr["name"],
|
||||
version=PackageVersion(
|
||||
hdr['epoch'], hdr['version'], hdr['release']
|
||||
),
|
||||
filesize=int(hdr['size']),
|
||||
filename=filepath,
|
||||
checksum=FileChecksum(*checksum),
|
||||
mandatory=False,
|
||||
requires=self._parse_package_relations(pkg.requires),
|
||||
obsoletes=self._parse_package_relations(pkg.obsoletes),
|
||||
provides=self._parse_package_relations(pkg.provides),
|
||||
)
|
||||
|
||||
def get_relative_path(self, repository, filename):
|
||||
return "packages/" + filename
|
||||
|
||||
def _load_db(self, connection, baseurl, repomd, *aliases):
|
||||
"""Loads database.
|
||||
|
||||
|
@ -260,6 +300,23 @@ class RpmRepositoryDriver(RepositoryDriverBase):
|
|||
|
||||
return relations
|
||||
|
||||
@staticmethod
|
||||
def _parse_package_relations(relations):
|
||||
"""Parses yum package relations.
|
||||
|
||||
:param relations: list of tuples
|
||||
(name, flags, (epoch, version, release))
|
||||
:return: list of PackageRelation objects
|
||||
"""
|
||||
return [
|
||||
PackageRelation(
|
||||
x[0], VersionRange(
|
||||
_OPERATORS_MAPPING[x[1]], x[1] and PackageVersion(*x[2])
|
||||
)
|
||||
)
|
||||
for x in relations
|
||||
]
|
||||
|
||||
def _get_checksum(self, pkg_tag):
|
||||
"""Gets checksum from package tag."""
|
||||
checksum = dict.fromkeys(("md5", "sha1", "sha256"), None)
|
||||
|
@ -291,7 +348,7 @@ class RpmRepositoryDriver(RepositoryDriverBase):
|
|||
"""
|
||||
|
||||
return PackageVersion(
|
||||
int(attrs.get("epoch", 0)),
|
||||
attrs.get("ver", "0.0").split("."),
|
||||
attrs.get("rel", "0").split(".")
|
||||
attrs.get("epoch", 0),
|
||||
attrs.get("ver", "0.0"),
|
||||
attrs.get("rel", "0")
|
||||
)
|
||||
|
|
|
@ -104,6 +104,15 @@ def get_url_from_path(path):
|
|||
return "file://" + path
|
||||
|
||||
|
||||
def get_filename_from_uri(uri):
|
||||
"""Get filename from the URI.
|
||||
|
||||
:param uri: the URI
|
||||
:return: the filename
|
||||
"""
|
||||
return uri.rpartition("/")[-1]
|
||||
|
||||
|
||||
def normalize_repository_url(url):
|
||||
"""Convert URL of repository to normal form.
|
||||
|
||||
|
|
|
@ -25,9 +25,9 @@ class PackageVersion(ComparableObject):
|
|||
__slots__ = ["epoch", "version", "release"]
|
||||
|
||||
def __init__(self, epoch, version, release):
|
||||
self.epoch = int(epoch)
|
||||
self.version = tuple(version)
|
||||
self.release = tuple(release)
|
||||
self.epoch = int(epoch or 0)
|
||||
self.version = tuple(version.split('.'))
|
||||
self.release = tuple(release.split('.'))
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, text):
|
||||
|
@ -41,7 +41,7 @@ class PackageVersion(ComparableObject):
|
|||
components = components[1:]
|
||||
else:
|
||||
epoch = 0
|
||||
return cls(epoch, components[0].split("."), components[1].split("."))
|
||||
return cls(epoch, components[0], components[1])
|
||||
|
||||
def cmp(self, other):
|
||||
if not isinstance(other, PackageVersion):
|
||||
|
|
|
@ -25,3 +25,9 @@ except ImportError:
|
|||
class TestCase(unittest.TestCase):
|
||||
|
||||
"""Test case base class for all unit tests."""
|
||||
|
||||
def _check_cases(self, assertion, cases, method):
|
||||
for exp, value in cases:
|
||||
assertion(
|
||||
exp, method(*value), "{0} != f({1})".format(exp, value)
|
||||
)
|
||||
|
|
|
@ -265,3 +265,98 @@ class TestDebDriver(base.TestCase):
|
|||
open.assert_any_call("/root/dists/trusty/Release", "a+b")
|
||||
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_EX)
|
||||
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_UN)
|
||||
|
||||
@mock.patch.multiple(
|
||||
"packetary.drivers.deb_driver",
|
||||
deb822=mock.DEFAULT,
|
||||
gzip=mock.DEFAULT,
|
||||
open=mock.DEFAULT,
|
||||
os=mock.DEFAULT,
|
||||
)
|
||||
@mock.patch("packetary.drivers.deb_driver.utils.ensure_dir_exist")
|
||||
def test_create_repository(self, mkdir_mock, deb822, gzip, open, os):
|
||||
repository_data = {
|
||||
"name": "Test", "url": "file:///repo", "suite": "trusty",
|
||||
"section": "main", "type": "rpm", "priority": "100",
|
||||
"origin": "Origin", "path": "/repo"
|
||||
}
|
||||
repo = self.driver.create_repository(repository_data, "x86_64")
|
||||
self.assertEqual(repository_data["name"], repo.name)
|
||||
self.assertEqual("x86_64", repo.architecture)
|
||||
self.assertEqual(repository_data["url"] + "/", repo.url)
|
||||
self.assertEqual(repository_data["origin"], repo.origin)
|
||||
self.assertEqual(
|
||||
(repository_data["suite"], repository_data["section"]),
|
||||
repo.section
|
||||
)
|
||||
self.assertEqual(repository_data["path"], repo.path)
|
||||
mkdir_mock.assert_called_once_with(os.path.dirname())
|
||||
open.assert_any_call(
|
||||
"/repo/dists/trusty/main/binary-amd64/Release", "wb"
|
||||
)
|
||||
open.assert_any_call(
|
||||
"/repo/dists/trusty/main/binary-amd64/Packages", "ab"
|
||||
)
|
||||
gzip.open.assert_called_once_with(
|
||||
"/repo/dists/trusty/main/binary-amd64/Packages.gz", "ab"
|
||||
)
|
||||
|
||||
def test_createrepository_fails_if_invalid_data(self):
|
||||
repository_data = {
|
||||
"name": "Test", "url": "file:///repo", "suite": "trusty",
|
||||
"type": "rpm", "priority": "100",
|
||||
"origin": "Origin", "path": "/repo"
|
||||
}
|
||||
with self.assertRaisesRegexp(ValueError, "flat format"):
|
||||
self.driver.create_repository(repository_data, "x86_64")
|
||||
with self.assertRaisesRegexp(ValueError, "single component"):
|
||||
repository_data["section"] = ["main", "universe"]
|
||||
self.driver.create_repository(repository_data, "x86_64")
|
||||
|
||||
@mock.patch.multiple(
|
||||
"packetary.drivers.deb_driver",
|
||||
debfile=mock.DEFAULT,
|
||||
open=mock.DEFAULT,
|
||||
os=mock.DEFAULT,
|
||||
utils=mock.DEFAULT,
|
||||
)
|
||||
def test_load_package_from_file(self, debfile, os, open, utils):
|
||||
fake_repo = gen_repository(
|
||||
name=("trusty", "main"), url="file:///repo"
|
||||
)
|
||||
file_info = ("/test.rpm", 2, [3, 4, 5])
|
||||
utils.get_size_and_checksum_for_files.return_value = [file_info]
|
||||
debfile.DebFile().control.get_content.return_value = {
|
||||
"package": "Test",
|
||||
"version": "2.7.9-1",
|
||||
"depends": "test1 (>= 2.2.1)",
|
||||
"replaces": "test2 (<< 2.2.1)",
|
||||
"recommends": "test3 (>> 2.2.1)",
|
||||
"provides": "test4 (>> 2.2.1)"
|
||||
}
|
||||
pkg = self.driver.load_package_from_file(
|
||||
fake_repo, "pool/main/t/test.deb"
|
||||
)
|
||||
|
||||
self.assertEqual("Test", pkg.name)
|
||||
self.assertEqual("2.7.9-1", pkg.version)
|
||||
self.assertEqual("pool/main/t/test.deb", pkg.filename)
|
||||
self.assertEqual((3, 4, 5), pkg.checksum)
|
||||
self.assertEqual(2, pkg.filesize)
|
||||
self.assertItemsEqual(
|
||||
['test1 (>= 2.2.1)', 'test3 (> 2.2.1)'],
|
||||
(str(x) for x in pkg.requires)
|
||||
)
|
||||
self.assertItemsEqual(
|
||||
['test4 (> 2.2.1)'],
|
||||
(str(x) for x in pkg.provides)
|
||||
)
|
||||
self.assertEqual([], pkg.obsoletes)
|
||||
self.assertFalse(pkg.mandatory)
|
||||
|
||||
def test_get_relative_path(self):
|
||||
repo = gen_repository(
|
||||
"test", "file://repo", section=("trusty", "main")
|
||||
)
|
||||
rel_path = self.driver.get_relative_path(repo, "test.pkg")
|
||||
self.assertEqual("pool/main/t/test.pkg", rel_path)
|
||||
|
|
|
@ -23,22 +23,16 @@ from packetary.tests import base
|
|||
|
||||
|
||||
class TestLibraryUtils(base.TestCase):
|
||||
|
||||
def test_append_token_to_string(self):
|
||||
self.assertEqual(
|
||||
"v1 v2 v3",
|
||||
utils.append_token_to_string("v2 v3", "v1")
|
||||
)
|
||||
self.assertEqual(
|
||||
"v1",
|
||||
utils.append_token_to_string("", "v1")
|
||||
)
|
||||
self.assertEqual(
|
||||
"v1 v2 v3 v4",
|
||||
utils.append_token_to_string('v1\tv2 v3', "v4")
|
||||
)
|
||||
self.assertEqual(
|
||||
"v1 v2 v3",
|
||||
utils.append_token_to_string('v1 v2 v3', "v1")
|
||||
cases = [
|
||||
("v1 v2 v3", ("v2 v3", "v1")),
|
||||
("v1", ("", "v1")),
|
||||
("v1 v2 v3 v4", ("v1\tv2 v3", "v4")),
|
||||
("v1 v2 v3", ("v1 v2 v3", "v1")),
|
||||
]
|
||||
self._check_cases(
|
||||
self.assertEqual, cases, utils.append_token_to_string
|
||||
)
|
||||
|
||||
def test_composite_writer(self):
|
||||
|
@ -76,24 +70,15 @@ class TestLibraryUtils(base.TestCase):
|
|||
)
|
||||
|
||||
def test_get_path_from_url(self):
|
||||
self.assertEqual(
|
||||
"/a/f.txt",
|
||||
utils.get_path_from_url("/a/f.txt")
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
"/a/f.txt",
|
||||
utils.get_path_from_url("file:///a/f.txt?size=1")
|
||||
)
|
||||
|
||||
cases = [
|
||||
("/a/f.txt", ("/a/f.txt",)),
|
||||
("/a/f.txt", ("file:///a/f.txt?size=1",)),
|
||||
("/f.txt", ("http://host/f.txt", False)),
|
||||
]
|
||||
self._check_cases(self.assertEqual, cases, utils.get_path_from_url)
|
||||
with self.assertRaises(ValueError):
|
||||
utils.get_path_from_url("http:///a/f.txt")
|
||||
|
||||
self.assertEqual(
|
||||
"/f.txt",
|
||||
utils.get_path_from_url("http://host/f.txt", False)
|
||||
)
|
||||
|
||||
@mock.patch("packetary.library.utils.os")
|
||||
def test_normalize_repository_url(self, os_mock):
|
||||
def abs_patch_mock(p):
|
||||
|
@ -105,17 +90,14 @@ class TestLibraryUtils(base.TestCase):
|
|||
os_mock.path.abspath.side_effect = abs_patch_mock
|
||||
|
||||
cases = [
|
||||
("file:///repo/", "/repo"),
|
||||
("file:///root/repo/", "./repo"),
|
||||
("http://localhost/repo/", "http://localhost/repo"),
|
||||
("http://localhost/repo/", "http://localhost/repo/"),
|
||||
("file:///repo/", ("/repo",)),
|
||||
("file:///root/repo/", ("./repo",)),
|
||||
("http://localhost/repo/", ("http://localhost/repo",)),
|
||||
("http://localhost/repo/", ("http://localhost/repo/",)),
|
||||
]
|
||||
|
||||
for expected, url in cases:
|
||||
self.assertEqual(
|
||||
expected, utils.normalize_repository_url(url),
|
||||
"URL: {0}".format(url)
|
||||
)
|
||||
self._check_cases(
|
||||
self.assertEqual, cases, utils.normalize_repository_url
|
||||
)
|
||||
|
||||
@mock.patch("packetary.library.utils.os")
|
||||
def test_ensure_dir_exist(self, os):
|
||||
|
@ -133,3 +115,12 @@ class TestLibraryUtils(base.TestCase):
|
|||
utils.ensure_dir_exist("/private")
|
||||
with self.assertRaises(ValueError):
|
||||
utils.ensure_dir_exist(1)
|
||||
|
||||
def test_get_filename_from_uri(self):
|
||||
cases = [
|
||||
("test.pkg", ("test.pkg",)),
|
||||
("test.pkg", ("/root/test.pkg",)),
|
||||
("test.pkg", ("file:///root/test.pkg",)),
|
||||
("", ("file:///root/",))
|
||||
]
|
||||
self._check_cases(self.assertEqual, cases, utils.get_filename_from_uri)
|
||||
|
|
|
@ -89,6 +89,13 @@ class TestRepositoryApi(base.TestCase):
|
|||
context, "deb", "x86_64"
|
||||
)
|
||||
|
||||
def test_create_repository(self):
|
||||
file_urls = ["file://test1.pkg"]
|
||||
self.api.create_repository(self.repo_data, file_urls)
|
||||
self.controller.create_repository.assert_called_once_with(
|
||||
self.repo_data, file_urls
|
||||
)
|
||||
|
||||
def test_get_packages_as_is(self):
|
||||
packages = self.api.get_packages([self.repo_data], None)
|
||||
self.assertEqual(5, len(packages))
|
||||
|
@ -189,7 +196,7 @@ class TestRepositoryApi(base.TestCase):
|
|||
self.assertEqual(expected, actual)
|
||||
self.assertIsNone(self.api._load_requirements(None))
|
||||
|
||||
def test_validate_repos_data(self):
|
||||
def test_validate_repo_data(self):
|
||||
# TODO(bgaifullin) implement me
|
||||
pass
|
||||
|
||||
|
|
|
@ -109,7 +109,7 @@ class TestRepositoryController(base.TestCase):
|
|||
gen_package(name="test1", repository=repo, filesize=10),
|
||||
gen_package(name="test2", repository=repo, filesize=-1)
|
||||
]
|
||||
target = gen_repository(url="/test/repo")
|
||||
target = gen_repository(url="/test/repo/")
|
||||
self.context.connection.retrieve.side_effect = [0, 10]
|
||||
observer = mock.MagicMock()
|
||||
self.ctrl._copy_packages(target, packages, observer)
|
||||
|
@ -135,3 +135,44 @@ class TestRepositoryController(base.TestCase):
|
|||
observer = mock.MagicMock()
|
||||
self.ctrl._copy_packages(repo, packages, observer)
|
||||
self.assertFalse(self.context.connection.retrieve.called)
|
||||
|
||||
def test_copy_free_package(self):
|
||||
repo = gen_repository(url="file:///repo/")
|
||||
package = gen_package(name="test1", filename="file:///root/test.pkg",
|
||||
repository=None, filesize=10)
|
||||
self.driver.get_relative_path.side_effect = ["pool/t/test1.pkg"]
|
||||
self.ctrl._copy_package(repo, package, None)
|
||||
self.context.connection.retrieve.assert_called_once_with(
|
||||
"file:///root/test.pkg",
|
||||
"/repo/pool/t/test1.pkg",
|
||||
size=10
|
||||
)
|
||||
|
||||
def test_create_repository(self):
|
||||
repository_data = {
|
||||
"name": "Test", "url": "file:///repo/",
|
||||
"section": ("trusty", "main"), "origin": "Test"
|
||||
}
|
||||
repo = gen_repository(**repository_data)
|
||||
packages_list = ['/tmp/test1.pkg']
|
||||
packages = [gen_package(name="test2", repository=repo)]
|
||||
self.driver.create_repository.return_value = repo
|
||||
self.driver.load_package_from_file.side_effect = packages
|
||||
self.driver.get_relative_path.side_effect = ["pool/t/test1.pkg"]
|
||||
self.ctrl.create_repository(repository_data, packages_list)
|
||||
self.driver.create_repository.assert_called_once_with(
|
||||
repository_data, self.ctrl.arch
|
||||
)
|
||||
self.driver.get_relative_path.assert_called_once_with(
|
||||
repo, "test1.pkg"
|
||||
)
|
||||
self.context.connection.retrieve.assert_any_call(
|
||||
"/tmp/test1.pkg",
|
||||
"/repo/pool/t/test1.pkg"
|
||||
)
|
||||
self.driver.load_package_from_file.assert_called_once_with(
|
||||
repo, "pool/t/test1.pkg"
|
||||
)
|
||||
self.driver.add_packages.assert_called_once_with(
|
||||
self.ctrl.context.connection, repo, set(packages)
|
||||
)
|
||||
|
|
|
@ -165,8 +165,9 @@ class TestRpmDriver(base.TestCase):
|
|||
package = packages[0]
|
||||
self.assertTrue(package.mandatory)
|
||||
|
||||
@mock.patch("packetary.drivers.rpm_driver.os.path.exists")
|
||||
@mock.patch("packetary.drivers.rpm_driver.shutil")
|
||||
def test_add_packages(self, shutil):
|
||||
def test_add_packages(self, shutil, path_exists):
|
||||
self.createrepo.MDError = ValueError
|
||||
self.createrepo.MetaDataGenerator().doFinalMove.side_effect = [
|
||||
None, self.createrepo.MDError()
|
||||
|
@ -174,9 +175,9 @@ class TestRpmDriver(base.TestCase):
|
|||
repo = gen_repository("test", url="file:///repo/os/x86_64")
|
||||
self.createrepo.MetaDataConfig().outputdir = "/repo/os/x86_64"
|
||||
self.createrepo.MetaDataConfig().tempdir = "tmp"
|
||||
|
||||
self.createrepo.MetaDataConfig().finaldir = "repodata"
|
||||
path_exists.side_effect = [True, False]
|
||||
self.driver.add_packages(self.connection, repo, set())
|
||||
|
||||
self.assertEqual(
|
||||
"/repo/os/x86_64",
|
||||
self.createrepo.MetaDataConfig().directory
|
||||
|
@ -191,6 +192,8 @@ class TestRpmDriver(base.TestCase):
|
|||
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.driver.add_packages(self.connection, repo, set())
|
||||
|
||||
self.assertFalse(self.createrepo.MetaDataConfig().update)
|
||||
shutil.rmtree.assert_called_once_with(
|
||||
"/repo/os/x86_64/tmp", ignore_errors=True
|
||||
)
|
||||
|
@ -199,6 +202,9 @@ class TestRpmDriver(base.TestCase):
|
|||
def test_fork_repository(self, ensure_dir_exists_mock):
|
||||
repo = gen_repository("os", url="http://localhost/os/x86_64/")
|
||||
self.createrepo.MetaDataGenerator().doFinalMove.side_effect = [None]
|
||||
self.createrepo.MetaDataConfig().outputdir = "/repo/os/x86_64"
|
||||
self.createrepo.MetaDataConfig().tempdir = "tmp"
|
||||
self.createrepo.MetaDataConfig().finaldir = "repodata"
|
||||
new_repo = self.driver.fork_repository(
|
||||
self.connection,
|
||||
repo,
|
||||
|
@ -210,3 +216,65 @@ class TestRpmDriver(base.TestCase):
|
|||
self.assertEqual("file:///repo/os/x86_64/", new_repo.url)
|
||||
self.createrepo.MetaDataGenerator()\
|
||||
.doFinalMove.assert_called_once_with()
|
||||
|
||||
@mock.patch("packetary.drivers.rpm_driver.utils.ensure_dir_exist")
|
||||
def test_create_repository(self, ensure_dir_exists_mock):
|
||||
repository_data = {
|
||||
"name": "Test", "url": "file:///repo/os/x86_64", "origin": "Test"
|
||||
}
|
||||
repo = self.driver.create_repository(repository_data, "x86_64")
|
||||
ensure_dir_exists_mock.assert_called_once_with("/repo/os/x86_64/")
|
||||
self.assertEqual(repository_data["name"], repo.name)
|
||||
self.assertEqual("x86_64", repo.architecture)
|
||||
self.assertEqual(repository_data["url"] + "/", repo.url)
|
||||
self.assertEqual(repository_data["origin"], repo.origin)
|
||||
|
||||
@mock.patch("packetary.drivers.rpm_driver.utils")
|
||||
def test_load_package_from_file(self, utils):
|
||||
file_info = ("/test.rpm", 2, [3, 4, 5])
|
||||
utils.get_size_and_checksum_for_files.return_value = [file_info]
|
||||
utils.get_path_from_url.return_value = "/repo/x86_64/test.rpm"
|
||||
rpm_mock = mock.MagicMock(
|
||||
requires=[('test1', 'EQ', ('0', '1.2.3', '1.el5'))],
|
||||
provides=[('test2', None, (None, None, None))],
|
||||
obsoletes=[]
|
||||
)
|
||||
self.createrepo.yumbased.YumLocalPackage.return_value = rpm_mock
|
||||
rpm_mock.returnLocalHeader.return_value = {
|
||||
"name": "Test", "epoch": 1, "version": "1.2.3", "release": "1",
|
||||
"size": "10"
|
||||
}
|
||||
repo = gen_repository("Test", url="file:///repo/os/x86_64/")
|
||||
pkg = self.driver.load_package_from_file(repo, "test.rpm")
|
||||
utils.get_path_from_url.assert_called_once_with(
|
||||
"file:///repo/os/x86_64/test.rpm"
|
||||
)
|
||||
self.createrepo.yumbased.YumLocalPackage.assert_called_once_with(
|
||||
filename="/repo/x86_64/test.rpm"
|
||||
)
|
||||
utils.get_size_and_checksum_for_files.assert_called_once_with(
|
||||
["/repo/x86_64/test.rpm"], mock.ANY
|
||||
)
|
||||
|
||||
self.assertEqual("Test", pkg.name)
|
||||
self.assertEqual("1-1.2.3-1", str(pkg.version))
|
||||
self.assertEqual("test.rpm", pkg.filename)
|
||||
self.assertEqual((3, 4, 5), pkg.checksum)
|
||||
self.assertEqual(10, pkg.filesize)
|
||||
self.assertItemsEqual(
|
||||
['test1 (= 0-1.2.3-1.el5)'],
|
||||
(str(x) for x in pkg.requires)
|
||||
)
|
||||
self.assertItemsEqual(
|
||||
['test2 (any)'],
|
||||
(str(x) for x in pkg.provides)
|
||||
)
|
||||
self.assertEqual([], pkg.obsoletes)
|
||||
self.assertEqual(pkg.mandatory, False)
|
||||
|
||||
def test_get_relative_path(self):
|
||||
repo = gen_repository(
|
||||
"test", "file://repo", section=("trusty", "main")
|
||||
)
|
||||
rel_path = self.driver.get_relative_path(repo, "test.pkg")
|
||||
self.assertEqual("packages/test.pkg", rel_path)
|
||||
|
|
Loading…
Reference in New Issue