# This file is part of versuchung.
#
# versuchung is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# versuchung is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# versuchung. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
from versuchung.types import Type, InputParameter
from versuchung.files import Directory, Directory_op_with, File
from versuchung.execute import shell
import logging
import os
import sys
import gzip
import re
from subprocess import PIPE
try:
from StringIO import StringIO as BytesIO
except Exception:
from io import BytesIO
[docs]class TarArchive(Type, InputParameter, Directory_op_with):
"""Can be used as: **input parameter**
The archive will be extracted to a temporary directory. It will be
removed after the experiment is over.
``clone_url`` can either be a :class:`string` or any object that
has a ``.path`` attribute (like
e.g. :class:`~versuchung.filesystems.File`). Of course the
referenced file must be a single file.
This parameter can be used as argument to the with keyword, to
change to the temporary directory::
with self.inputs.tar_archive as path:
# Here we have path == os.path.abspath(os.curdir)
# Do something in the extracted copy
print path
"""
def __init__(self, filename = None):
"""The default_filename is either a string to a file. Or a
object with a path attribute (e.g. a :class:`~versuchung.files.File`)"""
Type.__init__(self)
InputParameter.__init__(self)
Directory_op_with.__init__(self)
self.__filename = filename
self.__value = None
def inp_setup_cmdline_parser(self, parser):
self.inp_parser_add(parser, None, self.__filename)
def inp_extract_cmdline_parser(self, opts, args):
self.__filename = self.inp_parser_extract(opts, None)
def before_experiment_run(self, parameter_type):
if parameter_type == "input" and "path" in dir(self.__filename):
self.subobjects["filename"] = self.__filename
Type.before_experiment_run(self, parameter_type)
self.__filename = self.__filename.path
else:
Type.before_experiment_run(self, parameter_type)
self.__filename = os.path.abspath(self.__filename)
def inp_metadata(self):
return {self.name: self.__filename}
def __setup_value(self):
if "path" in dir(self.__filename):
self.subobjects["filename"] = self.__filename
self.__filename = self.__filename.path
self.__filename = os.path.abspath(self.__filename)
fn = self.__filename
extract_mode = ""
if "tar.gz" in fn or "tgz" in fn:
extract_mode = "z"
if "tar.bz2" in fn or "bzip2" in fn:
extract_mode = "j"
if "tar.xz" in fn or "txz" in fn:
extract_mode = "J"
with self.tmp_directory as d:
try:
os.mkdir(self.name)
except OSError:
# ignore errors if the directory should already exist for some reason
pass
with Directory(self.name) as d2:
dirname = os.path.abspath(".")
(out, ret) = shell("tar %sxvf %s", extract_mode, fn)
if ret != 0:
raise RuntimeError("Extracting of %s failed" % fn)
cd = None
for line in out:
if (cd == None or len(line) < len(cd)) and line.endswith("/"):
cd = line
if cd and all([x.startswith(cd) for x in out]):
dirname = cd
return Directory(os.path.abspath(dirname))
@property
def value(self):
"""Return a :class:`versuchung.files.Directory` instance to the extracted
tar archive. If it contains only one directory the instance
will point there. Otherwise it will point to a directory
containing the contents of the archive"""
if not self.__value:
self.__value = self.__setup_value()
return self.__value
@property
def path(self):
"""Return the string to the extract directory (same as .value.path)"""
return self.value.path
[docs]class GitArchive(InputParameter, Type, Directory_op_with):
"""Can be used as: **input parameter**
The git repository given in ``clone_url`` will be cloned to a
temporary directory. It will be removed after the experiment is
over. If ``shallow == True`` Only the files and not the .git is
copied (cloned). This is especially useful for large git
repositories like the Linux kernel tree.
``clone_url`` can either be a :class:`string` or any object that
has a ``.path`` attribute (like e.g. :class:`TarArchive`). Of
course the refenced path must be a directory.
This parameter can be used as argument to the with keyword, to
change to the temporary directory::
with self.inputs.git_archive as path:
# Here we have path == os.path.abspath(os.curdir)
# Do something in the extracted copy
print path
"""
def __init__(self, clone_url = None, ref = "refs/heads/master", shallow=False,
branches=None, tags=None):
"""clone_url: where to the git archive from
This might either be a string or a object with a path attribute
ref: which git reference to checkout
shallow: do a shallow copy (using git-archive).
branches: Also fetch other branches. Use branches=True for all branches and branches=REGEX for a filtered view.
tags: Also fetch other tags. Use tags=True for all tags and tags=REGEX for a filtered view.
The git archive will be cloned to self.name (which is the
key in the input parameters dict)"""
Type.__init__(self)
InputParameter.__init__(self)
Directory_op_with.__init__(self)
self.__clone_url = clone_url
self.__ref = ref
self.__shallow = shallow
self.__value = None
self.__hash = None
# Include branches and tags into the metadata-hash
self.__filter_refs = {"branches": branches, "tags": tags}
if (branches or tags ) and shallow:
raise RuntimeError("Shallow clone and branch/tag checkout is not supported.")
def inp_setup_cmdline_parser(self, parser):
self.inp_parser_add(parser, "clone-url", self.__clone_url)
self.inp_parser_add(parser, "ref", self.__ref)
def inp_extract_cmdline_parser(self, opts, args):
self.__clone_url = self.inp_parser_extract(opts, "clone-url")
self.__ref = self.inp_parser_extract(opts, "ref")
def before_experiment_run(self, parameter_type):
if parameter_type == "input" and "path" in dir(self.__clone_url):
self.subobjects["clone-url"] = self.__clone_url
Type.before_experiment_run(self, parameter_type)
self.__clone_url = self.__clone_url.path
else:
Type.before_experiment_run(self, parameter_type)
def __references(self, prefix_filter=None, regex_filter=None):
(lines, ret) = shell("git ls-remote %s 'refs/*'", self.__clone_url, stderr=sys.stderr)
if ret != 0 or lines == 0:
print("\n".join(lines))
raise RuntimeError("Could not list references in repository")
ret = {}
for line in lines:
commit_hash, refname = line.strip().split("\t", 1)
# Apply regex and prefix filter
if prefix_filter:
if not refname.startswith(prefix_filter):
continue
# I have no idea where these names come from
if refname.startswith("refs/tags/") and refname.endswith("^{}"):
continue
# Shorten the Prefix
refname = refname[len(prefix_filter):]
if regex_filter and not re.match(regex_filter, refname):
continue
ret[refname] = commit_hash
return ret
[docs] def references(self, regex_filter=None):
"""Inspect reference list of the repository. This reference list
includes all branches, tags, and whatsoever. All reference names are
fully qualified (refs/tags/*, refs/heads*).
If the ``regex_filter`` is given, only reference names that
match the regex are returned.
@returns a dictionary that maps references to commit hashes
"""
return self.__references(regex_filter=regex_filter)
[docs] def branches(self):
"""Like references, but returns a list of branches/heads in the
repository. The keys are transformed (refs/heads/$X -> $X).
The GitArchive(branches=...) filter applies!
@returns dictionary of tag name to commit hashes
"""
regex_filter = None
arg = self.__filter_refs["branches"]
if arg and arg is not True:
regex_filter = arg
return self.__references(prefix_filter="refs/heads/", regex_filter=regex_filter)
def checkout(self, branch=None, tag=None):
if branch:
visible_branches = self.__metadata.get("branches", {})
if branch not in visible_branches:
raise RuntimeError("Branch `%s' is not visible, please parametrize GitArchive(branches=...) correctly" % branch)
self.__ref = "refs/heads/" + branch
self.__hash = visible_branches[branch]
if tag:
visible_tags = self.__metadata.get("tags", {})
if tag not in visible_tags:
raise RuntimeError("Tag `%s' is not visible, please parametrize GitArchive(tags=...) correctly" % tag)
self.__ref = "refs/tags/" + tag
self.__hash = visible_tags[tag]
if not self.__ref:
raise RuntimeError("GitArchive.checkout() requires branch or tag parameter")
cmd = "cd '%s' && git checkout %s"
(lines, ret) = shell(cmd, self.value.path, self.__ref, stderr=sys.stderr)
if ret != 0:
print("\n".join(lines))
raise RuntimeError("GitArchive.checkout(%s) failed" % self.__ref)
return (self.__ref, self.__hash)
[docs] def checkout_hash(self):
"""Return the hash of the HEAD commit hash as string"""
if not self.__hash:
cmd = "git ls-remote %s %s" % (self.__clone_url,
self.__ref)
(lines, ret) = shell(cmd, stderr=sys.stderr)
if ret != 0 or lines == 0:
print("\n".join(lines))
sys.exit(-1)
self.__hash = lines[0].split("\t")[0]
if self.__hash == "":
self.__hash = self.__ref
return self.__hash
[docs] def checkout_ref(self):
"""Return git ref which was checked out"""
return self.__ref
[docs] def checkout_url(self):
"""Return git url which was checked out"""
return self.__clone_url
def inp_metadata(self):
ret = {"clone-url": str(self.__clone_url),
"ref": self.__ref,
"hash": self.checkout_hash()}
# If the user wants to checkout different branches, these have
# to influence the metadata hash
for ref, getter in [("branches", self.branches),
("tags", self.tags)]:
if self.__filter_refs[ref]:
ret[ref] = getter()
self.__metadata = ret
return {self.name + "-" + k: v for (k,v) in ret.items()}
def __setup_value(self):
if "path" in dir(self.__clone_url):
self.subobjects["clone-url"] = self.__clone_url
self.__clone_url = self.__clone_url.path
logging.info("copying git archive %s", self.__clone_url)
with self.tmp_directory as d:
os.mkdir(self.name)
if self.__shallow:
cmd = "cd '%s' && git archive --format=tar --remote=%s %s | tar x"
args = (self.name,
self.__clone_url,
self.__ref)
else:
cmd = "git clone %s %s"
args = (self.__clone_url, self.name)
(lines, ret) = shell(cmd, *args, stderr=sys.stderr)
if ret != 0:
print("\n".join(lines))
sys.exit(-1)
if not self.__shallow:
cmd = "cd %s && git gc && git fetch %s %s && git checkout FETCH_HEAD"
args = (self.name, self.__clone_url, self.__ref)
(lines, ret) = shell(cmd, *args, stderr=sys.stderr)
if ret != 0:
print("\n".join(lines))
sys.exit(-1)
# Fetch all visible branches and tags
for branch in self.__metadata.get("branches", {}):
cmd = "cd %s && git fetch %s refs/heads/%s && git update-ref refs/heads/%s FETCH_HEAD"
shell(cmd, self.name, self.__clone_url, branch, branch, stderr=sys.stderr)
for tag in self.__metadata.get("tags", {}):
cmd = "cd %s && git fetch %s refs/tags/%s && git update-ref refs/tags/%s FETCH_HEAD"
shell(cmd, self.name, self.__clone_url, tag, tag, stderr=sys.stderr)
return Directory(os.path.abspath(self.name))
@property
def value(self):
"""Return a :class:`versuchung.files.Directory` instance to the cloned git directory"""
if not self.__value:
self.__value = self.__setup_value()
return self.__value
@property
def path(self):
"""Return the string to the extract directory (same as .value.path)"""
return self.value.path
[docs]class GzipFile(File):
def __init__(self, default_filename=""):
File.__init__(self, default_filename, binary=True)
@property
def path(self):
"""Decompress file into the temporary directory and return path to this location"""
assert self.tmp_directory is not None, \
"Can gunzip file only as part of an active experiment"
path = File.path.fget(self)
base = os.path.basename(path.rstrip(".gz"))
filename = os.path.join(self.tmp_directory.path,
self.name + "_" + base)
if not os.path.exists(filename):
shell("gunzip < %s > %s", path, filename)
return filename
[docs] def after_read(self, value):
x = BytesIO(value)
fd = gzip.GzipFile(fileobj=x)
return fd.read().decode()
[docs] def before_write(self, value):
x = BytesIO()
fd = gzip.GzipFile(fileobj=x, mode="w")
fd.write(value.encode())
fd.close()
return x.getvalue()