Source code for imagepypelines.core.Block

# @Email: jmaggio14@gmail.com
# @Website: https://www.imagepypelines.org/
# @License: https://github.com/jmaggio14/imagepypelines/blob/master/LICENSE
# @github: https://github.com/jmaggio14/imagepypelines
#
# Copyright (c) 2018 - 2020 Jeff Maggio, Jai Mehra, Ryan Hartzell
from ..Logger import get_logger
from ..Logger import ImagepypelinesLogger
from .constants import NUMPY_TYPES, UUID_ORDER
from .Exceptions import BlockError
from .arg_checking import DEFAULT_SHAPE_FUNCS, HOMOGENUS_CONTAINERS
from .Data import is_container
from .util import Timer

from uuid import uuid4
from abc import ABCMeta, abstractmethod
from itertools import chain
import inspect
import copy
import numpy as np
import os

[docs]class Block(metaclass=ABCMeta): """a contained algorithmic element used to construct pipelines. This class is designed to be inherited from, or used in the form of one of its child classes. Note: you must overload `Block.process()` if you intend to inherit from this class Attributes: uuid(str): hex uuid for this pipeline name(str): user specified name for this pipeline, used to generate the unique id. defaults to the name of your subclass batch_type(str, int): the size of the batch fed into your process function. Will be an integer, "all", or "each" void(bool): Boolean value. By default all blocks return a value or values as output. However, if printing to screen, plotting, or saving data to a file, a block may not have a meaningful output that should be stored in a pipeline's output dictionary. In this case, void should be set to True, so that the output of the block is ignored. The associated var key in the pipeline output will contain a value of :obj:`None`. logger(:obj:`ImagepypelinesLogger`): Logger object for this block. When run in a pipeline this logger is temporaily replaced with a child of the Pipeline's logger tags(:obj:`set`): tags to describe this block. unused as of March 2020 _arg_spec(:obj:`namedtuple`,None): a named tuple describing the arguments for this block's process function. Only defined if the property `block.args` is accessed. skip_enforcement(bool): whether or not to enforce type and shape checking types(:obj:`dict`): Dictionary of input types. If arg doesn't exist as a key, or if the value is None, then no checking is done shapes(:obj:`dict`): Dictionary of input shapes. If arg doesn't exist as a key, or if the value is None, then no checking is done containers(:obj:`dict`): Dictionary of input containers. If arg doesn't exist as a key, or if the value is None, then no checking is done *if batch_type is "each", then the container is irrelevant and can be safely ignored!* shape_fns(:obj:`dict`): Dictionary of shape functions to retrieve. If type(arg_datum) doesn't exist as a key, or if the value is None, then no checking is done. """ def __init__(self, name=None, batch_type="all", types=None, shapes=None, containers=None, void=False): """instantiates the block Args: name(str,None): the name of this block - how it will show up in the graph. batch_type(str, int): the type of the batch processing for your process function. Either "all" or "each". "all" means that all argument data will be passed into to your function at once, "each" means that each argument datum will be passed in individually types(:obj:`dict`,None): Dictionary of input types. If arg doesn't exist as a key, or if the value is None, then no checking is done. If not provided, then will default to args as keys, None as values. shapes(:obj:`dict`,None): Dictionary of input shapes. If arg doesn't exist as a key, or if the value is None, then no checking is done. If not provided, then will default to args as keys, None as values. containers(:obj:`dict`,None): Dictionary of input containers. If arg doesn't exist as a key, or if the value is None, then no checking is done. If not provided, then will default to args as keys, None as values. *if batch_type is "each", then the container is irrelevant and can be safely ignored!* void(bool): Boolean value. By default all blocks return a value or values as output. However, if printing to screen, plotting, or saving data to a file, a block may not have a meaningful output that should be stored in a pipeline's output dictionary. In this case, void should be set to True, so that the output of the block is ignored. The associated var key in the pipeline output will contain a value of :obj:`None`. Default is False """ assert batch_type in ("all","each"),"batch_type must be 'each' or 'all'" # setup absolutely unique id for this block self.uuid = uuid4().hex # ------ setting up instance variables if name is None: name = self.__class__.__name__ self.name = name self.batch_type = batch_type self.void = void # this will be defined in _pipeline_pair self.logger = get_logger( self.id ) # setup initial tags self.tags = set() # FullArgSpec for this block, defined in self.args for most blocks if not hasattr(self, '_arg_spec'): self._arg_spec = None # TYPE AND SHAPE CHECKING VARS # ---------------------------------------------------------------------- self.skip_enforcement = False # types if types is None: self.types = {arg : None for arg in self.args} else: if not isinstance(types,dict): raise TypeError("'types' must be a dictionary or None") self.types = types # shapes if shapes is None: self.shapes = {arg : None for arg in self.args} else: if not isinstance(shapes,dict): raise TypeError("'shapes' must be a dictionary or None") self.shapes = shapes # containers if containers is None: self.containers = {arg : None for arg in self.args} else: if not isinstance(containers,dict): raise TypeError("'containers' must be a dictionary or None") self.containers = containers self.shape_fns = DEFAULT_SHAPE_FUNCS.copy() super(Block,self).__init__() # for metaclass? ############################################################################ # overloadable ############################################################################
[docs] @abstractmethod def process(self, *data_batches): pass
############################################################################
[docs] def check_setup(self, task_args): """briefly checks setup with the provided task inputs. This function can be overloaded to add additional functionality if desired. By default it simply checks if too many or too few arguments were provided. Args: task_args(:obj:`tuple` of :obj:`str`): Arguments for this task Note: Be very careful making task-specific modifications to the block setup in this function. It's called once for every task this block is in. Changes made for one task may not apply to another task. """ # too few args if len(task_args) < self.n_args: msg = "Not enough arguments provided" self.logger.error(msg) raise BlockError(msg + " (%s)" % self.name) # too many args elif len(task_args) > self.n_args: msg = "Too many arguments provided" self.logger.error(msg) raise BlockError(msg + " (%s)" % self.name)
############################################################################
[docs] def preprocess(self): """runs before all batches are processed""" pass
############################################################################
[docs] def postprocess(self): """runs after all batches are processed""" pass
############################################################################ # primary frontend ############################################################################
[docs] def rename(self, name): """renames the block to the given name. The id is reset in this process""" if not isinstance(name,str): raise BlockError("name must be string") self.logger.warning("being renamed from '%s' to '%s'" % self.name, name) old_name = self.name self.name = name # reset the logger self._unpair_logger() # log the new name self.logger.warning("renamed from '%s' to '%s'" % old_name, self.name)
############################################################################
[docs] def copy(self): """fetches a shallow copy of this block with the UUID updated""" # NOTE: make sure this results in the same behavior as unpickling # the uuid must be updated copied = copy.copy(self) copied.uuid = uuid4().hex return copied
############################################################################
[docs] def deepcopy(self): """fetches a deep copy of this block with the UUID updated""" # NOTE: make sure this results in the same behavior as unpickling # the uuid must be updated deepcopied = copy.deepcopy(self) deepcopied.uuid = uuid4().hex return deepcopied
############################################################################
[docs] def enforce(self, arg, types=None, shapes=None, containers=None): """sets the block up to make sure the given arg is the assigned type and shapes Args: arg(str): name the process function argument you want to enforce checking on types(:obj:`tuple` of :obj:`type`): the types to restrict this argument to. If left as None, then no type checking will be done shapes(:obj:`tuple` of :obj:`type`): the shapes to restrict this argument to. If left as None, then no shape checking will be done containers(:obj:`tuple` of :obj:`type`): the containers to restrict this argument to. If left as None, then no container checking will be done. *if batch_type is "each", then the container is irrelevant and can be safely ignored!* Returns: :obj:`Block` : self Note: This function must be called after the parent block is instantiated! That is, in your `__init__` function, you must call `super().__init__` before calling `self.enforce` """ # make sure the parent block object is already instantiated before we # run this function (i.e. call super().__init__ before enforcement) if not hasattr(self, 'uuid'): raise BlockError("Block __init__ must be called before enforcement") # NOTE: force values to be lists of lists/None (ie add error checking) self.types[arg] = types self.shapes[arg] = shapes self.containers[arg] = containers return self
############################################################################ # called internally or by Pipeline ############################################################################ def _pipeline_process(self, *data, logger, force_skip, analytics): """batches and processes data through the block's process function. This function is called by Pipeline, and not intended to be called by the user. Args: *data: Variable length list of data logger(:obj:`ImagepypelinesLogger`): parent pipeline logger, which will be used to create a new child block logger force_skip(bool): whether or not to check batch types and shapes analytics(dict): dictionary to store processing analytics for this block. Returns: (tuple): variable length tuple containing processed data """ self._pair_logger(logger) # NOTE: add type checking here # check data partity (same n_datums for every data) # this still works even if len(data) is 0 if not all(data[0].n_datums == d.n_datums for d in data): msg = "Invalid data lengths! all data must have the same"\ + "number of items. {}" # this adds a list ("input_name.n_datums"=) msg.format(",".join("{}.n_datums={}".format(d.n_datums) for i,d in zip(self.inputs,data))) self.logger.error(msg) raise RuntimeError(msg) # fetch the process id for this block analytics['pid'] = os.getpid() timer = Timer() # run preprocess self.preprocess() # root blocks don't need input data, and won't have any data # passed in to batch. We only call process once for these if self.n_args == 0: # this separate statement is necessary because we have to ensure # that process is only called once not for every data batch # (only if there are no inputs, ie no batches, into this block) ret = self.process() # put it a tuple if it isn't already if not isinstance(ret, tuple): ret = (ret, ) else: # --------- CHECKING --------- # check the batches before processing if not (force_skip or self.skip_enforcement): # reset the lap timer timer.lap_ms() # validate incoming data self._check_batches(*data) analytics['validation_time'] = timer.lap_ms() # --------- ACTUAL PROCESSING --------- # EACH - every batch is a datum if self.batch_type == "each": # construct the batch generators def _process_batches(*data): batches = (d.as_each() for d in data) for datums in zip(*batches): out = self.process(*datums) # put it a tuple if it isn't already if not isinstance(out, tuple): out = (out, ) yield out ret = tuple( zip(*_process_batches(*data)) ) # ALL - process everything at once else: # if batch_type == "all" batches = (d.as_all() for d in data) ret = self.process(*batches) # if there is a return value for this block (most blocks) # we make sure we can write this data to a graph edge if not self.void: # put it a tuple if it isn't already if not isinstance(ret, tuple): ret = (ret, ) # enforce container outputs on batch_type="all" blocks for i,out in enumerate(ret): if not is_container(out): msg = "Blocks with batch_type='all' must return containers of data (output %s is '%s', which is not an iterable type) or have their 'void' attribute set to 'True'. " msg = msg % (i, type(out)) self.logger.error(msg) raise BlockError(msg) analytics['processing_time'] = timer.time_ms() # track the number of incoming datums to this block if len(data) > 0: analytics["num_in"] = data[0].n_datums analytics["n_batches"] = data[0].n_batches_with(self.batch_type) analytics['avg_time_per_datum'] = analytics['processing_time'] / analytics['num_in'] return ret ############################################################################ def _check_batches(self, *data): """checks argument batches to verify if they are the correct type and shapes """ all_data = (d.as_all() for d in data) # FOR EVERY ARG AND BATCH # ====================================================================== for arg_name,data_container in zip(self.args, all_data): # ---------- CONTAINER CHECK ---------- # we have to check the container if datums aren't passed in individually if self.batch_type == "all": okay_containers = self.containers.get(arg_name,None) if okay_containers is not None: # check the container type is valid if not isinstance(data_container, okay_containers): msg = "invalid container for '{}'. must be {}, not {}. (you can disable this check with the 'skip_enforcement' keyword)" msg = msg.format(arg_name, okay_containers, type(batch)) self.logger.error(msg) raise BlockError(msg) # check if it's a homogenus container # for example if it's a numpy array, we can speed thing sup because # we only have to check the first row if type(data_container) in HOMOGENUS_CONTAINERS: data_container = data_container[0] # FOR EVERY DATUM IN THE CONTAINER # ================================================================== for datum in data_container: # --------------------------------------- # TYPE CHECKING # --------------------------------------- # fetch the accepted types for this arg arg_types = self.types.get(arg_name, None) # if arg_types is None, then we will skip all type checking if not (arg_types is None): if not isinstance(datum, arg_types): msg = "invalid type for '{}'. must be {}, not {}. (you can disable this check with the 'skip_enforcement' keyword)" msg = msg.format(arg_name, arg_types, type(batch)) self.logger.error(msg) raise BlockError(msg) # --------------------------------------- # SHAPE CHECKING # --------------------------------------- # fetch the accepted shapes for this arg arg_shapes = self.shapes.get(arg_name, None) # if arg_shapes is None, then we will skip all shape checking if not (arg_shapes is None): # skip shape checking if we don't have a shape_fn shape_fn = self.shape_fns.get( type(datum), None ) if shape_fn is None: continue # retrieve datum shape datum_shape = shape_fn(datum) # FOR ARG SHAPE in all possible arg shapes # ========================================================== ndim_okay = False axes_okay = True for arg_shape in arg_shapes: # reject automatically unless at least one shape has # right number of dimensions if len(arg_shape) != len(datum_shape): continue ndim_okay = True # otherwise check every axis for arg_ax,d_ax in zip(arg_shape,datum_shape): # no need to check if the arg_ax is None (any length) if arg_ax is None: continue # compare every axis length axes_okay = (axes_okay and (arg_ax == d_ax)) # raise a shape error if not (axes_okay and ndim_okay): msg = "invalid shape for '{}'. must be {}, not {}" msg = msg.format(arg_name, arg_shapes, datum_shape) self.logger.error(msg) raise BlockError(msg + " (you can disable this check with the 'skip_enforcement' keyword)") ############################################################################ def _summary(self): """fetches a static summary of the block""" summary = {} # instance vars summary['name'] = self.name summary['id'] = self.id summary['uuid'] = self.uuid summary['args'] = self.args summary['types'] = {k : str(v) for k,v in self.types.items()} summary['shapes'] = self.shapes summary['skip_enforcement'] = self.skip_enforcement summary['batch_type'] = self.batch_type summary['tags'] = list(self.tags) # other data summary['class_name'] = self.__class__.__name__ # method documentation summary['DOCS'] = {} summary['DOCS']['class'] = inspect.getdoc(self) summary['DOCS']['__init__'] = inspect.getdoc(self.__init__) summary['DOCS']['process'] = inspect.getdoc(self.process) return summary ############################################################################
[docs] def get_default_node_attrs(self): """all values must be json serializable""" attrs = {'name':self.name, 'class_name': self.__class__.__name__, 'batch_type': self.batch_type, # string indicating 'display_as': 'block', } return attrs
############################################################################ def _pair_logger(self, pipeline_logger): """creates or fetches a new child logger of the pipeline for this block""" self.logger = pipeline_logger.getChild(self.id) ############################################################################ def _unpair_logger(self): """restores the original block logger""" self.logger = get_logger(self.id) ############################################################################ # special ############################################################################
[docs] def __call__(self, *args): """Allows any block to act as a callable, aliasing the process method""" return self.process(*args)
def __str__(self): return self.id ############################################################################ def __repr__(self): return self.id ############################################################################ def __getstate__(self): return self.__dict__ ############################################################################ def __setstate__(self, state): """resets the uuid in the event of a copy""" state['uuid'] = uuid4().hex self.__dict__.update(state) ############################################################################ # properties ############################################################################ @property def args(self): """:obj:`list` of :obj:`str`: arguments in the order they are expected""" if self._arg_spec is None: self._arg_spec = inspect.getfullargspec(self.process) # save the argspec in an instance variable if it hasn't been computed if (self._arg_spec.args is None): return [] else: # ignoring 'self' return self._arg_spec.args[1:] ############################################################################ @property def n_args(self): """int: Number of arguments for the process function""" return len(self.args) ############################################################################ @property def id(self): """str: A unique id for this block This id is a combination of the block's non-unique name and part of it's uuid (last 6 characters by default). The entropy of this id can be increased by increasing ImagePypelines UUID_ORDER variable """ return "{}#{}".format(self.name, self.uuid[-UUID_ORDER:])
# END