Pipeline¶
-
class
imagepypelines.
Pipeline
(tasks=None, name=None)[source]¶ Bases:
object
processing algorithm manager for simple pipeline construction
< RYAN I NEED YOU HERE >
-
name
¶ user specified name for this pipeline, used to generate the unique id. defaults to “Pipeline” or the name of your subclass
- Type
-
logger
¶ Logger object for this pipeline
- Type
-
graph
¶ Multi-Edge directed task graph which stores all data and algorithmic order
- Type
networkx.MultiDiGraph
-
vars
¶ dict to track the block that generates the each variable, keys are variable names, values are a subdictionary containing ‘block_node_id’ and ‘block’
- Type
-
indexed_inputs(:obj:`list` of
obj:’str’): sorted list of indexed input variable names. (the indexed arguments for the process function)
-
keyword_inputs(:obj:`list` of
obj:’str’): alphabetically sorted list of unindexed input variable names (the keyword arguments for the process function)
-
_inputs
¶ dictionary internally used to access Input objects for queuing data into the pipeline
- Type
- Pipeline Graph Information:
- Nodes are dictionaries representing tasks. They contain:
‘block’ : block object for this task, ‘args’ : names of the task inputs for this block, ‘outputs’ : names of the task outputs produced by this block, ‘name’ : string name of the node for visualization purposes, ‘color’ : color of the node for visualization purposes, ‘shape’ : shape of the node for visualization purposes, ‘class_name’ : name of the class, frequently identical to the name ‘validation_time’: time required to validate incoming data if applicable ‘processing_time’ : time required to process the data using this node ‘avg_time_per_datum’: average processing time for each datum ‘num_in’ : number of datums coming into this node ‘n_batches’ : number of batches for this node ‘pid’ : process id for this node ‘status’: processing status. one of: (‘not started’, ‘processing’, ‘done’) <plus other attributes defined by the user in Block.get_default_node_attrs()>
- Pipeline edges are dictionaries containing the following:
‘var_name’ : name of the variable in task definition ‘out_index’ : output index from the source node, ‘in_index’ : input index for the target node, ‘name’ : name target block’s argument at the in_index ‘same_type_for_all_datums’ : whether or not this data is a homogenus container ‘data_stored_in’ : the type of the container used to house the data (list, ndarray, etc) ‘n_datums’ : number of items of data in this edge ‘datum_type’ : the type of data contained, this is only
guarenteed to be accurate is same_type_for_all_datums is True
‘node_a’ : source node uuid ‘node_b’ : target node uuid ‘data’ : data for this edge
Example
>>> import imagepypelines as ip >>> >>> @ip.blockify( kwargs=dict(value=10) ) >>> def add_val(a,b,value): >>> return a+value, b+value >>> >>> @ip.blockify( kwargs=dict(value=5) ) >>> def minus_val(a,b,value): >>> return a-value, b-value >>> >>> tasks = { >>> # inputs >>> 'zero' : ip.Input(0), >>> 'one' : ip.Input(1), >>> # operations >>> ('ten','eleven') : (add_val, 'zero', 'one'), >>> ('twenty','eleven2') : (add_val, 'ten', 'one'), >>> ('fifteen', 'six') : (minus_val, 'twenty', 'eleven'), >>> ('twentyfive','twentyone') : (add_val, 'fifteen','eleven2'), >>> ('negativefour', 'negativefive') : (minus_val, 'one', 'zero'), >>> } >>> >>> # pipeline1 - raw construction >>> pipeline1 = ip.Pipeline(tasks, 'Pipeline1') >>> # pipeline1.draw(show=True) >>> >>> processed1 = pipeline1.process([0,0], [1,1]) >>> # print(processed1) >>> >>> >>> # pipeline2 - construction from task dict >>> static_constructor = pipeline1.get_tasks() >>> >>> pipeline2 = ip.Pipeline(static_constructor, name="Pipeline2") >>> processed2 = pipeline2.process([0,0], one=[1,1])
Attributes Summary
arguments in the order they are expected
unordered set of pipeline blocks
container): the containers compatible for input arguments.
topologically sorted edges of the pipeline
an unique id for this pipeline
number of arguments for the process function
unordered set of every variable that can’t be computed because it relies on void data
tuple): the shapes compatible for input arguments.
type): the types compatible for input arguments.
all variables in the pipeline
unordered set of variables which won’t have data assigned to them during runtime
Methods Summary
__call__
(*pos_data[, fetch, skip_enforcement])aliases self.process with same signature for more natural use
asblock
(*fetches)generates a block that runs this pipeline internally
assign_input_index
(var, index)reassigns the index for this variable in the process argument list
clear
()clears all edges in the graph and unloads the input nodes.
copy
([name])returns a copy of the Pipeline, but not a copy of the blocks
helper function to debug what part of a block is not serializable
deepcopy
([name])returns a copy of the Pipeline including copies of all its blocks
draw
([show, ax])from_bytes
(raw_bytes[, passwd, checksum, name])loads the pipeline from the given bytes
get_containers_for
(var)fetches the enforced containers for this variable of the pipeline.
get_predecessors
(var)fetches the names of the variables which must be computed/loaded before the given variable can be computed.
get_shapes_for
(var)fetches the enforced shapes for the given variable
get_successors
(var)fetches the names of the variables which depend on this variable before they can be computed
generates a dictionary task represenation of the pipeline, which can be used to make other pipelines.
get_types_for
(var)fetches the enforced types for this variable of the pipeline.
load
(filename[, passwd, checksum, name])loads the pipeline from the given file
process
(*pos_data[, fetch, skip_enforcement])processes input data through the pipeline
process_and_grab
(*pos_data, fetch[, …])processes input data through the pipeline, but returns a tuple with the specified fetches
rename
(name)renames the Pipeline to the given name.
reset
()resets all edges in the graph, resets the inputs (and updates the dashboards)
save
(filename[, passwd])serializes and saves a copy of the pipeline to the given filename.
to_bytes
([passwd])serialized a copy of the pipeline, and returns the raw bytes.
update
([tasks, predict_compatibility])updates the pipeline’s graph with a dict of tasks
Attributes Documentation
-
containers
¶ container): the containers compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks
- Type
(obj
- Type
dict of str
-
execution_order
¶ topologically sorted edges of the pipeline
- Type
Generator
-
id
¶ an unique id for this pipeline
This id is a combination of the pipeline’s non-unique name and part of it’s uuid (last 6 characters by default). The entropy of this id can be increased by increasing ImagePypelines UUID_ORDER variable
- Type
-
noncomputable
¶ unordered set of every variable that can’t be computed because it relies on void data
- Type
-
shapes
¶ tuple): the shapes compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks
- Type
(obj
- Type
dict of str
-
types
¶ type): the types compatible for input arguments. This is computed dynamically so it will automatically reflect changes to Blocks
- Type
(obj
- Type
dict of str
-
void_vars
¶ unordered set of variables which won’t have data assigned to them during runtime
- Type
Methods Documentation
-
__call__
(*pos_data, fetch=None, skip_enforcement=False, **kwdata)[source]¶ aliases self.process with same signature for more natural use
-
asblock
(*fetches)[source]¶ generates a block that runs this pipeline internally
- Parameters
*fetches – variables to fetch from the pipeline in the order they should be outputed as
- Returns
- Block which will run the pipeline internally and
retrieve the data specified by fetches
- Return type
-
assign_input_index
(var, index)[source]¶ reassigns the index for this variable in the process argument list
-
clear
()[source]¶ clears all edges in the graph and unloads the input nodes. Does not update the remote dashboard
-
static
from_bytes
(raw_bytes, passwd=None, checksum=None, name=None)[source]¶ loads the pipeline from the given bytes
- Parameters
raw_bytes (bytes) – the encoded pipeline in bytes format
passwd (str) – password to decrypt the serialized pipeline with, defaults to None
checksum (str) – the sha256 checksum to check the bytes against
name (str) – new name for the pipeline. If left as None, then defaults to the old name of the pipeline
- Returns
the loaded pipeline
- Return type
Warning
Serialized data can be a security risk! For sensitive applications, use the checksum parameter. ImagePypelines can use this to ensure the data hasn’t been tampered with.
for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html
-
get_containers_for
(var)[source]¶ fetches the enforced containers for this variable of the pipeline.
More specifically, these are the containers that won’t throw an error within the block
- Parameters
var (str) – the name of the variable
- Returns
the containers enforced for
- Return type
the given variable
-
get_predecessors
(var)[source]¶ fetches the names of the variables which must be computed/loaded before the given variable can be computed.
-
get_shapes_for
(var)[source]¶ fetches the enforced shapes for the given variable
More specifically, these are the shapes that won’t throw an error within the block
- Parameters
var (str) – the name of the variable
- Returns
the shapes enforced for
- Return type
the given variable
-
get_successors
(var)[source]¶ fetches the names of the variables which depend on this variable before they can be computed
-
get_tasks
()[source]¶ generates a dictionary task represenation of the pipeline, which can be used to make other pipelines.
-
get_types_for
(var)[source]¶ fetches the enforced types for this variable of the pipeline.
More specifically, these are the types that won’t throw an error within the block
- Parameters
var (str) – the name of the variable
- Returns
the types enforced for
- Return type
the given variable
-
classmethod
load
(filename, passwd=None, checksum=None, name=None)[source]¶ loads the pipeline from the given file
- Parameters
filename (str) – the filename to load the serialized pipeline from
passwd (str) – password to decrypt the serialized pipeline with, defaults to None
checksum (str) – the sha256 checksum to check the file against
name (str) – new name for the pipeline. If left as None, then defaults to the old name of the pipeline
- Returns
the loaded pipeline
- Return type
Warning
Serilized data can be a security risk! For sensitive applications, use the checksum parameter. ImagePypelines can use this to ensure the data hasn’t been tampered with.
for more information about serialization security, see: https://docs.python.org/3.8/library/pickle.html
-
process
(*pos_data, fetch=None, skip_enforcement=False, **kwdata)[source]¶ processes input data through the pipeline
process first resets this pipeline, before loading input data into the graph and processing it.
Note
The argument list for the Pipeline can be found with Pipeline.args
Warning
MUST ADD FETCHES DOCUMENTATIONS
-
process_and_grab
(*pos_data, fetch, skip_enforcement=False, **kwdata)[source]¶ processes input data through the pipeline, but returns a tuple with the specified fetches
process first resets this pipeline, before loading input data into the graph and processing it.
Note
The argument list for the Pipeline can be found with Pipeline.args
Warning
MUST ADD FETCHES DOCUMENTATIONS
-
save
(filename, passwd=None)[source]¶ serializes and saves a copy of the pipeline to the given filename. Pipeline can be optionally encrypted
-
to_bytes
(passwd=None)[source]¶ serialized a copy of the pipeline, and returns the raw bytes. Can be optionally encrypted
-
update
(tasks=None, predict_compatibility=True)[source]¶ updates the pipeline’s graph with a dict of tasks
update will modify and change many instance variables of the pipeline. Despite its generic name, it can only add tasks to the pipeline, not remove them. update is called internally to the pipeline during instantiation.
- Parameters
tasks (dict) – dictionary of tasks to define this pipeline’s graph
-