"""Class for warehouses with Git as VCS."""
from __future__ import annotations
from os import makedirs, remove
from os.path import abspath, join, isabs, normpath, exists, relpath, isdir
from os.path import dirname, getmtime
from shutil import rmtree, copy
from collections import namedtuple
from time import time
from re import compile as re_compile
from git import Repo
from git.exc import GitCommandError
from chrysalio.lib.utils import copy_content, full_url
from .utils import INFOS_DIR, LOCAL_DIR, IGNORED_PREFIX, IGNORED_SUFFIX
from .utils import file_check_move, thumbnail_remove, thumbnail_move
from .vcs_none import VcsNone
from .i18n import _
GIT_LOCK_FILE = 'index.lock'
GIT_LOCK_TTL = 3600
GIT_IGNORED_PATTERN = \
f'\\{IGNORED_PREFIX}[^\\{IGNORED_SUFFIX}]+\\{IGNORED_SUFFIX}'
# =============================================================================
[docs]
class VcsGit():
"""Class to manage warehouses with Git as VCS.
:param str root:
Absolute path to the warehouse directory.
:param str url: (optional)
URL of the remote repository.
:param str user_id: (optional)
User ID for clone/pull access.
:param str password: (optional)
Clear password for clone/pull access.
:param int lock_ttl: (default=3600)
Clear password for clone/pull access.
"""
engine = 'git'
# -------------------------------------------------------------------------
def __init__(
self,
root: str,
url: str | None = None,
user_id: str | None = None,
password: str | None = None,
lock_ttl: int = GIT_LOCK_TTL):
"""Constructor method."""
self.root = root
self._url = url
self._user = user_id
self._password = password
self._lock_ttl = lock_ttl
if self._url is not None and not self._url.startswith('http'):
self._url = abspath(self._url)
self._ignore = re_compile(GIT_IGNORED_PATTERN)
# -------------------------------------------------------------------------
[docs]
def init(self):
"""Initialize a local Git repository.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
try:
Repo.init(self.root)
except (IOError, OSError) as error:
return str(error)
except GitCommandError as error:
return ' '.join(error.stderr.split('\n')).strip()
self._exclude_files()
return None
# -------------------------------------------------------------------------
[docs]
def is_dirty(self) -> bool:
"""Return ``True`` if the repository data has been modified.
:rtype: bool
"""
try:
repo = Repo.init(self.root)
return repo.is_dirty() or bool(repo.untracked_files)
except (IOError, OSError, GitCommandError):
return False
# -------------------------------------------------------------------------
[docs]
def clone(self) -> str | None:
"""Create a copy of an existing repository.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
if self._url is None:
return self.init()
fullurl = full_url(self._url, self._user, self._password)
try:
repo = Repo.clone_from(fullurl, self.root)
except (IOError, OSError) as error:
return str(error).replace(fullurl, self._url)
except GitCommandError as error:
return ' '.join(error.stderr.split('\n')).strip().replace(
fullurl, self._url)
repo.remotes.origin.set_url(self._url)
self._exclude_files()
return None
# -------------------------------------------------------------------------
[docs]
def pull(self) -> str | None:
"""Pull from a remote repository or do nothing.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
self._fix_lock()
if self._url is None:
return None
fullurl = full_url(self._url, self._user, self._password)
try:
repo = Repo.init(self.root)
repo.remotes.origin.set_url(fullurl)
repo.remotes.origin.pull()
except (IOError, OSError) as error:
return str(error).replace(fullurl, self._url)
except GitCommandError as error:
return ' '.join(error.stderr.split('\n')).strip().replace(
fullurl, self._url)
finally:
repo.remotes.origin.set_url(self._url)
return None
# -------------------------------------------------------------------------
[docs]
def add(self, path: str | None = None) -> str | None:
"""Add new files in path ``path``.
:param str path: (optional)
Relative or absolute path to add.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
if path is None:
path = self.root
if not isabs(path):
path = normpath(join(self.root, path))
if not path.startswith(self.root) or not exists(path):
return _('git add: incorrect path')
path = relpath(path, self.root)
if path == '.git':
return _('git add: you cannot add .git')
if self._ignore.search(path) is not None:
return None
repo = Repo.init(self.root)
try:
if path == '.':
repo.git.add('--all')
else:
repo.index.add((path, ))
except (IOError, OSError) as error: # pragma: nocover
return str(error)
except GitCommandError as error: # pragma: nocover
return ' '.join(error.stderr.split('\n')).strip()
return None
# -------------------------------------------------------------------------
[docs]
def remove(self, directory: str, filename: str) -> str | None:
"""Remove a file or a directory in path ``directory``.
:param str directory:
Relative or absolute path to the directory containing the file.
:param str filename:
Name of the file to remove.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
# pylint: disable = too-many-return-statements, too-many-branches
if isabs(directory):
directory = relpath(directory, self.root)
abs_file = normpath(join(self.root, directory, filename))
if not abs_file.startswith(self.root) or abs_file == self.root:
return _('git remove: incorrect path')
# Remove information file
repo = Repo.init(self.root)
abs_path = join(self.root, INFOS_DIR, directory, f'{filename}.xml')
is_dir = isdir(abs_file)
if exists(abs_path):
try:
repo.index.remove(( # yapf: disable
normpath(join(INFOS_DIR, directory, f'{filename}.xml')), ),
working_tree=True)
except (IOError, OSError) as error:
return str(error)
except GitCommandError:
return VcsNone(self.root).remove(directory, filename)
abs_path = join(self.root, INFOS_DIR, directory, filename)
if is_dir and isdir(abs_path):
try:
repo.index.remove(
(normpath(join(INFOS_DIR, directory, filename)), ),
working_tree=True,
r=True)
except (IOError, OSError) as error: # pragma: nocover
return str(error)
except GitCommandError as error: # pragma: nocover
return ' '.join(error.stderr.split('\n')).strip()
# Remove file
if exists(abs_file):
try:
repo.index.remove(
(normpath(join(directory, filename)), ),
working_tree=True,
r=True)
except (IOError, OSError) as error:
return str(error)
except GitCommandError:
return VcsNone(self.root).remove(directory, filename)
# Remove thumbnails
thumbnail_remove(self.root, directory, filename)
return None
# -------------------------------------------------------------------------
[docs]
def move(
self,
file1: tuple[str, str],
file2: tuple[str, str],
copy_only: bool = False) -> str | None:
"""Move or copy a file.
:param tuple file1:
A tuple such as ``(old_directory, old_name)``.
:param tuple file2:
A tuple such as ``(new_directory, new_name)``.
:param bool copy_only: (default=False)
If ``True``, copy instead of moving the file.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
# pylint: disable = too-many-return-statements, too-many-branches
# Copy
if copy_only:
return self.copy(file1, file2)
# Prepare files
abs_path1, abs_path2, error = file_check_move(self, file1, self, file2)
if abs_path1 is None:
return error
# Rename file
repo = Repo.init(self.root)
is_dir = isdir(abs_path1)
try:
repo.index.move(( # yapf: disable
normpath(join(file1[0], file1[1])),
normpath(join(file2[0], file2[1]))))
except (IOError, OSError, GitCommandError) as error:
if is_dir:
makedirs(abs_path2, exist_ok=True)
rmtree(abs_path1)
elif hasattr(error, 'strerr'):
return ' '.join(
error.stderr.split('\n')).strip() # type: ignore
elif 'not under version control' in str(error):
return VcsNone(self.root).move(file1, file2, copy_only)
else:
return str(error)
# Rename information file
abs_path1 = join(
self.root, INFOS_DIR, file1[0], '{0}.xml'.format(file1[1]))
if exists(abs_path1):
abs_path2 = join(
self.root, INFOS_DIR, file2[0], '{0}.xml'.format(file2[1]))
makedirs(dirname(abs_path2), exist_ok=True)
try:
repo.index.move(( # yapf: disable
normpath(join(INFOS_DIR, file1[0], f'{file1[1]}.xml')),
normpath(join(INFOS_DIR, file2[0], f'{file2[1]}.xml'))))
except (IOError, OSError) as error: # pragma: nocover
return str(error)
except GitCommandError as error: # pragma: nocover
return ' '.join(error.stderr.split('\n')).strip()
abs_path1 = join(self.root, INFOS_DIR, file1[0], file1[1])
if is_dir and isdir(abs_path1):
abs_path2 = join(self.root, INFOS_DIR, file2[0], file2[1])
if exists(abs_path2):
rmtree(abs_path2)
try:
repo.index.move(
(
normpath(join(INFOS_DIR, file1[0], file1[1])),
normpath(join(INFOS_DIR, file2[0], file2[1]))))
except (IOError, OSError) as error: # pragma: nocover
return str(error)
except GitCommandError as error: # pragma: nocover
return ' '.join(error.stderr.split('\n')).strip()
# Rename thumbnails
thumbnail_move(self.root, file1, self.root, file2, copy_only)
return None
# -------------------------------------------------------------------------
[docs]
def copy(self, file1, file2):
"""Copy a file.
:param tuple file1:
A tuple such as ``(old_directory, old_name)``.
:param tuple file2:
A tuple such as ``(new_directory, new_name)``.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
# Prepare files
abs_path1, abs_path2, error = file_check_move(self, file1, self, file2)
if abs_path1 is None:
return error
# Copy file
makedirs(dirname(abs_path2), exist_ok=True)
is_dir = isdir(abs_path1)
if is_dir:
copy_content(abs_path1, abs_path2)
else:
copy(abs_path1, abs_path2)
self.add(abs_path2)
# Copy information file
abs_path1 = join(
self.root, INFOS_DIR, file1[0], '{0}.xml'.format(file1[1]))
if exists(abs_path1):
abs_path2 = join(
self.root, INFOS_DIR, file2[0], '{0}.xml'.format(file2[1]))
makedirs(dirname(abs_path2), exist_ok=True)
copy(abs_path1, abs_path2)
self.add(abs_path2)
abs_path1 = join(self.root, INFOS_DIR, file1[0], file1[1])
if is_dir and isdir(abs_path1):
abs_path2 = join(self.root, INFOS_DIR, file2[0], file2[1])
if exists(abs_path2):
rmtree(abs_path2)
copy_content(abs_path1, abs_path2)
self.add(abs_path2)
# Copy thumbnails
thumbnail_move(self.root, file1, self.root, file2, True)
return None
# -------------------------------------------------------------------------
[docs]
def commit(self, message, name, email=''):
"""Commit changes.
:param str message:
Message for mommit.
:param str name:
Name of the author.
:param str email: (optional)
Emial of the author.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None`` if it succeeds.
"""
if not message:
return _('git commit: empty message')
author = namedtuple('author', 'name email')(name=name, email=email)
try:
repo = Repo.init(self.root)
repo.index.commit(message, author=author)
except (IOError, OSError,
UnicodeEncodeError) as error: # pragma: nocover
return str(error)
except GitCommandError as error: # pragma: nocover
return ' '.join(error.stderr.split('\n')).strip()
return None
# -------------------------------------------------------------------------
[docs]
def log(self, directory, filename, limit=20):
"""Show revision history.
:param str directory:
Relative or absolute path to the directory containing the file.
:param str filename:
Name of the file to log.
:param init limit: (default=20)
Maximum number of commits.
:rtype: list
:return:
A list of tuples such as
``(commit_id, datetime, commiter_name, message)``.
"""
if isabs(directory):
directory = relpath(directory, self.root)
repo = Repo.init(self.root)
log = []
for commit in repo.iter_commits('--all', max_count=limit,
paths=join(directory, filename)):
log.append(
(
commit.hexsha,
commit.committed_datetime.replace(tzinfo=None),
commit.author.name, commit.message))
return log
# -------------------------------------------------------------------------
def _fix_lock(self):
"""Remove long locks.
:rtype: bool
"""
lock_file = join(self.root, '.git', GIT_LOCK_FILE)
if exists(lock_file) and getmtime(lock_file) + self._lock_ttl < time():
try:
remove(lock_file)
except OSError: # pragma: nocover
pass
return True
return False
# -------------------------------------------------------------------------
def _exclude_files(self):
"""Locally exclude `.thumbnails` and `.index` directories."""
exclude_file = join(self.root, '.git', 'info', 'exclude')
with open(exclude_file, 'r', encoding='utf8') as hdl:
content = hdl.read()
modified = False
if LOCAL_DIR not in content:
content += f'\n{IGNORED_PREFIX}*{IGNORED_SUFFIX}\n'\
f'{LOCAL_DIR}'
modified = True
if modified:
with open(exclude_file, 'w', encoding='utf8') as hdl:
hdl.write(content)