Implement version comparisons and dpkg queries.

This commit is contained in:
Robert Collins 2013-06-10 20:36:29 +12:00
parent 2c39aa2e56
commit 84eb38a96f
2 changed files with 311 additions and 7 deletions

View File

@ -18,7 +18,23 @@
from parsley import makeGrammar
import subprocess
grammar = """
debversion_grammar = """
epoch = <digit+>:d ':' -> d
trailingdeb = (upstream_segment* '-' debver)
upstream_segment = (letterOrDigit | '.' | '+' | '~' | ':')
upstreamver = digit (upstream_segment | ('-' ~~trailingdeb))*
upstreamver_no_hyphen = digit (letterOrDigit | '.' | '+' | '~' | ':')*
debver = (letterOrDigit | '.' | '+' | '~')+
upstream_no_hyphen = epoch?:e <upstreamver_no_hyphen>:u -> (e or '0', u, "")
upstream_hyphen = epoch?:e <upstreamver>:u '-' <debver>:d -> (e or '0', u, d)
debversion = upstream_hyphen | upstream_no_hyphen
"""
debversion_compiled = makeGrammar(debversion_grammar, {})
grammar = debversion_grammar + """
rules = rule*
rule = <name>:name selector?:selector version?:version '\n' -> (
name, selector or [], version or [])
@ -28,11 +44,6 @@ name = (lowercase|digit):start (lowercase|digit|'.'|'+'|'-')+:rest
ws = ' '+
profile = ('!'?:neg <(lowercase|digit|':')+>:name) -> (neg!='!', name)
selector = ws '[' profile:p1 (ws profile)*:p2 ']' -> [p1] + p2
epoch = digit+ ':'
upstreamver = digit (letterOrDigit | '.' | '+' | '-' | '~' | ':')*
upstreamver_no_hyphen = digit (letterOrDigit | '.' | '+' | '~' | ':')*
debver = digit (letterOrDigit | '.' | '+' | '~')*
debversion = epoch? (upstreamver_no_hyphen | (upstreamver debver?))
oneversion = <('<=' | '<' | '!=' | '==' | '>=' | '>')>:rel <debversion>:v -> (
rel, v)
version = ws oneversion:v1 (',' oneversion)*:v2 -> [v1] + v2
@ -86,7 +97,22 @@ class Depends(object):
:param rules: A list of rules, as returned by active_rules.
:return: A list of unsatisfied rules.
"""
return []
missing = set()
incompatible = []
for rule in rules:
installed = self.platform.get_pkg_version(rule[0])
if not installed:
missing.add(rule[0])
for operator, constraint in rule[2]:
if not _eval(installed, operator, constraint):
incompatible.append(
(rule[0], '%s%s' % (operator, constraint), installed))
result = []
if missing:
result.append(("missing", sorted(missing)))
if incompatible:
result.append(("badversion", incompatible))
return result
def profiles(self):
profiles = set()
@ -101,4 +127,154 @@ class Depends(object):
atoms = set([distro])
if distro in ["debian", "ubuntu"]:
atoms.add("dpkg")
self.platform = Dpkg()
return ["platform:%s" % (atom,) for atom in sorted(atoms)]
class Platform(object):
"""Interface for querying platform specific info."""
def get_pkg_version(self, pkg_name):
"""Find the installed version of pkg_name.
:return: None if pkg_name is not installed, or a version otherwise.
"""
raise NotImplementedError(self.get_pkg_version)
class Dpkg(Platform):
"""dpkg specific platform implementation.
This currently shells out to dpkg, it could in future use python-apt.
"""
def get_pkg_version(self, pkg_name):
try:
output = subprocess.check_output(
["dpkg-query", "-W", "-f",
"${binary:Package} ${Status} ${Version}\n",
pkg_name], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if (e.returncode == 1 and
e.output.startswith('dpkg-query: no packages found')):
return None
raise
# output looks like
# name planned status install-status version
output = output.strip()
elements = output.split(' ')
if elements[3] != 'installed':
return None
return elements[4]
def _eval_diff(operator, diff):
"""Return the boolean result for operator given diff.
:param diff: An int with negative values meaning the right most parameter
to the _eval function was greater than the left most parameter.
:return: True if the operator was satisfied.
"""
if operator == "==":
return diff == 0
if operator == "!=":
return diff != 0
if operator == "<":
return diff < 0
if operator == "<=":
return diff <= 0
if operator == ">":
return diff > 0
if operator == ">=":
return diff >= 0
def _to_ord(character):
# Per http://www.debian.org/doc/debian-policy/ch-controlfields.html
# The lexical comparison is a comparison of ASCII values modified so that
# all the letters sort earlier than all the non-letters and so that a
# tilde sorts before anything, even the end of a part.
# ord(~) -> 126
# ord('A') -> 65
# ord('Z') -> 90
# ord('a') -> 97
# ord('z') -> 122
# ord('+') -> 43
# ord('-') -> 45
# ord('.') -> 46
# ord(':') -> 58
if not character or character.isdigit(): # end of a part
return 1
elif character == '~':
return 0
else:
ordinal = ord(character)
if ordinal < 65:
# Shift non-characters up beyond the highest character.
ordinal+= 100
return ordinal
def _cmp_nondigit(left, right):
l_ord = _to_ord(left)
r_ord = _to_ord(right)
return l_ord - r_ord
def _find_int(a_str, offset):
"""Find an int within a_str.
:return: The int and the offset of the first character after the int.
"""
if offset == len(a_str):
return 0, offset
initial_offset = offset
while offset < len(a_str):
offset += 1
try:
result = int(a_str[initial_offset:offset])
except ValueError:
# past the end of the decimal bit
offset -= 1
break
return int(a_str[initial_offset:offset]), offset
def _eval(installed, operator, constraint):
if operator == "==":
return installed == constraint
if operator == "!=":
return installed != constraint
constraint_parsed = debversion_compiled(constraint).debversion()
installed_parsed = debversion_compiled(installed).debversion()
diff = int(installed_parsed[0]) - int(constraint_parsed[0])
if diff:
return _eval_diff(operator, diff)
diff = _cmp_segment(installed_parsed[1], constraint_parsed[1])
if diff:
return _eval_diff(operator, diff)
diff = _cmp_segment(installed_parsed[2], constraint_parsed[2])
return _eval_diff(operator, diff)
def _cmp_segment(l_str, r_str):
r_offset = 0
l_offset = 0
while (r_offset < len(r_str)) or (l_offset < len(l_str)):
r_char = r_str[r_offset:r_offset+1]
l_char = l_str[l_offset:l_offset+1]
if ((not r_char or r_char.isdigit())
and (not l_char or l_char.isdigit())):
l_int, l_offset = _find_int(l_str, l_offset)
r_int, r_offset = _find_int(r_str, r_offset)
diff = l_int - r_int
if diff:
return diff
diff = _cmp_nondigit(l_char, r_char)
if diff:
return diff
if not l_char.isdigit() and l_offset < len(l_str):
l_offset += 1
if not r_char.isdigit() and r_offset < len(r_str):
r_offset += 1
return 0

View File

@ -25,6 +25,9 @@ from testtools.matchers import MatchesSetwise
from testtools import TestCase
from bindep.depends import Depends
from bindep.depends import Dpkg
from bindep.depends import _eval
from bindep.depends import Platform
class TestDepends(TestCase):
@ -58,6 +61,7 @@ class TestDepends(TestCase):
depends = Depends("")
self.assertThat(
depends.platform_profiles(), Contains("platform:dpkg"))
self.assertIsInstance(depends.platform, Dpkg)
def test_finds_profiles(self):
depends = Depends(dedent("""\
@ -104,3 +108,127 @@ class TestDepends(TestCase):
depends = Depends("foo [on]\n")
self.assertEqual([], depends.active_rules(["default"]))
def test_check_rule_missing(self):
depends = Depends("")
mocker = mox.Mox()
depends.platform = mocker.CreateMock(Platform)
depends.platform.get_pkg_version("foo").AndReturn(None)
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.assertEqual(
[('missing', ['foo'])], depends.check_rules([("foo", [], [])]))
def test_check_rule_present(self):
depends = Depends("")
mocker = mox.Mox()
depends.platform = mocker.CreateMock(Platform)
depends.platform.get_pkg_version("foo").AndReturn("123")
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.assertEqual([], depends.check_rules([("foo", [], [])]))
def test_check_rule_incompatible(self):
depends = Depends("")
mocker = mox.Mox()
depends.platform = mocker.CreateMock(Platform)
depends.platform.get_pkg_version("foo").AndReturn("123")
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.assertEqual(
[('badversion', [('foo', "!=123", "123")])],
depends.check_rules([("foo", [], [("!=", "123")])]))
class TestDpkg(TestCase):
def test_not_installed(self):
platform = Dpkg()
mocker = mox.Mox()
mocker.StubOutWithMock(subprocess, "check_output")
subprocess.check_output(
["dpkg-query", "-W", "-f",
"${binary:Package} ${Status} ${Version}\n", "foo"],
stderr=subprocess.STDOUT).AndReturn(
"foo deinstall ok config-files 4.0.0-0ubuntu1\n")
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.addCleanup(mocker.UnsetStubs)
self.assertEqual(None, platform.get_pkg_version("foo"))
def test_unknown_package(self):
platform = Dpkg()
mocker = mox.Mox()
mocker.StubOutWithMock(subprocess, "check_output")
subprocess.check_output(
["dpkg-query", "-W", "-f",
"${binary:Package} ${Status} ${Version}\n", "foo"],
stderr=subprocess.STDOUT).AndRaise(
subprocess.CalledProcessError(
1, [], "dpkg-query: no packages found matching foo\n"))
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.addCleanup(mocker.UnsetStubs)
self.assertEqual(None, platform.get_pkg_version("foo"))
def test_installed_version(self):
platform = Dpkg()
mocker = mox.Mox()
mocker.StubOutWithMock(subprocess, "check_output")
subprocess.check_output(
["dpkg-query", "-W", "-f",
"${binary:Package} ${Status} ${Version}\n", "foo"],
stderr=subprocess.STDOUT).AndReturn(
"foo install ok installed 4.0.0-0ubuntu1\n")
mocker.ReplayAll()
self.addCleanup(mocker.VerifyAll)
self.addCleanup(mocker.UnsetStubs)
self.assertEqual("4.0.0-0ubuntu1", platform.get_pkg_version("foo"))
class TestEval(TestCase):
def test_lt(self):
self.assertEqual(True, _eval("3.5-ubuntu", "<", "4"))
self.assertEqual(False, _eval("4", "<", "3.5-ubuntu"))
self.assertEqual(False, _eval("4", "<", "4"))
# Epoch comes first
self.assertEqual(True, _eval("1:2", "<", "2:1"))
# ~'s
self.assertEqual(True, _eval("1~~", "<", "1~~a"))
self.assertEqual(True, _eval("1~~a", "<", "1~"))
self.assertEqual(True, _eval("1~", "<", "1"))
self.assertEqual(True, _eval("1", "<", "1a"))
# debver's
self.assertEqual(True, _eval("1-a~~", "<", "1-a~~a"))
self.assertEqual(True, _eval("1-a~~a", "<", "1-a~"))
self.assertEqual(True, _eval("1-a~", "<", "1-a"))
self.assertEqual(True, _eval("1-a", "<", "1-aa"))
# end-of-segment
self.assertEqual(True, _eval("1a", "<", "1aa"))
self.assertEqual(True, _eval("1a-a", "<", "1a-aa"))
def test_lte(self):
self.assertEqual(True, _eval("3.5-ubuntu", "<=", "4"))
self.assertEqual(False, _eval("4", "<=", "3.5-ubuntu"))
self.assertEqual(True, _eval("4", "<=", "4"))
def test_eq(self):
self.assertEqual(True, _eval("3.5-ubuntu", "==", "3.5-ubuntu"))
self.assertEqual(False, _eval("4", "==", "3.5-ubuntu"))
self.assertEqual(False, _eval("3.5-ubuntu", "==", "4"))
def test_neq(self):
self.assertEqual(False, _eval("3.5-ubuntu", "!=", "3.5-ubuntu"))
self.assertEqual(True, _eval("4", "!=", "3.5-ubuntu"))
self.assertEqual(True, _eval("3.5-ubuntu", "!=", "4"))
def test_gt(self):
self.assertEqual(False, _eval("3.5-ubuntu", ">", "4"))
self.assertEqual(True, _eval("4", ">", "3.5-ubuntu"))
self.assertEqual(False, _eval("4", ">", "4"))
def test_gte(self):
self.assertEqual(False, _eval("3.5-ubuntu", ">=", "4"))
self.assertEqual(True, _eval("4", ">=", "3.5-ubuntu"))
self.assertEqual(True, _eval("4", ">=", "4"))