Pipeline

class imagepypelines.Pipeline(tasks=None, name=None)[source]

Bases: object

processing algorithm manager for simple pipeline construction

< RYAN I NEED YOU HERE >

uuid

hex uuid for this pipeline

Type

str

name

user specified name for this pipeline, used to generate the unique id. defaults to “Pipeline” or the name of your subclass

Type

str

logger

Logger object for this pipeline

Type

ImagepypelinesLogger

graph

Multi-Edge directed task graph which stores all data and algorithmic order

Type

networkx.MultiDiGraph

vars

dict to track the block that generates the each variable, keys are variable names, values are a subdictionary containing ‘block_node_id’ and ‘block’

Type

dict

indexed_inputs(:obj:`list` of

obj:’str’): sorted list of indexed input variable names. (the indexed arguments for the process function)

keyword_inputs(:obj:`list` of

obj:’str’): alphabetically sorted list of unindexed input variable names (the keyword arguments for the process function)

_inputs

dictionary internally used to access Input objects for queuing data into the pipeline

Type

dict

Pipeline Graph Information:
Nodes are dictionaries representing tasks. They contain:

‘block’ : block object for this task, ‘args’ : names of the task inputs for this block, ‘outputs’ : names of the task outputs produced by this block, ‘name’ : string name of the node for visualization purposes, ‘color’ : color of the node for visualization purposes, ‘shape’ : shape of the node for visualization purposes, ‘class_name’ : name of the class, frequently identical to the name ‘validation_time’: time required to validate incoming data if applicable ‘processing_time’ : time required to process the data using this node ‘avg_time_per_datum’: average processing time for each datum ‘num_in’ : number of datums coming into this node ‘n_batches’ : number of batches for this node ‘pid’ : process id for this node ‘status’: processing status. one of: (‘not started’, ‘processing’, ‘done’) <plus other attributes defined by the user in Block.get_default_node_attrs()>

Pipeline edges are dictionaries containing the following:

‘var_name’ : name of the variable in task definition ‘out_index’ : output index from the source node, ‘in_index’ : input index for the target node, ‘name’ : name target block’s argument at the in_index ‘same_type_for_all_datums’ : whether or not this data is a homogenus container ‘data_stored_in’ : the type of the container used to house the data (list, ndarray, etc) ‘n_datums’ : number of items of data in this edge ‘datum_type’ : the type of data contained, this is only

guarenteed to be accurate is same_type_for_all_datums is True

‘node_a’ : source node uuid ‘node_b’ : target node uuid ‘data’ : data for this edge

Example

>>> import imagepypelines as ip
>>>
>>> @ip.blockify( kwargs=dict(value=10) )
>>> def add_val(a,b,value):
>>>     return a+value, b+value
>>>
>>> @ip.blockify( kwargs=dict(value=5) )
>>> def minus_val(a,b,value):
>>>     return a-value, b-value
>>>
>>> tasks = {
>>>         # inputs
>>>         'zero' : ip.Input(0),
>>>         'one' : ip.Input(1),
>>>         # operations
>>>         ('ten','eleven') : (add_val, 'zero', 'one'),
>>>         ('twenty','eleven2') : (add_val, 'ten', 'one'),
>>>         ('fifteen', 'six') : (minus_val, 'twenty', 'eleven'),
>>>         ('twentyfive','twentyone') : (add_val, 'fifteen','eleven2'),
>>>         ('negativefour', 'negativefive') : (minus_val, 'one', 'zero'),
>>>         }
>>>
>>> # pipeline1 - raw construction
>>> pipeline1 = ip.Pipeline(tasks, 'Pipeline1')
>>> # pipeline1.draw(show=True)
>>>
>>> processed1 = pipeline1.process([0,0], [1,1])
>>> # print(processed1)
>>>
>>>
>>> # pipeline2 - construction from task dict
>>> static_constructor = pipeline1.get_tasks()
>>>
>>> pipeline2 = ip.Pipeline(static_constructor, name="Pipeline2")
>>> processed2 = pipeline2.process([0,0], one=[1,1])

Attributes Summary

args

arguments in the order they are expected

blocks

unordered set of pipeline blocks

containers

container): the containers compatible for input arguments.

execution_order

topologically sorted edges of the pipeline

