Added basic Repository.branches implementation.

This commit is contained in:
Tamir Bahar 2017-04-13 01:43:50 +03:00
parent dd57c9b366
commit d14438725e
2 changed files with 191 additions and 9 deletions

View File

@ -32,6 +32,7 @@ from __future__ import absolute_import
from string import hexdigits
import sys, tarfile
from time import time
if sys.version_info[0] < 3:
from cStringIO import StringIO
else:
@ -44,6 +45,7 @@ from _pygit2 import Repository as _Repository, init_file_backend
from _pygit2 import Oid, GIT_OID_HEXSZ, GIT_OID_MINPREFIXLEN
from _pygit2 import GIT_CHECKOUT_SAFE, GIT_CHECKOUT_RECREATE_MISSING, GIT_DIFF_NORMAL
from _pygit2 import GIT_FILEMODE_LINK
from _pygit2 import GIT_BRANCH_LOCAL, GIT_BRANCH_REMOTE
from _pygit2 import Reference, Tree, Commit, Blob, Signature
from .config import Config
@ -57,7 +59,6 @@ from .submodule import Submodule
class BaseRepository(_Repository):
def __init__(self, backend, *args, **kwargs):
super(BaseRepository, self).__init__(backend, *args, **kwargs)
self._common_init()
@ -484,6 +485,7 @@ class BaseRepository(_Repository):
@staticmethod
def _merge_options(favor):
"""Return a 'git_merge_opts *'"""
def favor_to_enum(favor):
if favor == 'normal':
return C.GIT_MERGE_FILE_FAVOR_NORMAL
@ -530,13 +532,13 @@ class BaseRepository(_Repository):
theirs._to_c() if theirs is not None else (ffi.NULL, ffi.NULL))
err = C.git_merge_file_from_index(
cmergeresult, self._repo,
cancestor, cours, ctheirs,
ffi.NULL);
cmergeresult, self._repo,
cancestor, cours, ctheirs,
ffi.NULL);
check_error(err)
ret = ffi.string(cmergeresult.ptr,
cmergeresult.len).decode('utf-8')
cmergeresult.len).decode('utf-8')
C.git_merge_file_result_free(cmergeresult)
return ret
@ -735,11 +737,12 @@ class BaseRepository(_Repository):
C.git_buf_free(buf)
finally:
C.git_describe_result_free(result[0])
#
# Stash
#
def stash(self, stasher, message=None, keep_index=False,
include_untracked=False, include_ignored=False):
include_untracked=False, include_ignored=False):
"""Save changes to the working directory to the stash.
:param Signature stasher: The identity of the person doing the stashing.
@ -817,7 +820,6 @@ class BaseRepository(_Repository):
"""
check_error(C.git_stash_drop(self._repo, index))
def stash_pop(self, index=0, **kwargs):
"""Apply a stashed state and remove it from the stash list.
@ -886,11 +888,11 @@ class BaseRepository(_Repository):
info = tarfile.TarInfo(prefix + entry.path)
info.size = len(content)
info.mtime = timestamp
info.uname = info.gname = 'root' # just because git does this
info.uname = info.gname = 'root' # just because git does this
if entry.mode == GIT_FILEMODE_LINK:
info.type = tarfile.SYMTYPE
info.linkname = content.decode("utf-8")
info.mode = 0o777 # symlinks get placeholder
info.mode = 0o777 # symlinks get placeholder
info.size = 0
archive.addfile(info)
else:
@ -996,6 +998,54 @@ class BaseRepository(_Repository):
check_error(err)
class Branches(object):
def __init__(self, repository, flag=GIT_BRANCH_LOCAL | GIT_BRANCH_REMOTE):
self._repository = repository
self._flag = flag
if flag == GIT_BRANCH_LOCAL | GIT_BRANCH_REMOTE:
self.local = Branches(repository, flag=GIT_BRANCH_LOCAL)
self.remote = Branches(repository, flag=GIT_BRANCH_REMOTE)
def __getitem__(self, name):
branch = None
if self._flag & GIT_BRANCH_LOCAL:
branch = self._repository.lookup_branch(name, GIT_BRANCH_LOCAL)
if branch is None and self._flag & GIT_BRANCH_REMOTE:
branch = self._repository.lookup_branch(name, GIT_BRANCH_REMOTE)
if branch is None:
raise KeyError('Branch not found: {}'.format(name))
return branch
def get(self, key):
try:
return self[key]
except KeyError:
return None
def __iter__(self):
for branch_name in self._repository.listall_branches(self._flag):
yield branch_name
def create(self, name, commit, force=False):
return self._repository.create_branch(name, commit, force)
def delete(self, name):
self[name].delete()
def __contains__(self, name):
try:
# If the lookup succeeds, the name is present.
_ = self[name]
return True
except KeyError:
return False
class Repository(BaseRepository):
def __init__(self, path, *args, **kwargs):
if not isinstance(path, six.string_types):
@ -1004,6 +1054,8 @@ class Repository(BaseRepository):
path_backend = init_file_backend(path)
super(Repository, self).__init__(backend=path_backend, *args, **kwargs)
self.branches = Branches(self)
@classmethod
def _from_c(cls, ptr, owned):
cptr = ffi.new('git_repository **')

View File

@ -39,6 +39,135 @@ I18N_LAST_COMMIT = '5470a671a80ac3789f1a6a8cefbcf43ce7af0563'
ORIGIN_MASTER_COMMIT = '784855caf26449a1914d2cf62d12b9374d76ae78'
class BranchesObjectTestCase(utils.RepoTestCase):
def test_lookup_branch_local(self):
branch = self.repo.branches['master']
self.assertEqual(branch.target.hex, LAST_COMMIT)
branch = self.repo.branches.local['i18n']
self.assertEqual(branch.target.hex, I18N_LAST_COMMIT)
self.assertTrue(self.repo.branches.get('not-exists') is None)
self.assertRaises(KeyError, lambda: self.repo.branches['not-exists'])
def test_listall_branches(self):
branches = sorted(self.repo.branches)
self.assertEqual(branches, ['i18n', 'master'])
def test_create_branch(self):
commit = self.repo[LAST_COMMIT]
reference = self.repo.branches.create('version1', commit)
self.assertTrue('version1' in self.repo.branches)
reference = self.repo.branches['version1']
self.assertEqual(reference.target.hex, LAST_COMMIT)
# try to create existing reference
self.assertRaises(ValueError,
lambda: self.repo.branches.create('version1', commit))
# try to create existing reference with force
reference = self.repo.branches.create('version1', commit, True)
self.assertEqual(reference.target.hex, LAST_COMMIT)
def test_delete(self):
self.repo.branches.delete('i18n')
self.assertTrue(self.repo.branches.get('i18n') is None)
def test_cant_delete_master(self):
self.assertRaises(pygit2.GitError, lambda: self.repo.branches.delete('master'))
def test_branch_is_head_returns_true_if_branch_is_head(self):
branch = self.repo.branches.get('master')
self.assertTrue(branch.is_head())
def test_branch_is_head_returns_false_if_branch_is_not_head(self):
branch = self.repo.branches.get('i18n')
self.assertFalse(branch.is_head())
def test_branch_rename_succeeds(self):
new_branch = self.repo.branches['i18n'].rename('new-branch')
self.assertEqual(new_branch.target.hex, I18N_LAST_COMMIT)
new_branch_2 = self.repo.branches.get('new-branch')
self.assertEqual(new_branch_2.target.hex, I18N_LAST_COMMIT)
def test_branch_rename_fails_if_destination_already_exists(self):
original_branch = self.repo.branches.get('i18n')
self.assertRaises(ValueError, lambda: original_branch.rename('master'))
def test_branch_rename_not_fails_if_force_is_true(self):
original_branch = self.repo.branches.get('master')
new_branch = original_branch.rename('i18n', True)
self.assertEqual(new_branch.target.hex, LAST_COMMIT)
def test_branch_rename_fails_with_invalid_names(self):
original_branch = self.repo.branches.get('i18n')
self.assertRaises(ValueError,
lambda: original_branch.rename('abc@{123'))
def test_branch_name(self):
branch = self.repo.branches.get('master')
self.assertEqual(branch.branch_name, 'master')
self.assertEqual(branch.name, 'refs/heads/master')
branch = self.repo.branches.get('i18n')
self.assertEqual(branch.branch_name, 'i18n')
self.assertEqual(branch.name, 'refs/heads/i18n')
class BranchesObjectEmptyRepoTestCase(utils.EmptyRepoTestCase):
def setUp(self):
super(utils.EmptyRepoTestCase, self).setUp()
remote = self.repo.remotes[0]
remote.fetch()
def test_lookup_branch_remote(self):
branch = self.repo.branches.remote.get('origin/master')
self.assertEqual(branch.target.hex, ORIGIN_MASTER_COMMIT)
self.assertTrue(
self.repo.branches.remote.get('origin/not-exists') is None)
def test_listall_branches(self):
branches = sorted(self.repo.branches.remote)
self.assertEqual(branches, ['origin/master'])
def test_branch_remote_name(self):
self.repo.remotes[0].fetch()
branch = self.repo.branches.remote['origin/master']
self.assertEqual(branch.remote_name, 'origin')
def test_branch_upstream(self):
self.repo.remotes[0].fetch()
remote_master = self.repo.branches.remote['origin/master']
master = self.repo.branches.create('master',
self.repo[remote_master.target.hex])
self.assertTrue(master.upstream is None)
master.upstream = remote_master
self.assertEqual(master.upstream.branch_name, 'origin/master')
def set_bad_upstream():
master.upstream = 2.5
self.assertRaises(TypeError, set_bad_upstream)
master.upstream = None
self.assertTrue(master.upstream is None)
def test_branch_upstream_name(self):
self.repo.remotes[0].fetch()
remote_master = self.repo.branches.remote['origin/master']
master = self.repo.branches.create('master',
self.repo[remote_master.target.hex])
master.upstream = remote_master
self.assertEqual(master.upstream_name, 'refs/remotes/origin/master')
class BranchesTestCase(utils.RepoTestCase):
def test_lookup_branch_local(self):
branch = self.repo.lookup_branch('master')
@ -159,6 +288,7 @@ class BranchesEmptyRepoTestCase(utils.EmptyRepoTestCase):
def set_bad_upstream():
master.upstream = 2.5
self.assertRaises(TypeError, set_bad_upstream)
master.upstream = None