Source code for imagepypelines.core.Pipeline

# @Email: jmaggio14@gmail.com
# @Website: https://www.imagepypelines.org/
# @License: https://github.com/jmaggio14/imagepypelines/blob/master/LICENSE
# @github: https://github.com/jmaggio14/imagepypelines
#
# Copyright (c) 2018 - 2020 Jeff Maggio, Jai Mehra, Ryan Hartzell
from ..Logger import get_logger, MASTER_LOGGER
from .Block import Block
from .Data import Data
from .block_subclasses import Input, Leaf, PipelineBlock
from .constants import UUID_ORDER
from .Exceptions import PipelineError
from .io_tools import passgen
from .util import Timer
from .DashboardComm import DashboardComm

from cryptography.fernet import Fernet
import inspect
import numpy as np
from uuid import uuid4
import networkx as nx
from networkx.readwrite import json_graph
import hashlib
import copy
import itertools
import dill
import json

STATUS_NOT_STARTED = "not started"
"""status constant for nodes that haven't yet been started"""

STATUS_PROCESSING  = "processing"
"""status constant for nodes that are currently processing"""

STATUS_COMPLETE    = "done"
"""status constant for nodes that have completed their work"""

MSG_GRAPH  = "graph"
"""message type for graph messages"""

MSG_STATUS = "status"
"""message type for status messages"""

MSG_RESET  = "reset"
"""message type for reset messages"""

MSG_ERROR  = "block_error"
"""message type for error messages"""

MSG_DELETE  = "delete"
"""message type for delete messages"""

PIPELINE_SOURCE_TYPE = "pipeline"

ILLEGAL_VAR_NAMES = ['fetch','skip_enforcement']
"""illegal or reserved names for variables in the graph"""