id

an unique id for this pipeline

n_args

number of arguments for the process function

noncomputable

unordered set of every variable that can’t be computed because it relies on void data

shapes

tuple): the shapes compatible for input arguments.

types

type): the types compatible for input arguments.

variables

all variables in the pipeline

void_vars

unordered set of variables which won’t have data assigned to them during runtime

Methods Summary

__call__(*pos_data[, fetch, skip_enforcement])

aliases self.process with same signature for more natural use

asblock(*fetches)

generates a block that runs this pipeline internally

assign_input_index(var, index)

reassigns the index for this variable in the process argument list

clear()

clears all edges in the graph and unloads the input nodes.

copy([name])

returns a copy of the Pipeline, but not a copy of the blocks

debug_serialization()

helper function to debug what part of a block is not serializable

deepcopy([name])

returns a copy of the Pipeline including copies of all its blocks

draw([show, ax])

from_bytes(raw_bytes[, passwd, checksum, name])

loads the pipeline from the given bytes

get_containers_for(var)

fetches the enforced containers for this variable of the pipeline.

get_predecessors(var)

fetches the names of the variables which must be computed/loaded before the given variable can be computed.

get_shapes_for(var)

fetches the enforced shapes for the given variable

get_successors(var)

fetches the names of the variables which depend on this variable before they can be computed

get_tasks()

generates a dictionary task represenation of the pipeline, which can be used to make other pipelines.

get_types_for(var)

fetches the enforced types for this variable of the pipeline.

load(filename[, passwd, checksum, name])

loads the pipeline from the given file

process(*pos_data[, fetch, skip_enforcement])

processes input data through the pipeline

process_and_grab(*pos_data, fetch[, …])

processes input data through the pipeline, but returns a tuple with the specified fetches

rename(name)

renames the Pipeline to the given name.

reset()

resets all edges in the graph, resets the inputs (and updates the dashboards)

save(filename[, passwd])

serializes and saves a copy of the pipeline to the given filename.

to_bytes([passwd])

serialized a copy of the pipeline, and returns the raw bytes.

update([tasks, predict_compatibility])

updates the pipeline’s graph with a dict of tasks

Attributes Documentation

args

arguments in the order they are expected

Type

list of str

blocks

unordered set of pipeline blocks

Type

set of Block

containers

container): the containers compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks

Type

(obj

Type

dict of str

execution_order

topologically sorted edges of the pipeline

Type

Generator

id

an unique id for this pipeline

This id is a combination of the pipeline’s non-unique name and part of it’s uuid (last 6 characters by default). The entropy of this id can be increased by increasing ImagePypelines UUID_ORDER variable

Type

str

n_args

number of arguments for the process function

Type

int

noncomputable

unordered set of every variable that can’t be computed because it relies on void data

Type

set

shapes

tuple): the shapes compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks

Type

(obj

Type

dict of str

types

type): the types compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks

Type

