Source code for imagepypelines.core.block_subclasses
# @Email: jmaggio14@gmail.com
# @Website: https://www.imagepypelines.org/
# @License: https://github.com/jmaggio14/imagepypelines/blob/master/LICENSE
# @github: https://github.com/jmaggio14/imagepypelines
#
# Copyright (c) 2018 - 2020 Jeff Maggio, Jai Mehra, Ryan Hartzell
#
import sys
import inspect
from abc import abstractmethod
import copy
from types import FunctionType
from functools import wraps
from .Block import Block
################################################################################
[docs]class PipelineBlock(Block):
"""Block which runs a pipeline internally (used for nesting pipelines
within pipelines)
Attributes:
pipeline(:obj:`Pipeline`): pipeline to process with
fetch(:obj:`tuple` of :obj:`str`): variables to fetch from the pipeline
in the order to retrieve them in
Batch Size:
"each"
"""
def __init__(self, pipeline, fetch):
"""instantiates the PipelineBlock
Args:
pipeline(:obj:`Pipeline`): pipeline to process with
fetch(:obj:`tuple` of :obj:`str`): variables to fetch from the
pipeline in the order to retrieve them in
"""
# check to make sure all the fetch vars are actually in the pipeline
if not all((fet in pipeline.vars) for fet in fetch):
msg = "Invalid Pipeline fetch, must be one of %s" % pipeline.vars.keys()
raise BlockError(msg)
# instantiate block args
self.fetch = fetch
self.pipeline = pipeline
# NOTE: do something here to support arg checking for the pipeline!!!
super().__init__(name=pipeline.name, batch_type="all")
############################################################################
[docs] def process(self, *args):
"""Runs the pipeline and fetches the desired variables"""
# NOTE: we might want to make the pipeline's logger a child of this
# block's logger in here?
processed = self.pipeline.process(*args, fetch=self.fetch)
# turn processed dict into a tuple of fetches
return tuple(processed[fet] for fet in self.fetch)
############################################################################
@property
def args(self):
""":obj:`list` of :obj:`str`: arg names for the pipeline"""
return self.pipeline.args
[docs] def get_default_node_attrs(self):
attrs = super().get_default_node_attrs()
attrs['display_as'] = 'sub_pipeline'
return attrs
################################################################################
[docs]class FuncBlock(Block):
"""Block that will run any function you give it, either unfettered through
the __call__ function, or with optional hardcoded parameters for use in a
pipeline. Typically the FuncBlock is only used in the :obj:`blockify`
decorator method.
Attributes:
func(function): the function to call internally
preset_kwargs(dict): preset keyword arguments, typically used for
arguments that are not data to process
"""
def __new__(cls, func=None, preset_kwargs=None, **block_kwargs):
"""Generates the new function block
Args:
func (function): the function you desire to turn into a block
preset_kwargs (dict): preset keyword arguments, typically used for
arguments that are not data to process
**block_kwargs: keyword arguments for :obj:`Block` instantiation
"""
if preset_kwargs is None:
preset_kwargs = {}
obj = super().__new__(cls)
# copy the method to avoid and any possible edge-case weirdness
func = copy.copy(func)
obj.func = func
return obj
def __init__(self, func, preset_kwargs=None, **block_kwargs):
"""instantiates the function block
Args:
func (function): the function you desire to turn into a block
preset_kwargs (dict): preset keyword arguments, typically used for
arguments that are not data to process
**block_kwargs: keyword arguments for :obj:`Block` instantiation
"""
if preset_kwargs is None:
preset_kwargs = {}
self._arg_spec = inspect.getfullargspec(func)
# we can't allow varargs at all because a block must have a known
# number of inputs
if (self._arg_spec.varargs or self._arg_spec.varkw):
raise TypeError("function cannot accept a variable number of args")
self.preset_kwargs = preset_kwargs
if self._arg_spec.defaults is not None:
tmp_kwargs = {k:v for k,v in zip(self._arg_spec.args[-len(self._arg_spec.defaults):], self._arg_spec.defaults)}
tmp_kwargs.update(self.preset_kwargs)
self.preset_kwargs = tmp_kwargs
super().__init__(self.func.__name__, **block_kwargs)
[docs] def process(self, *args):
return self.func(*args, **self.preset_kwargs)
[docs] def __call__(self, *args, **kwargs):
"""returns the exact output of the user defined function without any
interference or interaction with the class
"""
return self.func(*args,**kwargs)
[docs] def tweak(self, **kwargs):
"""Returns a mutated copy of the block instance with the new kwargs"""
if kwargs:
new_inst = self.deepcopy()
new_inst.preset_kwargs.update(**kwargs)
return new_inst
else:
raise TypeError("kwargs must take only key word arguments which satisfy the process function of the block")
@property
def kwargs(self):
""":obj:`dict` of key word arguments for bound processing function"""
return self.preset_kwargs
@property
def args(self):
""":obj:`list` of :obj:`str`: arguments in the order they are expected"""
# save the argspec in an instance variable if it hasn't been computed
pos_args = ([] if (self._arg_spec.args is None) else self._arg_spec.args).copy()
for idx,preset in enumerate(self.preset_kwargs.keys()):
pos_args.remove(preset)
return pos_args
################################################################################
################################################################################
[docs]class Leaf(Block):
"""a block to act as a leaf node in the Pipeline Graph. Used to complete
final outgoing edges from processing blocks
Attributes:
var_name(str): the name of the variable this leaf represents
"""
def __init__(self,var_name):
"""instantiates the leaf
Args:
var_name(str): the name of the variable this leaf represents
"""
self.var_name = var_name
super().__init__(self.var_name, batch_type="all")
############################################################################
[docs] def process(self,*data):
"""does nothing in a leaf"""
pass
# return data
############################################################################
def _pipeline_process(self,*data, **kwargs):
return data[0]
############################################################################
[docs] def get_default_node_attrs(self):
"""retrieves default node attributes for the Leaf"""
attrs = super().get_default_node_attrs()
attrs['display_as'] = 'leaf'
return attrs
############################################################################
# properties
############################################################################
@property
def args(self):
""":obj:`list` of :obj:`str`: returns a 1-element list for this leaf's input"""
return [self.var_name]
# END