[docs]class Pipeline(object): """processing algorithm manager for simple pipeline construction < RYAN I NEED YOU HERE > Attributes: uuid(str): hex uuid for this pipeline name(str): user specified name for this pipeline, used to generate the unique id. defaults to "Pipeline" or the name of your subclass logger(:obj:`ImagepypelinesLogger`): Logger object for this pipeline graph(:obj:`networkx.MultiDiGraph`): Multi-Edge directed task graph which stores all data and algorithmic order vars(dict): dict to track the block that generates the each variable, keys are variable names, values are a subdictionary containing 'block_node_id' and 'block' indexed_inputs(:obj:`list` of :obj:'str'): sorted list of indexed input variable names. (the indexed arguments for the process function) keyword_inputs(:obj:`list` of :obj:'str'): alphabetically sorted list of unindexed input variable names (the keyword arguments for the process function) _inputs(dict): dictionary internally used to access Input objects for queuing data into the pipeline Pipeline Graph Information: Nodes are dictionaries representing tasks. They contain: 'block' : block object for this task, 'args' : names of the task inputs for this block, 'outputs' : names of the task outputs produced by this block, 'name' : string name of the node for visualization purposes, 'color' : color of the node for visualization purposes, 'shape' : shape of the node for visualization purposes, 'class_name' : name of the class, frequently identical to the name 'validation_time': time required to validate incoming data if applicable 'processing_time' : time required to process the data using this node 'avg_time_per_datum': average processing time for each datum 'num_in' : number of datums coming into this node 'n_batches' : number of batches for this node 'pid' : process id for this node 'status': processing status. one of: ('not started', 'processing', 'done') <plus other attributes defined by the user in Block.get_default_node_attrs()> Pipeline edges are dictionaries containing the following: 'var_name' : name of the variable in task definition 'out_index' : output index from the source node, 'in_index' : input index for the target node, 'name' : name target block's argument at the in_index 'same_type_for_all_datums' : whether or not this data is a homogenus container 'data_stored_in' : the type of the container used to house the data (list, ndarray, etc) 'n_datums' : number of items of data in this edge 'datum_type' : the type of data contained, this is only guarenteed to be accurate is same_type_for_all_datums is True 'node_a' : source node uuid 'node_b' : target node uuid 'data' : data for this edge Example: >>> import imagepypelines as ip >>> >>> @ip.blockify( kwargs=dict(value=10) ) >>> def add_val(a,b,value): >>> return a+value, b+value >>> >>> @ip.blockify( kwargs=dict(value=5) ) >>> def minus_val(a,b,value): >>> return a-value, b-value >>> >>> tasks = { >>> # inputs >>> 'zero' : ip.Input(0), >>> 'one' : ip.Input(1), >>> # operations >>> ('ten','eleven') : (add_val, 'zero', 'one'), >>> ('twenty','eleven2') : (add_val, 'ten', 'one'), >>> ('fifteen', 'six') : (minus_val, 'twenty', 'eleven'), >>> ('twentyfive','twentyone') : (add_val, 'fifteen','eleven2'), >>> ('negativefour', 'negativefive') : (minus_val, 'one', 'zero'), >>> } >>> >>> # pipeline1 - raw construction >>> pipeline1 = ip.Pipeline(tasks, 'Pipeline1') >>> # pipeline1.draw(show=True) >>> >>> processed1 = pipeline1.process([0,0], [1,1]) >>> # print(processed1) >>> >>> >>> # pipeline2 - construction from task dict >>> static_constructor = pipeline1.get_tasks() >>> >>> pipeline2 = ip.Pipeline(static_constructor, name="Pipeline2") >>> processed2 = pipeline2.process([0,0], one=[1,1]) """ def __init__(self, tasks=None, name=None): """initializes the pipeline with a user-provided graph (tasks) Args: tasks (dict,:obj:`Pipeline`): dictionary of tasks to define this pipeline's graph or another pipeline to replicate. name (str): name used to generate the logger name """ self.uuid = uuid4().hex # unique univeral hex ID for this pipeline if name is None: name = self.__class__.__name__ self.name = name # string name - used to generate the id # build the logger for this pipeline self.logger = get_logger(self.id, pipeline=self) # logging object # GRAPHING self.graph = nx.MultiDiGraph() # networkx graph keeping track of tasks self.vars = {} # dict of var_names and the nodes that create them # PROCESS / internal tracking self.indexed_inputs = [] # sorted list of indexed input variable names self.keyword_inputs = [] # alphabetically sorted list of unindexed inputs self._inputs = {} # dict of input_name: Input_object # If a pipeline is passed in, then retrieve tasks and replicate our # pipeline if isinstance(tasks, Pipeline): tasks = tasks.get_tasks() elif isinstance(tasks, dict): tasks = tasks elif tasks is None: tasks = {} else: raise TypeError("'tasks' not a valid Pipeline, task graph dictionary, nor 'None'.") # build the dashboard comm object self.dashcomm = DashboardComm() # update the tasks self.update(tasks) ############################################################################ # primary frontend functions ############################################################################
[docs] def update(self, tasks=None, predict_compatibility=True): """updates the pipeline's graph with a dict of tasks `update` will modify and change many instance variables of the pipeline. Despite its generic name, it can only add tasks to the pipeline, not remove them. `update` is called internally to the pipeline during instantiation. Args: tasks(dict): dictionary of tasks to define this pipeline's graph """ ######################################################################## # HELPER FUNCTIONS ######################################################################## def _add_to_vars(var): # make sure variable name is a string if not isinstance(var,str): msg = f"graph vars must be a string, not {type(var)}" self.logger.error(msg) raise TypeError(msg) # check if variable name already exists if var in self.vars.keys(): msg = f"variable \"{var}\" cannot be defined more than once" self.logger.error(msg) raise ValueError(msg) # check if variable name is illegal if var in ILLEGAL_VAR_NAMES: msg = f"{var} cannot be named one of {ILLEGAL_VAR_NAMES}" self.logger.error(msg) raise PipelineError(msg) self.vars[var] = {'block_node_id':None, # will always be defined 'block':None # will always be defined } def _add_input(inpt, outputs, node_uuid): # track what inputs are required so we can populate # them with arguments in self.process if len(outputs) != 1: msg = "Inputs must define exactly one output" self.logger.error(msg) raise PipelineError(msg) # add the input block to a tracking dictionary self._inputs[outputs[0]] = inpt # ====================================================================== # GRAPH CONSTRUCTION # ====================================================================== if tasks is None: tasks = {} ######################################################################## # Define the variables we'll be using for these tasks ######################################################################## # add all variables defined in the graph to a dictionary for var in tasks.keys(): # for tuple defined dict keys like ('x','y') : (block, 'a', 'b') if isinstance(var,(tuple,list)): for v in var: _add_to_vars(v) # for str defined dict keys like 'x' : (block, 'a', 'b') else: _add_to_vars(var) ######################################################################## # Add all the task nodes to the graph ######################################################################## # reiterate through the graph definition to define inputs and outputs for outputs,task in tasks.items(): # make a single value into a list to simplify code if not isinstance(outputs, (tuple,list)): outputs = (outputs,) if not isinstance(task, (tuple,list)): task = (task,) # e.g. - 'z': (block, 'x', 'y'), if isinstance(task, (tuple,list)): block = task[0] args = task[1:] node_uuid = block.name + uuid4().hex + '-node' # if we have a tuple input, then the first value MUST be a block if not isinstance(block, Block): raise TypeError("first value in any graph definition tuple must be a Block") # check if this block is an "Input" Block - this is a special case # e.g. - 'x': (ip.Input(),) if isinstance(block, Input): _add_input(block, outputs, node_uuid) if len(args) != 0: raise PipelineError("Input blocks cannot take any arguments") for output in outputs: self.vars[output]['block_node_id'] = node_uuid self.vars[output]['block'] = block # check this task's setup using the block.check_setup function block.check_setup(args) # add the task to the graph self.graph.add_node(node_uuid, block=block, args=args, outputs=outputs, validation_time=None, processing_time=None, avg_time_per_datum=None, num_in=None, n_batches=None, pid=None, status=STATUS_NOT_STARTED, **block.get_default_node_attrs(), ) else: # something other than a block or of tuple (block, var1, var2,...) raise PipelineError("invalid task definition, must be block or tuple: (block, 'var1', 'var2',...)") ######################################################################## # Draw any new edges required for all block nodes ######################################################################## # THIRD FOR LOOP - drawing edges for node_b,node_b_attrs in self.graph.nodes(data=True): # draw an edge for every input into this node for in_index, arg_name in enumerate(node_b_attrs['args']): # first we identify an upstream node by looking up what task # created them node_a = self.vars[arg_name]['block_node_id'] node_a_attrs = self.graph.nodes[ node_a ] # draw the edge FOR THIS INPUT from node_a to node_b block_arg_name = node_b_attrs['block'].args[in_index] out_index = node_a_attrs['outputs'].index(arg_name) # edge key is {var_name}:{out_index}-->{in_index} edge_key = "{}:{}-->{}".format(arg_name, out_index, in_index) # draw the edge if it doesn't already exist if not self.graph.has_edge(node_a,node_b,edge_key): self.graph.add_edge(node_a, node_b, # key key=edge_key, # attributes var_name = arg_name, # name assigned in graph definition out_index = out_index, in_index = in_index, name = block_arg_name, # name of node_b's process argument at the index same_type_for_all_datums="unknown", n_datums=0, datum_type=None, data_stored_in="unknown", node_a = node_a, node_b = node_b, data = None, # none is a placeholder value. it will be populated ) ######################################################################## # Draw 'leaves' on tasks with no output edges # this is required so they will still be computable ######################################################################## # this is required so we can store data on end edges - otherwise the final # nodes of our pipeline won't have output edges, so we can't store data # on those edges # make a list of nodes without outgoing edges end_nodes = [] for node_id,attrs in self.graph.nodes(data=True): # if the node already has outputs, we don't need a leaf out of it if self.graph.out_degree(node_id) == len(attrs['outputs']): continue # if the end node is a Leaf already, then we don't need another leaf elif isinstance(attrs['block'],Leaf): continue else: bad_edges = [] drawn_edges = tuple(self.graph.out_edges(node_id, data='var_name')) for var_name in attrs['outputs']: if var_name not in drawn_edges: bad_edges.append(var_name) end_nodes.append( (node_id,attrs,bad_edges) ) for node_id,node_attrs,bad_edges in end_nodes: # this is a final node of the pipeline, so we need to draw a # leaf for each of its output edges for i,end_name in enumerate(bad_edges): # add the leaf leaf = Leaf(end_name) leaf_uuid = leaf.name + uuid4().hex + '-node' self.graph.add_node(leaf_uuid, block=leaf, args=(end_name,), outputs=(end_name,), validation_time=None, processing_time=None, avg_time_per_datum=None, num_in=None, n_batches=None, pid=None, status=STATUS_NOT_STARTED, **leaf.get_default_node_attrs() ) # edge key is {var_name}:{out_index}-->{in_index} edge_key = "{}:{}-->{}".format(end_name, i, 0) # draw the edge to the leaf # no need to check if it exists, because we just created the Leaf self.graph.add_edge(node_id, leaf_uuid, key=edge_key, var_name=end_name, # name assigned in graph definition out_index=i, in_index=0, name=end_name, # name of node_b's process argument at the index same_type_for_all_datums="unknown", n_datums=0, datum_type=None, data_stored_in="unknown", node_a = node_id, node_b = leaf_uuid, data=None) # CHECK to make sure the graph isn't cyclic (this is impossible to compute) if not nx.is_directed_acyclic_graph(self.graph): msg = "Cyclic variable dependency detected! Make sure" \ " two variables don't depend on each other - these" \ " aren't possible to compute" self.logger.error(msg) raise PipelineError(msg) ######################################################################## # create input list & requirements ######################################################################## # reset old index tracking lists self.indexed_inputs = [] self.keyword_inputs = [] # sort the inputs into keyword and indexed for inpt_name, inpt in self._inputs.items(): # check if the input index is defined if isinstance(inpt.index,int): self.indexed_inputs.append(inpt_name) else: self.keyword_inputs.append(inpt_name) # sort the positonal inputs by index self.indexed_inputs.sort(key=lambda x: self._inputs[x].index) # sort keyword only inputs alphabetically self.keyword_inputs.sort() # check to make sure an input index isn't defined twice indices_used = [self._inputs[x].index for x in self.indexed_inputs] if len(set(indices_used)) != len(indices_used): # Note: add more verbose error message msg = "Input indices cannot be reused" self.logger.error(msg) raise PipelineError(msg) # check to make sure input indexes are consecutive (don't skip) if len(indices_used) > 0: if max(indices_used) + 1 != len(indices_used): # Note: add more verbose error message msg = "Input indices must be consecutive" self.logger.error(msg) raise PipelineError(msg) # log the current pipeline status msg = "{} tasks set up; process arguments are ({},)".format(len(tasks), ', '.join(self.args)) self.logger.info(msg) ######################################################################## # Running Checks ######################################################################## if predict_compatibility: # check to make sure there are compatible types for var in self.vars.keys(): # check if there are compatible types if self.get_types_for(var) == tuple(): msg = "PREDICTED INCOMPATIBILITY : no compatible types for '%s'" % var self.logger.warning(msg) # check if there are compatible shapes if self.get_shapes_for(var) == tuple(): msg = "PREDICTED INCOMPATIBILITY : no compatible shapes for '%s'" % var self.logger.warning(msg) # iterate through edges and check if any blocks are expecting void data for node_a_id, node_b_id, var_name in self.graph.edges(data="var_name"): block_a = self.graph.nodes[node_a_id]['block'] block_b = self.graph.nodes[node_b_id]['block'] # if block a is void, then we make sure block_b isn't a block # expecting data that won't exist (make sure block b isn't a leaf) if block_a.void: if not isinstance(block_b, Leaf): msg = "INCOMPATIBILE EDGE: {block_b} is expecting '{var_name}' from {block_a}, but {block_a} is void (doesn't return data)" msg.format(block_a=block_a, block_b=block_b, var_name=var_name) self.logger.warning(msg) # print warning for vars that won't be computed noncomputable = self.noncomputable if len(noncomputable) > 0: msg = f"{noncomputable} won't be computed because they rely on data from void blocks (blocks that don't return data)" self.logger.warning(msg) # send along the new graph in a message to the Dashboards self.__send_graph_msg_to_dash()
############################################################################
[docs] def process(self, *pos_data, fetch=None, skip_enforcement=False, **kwdata): """processes input data through the pipeline process first resets this pipeline, before loading input data into the graph and processing it. Note: The argument list for the Pipeline can be found with `Pipeline.args` Warning: MUST ADD FETCHES DOCUMENTATIONS """ # setup fetches if fetch is None: fetch = self.vars.keys() # clear any data already in the graph, and send a message to the dashboard(s) self.reset() # -------------------------------------------------------------- # STORING INPUTS - inside the input nodes # -------------------------------------------------------------- all_inputs = self.args # store positonal arguments fed in ## NOTE: need error checking here (number of inputs, etc) for i,data in enumerate(pos_data): inpt = self._inputs[ all_inputs[i] ] # check if the data has already been loaded if inpt.loaded: msg = "'%s' has already been loaded" % self.indexed_inputs[i] self.logger.error(msg) raise PipelineError(msg) inpt.load(data) # store keyword arguments fed in ## NOTE: need error checking here (number of inputs, etc) for key, val in kwdata.items(): inpt = self._inputs[key] # check if the data has already been loaded if inpt.loaded: msg = "'%s' has already been loaded" % key self.logger.error(msg) raise PipelineError(msg) inpt.load(val) # check to make sure all inputs are loaded data_loaded = True for key,inpt in self._inputs.items(): if not inpt.loaded: msg = "data for \"%s\" must be provided" % key self.logger.error(msg) data_loaded = False if not data_loaded: raise PipelineError("insufficient input data provided") # -------------------------------------------------------------- # PROCESS # -------------------------------------------------------------- self.__compute(skip_enforcement) # populate the output dictionary fetch_dict = {} for _,_,edge in self.graph.edges(data=True): if edge['var_name'] in fetch: if edge['data'] is None: fetch_dict[ edge['var_name'] ] = None #Could eventually be ip.Void or ip.NULL type else: fetch_dict[ edge['var_name'] ] = edge['data'].grab() # clear the graph of data to reduce memory footprint self.clear() return fetch_dict
############################################################################
[docs] def process_and_grab(self, *pos_data, fetch, skip_enforcement=False, **kwdata): """processes input data through the pipeline, but returns a tuple with the specified fetches process first resets this pipeline, before loading input data into the graph and processing it. Note: The argument list for the Pipeline can be found with `Pipeline.args` Warning: MUST ADD FETCHES DOCUMENTATIONS """ fetch_dict = self.process(*pos_data, fetch=fetch, skip_enforcement=skip_enforcement, **kwdata) return tuple( fetch_dict[k] for k in fetch )
############################################################################
[docs] def asblock(self, *fetches): """generates a block that runs this pipeline internally Args: *fetches: variables to fetch from the pipeline in the order they should be outputed as Returns: :obj:`Block`: Block which will run the pipeline internally and retrieve the data specified by fetches """ return PipelineBlock(self, fetch=fetches)
############################################################################
[docs] def clear(self): """clears all edges in the graph and unloads the input nodes. Does not update the remote dashboard""" # unload all edge data for _,_,edge in self.graph.edges(data=True): edge['data'] = None # unload all input data for inpt in self._inputs.values(): inpt.unload()
############################################################################
[docs] def reset(self): """resets all edges in the graph, resets the inputs (and updates the dashboards)""" self.clear() # send a message to the dashboards saying the pipeline has been reset self.__send_reset_msg_to_dash()
############################################################################
[docs] def draw(self, show=True, ax=None): # visualize(self, show, ax) pass
# saving/loading ############################################################################
[docs] def save(self, filename, passwd=None): """serializes and saves a copy of the pipeline to the given filename. Pipeline can be optionally encrypted Args: filename(str): the filename to save the serialized pipeline to passwd(str): password to encrypt the serialized pipeline with if desired, defaults to None Returns: str: the sha256 checksum for the saved file """ encoded, checksum = self.to_bytes(passwd) # write the file contents with open(filename, 'wb') as f: f.write(encoded) return checksum
############################################################################
[docs] @classmethod def load(cls, filename, passwd=None, checksum=None, name=None): """loads the pipeline from the given file Args: filename(str): the filename to load the serialized pipeline from passwd(str): password to decrypt the serialized pipeline with, defaults to None checksum(str): the sha256 checksum to check the file against name(str): new name for the pipeline. If left as None, then defaults to the old name of the pipeline Returns: :obj:`Pipeline`: the loaded pipeline Warning: Serilized data can be a security risk! For sensitive applications, use the `checksum` parameter. ImagePypelines can use this to ensure the data hasn't been tampered with. for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html """ # fetch the raw file contents with open(filename,'rb') as f: raw_bytes = f.read() return cls.from_bytes(raw_bytes, passwd, checksum, name)
############################################################################
[docs] def to_bytes(self, passwd=None): """serialized a copy of the pipeline, and returns the raw bytes. Can be optionally encrypted Args: passwd(str): password to encrypt the serialized pipeline with if desired, defaults to None Returns: (tuple): tuple containing: bytes: the serialized and optionally encrypted pipeline str: the sha256 checksum for the raw bytes """ raw_bytes = dill.dumps(self.copy()) # encrypt the pipeline if passwd is provided if passwd: fernet = Fernet( passgen(passwd) ) encoded = fernet.encrypt(raw_bytes) else: encoded = raw_bytes return encoded, hashlib.sha256(encoded).hexdigest()
############################################################################
[docs] @staticmethod def from_bytes(raw_bytes, passwd=None, checksum=None, name=None): """loads the pipeline from the given bytes Args: raw_bytes(bytes): the encoded pipeline in bytes format passwd(str): password to decrypt the serialized pipeline with, defaults to None checksum(str): the sha256 checksum to check the bytes against name(str): new name for the pipeline. If left as None, then defaults to the old name of the pipeline Returns: :obj:`Pipeline`: the loaded pipeline Warning: Serialized data can be a security risk! For sensitive applications, use the `checksum` parameter. ImagePypelines can use this to ensure the data hasn't been tampered with. for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html """ # check the file checksum if provided if checksum: fchecksum = hashlib.sha256(raw_bytes).hexdigest() if fchecksum != checksum: msg = "Invalid Checksum!" MASTER_LOGGER.error(msg) PipelineError(msg) # decrypt the file contents if passwd is provided if passwd: fernet = Fernet( passgen(passwd) ) decoded = fernet.decrypt(raw_bytes) else: decoded = raw_bytes # load the pipeline pipeline = dill.loads(decoded) # rename it if desired if name is not None: pipeline.rename(name) return pipeline
############################################################################
[docs] def copy(self, name=None): """returns a copy of the Pipeline, but not a copy of the blocks""" if name is None: name = self.name return Pipeline(self, name=name)
############################################################################
[docs] def deepcopy(self, name=None): """returns a copy of the Pipeline including copies of all its blocks""" if name is None: name = self.name # iterate through all pipeline tasks already_copied = {} new_tasks = {} # for ('out1', 'out2'), (block, 'in1', 'in2') for task_outs, task_ins in self.get_tasks().items(): old_block = task_ins[0] args = task_ins[1:] # if block is already copied, fetch the copied block for the new tasks if old_block.uuid in already_copied: new_block = already_copied[old_block.uuid] # otherwise copy the block if we haven't already copied it else: new_block = old_block.deepcopy() already_copied[old_block.uuid] = new_block new_tasks[task_outs] = (new_block,) + args return Pipeline(new_tasks, name=name)
############################################################################ # internal ############################################################################ def __format_edge_data_for_dash_msg(self, edge_data): """formats data for an edge to a sendable form, Relies on a copy of the edge being passed in so permanent changes to the graph aren't made """ # delete the data from edge - we don't need it del edge_data['data'] # convert classes to their strings names edge_data['datum_type'] = edge_data['datum_type'].__class__.__name__ edge_data['data_stored_in'] = edge_data['data_stored_in'].__class__.__name__ return edge_data def __send_graph_msg_to_dash(self): """sends a full description of the graph to the dashboard This includes the graph structure and documentation for all blocks """ payload = {} payload['args'] = self.args # commented out as redundant on 07/12/20 - JM # # variables and the node id that creates them # vars = {key : val['block_node_id'] for key,val in self.vars.items()} # payload["vars"] = vars # create a dictionary containing block summaries payload['block_docs'] = {} for block in self.blocks: payload['block_docs'][block.id] = block._summary() # copy the graph so we can modify it safely graph_copy = self.graph.copy() # populate the nodes metadata payload['nodes'] = {} for node_id,node_info in graph_copy.nodes(data=True): # delete the block from this copy - we don't need it del node_info['block'] payload['nodes'][node_id] = node_info # populate the edge metadata payload['edges'] = {} for node_a,node_b,key,e_data in graph_copy.edges(keys=True, data=True): e_data = self.__format_edge_data_for_dash_msg(e_data) payload['edges']['|'.join((node_a,node_b,key))] = e_data # jsonify the graph in node-link format. see: # https://networkx.github.io/documentation/stable/reference/readwrite/json_graph.html payload['node-link'] = json_graph.node_link_data(graph_copy) msg = {'type' : MSG_GRAPH, 'name' : self.name, 'id' : self.id, 'uuid' : self.uuid, 'source_type' : PIPELINE_SOURCE_TYPE, 'payload' : payload, } self.dashcomm.write_graph(self.id, json.dumps(msg)) ############################################################################ def __read_msg_from_dash(self): """Receives any messages sent from the dashboards""" msg = self.dashcomm.read() if msg: print(msg) ############################################################################ def __send_status_msg_to_dash(self, node_id): """builds and sends a status message to the dashboards""" payload = {} # fetch pertinent metadata from the node node_info = self.graph.nodes[node_id].copy() del node_info['block'] # delete the block from this copy payload['nodes'] = {node_id : node_info} # fetch metadata for the incoming edges payload['edges'] = {} for node_a,node_b,key,e_data in self.graph.in_edges(node_id, keys=True, data=True): # delete the data object from this copy formatted_edge_data = self.__format_edge_data_for_dash_msg(e_data.copy()) # update the edges dict payload['edges']['|'.join((node_a,node_b,key))] = formatted_edge_data # encode the message as json and send it msg = {'type' : MSG_STATUS, 'name' : self.name, 'id' : self.id, 'uuid' : self.uuid, 'source_type' : PIPELINE_SOURCE_TYPE, 'payload' : payload, } self.dashcomm.write_status( json.dumps(msg) ) ############################################################################ def __send_reset_msg_to_dash(self): """builds and sends a reset message to the dashboards""" msg = {'type' : MSG_RESET, 'name' : self.name, 'id' : self.id, 'uuid' : self.uuid, 'source_type' : PIPELINE_SOURCE_TYPE, 'payload' : {}, } self.dashcomm.write_reset( json.dumps(msg) ) ############################################################################ def __send_block_error_msg_to_dash(self, node_id, error): """builds and sends an error message to the dashboards""" block = self.graph.nodes[node_id]['block'] msg = {'type' : MSG_ERROR, 'name' : self.name, 'id' : self.id, 'uuid' : self.uuid, 'source_type' : PIPELINE_SOURCE_TYPE, 'payload' : { 'node_id' : node_id, 'block_name' : block.name, 'block_id' : block.id, 'block_uuid' : block.uuid, 'error_type' : error.__class__.__name__, 'error_msg' : str(error), }, } self.dashcomm.write_error( json.dumps(msg) ) ############################################################################ def __send_delete_msg_to_dash(self): """builds and sends a delete message to the dashboards""" msg = {'type' : MSG_DELETE, 'name' : self.name, 'id' : self.id, 'uuid' : self.uuid, 'source_type' : PIPELINE_SOURCE_TYPE, 'payload' : {}, } self.dashcomm.write_delete( json.dumps(msg) ) ############################################################################ def __compute_block(self, node_id, skip_enforcement=False): """ WARNING: this function should only be called if all incoming edges are populated with data. Ideally it should not be used outside the __compute function """ try: self.graph.nodes[node_id]['status'] = STATUS_PROCESSING # UPDATE THE DASHBOARD # ----------------------------------- self.__read_msg_from_dash() self.__send_status_msg_to_dash(node_id) # fetch the block for this node block = self.graph.nodes[node_id]['block'] # ORGANIZE INPUT DATA FOR COMPUTATION # ----------------------------------- # fetch input data for this node in_edges = [e[2] for e in self.graph.in_edges(node_id, data=True)] arg_data_dict = {e['in_index'] : e['data'] for e in in_edges} args = [arg_data_dict[k] for k in sorted( arg_data_dict.keys() )] # assign the task outputs to their appropriate edge analytics = {} # COMPUTE DATA IN THE BLOCK # ----------------------------------- # (args will be empty for root blocks) outputs = block._pipeline_process(*args, logger=self.logger, force_skip=skip_enforcement, analytics=analytics) # POPULATE OUTPUT EDGES FOR THIS NODE # ----------------------------------- # update the graph with analytics for this block self.graph.nodes[node_id].update(analytics) # populate upstream edges with the data we need # get the output edges out_edges = [e[2] for e in self.graph.out_edges(node_id, data=True)] # NEED ERROR CHECKING HERE # (psuedo) if n_out == n_expected_out if block.void: # there's no output for this block, so we write None to the edges for out_edge in out_edges: out_edge['data'] = None else: for out_edge in out_edges: data = Data( outputs[out_edge['out_index']] ) out_edge['data'] = data out_edge['same_type_for_all_datums'] = data.is_homogenus_container() out_edge['data_stored_in'] = data.container_type out_edge['n_datums'] = data.n_datums out_edge['datum_type'] = data.datum_type # update node status self.graph.nodes[node_id]['status'] = STATUS_COMPLETE # update the dashboard self.__send_status_msg_to_dash(node_id) except Exception as error: self.__send_block_error_msg_to_dash(node_id, error) raise ############################################################################ def __compute(self, skip_enforcement=False): """executes the graph tasks. Relies on Input data being preloaded""" ## NOTE: # add warning that for edges that are non-computable ### for node_a, node_b, edge_idx in self.execution_order: # check if node_a is a root node (no incoming edges) # these nodes can be computed and the edge populated # immmediately because they have no predecessors # NOTE: this will currently break if a root has more than one # output - JM # ROOT BLOCKS if self.graph.in_degree(node_a) == 0: self.__compute_block(node_a, skip_enforcement) # NON-ROOT BLOCKS # compute this node if all the data is queued in_edges = self.graph.in_edges(node_b, data=True) data_is_queued = all((e[2]['data'] is not None) for e in in_edges) if all((e[2]['data'] is not None) for e in in_edges): self.__compute_block(node_b, skip_enforcement) ############################################################################ # util ############################################################################
[docs] def get_tasks(self): """generates a dictionary task represenation of the pipeline, which can be used to make other pipelines. """ static = {} for _,attrs in self.graph.nodes(data=True): arg_vars = tuple(attrs['args']) out_vars = tuple(attrs['outputs']) block = attrs['block'] # ignore leaf blocks if isinstance(block, Leaf): continue static[out_vars] = (block,) + arg_vars return static
############################################################################
[docs] def get_predecessors(self, var): """fetches the names of the variables which must be computed/loaded before the given variable can be computed. Args: var(str): name of variable to find predecessors for Returns: set: an unordered set of the variables that must be computed before the given variable can be calculated. """ # NOTE: we could possibly speed this function up by using a depth # finding algorithm instead? # define a recursive function to get edges from all predecessor nodes preds = set() nodes_checked = set() def _get_priors(node): for node_a,node_b,var_name in self.graph.in_edges(node,'var_name'): preds.add(var_name) # recursively add edges from the source node if node_a not in nodes_checked: _get_priors(node_a) nodes_checked.add(node) _get_priors(self.vars[var]['block_node_id']) return preds
############################################################################
[docs] def get_successors(self, var): """fetches the names of the variables which depend on this variable before they can be computed Args: var(str): name of variable to find successors for Returns: set: an unordered set of the variables that can only be computed once the given variable has been """ # NOTE: we could possibly speed this function up by using a depth # finding algorithm instead? # define a recursive function to get edges from all successor nodes succs = set() nodes_checked = set() def _get_latters(node): for node_a,node_b,var_name in self.graph.out_edges(node,'var_name'): succs.add(var_name) # recursively add edges from the source node if node_b not in nodes_checked: _get_latters(node_b) nodes_checked.add(node) # start with every node node after this one - (so we don't include companion outputs) for _,next_node in self.graph.out_edges(self.vars[var]['block_node_id']): _get_latters(next_node) return succs
############################################################################
[docs] def get_types_for(self, var): """fetches the enforced types for this variable of the pipeline. More specifically, these are the types that won't throw an error within the block Args: var(str): the name of the variable Returns: (:obj:`tuple` of :obj:`type`): the types enforced for the given variable """ # INTERNAL HELPER FUNCTION def _dominant_type(types1, types2): """determines which set of types are dominant between two tuples of types """ # make types a tuple if they aren't already to simply the code if (types1 is not None) and not isinstance(types1,(list,tuple,set)): types1 = (types1,) if (types2 is not None) and not isinstance(types2,(list,tuple,set)): types2 = (types2,) # if either type is None, then the other automatically supercedes if types1 is None: return types2 elif types2 is None: return types1 # both must lists/tuples of types - we want the intersection else: okay_types = set(types1).intersection( set(types2) ) return tuple( okay_types ) # END INTERNAL HELPER FUNC # Iterate through pipeline args and compute the dominant type dom_types = None # fetch the node that produced the variable source_node = self.vars[var]['block_node_id'] # iterate through all nodes it's connected to and fetch their types for _,node_b,edge in self.graph.out_edges(source_node, data=True): # only if this out edge is for the given var if (edge['var_name'] == var): # fetch target block object target = self.graph.nodes[node_b]['block'] # fetch the actual name of the argument in the target's process function target_arg = target.args[ edge['in_index'] ] # skip updating this target if its enforcement is disabled if target.skip_enforcement: continue # compute and update the dominant type dom_types = _dominant_type( target.types.get(target_arg, None), dom_types ) return dom_types
############################################################################
[docs] def get_shapes_for(self, var): """fetches the enforced shapes for the given variable More specifically, these are the shapes that won't throw an error within the block Args: var(str): the name of the variable Returns: (:obj:`tuple` of :obj:`tuple`): the shapes enforced for the given variable """ # INTERNAL HELPER FUNCTION def _dominant_shape(shape1, shape2): """determines which shape is dominant between shapes, or calculates the new compatible shape if it's required.""" # if either shape is None, then the other automatically supercedes if shape1 is None: return shape2 elif shape2 is None: return shape1 # both must lists or tuples else: # if the ndim aren't the same, then there is no compatible shape if len(shape1) != len(shape2): return tuple() new_shape = [] # other we have to iterate through and find the dominant axes for ax1,ax2 in zip(shape1,shape2): # if one axis is None, then the other is dominant if ax1 is None: new_shape.append(ax2) elif ax2 is None: new_shape.append(ax1) # both must be integers else: if ax1 == ax2: # if both axial lengths are identical, then we # can append either to the new shape new_shape.append(ax1) else: # if the axial lengths aren't identical, there is # no compatible axis and thus no compatible shape # so we return an empty shape return tuple() return new_shape # END INTERNAL HELPER FUNC dom_shapes = (None,) # fetch the node that produced the variable source_node = self.vars[var]['block_node_id'] # iterate through all nodes it's connected to and fetch their types for _,node_b,edge in self.graph.out_edges(source_node, data=True): # only if this out edge is for the given var if (edge['var_name'] == var): # fetch target block object target = self.graph.nodes[node_b]['block'] # fetch the actual name of the argument in the target's process function target_arg = target.args[ edge['in_index'] ] # skip updating this target if its enforcement is disabled if target.skip_enforcement: continue # fetch target shapes target_shapes = target.shapes.get(target_arg, None) # make target shapes an 1 elem iterable if it's None target_shapes = (None,) if target_shapes is None else target_shapes # calculate all possible shape permutations all_possible = itertools.product( set(target_shapes), set(dom_shapes) ) # check every possible combination all_workable = [] for shape1,shape2 in all_possible: all_workable.append( _dominant_shape(shape1, shape2) ) # remove empty tuples (i.e. incompatible shapes) dom_shapes = tuple(s for s in all_workable if s != tuple()) #update the dominant type return dom_shapes
############################################################################
[docs] def get_containers_for(self, var): """fetches the enforced containers for this variable of the pipeline. More specifically, these are the containers that won't throw an error within the block Args: var(str): the name of the variable Returns: (:obj:`tuple` of :obj:`type`): the containers enforced for the given variable """ # INTERNAL HELPER FUNCTION def _dominant_container(containers1, containers2): """determines which set of containers are dominant between two tuples of containers """ # make containers a tuple if they aren't already to simply the code if (containers1 is not None) and not isinstance(containers1,(list,tuple,set)): containers1 = (containers1,) if (containers2 is not None) and not isinstance(containers2,(list,tuple,set)): containers2 = (containers2,) # if either container is None, then the other automatically supercedes if containers1 is None: return containers2 elif containers2 is None: return containers1 # both must lists/tuples of containers - we want the intersection else: okay_containers = set(containers1).intersection( set(containers2) ) return tuple( okay_containers ) # END INTERNAL HELPER FUNC # Iterate through pipeline args and compute the dominant container dom_containers = None # fetch the node that produced the variable source_node = self.vars[var]['block_node_id'] # iterate through all nodes it's connected to and fetch their containers for _,node_b,edge in self.graph.out_edges(source_node, data=True): # only if this out edge is for the given var if (edge['var_name'] == var): # fetch target block object target = self.graph.nodes[node_b]['block'] # fetch the actual name of the argument in the target's process function target_arg = target.args[ edge['in_index'] ] # skip updating this target if its enforcement is disabled if target.skip_enforcement: continue # compute and update the dominant container dom_containers = _dominant_container( target.containers.get(target_arg, None), dom_containers ) return dom_containers
############################################################################
[docs] def assign_input_index(self, var, index): """reassigns the index for this variable in the process argument list""" # reset the input index to a new one self._inputs[var].set_index(index) self.update()
############################################################################
[docs] def debug_serialization(self): """helper function to debug what part of a block is not serializable""" # NOTE: needs to be updated to use dill's builtin debugging tools error = False # fetch the static graph represenation static = self.get_tasks() # iterate through all tasks and check if their components are serializable raise_error = False for task in static.values(): block = task[0] # iterate through every value in the block's __dict__ for key,val in block.__dict__.items(): try: dill.dumps(val) except Exception as e: self.logger.error("error serializing {}.{}: {}".format(block,key,e)) error = True if not error: self.logger.info("no pickling issues detected")
############################################################################
[docs] def rename(self, name): """renames the Pipeline to the given name. The id is reset in this process""" if not isinstance(name,str): raise PipelineError("name must be string") self.logger.warning("being renamed from '%s' to '%s'" % (self.name, name)) old_name = self.name self.name = name # reset the logger with the new id self.logger = get_logger(self.id, pipeline=self) # log the new name self.logger.warning("renamed from '%s' to '%s'" % (old_name, self.name)) return self
############################################################################ # magic ############################################################################
[docs] def __call__(self, *pos_data, fetch=None, skip_enforcement=False, **kwdata): """aliases self.process with same signature for more natural use""" return self.process(*pos_data, fetch=None, skip_enforcement=False, **kwdata)
############################################################################ # COPYING & PICKLING def __getstate__(self): return self.__dict__ ############################################################################ def __setstate__(self, state): """resets the uuid in the event of a copy""" state['uuid'] = uuid4().hex self.__dict__.update(state) # updates the logger for the new state self.logger = get_logger(self.id, pipeline=self) ############################################################################ def __del__(self): """deletes pipeline and sends a delete message to the dashboard""" # self.__send_delete_msg_to_dash() ############################################################################ # properties ############################################################################ @property def id(self): """str: an unique id for this pipeline This id is a combination of the pipeline's non-unique name and part of it's uuid (last 6 characters by default). The entropy of this id can be increased by increasing ImagePypelines UUID_ORDER variable """ return "{}#{}".format(self.name,self.uuid[-UUID_ORDER:]) ############################################################################ @property def execution_order(self): """:obj:`Generator`: topologically sorted edges of the pipeline""" return nx.topological_sort( nx.line_graph(self.graph) ) ############################################################################ @property def args(self): """:obj:`list` of :obj:`str`: arguments in the order they are expected""" return self.indexed_inputs + self.keyword_inputs ############################################################################ @property def n_args(self): """int: number of arguments for the process function""" return len(self.args) ############################################################################ @property def blocks(self): """:obj:`set` of :obj:`Block`: unordered set of pipeline blocks""" blocks = set() for _,block in self.graph.nodes(data='block'): # ignore leaves if not isinstance(block, Leaf): blocks.add(block) return blocks ############################################################################ @property def variables(self): """:obj:`list` of :obj:`str`: all variables in the pipeline""" return list( self.vars.keys() ) ############################################################################ @property def types(self): """(obj:`dict` of str : type): the types compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks""" # Iterate through pipeline args and compute the dominant type types = {} for pype_arg in self.args: arg_types = self.get_types_for(pype_arg) types[pype_arg] = arg_types # check if there is at least 1 valid type if not (arg_types is None): if len(arg_types) == 0: # log it, but don't throw an error msg = "no valid types found for {}".format(pype_arg) self.logger.error(msg) return types ############################################################################ @property def shapes(self): """(obj:`dict` of str : tuple): the shapes compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks""" # Iterate through pipeline args and compute the dominant shapes shapes = {} for pype_arg in self.args: arg_shapes = self.get_shapes_for(pype_arg) shapes[pype_arg] = arg_shapes # check if there is at least 1 valid type if not (arg_shapes is None): if len(arg_shapes) == 0: # log it, but don't throw an error msg = "no valid shapes found for {}".format(pype_arg) self.logger.error(msg) return shapes ############################################################################ @property def containers(self): """(obj:`dict` of str : container): the containers compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks""" # Iterate through pipeline args and compute the dominant container containers = {} for pype_arg in self.args: arg_containers = self.get_containers_for(pype_arg) containers[pype_arg] = arg_containers # check if there is at least 1 valid container if not (arg_containers is None): if len(arg_containers) == 0: # log it, but don't throw an error msg = "no valid containers found for {}".format(pype_arg) self.logger.error(msg) return containers ############################################################################ @property def void_vars(self): """set: unordered set of variables which won't have data assigned to them during runtime""" void_vars = set() for node,attrs in self.graph.nodes(data=True): # if the block is void (has no outputs), then we add the variable names to a set if attrs['block'].void: void_vars.update(attrs['outputs']) return void_vars ############################################################################ @property def noncomputable(self): """set: unordered set of every variable that can't be computed because it relies on void data""" noncomputable = set() for var in self.void_vars: noncomputable.update( self.get_successors(var) ) return noncomputable