Source code for renga.models.cwl.command_line_tool

# -*- coding: utf-8 -*-
#
# Copyright 2018 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a ``CommandLineTool`` from the Common Workflow Language."""

import fnmatch
import re
import shlex
from contextlib import contextmanager

import attr

from renga._compat import Path

from ._ascwl import CWLClass, mapped
from .parameter import CommandInputParameter, CommandLineBinding, \
    CommandOutputParameter
from .process import Process
from .types import File


[docs]@attr.s class CommandLineTool(Process, CWLClass): """Represent a command line tool.""" # specialize inputs and outputs with Command{Input,Output}Parameter baseCommand = attr.ib( default='', validator=lambda self, attr, cmd: bool(cmd), ) # str or list(str) -> shutil.split() arguments = attr.ib( default=attr.Factory(list), converter=lambda cmd: list(cmd) if isinstance(cmd, (list, tuple)) else shlex.split(cmd), ) # list(string, Expression, CommandLineBinding) stdin = attr.ib(default=None) stdout = attr.ib(default=None) stderr = attr.ib(default=None) inputs = mapped(CommandInputParameter) outputs = mapped(CommandOutputParameter) successCodes = attr.ib(default=attr.Factory(list)) # list(int) temporaryFailCodes = attr.ib(default=attr.Factory(list)) # list(int) permanentFailCodes = attr.ib(default=attr.Factory(list)) # list(int)
[docs] def get_output_id(self, path): """Return an id of the matching path from default values.""" for output in self.outputs: if output.type in {'stdout', 'stderr'}: stream = getattr(self, output.type) if stream == path: return output.id elif output.type == 'File': glob = output.outputBinding.glob # TODO better support for Expression if glob.startswith('$(inputs.'): input_id = glob[len('$(inputs.'):-1] for input_ in self.inputs: if input_.id == input_id and input_.default == path: return output.id elif fnmatch.fnmatch(path, glob): return output.id
[docs] def to_argv(self, job=None): """Generate arguments for system call.""" argv = [self.baseCommand ] if not isinstance(self.baseCommand, list) else list( self.baseCommand ) args = [(a.position, a) for a in self.arguments] args += [(i.inputBinding.position, i) for i in self.inputs] for p, v in sorted(args): argv.extend(v.to_argv()) return argv
[docs]@attr.s class CommandLineToolFactory(object): """Command Line Tool Factory.""" _RE_SUBCOMMAND = re.compile(r'^[A-Za-z]+(-[A-Za-z]+)?$') command_line = attr.ib( converter=lambda cmd: list(cmd) if isinstance(cmd, (list, tuple)) else shlex.split(cmd), ) directory = attr.ib( default='.', converter=Path, ) stdin = attr.ib(default=None) # null, str, Expression stderr = attr.ib(default=None) # null, str, Expression stdout = attr.ib(default=None) # null, str, Expression baseCommand = attr.ib(init=False) arguments = attr.ib(init=False) inputs = attr.ib(init=False) outputs = attr.ib(init=False) def __attrs_post_init__(self): """Derive basic informations.""" self.baseCommand, detect = self.split_command_and_args() self.arguments = [] self.inputs = [] self.outputs = [] if self.stdin: input_ = next(self.guess_inputs(self.stdin)) assert input_.type == 'File' input_.id = 'input_stdin' self.inputs.append(input_) self.stdin = '$(inputs.{0}.path)'.format(input_.id) for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if stream and self.file_candidate(stream): self.outputs.append( CommandOutputParameter( id='output_{0}'.format(stream_name), type=stream_name, ) ) for input_ in self.guess_inputs(*detect): if isinstance(input_, CommandLineBinding): self.arguments.append(input_) else: self.inputs.append(input_)
[docs] def generate_tool(self): """Return an instance of command line tool.""" return CommandLineTool( stdin=self.stdin, stderr=self.stderr, stdout=self.stdout, baseCommand=self.baseCommand, arguments=self.arguments, inputs=self.inputs, outputs=self.outputs, )
[docs] @contextmanager def watch(self, repo=None, no_output=False): """Watch a Renga repository for changes to detect outputs.""" tool = self.generate_tool() git = repo.git yield tool if git: candidates = set(git.untracked_files) candidates |= {item.a_path for item in git.index.diff(None)} inputs = {input.id: input for input in self.inputs} outputs = list(tool.outputs) paths = [] for output, input, path in self.guess_outputs(candidates): outputs.append(output) paths.append(path) if input is not None: if input.id not in inputs: # pragma: no cover raise RuntimeError('Inconsistent input name.') inputs[input.id] = input if not no_output: for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if stream and stream not in candidates: raise RuntimeError( 'Output file was not created or changed.' ) elif stream: paths.append(stream) if not outputs: raise RuntimeError('No output was detected') tool.inputs = list(inputs.values()) tool.outputs = outputs repo.track_paths_in_storage(*paths)
[docs] @command_line.validator def validate_command_line(self, attribute, value): """Check the command line structure.""" if not value: raise ValueError('Command line can not be empty.')
[docs] @directory.validator def validate_path(self, attribute, value): """Path must exists.""" if not value.exists(): raise ValueError('Directory must exist.')
[docs] def file_candidate(self, candidate): """Return a path instance if it exists in current directory.""" candidate = Path(candidate) if not candidate.is_absolute(): candidate = self.directory / candidate if candidate.exists(): return candidate
[docs] def split_command_and_args(self): """Return tuple with command and args from command line arguments.""" cmd = [self.command_line[0]] args = list(self.command_line[1:]) if len(args) < 2: # only guess subcommand for more arguments return cmd, args while args and re.match(self._RE_SUBCOMMAND, args[0]) \ and not self.file_candidate(args[0]): cmd.append(args.pop(0)) return cmd, args
[docs] def guess_type(self, value): """Return new value and CWL parameter type.""" try: value = int(value) return value, 'int', None except ValueError: pass candidate = self.file_candidate(value) if candidate: try: return File(path=candidate.relative_to(self.directory) ), 'File', None except ValueError: # The candidate points to a file outside the working # directory # TODO suggest that the file should be imported to the repo pass if len(value) > 1 and ',' in value: return value.split(','), 'string[]', ',' return value, 'string', None
[docs] def guess_inputs(self, *arguments): """Yield command input parameters and command line bindings.""" position = 0 prefix = None for index, argument in enumerate(arguments): itemSeparator = None if prefix: if argument.startswith('-'): position += 1 yield CommandLineBinding( position=position, prefix=prefix, ) prefix = None if argument.startswith('--'): if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type(default) # TODO can be output position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=False, ) ) prefix = None else: prefix = argument elif argument.startswith('-'): if len(argument) > 2: if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type(default) else: # possibly a flag with value prefix = argument[0:2] default, type, itemSeparator = self.guess_type( argument[2:] ) position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=not bool(argument[2:]), ) ) prefix = None else: prefix = argument else: default, type, itemSeparator = self.guess_type(argument) # TODO can be output # TODO there might be an array position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, ) ) prefix = None if prefix: position += 1 yield CommandLineBinding( position=position, prefix=prefix, )
[docs] def guess_outputs(self, paths): """Yield detected output and changed command input parameter.""" input_candidates = { str(input.default): input for input in self.inputs if input.type != 'File' } # inputs that need to be changed if an output is detected conflicting_paths = { str(input.default) for input in self.inputs if input.type == 'File' } # names that can not be outputs because they are already inputs streams = { path for path in (getattr(self, name) for name in ('stdout', 'stderr')) if path is not None } # TODO group by a common prefix for position, path in enumerate(paths): candidate = self.file_candidate(path) if candidate is None: raise ValueError('Path "{0}" does not exist.'.format(path)) glob = str(candidate.relative_to(self.directory)) if glob in streams: continue if glob in conflicting_paths: raise ValueError('Output already exists in inputs.') if glob in input_candidates: input = input_candidates[glob] if input.type == 'File': # it means that it is rewriting a file raise NotImplemented() yield ( CommandOutputParameter( id='output_{0}'.format(position), type='File', outputBinding=dict( glob='$(inputs.{0})'.format(input.id), ), ), None, path ) else: yield ( CommandOutputParameter( id='output_{0}'.format(position), type='File', outputBinding=dict(glob=glob, ), ), None, path )