Source code for versuchung.files

# This file is part of versuchung.
#
# versuchung is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# versuchung is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# versuchung.  If not, see <http://www.gnu.org/licenses/>.


from versuchung.types import InputParameter, OutputParameter, Type
import versuchung.archives
try:
    from cStringIO import StringIO
except ImportError:
    from io import StringIO
import shutil
import csv
import os, stat
import hashlib
import fnmatch

class FilesystemObject(InputParameter, OutputParameter, Type):
    def __init__(self, default_name=""):
        InputParameter.__init__(self)
        OutputParameter.__init__(self)
        Type.__init__(self)
        self.__object_name = default_name
        self.__enclosing_directory = os.path.abspath(os.curdir)
        self.__force_enclosing_directory = False

    def inp_setup_cmdline_parser(self, parser):
        self.inp_parser_add(parser, None, self.__object_name)

    def inp_extract_cmdline_parser(self, opts, args):
        self.__object_name = self.inp_parser_extract(opts, None)

    def inp_metadata(self):
        return {self.name: self.__object_name}

    @property
    def path(self):
        """:return: string -- path to the file/directory"""
        if not self.__force_enclosing_directory:
            if self.parameter_type == "input":
                if self.static_experiment == self.dynamic_experiment:
                    self.__enclosing_directory = self.dynamic_experiment.startup_directory
                else:
                    self.__enclosing_directory = self.static_experiment.base_directory
            elif self.parameter_type == "output":
                assert self.static_experiment == self.dynamic_experiment
                self.__enclosing_directory = self.dynamic_experiment.base_directory
            elif self.static_experiment is not None:
                self.__enclosing_directory = self.static_experiment.base_directory
            else:
                self.__enclosing_directory = os.path.abspath(os.curdir)

        if os.path.isabs(self.__object_name):
            return self.__object_name

        return os.path.join(self.__enclosing_directory, self.__object_name)

    @property
    def basename(self):
        return os.path.basename(self.path)

    @property
    def dirname(self):
        return os.path.dirname(self.path)

    def set_path(self, base_directory, object_name):
        assert base_directory[0] == "/"
        self.__force_enclosing_directory = True
        self.__enclosing_directory = base_directory
        self.__object_name         = object_name

