Introduced API to build DEB and RPM repository

Change-Id: I7abc2705f66228e81b36febe146d06988efb7b00
Implements: blueprint build-repository
This commit is contained in:
Bulat Gaifullin 2016-01-25 22:52:20 +03:00 committed by Bulat Gaifullin
parent 1ce69b4fef
commit aff85a919e
13 changed files with 476 additions and 75 deletions

View File

@ -112,6 +112,15 @@ class RepositoryApi(object):
context = config if isinstance(config, Context) else Context(config)
return cls(RepositoryController.load(context, repotype, repoarch))
def create_repository(self, repo_data, package_files):
"""Create new repository with specified packages.
:param repo_data: The description of repository
:param package_files: The list of URLs of packages
"""
self._validate_repo_data(repo_data)
return self.controller.create_repository(repo_data, package_files)
def get_packages(self, repos_data, requirements_data=None,
include_mandatory=False):
"""Gets the list of packages from repository(es).
@ -192,7 +201,8 @@ class RepositoryApi(object):
self.controller.load_packages(repo, consumer)
def _load_repositories(self, repos_data):
self._validate_repos_data(repos_data)
for repo_data in repos_data:
self._validate_repo_data(repo_data)
return self.controller.load_repositories(repos_data)
def _load_requirements(self, requirements_data):
@ -213,7 +223,7 @@ class RepositoryApi(object):
))
return result
def _validate_repos_data(self, repos_data):
def _validate_repo_data(self, repo_data):
# TODO(bgaifullin) implement me
pass

View File

@ -97,6 +97,7 @@ class RepositoryController(object):
destination,
repository.path or utils.get_path_from_url(repository.url, False)
)
logger.info("cloning repository '%s' to '%s'", repository, new_path)
return self.driver.fork_repository(
self.context.connection, repository, new_path, source, locale
)
@ -120,6 +121,22 @@ class RepositoryController(object):
self.context.connection, repository, packages
)
def create_repository(self, repository_data, package_files):
"""Creates new repository from specified packages.
:param repository_data: the description of repository
:param package_files: the list of paths of packages
:return : the new repository
"""
repo = self.driver.create_repository(repository_data, self.arch)
packages = set()
with self.context.async_section() as section:
for url in package_files:
section.execute(self._add_package, repo, url, packages.add)
self.assign_packages(repo, packages)
return repo
def _copy_packages(self, target, packages, observer):
with self.context.async_section() as section:
for package in packages:
@ -128,17 +145,33 @@ class RepositoryController(object):
)
def _copy_package(self, target, package, observer):
bytes_copied = 0
if target.url != package.repository.url:
dst_path = os.path.join(
utils.get_path_from_url(target.url), package.filename
if package.repository is None:
src_url = package.filename
dst_path = self.driver.get_relative_path(
target, utils.get_filename_from_uri(package.filename)
)
src_path = urljoin(package.repository.url, package.filename)
bytes_copied = self.context.connection.retrieve(
src_path, dst_path, size=package.filesize
)
if package.filesize < 0:
package.filesize = bytes_copied
elif target.url != package.repository.url:
src_url = urljoin(package.repository.url, package.filename)
dst_path = package.filename
else:
return
bytes_copied = self.context.connection.retrieve(
src_url,
utils.get_path_from_url(urljoin(target.url, dst_path)),
size=package.filesize
)
if package.filesize < 0:
package.filesize = bytes_copied
if observer:
observer(bytes_copied)
def _add_package(self, repository, src_url, consumer):
dst_path = self.driver.get_relative_path(
repository, utils.get_filename_from_uri(src_url)
)
self.context.connection.retrieve(
src_url,
utils.get_path_from_url(urljoin(repository.url, dst_path))
)
consumer(self.driver.load_package_from_file(repository, dst_path))

View File

@ -31,6 +31,7 @@ class RepositoryDriverBase(object):
- implement all abstract methods
- register implementation in 'packetary.drivers' namespace
"""
def __init__(self):
self.logger = logging.getLogger(__package__)
@ -75,6 +76,33 @@ class RepositoryDriverBase(object):
:param packages: the set of packages
"""
@abc.abstractmethod
def create_repository(self, repository_data, arch):
"""Create new repository.
:param repository_data: repository input data
:param arch: the repository`s architecture
:return: new repository object
"""
@abc.abstractmethod
def load_package_from_file(self, repository, filepath):
"""Create package object from file.
:param repository: the repository object
:param filepath: the package path
:return: new package object
"""
@abc.abstractmethod
def get_relative_path(self, repository, filename):
"""Gets the relative path from filename of package.
:param repository: the repository object
:param filename: the full package url
:return: relative path
"""
@abc.abstractmethod
def priority_sort(self, repo_data):
"""Key method to sort repositories data by priority.

View File

@ -191,14 +191,71 @@ class DebRepositoryDriver(RepositoryDriverBase):
# TODO(sources and locales)
new_repo = copy.copy(repository)
new_repo.url = utils.normalize_repository_url(destination)
self._create_repository_structure(new_repo)
return new_repo
def create_repository(self, repository_data, arch):
url = utils.normalize_repository_url(repository_data['url'])
suite = repository_data['suite']
component = repository_data.get('section')
path = repository_data.get('path')
name = repository_data.get('name')
origin = repository_data.get('origin')
if component is None:
raise ValueError("The flat format does not supported.")
if isinstance(component, list):
if len(component) != 1:
raise ValueError("The only single component is acceptable.")
component = component[0]
repository = Repository(
name=name,
url=url,
architecture=arch,
origin=origin,
section=(suite, component),
path=path
)
self._create_repository_structure(repository)
self.logger.info("Created: %d repository.", repository.name)
return repository
def load_package_from_file(self, repository, filename):
filepath = utils.get_path_from_url(repository.url + filename)
_, size, checksum = next(iter(utils.get_size_and_checksum_for_files(
[filepath], _checksum_collector)
))
with closing(debfile.DebFile(filepath)) as deb:
debcontrol = deb822.Packages(
deb.control.get_content(debfile.CONTROL_FILE)
)
return Package(
repository=repository,
name=debcontrol["package"],
version=Version(debcontrol['version']),
filesize=int(debcontrol.get('size', size)),
filename=filename,
checksum=FileChecksum(*checksum),
mandatory=self._is_mandatory(debcontrol),
requires=self._get_relations(
debcontrol, "depends", "pre-depends",
"recommends"
),
provides=self._get_relations(debcontrol, "provides"),
obsoletes=[]
)
def get_relative_path(self, repository, filename):
return "/".join(("pool", repository.section[1], filename[0], filename))
def _create_repository_structure(self, repository):
packages_file = utils.get_path_from_url(
self._get_url_of_metafile(new_repo, "Packages")
self._get_url_of_metafile(repository, "Packages")
)
release_file = utils.get_path_from_url(
self._get_url_of_metafile(new_repo, "Release")
)
self.logger.info(
"clone repository %s to %s", repository, new_repo.url
self._get_url_of_metafile(repository, "Release")
)
utils.ensure_dir_exist(os.path.dirname(release_file))
@ -213,7 +270,6 @@ class DebRepositoryDriver(RepositoryDriverBase):
open(packages_file, "ab").close()
gzip.open(packages_file + ".gz", "ab").close()
return new_repo
def _update_suite_index(self, repository):
"""Updates the Release file in the suite."""

View File

@ -25,7 +25,9 @@ import createrepo
import lxml.etree as etree
import six
from packetary.drivers.base import RepositoryDriverBase
from packetary.library.checksum import composite as checksum_composite
from packetary.library.streams import GzipDecompress
from packetary.library import utils
from packetary.objects import FileChecksum
@ -33,6 +35,7 @@ from packetary.objects import Package
from packetary.objects import PackageRelation
from packetary.objects import PackageVersion
from packetary.objects import Repository
from packetary.objects import VersionRange
urljoin = six.moves.urllib.parse.urljoin
@ -49,7 +52,10 @@ _NAMESPACES = {
"rpm": "http://linux.duke.edu/metadata/rpm"
}
_checksum_collector = checksum_composite('md5', 'sha1', 'sha256')
_OPERATORS_MAPPING = {
None: None,
'GT': '>',
'LT': '<',
'EQ': '=',
@ -57,12 +63,12 @@ _OPERATORS_MAPPING = {
'LE': '<=',
}
_DEFAULT_PRIORITY = 10
class CreaterepoCallBack(object):
"""Callback object for createrepo"""
def __init__(self, logger):
self.logger = logger
@ -152,7 +158,9 @@ class RpmRepositoryDriver(RepositoryDriverBase):
try:
md_config.workers = multiprocessing.cpu_count()
md_config.directory = str(basepath)
md_config.update = True
md_config.update = os.path.exists(
os.path.join(basepath, md_config.finaldir)
)
mdgen = createrepo.MetaDataGenerator(
config_obj=md_config, callback=CreaterepoCallBack(self.logger)
)
@ -180,13 +188,45 @@ class RpmRepositoryDriver(RepositoryDriverBase):
# TODO(sources and locales)
new_repo = copy.copy(repository)
new_repo.url = utils.normalize_repository_url(destination)
self.logger.info(
"clone repository %s to %s", repository, new_repo.url
)
utils.ensure_dir_exist(destination)
self.add_packages(connection, new_repo, set())
return new_repo
def create_repository(self, repository_data, arch):
repository = Repository(
name=repository_data['name'],
url=utils.normalize_repository_url(repository_data["url"]),
architecture=arch,
origin=repository_data.get('origin')
)
utils.ensure_dir_exist(utils.get_path_from_url(repository.url))
return repository
def load_package_from_file(self, repository, filepath):
fullpath = utils.get_path_from_url(repository.url + filepath)
_, size, checksum = next(iter(utils.get_size_and_checksum_for_files(
[fullpath], _checksum_collector)
))
pkg = createrepo.yumbased.YumLocalPackage(filename=fullpath)
hdr = pkg.returnLocalHeader()
return Package(
repository=repository,
name=hdr["name"],
version=PackageVersion(
hdr['epoch'], hdr['version'], hdr['release']
),
filesize=int(hdr['size']),
filename=filepath,
checksum=FileChecksum(*checksum),
mandatory=False,
requires=self._parse_package_relations(pkg.requires),
obsoletes=self._parse_package_relations(pkg.obsoletes),
provides=self._parse_package_relations(pkg.provides),
)
def get_relative_path(self, repository, filename):
return "packages/" + filename
def _load_db(self, connection, baseurl, repomd, *aliases):
"""Loads database.
@ -260,6 +300,23 @@ class RpmRepositoryDriver(RepositoryDriverBase):
return relations
@staticmethod
def _parse_package_relations(relations):
"""Parses yum package relations.
:param relations: list of tuples
(name, flags, (epoch, version, release))
:return: list of PackageRelation objects
"""
return [
PackageRelation(
x[0], VersionRange(
_OPERATORS_MAPPING[x[1]], x[1] and PackageVersion(*x[2])
)
)
for x in relations
]
def _get_checksum(self, pkg_tag):
"""Gets checksum from package tag."""
checksum = dict.fromkeys(("md5", "sha1", "sha256"), None)
@ -291,7 +348,7 @@ class RpmRepositoryDriver(RepositoryDriverBase):
"""
return PackageVersion(
int(attrs.get("epoch", 0)),
attrs.get("ver", "0.0").split("."),
attrs.get("rel", "0").split(".")
attrs.get("epoch", 0),
attrs.get("ver", "0.0"),
attrs.get("rel", "0")
)

View File

@ -104,6 +104,15 @@ def get_url_from_path(path):
return "file://" + path
def get_filename_from_uri(uri):
"""Get filename from the URI.
:param uri: the URI
:return: the filename
"""
return uri.rpartition("/")[-1]
def normalize_repository_url(url):
"""Convert URL of repository to normal form.

View File

@ -25,9 +25,9 @@ class PackageVersion(ComparableObject):
__slots__ = ["epoch", "version", "release"]
def __init__(self, epoch, version, release):
self.epoch = int(epoch)
self.version = tuple(version)
self.release = tuple(release)
self.epoch = int(epoch or 0)
self.version = tuple(version.split('.'))
self.release = tuple(release.split('.'))
@classmethod
def from_string(cls, text):
@ -41,7 +41,7 @@ class PackageVersion(ComparableObject):
components = components[1:]
else:
epoch = 0
return cls(epoch, components[0].split("."), components[1].split("."))
return cls(epoch, components[0], components[1])
def cmp(self, other):
if not isinstance(other, PackageVersion):

View File

@ -25,3 +25,9 @@ except ImportError:
class TestCase(unittest.TestCase):
"""Test case base class for all unit tests."""
def _check_cases(self, assertion, cases, method):
for exp, value in cases:
assertion(
exp, method(*value), "{0} != f({1})".format(exp, value)
)

View File

@ -265,3 +265,98 @@ class TestDebDriver(base.TestCase):
open.assert_any_call("/root/dists/trusty/Release", "a+b")
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_EX)
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_UN)
@mock.patch.multiple(
"packetary.drivers.deb_driver",
deb822=mock.DEFAULT,
gzip=mock.DEFAULT,
open=mock.DEFAULT,
os=mock.DEFAULT,
)
@mock.patch("packetary.drivers.deb_driver.utils.ensure_dir_exist")
def test_create_repository(self, mkdir_mock, deb822, gzip, open, os):
repository_data = {
"name": "Test", "url": "file:///repo", "suite": "trusty",
"section": "main", "type": "rpm", "priority": "100",
"origin": "Origin", "path": "/repo"
}
repo = self.driver.create_repository(repository_data, "x86_64")
self.assertEqual(repository_data["name"], repo.name)
self.assertEqual("x86_64", repo.architecture)
self.assertEqual(repository_data["url"] + "/", repo.url)
self.assertEqual(repository_data["origin"], repo.origin)
self.assertEqual(
(repository_data["suite"], repository_data["section"]),
repo.section
)
self.assertEqual(repository_data["path"], repo.path)
mkdir_mock.assert_called_once_with(os.path.dirname())
open.assert_any_call(
"/repo/dists/trusty/main/binary-amd64/Release", "wb"
)
open.assert_any_call(
"/repo/dists/trusty/main/binary-amd64/Packages", "ab"
)
gzip.open.assert_called_once_with(
"/repo/dists/trusty/main/binary-amd64/Packages.gz", "ab"
)
def test_createrepository_fails_if_invalid_data(self):
repository_data = {
"name": "Test", "url": "file:///repo", "suite": "trusty",
"type": "rpm", "priority": "100",
"origin": "Origin", "path": "/repo"
}
with self.assertRaisesRegexp(ValueError, "flat format"):
self.driver.create_repository(repository_data, "x86_64")
with self.assertRaisesRegexp(ValueError, "single component"):
repository_data["section"] = ["main", "universe"]
self.driver.create_repository(repository_data, "x86_64")
@mock.patch.multiple(
"packetary.drivers.deb_driver",
debfile=mock.DEFAULT,
open=mock.DEFAULT,
os=mock.DEFAULT,
utils=mock.DEFAULT,
)
def test_load_package_from_file(self, debfile, os, open, utils):
fake_repo = gen_repository(
name=("trusty", "main"), url="file:///repo"
)
file_info = ("/test.rpm", 2, [3, 4, 5])
utils.get_size_and_checksum_for_files.return_value = [file_info]
debfile.DebFile().control.get_content.return_value = {
"package": "Test",
"version": "2.7.9-1",
"depends": "test1 (>= 2.2.1)",
"replaces": "test2 (<< 2.2.1)",
"recommends": "test3 (>> 2.2.1)",
"provides": "test4 (>> 2.2.1)"
}
pkg = self.driver.load_package_from_file(
fake_repo, "pool/main/t/test.deb"
)
self.assertEqual("Test", pkg.name)
self.assertEqual("2.7.9-1", pkg.version)
self.assertEqual("pool/main/t/test.deb", pkg.filename)
self.assertEqual((3, 4, 5), pkg.checksum)
self.assertEqual(2, pkg.filesize)
self.assertItemsEqual(
['test1 (>= 2.2.1)', 'test3 (> 2.2.1)'],
(str(x) for x in pkg.requires)
)
self.assertItemsEqual(
['test4 (> 2.2.1)'],
(str(x) for x in pkg.provides)
)
self.assertEqual([], pkg.obsoletes)
self.assertFalse(pkg.mandatory)
def test_get_relative_path(self):
repo = gen_repository(
"test", "file://repo", section=("trusty", "main")
)
rel_path = self.driver.get_relative_path(repo, "test.pkg")
self.assertEqual("pool/main/t/test.pkg", rel_path)

View File

@ -23,22 +23,16 @@ from packetary.tests import base
class TestLibraryUtils(base.TestCase):
def test_append_token_to_string(self):
self.assertEqual(
"v1 v2 v3",
utils.append_token_to_string("v2 v3", "v1")
)
self.assertEqual(
"v1",
utils.append_token_to_string("", "v1")
)
self.assertEqual(
"v1 v2 v3 v4",
utils.append_token_to_string('v1\tv2 v3', "v4")
)
self.assertEqual(
"v1 v2 v3",
utils.append_token_to_string('v1 v2 v3', "v1")
cases = [
("v1 v2 v3", ("v2 v3", "v1")),
("v1", ("", "v1")),
("v1 v2 v3 v4", ("v1\tv2 v3", "v4")),
("v1 v2 v3", ("v1 v2 v3", "v1")),
]
self._check_cases(
self.assertEqual, cases, utils.append_token_to_string
)
def test_composite_writer(self):
@ -76,24 +70,15 @@ class TestLibraryUtils(base.TestCase):
)
def test_get_path_from_url(self):
self.assertEqual(
"/a/f.txt",
utils.get_path_from_url("/a/f.txt")
)
self.assertEqual(
"/a/f.txt",
utils.get_path_from_url("file:///a/f.txt?size=1")
)
cases = [
("/a/f.txt", ("/a/f.txt",)),
("/a/f.txt", ("file:///a/f.txt?size=1",)),
("/f.txt", ("http://host/f.txt", False)),
]
self._check_cases(self.assertEqual, cases, utils.get_path_from_url)
with self.assertRaises(ValueError):
utils.get_path_from_url("http:///a/f.txt")
self.assertEqual(
"/f.txt",
utils.get_path_from_url("http://host/f.txt", False)
)
@mock.patch("packetary.library.utils.os")
def test_normalize_repository_url(self, os_mock):
def abs_patch_mock(p):
@ -105,17 +90,14 @@ class TestLibraryUtils(base.TestCase):
os_mock.path.abspath.side_effect = abs_patch_mock
cases = [
("file:///repo/", "/repo"),
("file:///root/repo/", "./repo"),
("http://localhost/repo/", "http://localhost/repo"),
("http://localhost/repo/", "http://localhost/repo/"),
("file:///repo/", ("/repo",)),
("file:///root/repo/", ("./repo",)),
("http://localhost/repo/", ("http://localhost/repo",)),
("http://localhost/repo/", ("http://localhost/repo/",)),
]
for expected, url in cases:
self.assertEqual(
expected, utils.normalize_repository_url(url),
"URL: {0}".format(url)
)
self._check_cases(
self.assertEqual, cases, utils.normalize_repository_url
)
@mock.patch("packetary.library.utils.os")
def test_ensure_dir_exist(self, os):
@ -133,3 +115,12 @@ class TestLibraryUtils(base.TestCase):
utils.ensure_dir_exist("/private")
with self.assertRaises(ValueError):
utils.ensure_dir_exist(1)
def test_get_filename_from_uri(self):
cases = [
("test.pkg", ("test.pkg",)),
("test.pkg", ("/root/test.pkg",)),
("test.pkg", ("file:///root/test.pkg",)),
("", ("file:///root/",))
]
self._check_cases(self.assertEqual, cases, utils.get_filename_from_uri)

View File

@ -89,6 +89,13 @@ class TestRepositoryApi(base.TestCase):
context, "deb", "x86_64"
)
def test_create_repository(self):
file_urls = ["file://test1.pkg"]
self.api.create_repository(self.repo_data, file_urls)
self.controller.create_repository.assert_called_once_with(
self.repo_data, file_urls
)
def test_get_packages_as_is(self):
packages = self.api.get_packages([self.repo_data], None)
self.assertEqual(5, len(packages))
@ -189,7 +196,7 @@ class TestRepositoryApi(base.TestCase):
self.assertEqual(expected, actual)
self.assertIsNone(self.api._load_requirements(None))
def test_validate_repos_data(self):
def test_validate_repo_data(self):
# TODO(bgaifullin) implement me
pass

View File

@ -109,7 +109,7 @@ class TestRepositoryController(base.TestCase):
gen_package(name="test1", repository=repo, filesize=10),
gen_package(name="test2", repository=repo, filesize=-1)
]
target = gen_repository(url="/test/repo")
target = gen_repository(url="/test/repo/")
self.context.connection.retrieve.side_effect = [0, 10]
observer = mock.MagicMock()
self.ctrl._copy_packages(target, packages, observer)
@ -135,3 +135,44 @@ class TestRepositoryController(base.TestCase):
observer = mock.MagicMock()
self.ctrl._copy_packages(repo, packages, observer)
self.assertFalse(self.context.connection.retrieve.called)
def test_copy_free_package(self):
repo = gen_repository(url="file:///repo/")
package = gen_package(name="test1", filename="file:///root/test.pkg",
repository=None, filesize=10)
self.driver.get_relative_path.side_effect = ["pool/t/test1.pkg"]
self.ctrl._copy_package(repo, package, None)
self.context.connection.retrieve.assert_called_once_with(
"file:///root/test.pkg",
"/repo/pool/t/test1.pkg",
size=10
)
def test_create_repository(self):
repository_data = {
"name": "Test", "url": "file:///repo/",
"section": ("trusty", "main"), "origin": "Test"
}
repo = gen_repository(**repository_data)
packages_list = ['/tmp/test1.pkg']
packages = [gen_package(name="test2", repository=repo)]
self.driver.create_repository.return_value = repo
self.driver.load_package_from_file.side_effect = packages
self.driver.get_relative_path.side_effect = ["pool/t/test1.pkg"]
self.ctrl.create_repository(repository_data, packages_list)
self.driver.create_repository.assert_called_once_with(
repository_data, self.ctrl.arch
)
self.driver.get_relative_path.assert_called_once_with(
repo, "test1.pkg"
)
self.context.connection.retrieve.assert_any_call(
"/tmp/test1.pkg",
"/repo/pool/t/test1.pkg"
)
self.driver.load_package_from_file.assert_called_once_with(
repo, "pool/t/test1.pkg"
)
self.driver.add_packages.assert_called_once_with(
self.ctrl.context.connection, repo, set(packages)
)

View File

@ -165,8 +165,9 @@ class TestRpmDriver(base.TestCase):
package = packages[0]
self.assertTrue(package.mandatory)
@mock.patch("packetary.drivers.rpm_driver.os.path.exists")
@mock.patch("packetary.drivers.rpm_driver.shutil")
def test_add_packages(self, shutil):
def test_add_packages(self, shutil, path_exists):
self.createrepo.MDError = ValueError
self.createrepo.MetaDataGenerator().doFinalMove.side_effect = [
None, self.createrepo.MDError()
@ -174,9 +175,9 @@ class TestRpmDriver(base.TestCase):
repo = gen_repository("test", url="file:///repo/os/x86_64")
self.createrepo.MetaDataConfig().outputdir = "/repo/os/x86_64"
self.createrepo.MetaDataConfig().tempdir = "tmp"
self.createrepo.MetaDataConfig().finaldir = "repodata"
path_exists.side_effect = [True, False]
self.driver.add_packages(self.connection, repo, set())
self.assertEqual(
"/repo/os/x86_64",
self.createrepo.MetaDataConfig().directory
@ -191,6 +192,8 @@ class TestRpmDriver(base.TestCase):
with self.assertRaises(RuntimeError):
self.driver.add_packages(self.connection, repo, set())
self.assertFalse(self.createrepo.MetaDataConfig().update)
shutil.rmtree.assert_called_once_with(
"/repo/os/x86_64/tmp", ignore_errors=True
)
@ -199,6 +202,9 @@ class TestRpmDriver(base.TestCase):
def test_fork_repository(self, ensure_dir_exists_mock):
repo = gen_repository("os", url="http://localhost/os/x86_64/")
self.createrepo.MetaDataGenerator().doFinalMove.side_effect = [None]
self.createrepo.MetaDataConfig().outputdir = "/repo/os/x86_64"
self.createrepo.MetaDataConfig().tempdir = "tmp"
self.createrepo.MetaDataConfig().finaldir = "repodata"
new_repo = self.driver.fork_repository(
self.connection,
repo,
@ -210,3 +216,65 @@ class TestRpmDriver(base.TestCase):
self.assertEqual("file:///repo/os/x86_64/", new_repo.url)
self.createrepo.MetaDataGenerator()\
.doFinalMove.assert_called_once_with()
@mock.patch("packetary.drivers.rpm_driver.utils.ensure_dir_exist")
def test_create_repository(self, ensure_dir_exists_mock):
repository_data = {
"name": "Test", "url": "file:///repo/os/x86_64", "origin": "Test"
}
repo = self.driver.create_repository(repository_data, "x86_64")
ensure_dir_exists_mock.assert_called_once_with("/repo/os/x86_64/")
self.assertEqual(repository_data["name"], repo.name)
self.assertEqual("x86_64", repo.architecture)
self.assertEqual(repository_data["url"] + "/", repo.url)
self.assertEqual(repository_data["origin"], repo.origin)
@mock.patch("packetary.drivers.rpm_driver.utils")
def test_load_package_from_file(self, utils):
file_info = ("/test.rpm", 2, [3, 4, 5])
utils.get_size_and_checksum_for_files.return_value = [file_info]
utils.get_path_from_url.return_value = "/repo/x86_64/test.rpm"
rpm_mock = mock.MagicMock(
requires=[('test1', 'EQ', ('0', '1.2.3', '1.el5'))],
provides=[('test2', None, (None, None, None))],
obsoletes=[]
)
self.createrepo.yumbased.YumLocalPackage.return_value = rpm_mock
rpm_mock.returnLocalHeader.return_value = {
"name": "Test", "epoch": 1, "version": "1.2.3", "release": "1",
"size": "10"
}
repo = gen_repository("Test", url="file:///repo/os/x86_64/")
pkg = self.driver.load_package_from_file(repo, "test.rpm")
utils.get_path_from_url.assert_called_once_with(
"file:///repo/os/x86_64/test.rpm"
)
self.createrepo.yumbased.YumLocalPackage.assert_called_once_with(
filename="/repo/x86_64/test.rpm"
)
utils.get_size_and_checksum_for_files.assert_called_once_with(
["/repo/x86_64/test.rpm"], mock.ANY
)
self.assertEqual("Test", pkg.name)
self.assertEqual("1-1.2.3-1", str(pkg.version))
self.assertEqual("test.rpm", pkg.filename)
self.assertEqual((3, 4, 5), pkg.checksum)
self.assertEqual(10, pkg.filesize)
self.assertItemsEqual(
['test1 (= 0-1.2.3-1.el5)'],
(str(x) for x in pkg.requires)
)
self.assertItemsEqual(
['test2 (any)'],
(str(x) for x in pkg.provides)
)
self.assertEqual([], pkg.obsoletes)
self.assertEqual(pkg.mandatory, False)
def test_get_relative_path(self):
repo = gen_repository(
"test", "file://repo", section=("trusty", "main")
)
rel_path = self.driver.get_relative_path(repo, "test.pkg")
self.assertEqual("packages/test.pkg", rel_path)