Source code for ciowarehouse.lib.vcs_git

"""Class for warehouses with Git as VCS."""

from __future__ import annotations
from os import makedirs, remove
from os.path import abspath, join, isabs, normpath, exists, relpath, isdir
from os.path import dirname, getmtime
from shutil import rmtree, copy
from collections import namedtuple
from time import time
from re import compile as re_compile

from git import Repo
from git.exc import GitCommandError

from chrysalio.lib.utils import copy_content, full_url
from .utils import INFOS_DIR, LOCAL_DIR, IGNORED_PREFIX, IGNORED_SUFFIX
from .utils import file_check_move, thumbnail_remove, thumbnail_move
from .vcs_none import VcsNone
from .i18n import _

GIT_LOCK_FILE = 'index.lock'
GIT_LOCK_TTL = 3600
GIT_IGNORED_PATTERN = \
    f'\\{IGNORED_PREFIX}[^\\{IGNORED_SUFFIX}]+\\{IGNORED_SUFFIX}'


# =============================================================================
[docs] class VcsGit(): """Class to manage warehouses with Git as VCS. :param str root: Absolute path to the warehouse directory. :param str url: (optional) URL of the remote repository. :param str user_id: (optional) User ID for clone/pull access. :param str password: (optional) Clear password for clone/pull access. :param int lock_ttl: (default=3600) Clear password for clone/pull access. """ engine = 'git' # ------------------------------------------------------------------------- def __init__( self, root: str, url: str | None = None, user_id: str | None = None, password: str | None = None, lock_ttl: int = GIT_LOCK_TTL): """Constructor method.""" self.root = root self._url = url self._user = user_id self._password = password self._lock_ttl = lock_ttl if self._url is not None and not self._url.startswith('http'): self._url = abspath(self._url) self._ignore = re_compile(GIT_IGNORED_PATTERN) # -------------------------------------------------------------------------
[docs] def init(self): """Initialize a local Git repository. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ try: Repo.init(self.root) except (IOError, OSError) as error: return str(error) except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip() self._exclude_files() return None
# -------------------------------------------------------------------------
[docs] def is_dirty(self) -> bool: """Return ``True`` if the repository data has been modified. :rtype: bool """ try: repo = Repo.init(self.root) return repo.is_dirty() or bool(repo.untracked_files) except (IOError, OSError, GitCommandError): return False
# -------------------------------------------------------------------------
[docs] def clone(self) -> str | None: """Create a copy of an existing repository. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ if self._url is None: return self.init() fullurl = full_url(self._url, self._user, self._password) try: repo = Repo.clone_from(fullurl, self.root) except (IOError, OSError) as error: return str(error).replace(fullurl, self._url) except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip().replace( fullurl, self._url) repo.remotes.origin.set_url(self._url) self._exclude_files() return None
# -------------------------------------------------------------------------
[docs] def pull(self) -> str | None: """Pull from a remote repository or do nothing. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ self._fix_lock() if self._url is None: return None fullurl = full_url(self._url, self._user, self._password) try: repo = Repo.init(self.root) repo.remotes.origin.set_url(fullurl) repo.remotes.origin.pull() except (IOError, OSError) as error: return str(error).replace(fullurl, self._url) except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip().replace( fullurl, self._url) finally: repo.remotes.origin.set_url(self._url) return None
# -------------------------------------------------------------------------
[docs] def add(self, path: str | None = None) -> str | None: """Add new files in path ``path``. :param str path: (optional) Relative or absolute path to add. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ if path is None: path = self.root if not isabs(path): path = normpath(join(self.root, path)) if not path.startswith(self.root) or not exists(path): return _('git add: incorrect path') path = relpath(path, self.root) if path == '.git': return _('git add: you cannot add .git') if self._ignore.search(path) is not None: return None repo = Repo.init(self.root) try: if path == '.': repo.git.add('--all') else: repo.index.add((path, )) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() return None
# -------------------------------------------------------------------------
[docs] def remove(self, directory: str, filename: str) -> str | None: """Remove a file or a directory in path ``directory``. :param str directory: Relative or absolute path to the directory containing the file. :param str filename: Name of the file to remove. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ # pylint: disable = too-many-return-statements, too-many-branches if isabs(directory): directory = relpath(directory, self.root) abs_file = normpath(join(self.root, directory, filename)) if not abs_file.startswith(self.root) or abs_file == self.root: return _('git remove: incorrect path') # Remove information file repo = Repo.init(self.root) abs_path = join(self.root, INFOS_DIR, directory, f'{filename}.xml') is_dir = isdir(abs_file) if exists(abs_path): try: repo.index.remove(( # yapf: disable normpath(join(INFOS_DIR, directory, f'{filename}.xml')), ), working_tree=True) except (IOError, OSError) as error: return str(error) except GitCommandError: return VcsNone(self.root).remove(directory, filename) abs_path = join(self.root, INFOS_DIR, directory, filename) if is_dir and isdir(abs_path): try: repo.index.remove( (normpath(join(INFOS_DIR, directory, filename)), ), working_tree=True, r=True) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() # Remove file if exists(abs_file): try: repo.index.remove( (normpath(join(directory, filename)), ), working_tree=True, r=True) except (IOError, OSError) as error: return str(error) except GitCommandError: return VcsNone(self.root).remove(directory, filename) # Remove thumbnails thumbnail_remove(self.root, directory, filename) return None
# -------------------------------------------------------------------------
[docs] def move( self, file1: tuple[str, str], file2: tuple[str, str], copy_only: bool = False) -> str | None: """Move or copy a file. :param tuple file1: A tuple such as ``(old_directory, old_name)``. :param tuple file2: A tuple such as ``(new_directory, new_name)``. :param bool copy_only: (default=False) If ``True``, copy instead of moving the file. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ # pylint: disable = too-many-return-statements, too-many-branches # Copy if copy_only: return self.copy(file1, file2) # Prepare files abs_path1, abs_path2, error = file_check_move(self, file1, self, file2) if abs_path1 is None: return error # Rename file repo = Repo.init(self.root) is_dir = isdir(abs_path1) try: repo.index.move(( # yapf: disable normpath(join(file1[0], file1[1])), normpath(join(file2[0], file2[1])))) except (IOError, OSError, GitCommandError) as error: if is_dir: makedirs(abs_path2, exist_ok=True) rmtree(abs_path1) elif hasattr(error, 'strerr'): return ' '.join( error.stderr.split('\n')).strip() # type: ignore elif 'not under version control' in str(error): return VcsNone(self.root).move(file1, file2, copy_only) else: return str(error) # Rename information file abs_path1 = join( self.root, INFOS_DIR, file1[0], '{0}.xml'.format(file1[1])) if exists(abs_path1): abs_path2 = join( self.root, INFOS_DIR, file2[0], '{0}.xml'.format(file2[1])) makedirs(dirname(abs_path2), exist_ok=True) try: repo.index.move(( # yapf: disable normpath(join(INFOS_DIR, file1[0], f'{file1[1]}.xml')), normpath(join(INFOS_DIR, file2[0], f'{file2[1]}.xml')))) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() abs_path1 = join(self.root, INFOS_DIR, file1[0], file1[1]) if is_dir and isdir(abs_path1): abs_path2 = join(self.root, INFOS_DIR, file2[0], file2[1]) if exists(abs_path2): rmtree(abs_path2) try: repo.index.move( ( normpath(join(INFOS_DIR, file1[0], file1[1])), normpath(join(INFOS_DIR, file2[0], file2[1])))) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() # Rename thumbnails thumbnail_move(self.root, file1, self.root, file2, copy_only) return None
# -------------------------------------------------------------------------
[docs] def copy(self, file1, file2): """Copy a file. :param tuple file1: A tuple such as ``(old_directory, old_name)``. :param tuple file2: A tuple such as ``(new_directory, new_name)``. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ # Prepare files abs_path1, abs_path2, error = file_check_move(self, file1, self, file2) if abs_path1 is None: return error # Copy file makedirs(dirname(abs_path2), exist_ok=True) is_dir = isdir(abs_path1) if is_dir: copy_content(abs_path1, abs_path2) else: copy(abs_path1, abs_path2) self.add(abs_path2) # Copy information file abs_path1 = join( self.root, INFOS_DIR, file1[0], '{0}.xml'.format(file1[1])) if exists(abs_path1): abs_path2 = join( self.root, INFOS_DIR, file2[0], '{0}.xml'.format(file2[1])) makedirs(dirname(abs_path2), exist_ok=True) copy(abs_path1, abs_path2) self.add(abs_path2) abs_path1 = join(self.root, INFOS_DIR, file1[0], file1[1]) if is_dir and isdir(abs_path1): abs_path2 = join(self.root, INFOS_DIR, file2[0], file2[1]) if exists(abs_path2): rmtree(abs_path2) copy_content(abs_path1, abs_path2) self.add(abs_path2) # Copy thumbnails thumbnail_move(self.root, file1, self.root, file2, True) return None
# -------------------------------------------------------------------------
[docs] def commit(self, message, name, email=''): """Commit changes. :param str message: Message for mommit. :param str name: Name of the author. :param str email: (optional) Emial of the author. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ if not message: return _('git commit: empty message') author = namedtuple('author', 'name email')(name=name, email=email) try: repo = Repo.init(self.root) repo.index.commit(message, author=author) except (IOError, OSError, UnicodeEncodeError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() return None
# -------------------------------------------------------------------------
[docs] def log(self, directory, filename, limit=20): """Show revision history. :param str directory: Relative or absolute path to the directory containing the file. :param str filename: Name of the file to log. :param init limit: (default=20) Maximum number of commits. :rtype: list :return: A list of tuples such as ``(commit_id, datetime, commiter_name, message)``. """ if isabs(directory): directory = relpath(directory, self.root) repo = Repo.init(self.root) log = [] for commit in repo.iter_commits('--all', max_count=limit, paths=join(directory, filename)): log.append( ( commit.hexsha, commit.committed_datetime.replace(tzinfo=None), commit.author.name, commit.message)) return log
# ------------------------------------------------------------------------- def _fix_lock(self): """Remove long locks. :rtype: bool """ lock_file = join(self.root, '.git', GIT_LOCK_FILE) if exists(lock_file) and getmtime(lock_file) + self._lock_ttl < time(): try: remove(lock_file) except OSError: # pragma: nocover pass return True return False # ------------------------------------------------------------------------- def _exclude_files(self): """Locally exclude `.thumbnails` and `.index` directories.""" exclude_file = join(self.root, '.git', 'info', 'exclude') with open(exclude_file, 'r', encoding='utf8') as hdl: content = hdl.read() modified = False if LOCAL_DIR not in content: content += f'\n{IGNORED_PREFIX}*{IGNORED_SUFFIX}\n'\ f'{LOCAL_DIR}' modified = True if modified: with open(exclude_file, 'w', encoding='utf8') as hdl: hdl.write(content)