[docs]class File(FilesystemObject): """Can be used as: **input parameter** and **output parameter** The File type represents the content of a single file. Its contents can be read and written most easily with the :attr:`value` property. Alternatively, the method :meth:`write` appends new content if the parameter `append` is set to `True`. NB: The content of the file is flushed only after the experiment finishes. Use :meth:`flush` to force writing the buffered data to disk before the experiment finishes. """ def __init__(self, default_filename="", binary=False): FilesystemObject.__init__(self, default_filename) self.__value = None self.__binary = binary if binary: self.__binary_mode = "b" else: self.__binary_mode = "" @property def value(self): """This attribute can be read and written and represent the content of the specified file""" if not self.__value: try: with open(self.original_path, "r" + self.__binary_mode) as fd: self.__value = self.after_read(fd.read()) except IOError: # File couldn't be read self.__value = self.after_read("") return self.__value @value.setter def value(self, value): self.__value = value @property def original_path(self): return File.path.fget(self)
[docs] def write(self, content, append = False): """Similar to the :attr:`value` property. If the parameter `append` is `False`, then the property :attr:`value` is reset (i.e., overwritten), otherwise the content is appendend""" if append: self.value += content else: self.value = content
def after_experiment_run(self, parameter_type): FilesystemObject.after_experiment_run(self, parameter_type) assert parameter_type in ["input", "output"] if parameter_type == "output": self.flush()
[docs] def flush(self): """Flush the cached content of the file to disk""" if self.__value == None: return with open(self.original_path, "w" + self.__binary_mode + "+") as fd: v = self.before_write(self.value) if v is None: v = "" fd.write(v)
[docs] def copy_contents(self, filename): """Read the given file and replace the current .value with the files content. Flushes automatically afterwards.""" with open(filename) as fd: self.value = self.after_read(fd.read()) self.flush()
[docs] def make_executable(self): """makes a file exectuable (chmod +x $file)""" st = os.stat(self.original_path) os.chmod(self.original_path, st.st_mode | stat.S_IEXEC)
def after_read(self, value): """To provide filtering of file contents in subclasses, overrwrite this method. It is gets the file content as a string and returns the value()""" return value def before_write(self, value): """To provide filtering of file contents in subclasses, overrwrite this method. This method gets the value() and returns a string, when the file is written to disk""" return value
[docs]class Executable(File): """Can be used as: **input parameter** An executable is a :class:`versuchung.files.File` that only references an executable. It checksums the executable and puts the checksum into the metadata. The executable is never changed. """ def __init__(self, default_filename): File.__init__(self, default_filename) @property def value(self): raise NotImplemented @value.setter def value(self, value): raise NotImplementedError def write(self, content, append = False): raise NotImplementedError def after_experiment_run(self, parameter_type): pass def flush(self): raise NotImplementedError def copy_contents(self, filename): raise NotImplementedError def make_executable(self): raise NotImplementedError def inp_metadata(self): return {self.name + "-md5": hashlib.md5(open(self.path, "rb").read()).hexdigest()}
[docs] def execute(self, cmdline, *args): """Does start the executable with meth:`versuchung.execute.shell` and args, which is of type list, as arguments.""" from versuchung.execute import shell return shell(self.path + " " + cmdline, *args)
class Directory_op_with: def __init__(self): self.__olddir = [] def __enter__(self): self.__olddir.append(os.path.abspath(os.curdir)) os.chdir(self.path) return self.path def __exit__(self, *excinfo): path = self.__olddir[-1] del self.__olddir[-1] os.chdir(path)
[docs]class Directory(FilesystemObject, Directory_op_with): """Can be used as: **input parameter** and **output parameter** Represents the contents of directory. The filename_filter is a glob/fnmatch expression to filter the directories content and to ensure that no file is generated that does not fit this pattern. An useful example of this is an output Directory that matches only *.log files and is directly located in the result directory: outputs = { "logs": Directory(".", filename_filter="*.log") } It can also be used with the **with**-keyword to change the current working directory temporarily to this directory:: with directory as dir: # Do something with adjusted current working directory print os.curdir """ def __init__(self, default_filename="", filename_filter="*"): FilesystemObject.__init__(self, default_filename) Directory_op_with.__init__(self) self.filename_filter = filename_filter self.__value = None self.__new_files = [] def __ensure_dir_exists(self): if not os.path.exists(self.path): os.mkdir(self.path) @property def value(self): """:return: list -- directories and files in given directory""" if not self.__value: self.__value = os.listdir(self.path) self.__value = [x for x in self.__value if fnmatch.fnmatch(x, self.filename_filter)] return self.__value def __iter__(self): for name in self.value: p = os.path.join(self.path, name) if name in self.subobjects: yield self.subobjects[name] continue if os.path.isdir(p): d = Directory(name) d.set_path(self.path, p) self.subobjects[name] = d yield d else: if p.endswith(".gz"): f = versuchung.archives.GzipFile(name) else: f = File(name) f.set_path(self.path, p) self.subobjects[name] = f yield f def before_experiment_run(self, parameter_type): FilesystemObject.before_experiment_run(self, parameter_type) if parameter_type == "output": self.__ensure_dir_exists()
[docs] def new_file(self, name, compressed=False): """Generate a new :class:`~versuchung.files.File` in the directory. It will be flushed automatically if the experiment is over.""" if not fnmatch.fnmatch(name, self.filename_filter): raise RuntimeError("Filename {} does not match filter {}".\ format(name, self.filename_filter)) self.__ensure_dir_exists() if compressed: f = versuchung.archives.GzipFile(name) else: f = File(name) f.set_path(self.path, name) f.value = "" self.subobjects[name] = f return f
[docs] def new_directory(self, name): """Generate a new :class:`~versuchung.files.Directory` in the directory. The directory <name> must not be present before""" if not fnmatch.fnmatch(name, self.filename_filter): raise RuntimeError("Filename {} does not match filter {}".\ format(name, self.filename_filter)) self.__ensure_dir_exists() f = Directory(name) f.set_path(self.path, name) os.mkdir(f.path) self.subobjects[name] = f return f
[docs] def mirror_directory(self, path, include_closure = None): """Copies the contents of the given directory to this directory. The include closure is a function, which checks for every (absolute) path in the origin directory, if it is mirrored. If it is None, all files are included.""" self.__ensure_dir_exists() if not include_closure: include_closure = lambda arg: True if not os.path.exists(path) and os.path.isdir(path): raise RuntimeError("Argument is no directory") path = os.path.abspath(path) for root, dirs, files in os.walk(path): root = root[len(path)+1:] for d in dirs: src = os.path.join(path, root, d) if not include_closure(src): continue dst = os.path.join(self.path, root, d) if not os.path.isdir(dst): os.mkdir(dst) for f in files: src = os.path.join(path, root, f) if not include_closure(src): continue dst = os.path.join(self.path, root, f) shutil.copyfile(src,dst) self.__value = None
[docs]class CSV_File(File): """Can be used as: **input parameter** and **output parameter** It is a normal :class:`~versuchung.files.File` but the content of the file is interpreted as a csv file. It is parsed before the value is exposed to the user. And formatted before the content is written to disk. Internally the :mod:`csv` is used, so all arguments to ``csv.reader`` and ``csv.writer`` can be given in *csv_args*.""" value = File.value """Other than a normal CSV_File the value of a CSV_File is a list of lists, which represents the structure of the csv file. This value can be manipulated by the user. >>> CSV_File("csv_output").value [["1", "2", "3"]]""" def __init__(self, default_filename = "", **csv_args): File.__init__(self, default_filename) self.csv_args = csv_args def after_read(self, value): fd = StringIO(value) reader = csv.reader(fd, self.csv_args) return list(reader) def before_write(self, value): fd = StringIO() writer = csv.writer(fd, self.csv_args) writer.writerows(value) return fd.getvalue() def write(self): raise NotImplemented
[docs] def append(self, row): """Append a row to the csv file It is just a shorthand for >>> csv_file.value.append([1,2,3]) :param row: row to append :type row: list.""" if type(row) != list: raise TypeError("list of values required") self.value.append(row)