(obj

Type

dict of str

variables

all variables in the pipeline

Type

list of str

void_vars

unordered set of variables which won’t have data assigned to them during runtime

Type

set

Methods Documentation

__call__(*pos_data, fetch=None, skip_enforcement=False, **kwdata)[source]

aliases self.process with same signature for more natural use

asblock(*fetches)[source]

generates a block that runs this pipeline internally

Parameters

*fetches – variables to fetch from the pipeline in the order they should be outputed as

Returns

Block which will run the pipeline internally and

retrieve the data specified by fetches

Return type

Block

assign_input_index(var, index)[source]

reassigns the index for this variable in the process argument list

clear()[source]

clears all edges in the graph and unloads the input nodes. Does not update the remote dashboard

copy(name=None)[source]

returns a copy of the Pipeline, but not a copy of the blocks

debug_serialization()[source]

helper function to debug what part of a block is not serializable

deepcopy(name=None)[source]

returns a copy of the Pipeline including copies of all its blocks

draw(show=True, ax=None)[source]
static from_bytes(raw_bytes, passwd=None, checksum=None, name=None)[source]

loads the pipeline from the given bytes

Parameters
  • raw_bytes (bytes) – the encoded pipeline in bytes format

  • passwd (str) – password to decrypt the serialized pipeline with, defaults to None

  • checksum (str) – the sha256 checksum to check the bytes against

  • name (str) – new name for the pipeline. If left as None, then defaults to the old name of the pipeline

Returns

the loaded pipeline

Return type

Pipeline

Warning

Serialized data can be a security risk! For sensitive applications, use the checksum parameter. ImagePypelines can use this to ensure the data hasn’t been tampered with.

for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html

get_containers_for(var)[source]

fetches the enforced containers for this variable of the pipeline.

More specifically, these are the containers that won’t throw an error within the block

Parameters

var (str) – the name of the variable

Returns

the containers enforced for

Return type

(tuple of type)

the given variable

get_predecessors(var)[source]

fetches the names of the variables which must be computed/loaded before the given variable can be computed.

Parameters

var (str) – name of variable to find predecessors for

Returns

an unordered set of the variables that must be computed before

the given variable can be calculated.

Return type

set

get_shapes_for(var)[source]

fetches the enforced shapes for the given variable

More specifically, these are the shapes that won’t throw an error within the block

Parameters

var (str) – the name of the variable

Returns

the shapes enforced for

Return type

(tuple of tuple)

the given variable

get_successors(var)[source]

fetches the names of the variables which depend on this variable before they can be computed

Parameters

var (str) – name of variable to find successors for

Returns

an unordered set of the variables that can only be computed

once the given variable has been

Return type

set

get_tasks()[source]

generates a dictionary task represenation of the pipeline, which can be used to make other pipelines.

get_types_for(var)[source]

fetches the enforced types for this variable of the pipeline.

More specifically, these are the types that won’t throw an error within the block

Parameters

var (str) – the name of the variable

Returns

the types enforced for

Return type

(tuple of type)

the given variable

classmethod load(filename, passwd=None, checksum=None, name=None)[source]

loads the pipeline from the given file

Parameters
  • filename (str) – the filename to load the serialized pipeline from

  • passwd (str) – password to decrypt the serialized pipeline with, defaults to None

  • checksum (str) – the sha256 checksum to check the file against

  • name (str) – new name for the pipeline. If left as None, then defaults to the old name of the pipeline

Returns

the loaded pipeline

Return type

Pipeline

Warning

Serilized data can be a security risk! For sensitive applications, use the checksum parameter. ImagePypelines can use this to ensure the data hasn’t been tampered with.

for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html

process(*pos_data, fetch=None, skip_enforcement=False, **kwdata)[source]

processes input data through the pipeline

process first resets this pipeline, before loading input data into the graph and processing it.

Note

The argument list for the Pipeline can be found with Pipeline.args

Warning

MUST ADD FETCHES DOCUMENTATIONS

process_and_grab(*pos_data, fetch, skip_enforcement=False, **kwdata)[source]

processes input data through the pipeline, but returns a tuple with the specified fetches

process first resets this pipeline, before loading input data into the graph and processing it.

Note

The argument list for the Pipeline can be found with Pipeline.args

Warning

MUST ADD FETCHES DOCUMENTATIONS

rename(name)[source]

renames the Pipeline to the given name. The id is reset in this process

reset()[source]

resets all edges in the graph, resets the inputs (and updates the dashboards)

save(filename, passwd=None)[source]

serializes and saves a copy of the pipeline to the given filename. Pipeline can be optionally encrypted

Parameters
  • filename (str) – the filename to save the serialized pipeline to

  • passwd (str) – password to encrypt the serialized pipeline with if desired, defaults to None

Returns

the sha256 checksum for the saved file

Return type

str

to_bytes(passwd=None)[source]

serialized a copy of the pipeline, and returns the raw bytes. Can be optionally encrypted

Parameters

passwd (str) – password to encrypt the serialized pipeline with if desired, defaults to None

Returns

tuple containing:

bytes: the serialized and optionally encrypted pipeline str: the sha256 checksum for the raw bytes

Return type

(tuple)

update(tasks=None, predict_compatibility=True)[source]

updates the pipeline’s graph with a dict of tasks

update will modify and change many instance variables of the pipeline. Despite its generic name, it can only add tasks to the pipeline, not remove them. update is called internally to the pipeline during instantiation.

Parameters

tasks (dict) – dictionary of tasks to define this pipeline’s graph