Prefect Local Dask Cluster - Example#

[1]:
import dask.dataframe as dd
import pandas as pd
import tempfile
from sklearn.datasets import load_iris
import seaborn as sns

import dask
from dask.distributed import Client
from dask.distributed import get_client
from prefect.executors import DaskExecutor
from prefect import Flow, task

from dask.diagnostics import ProgressBar
import dask.array as da
import os

import skimage
from skimage import data, io, filters
import glob
import shutil
import sys
from devtools import debug

Make Graphviz available#

Get the path of your current environment

[2]:
os.environ['PATH'] = f"/home/user2135/.conda/envs/datascience/bin/:{os.environ['PATH']}"
[3]:
shutil.which('dot')
[3]:
'/home/jillian/.conda/envs/sphinx/bin/dot'
[4]:
pbar = ProgressBar()
pbar.register() # global registration
[ ]:

[5]:
from dask.distributed import Client
client = Client()
client
[5]:

Client

Client-fd2e409c-70b5-11ed-b60c-0e4ec89a5867

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

[6]:
client.dashboard_link
[6]:
'http://127.0.0.1:8787/status'

Access the Local Dask Cluster Dashboard#

From the Client widget grab the dashboard port. You’ll need to proxy it as:

your-jhub-address/user/{YOUR_USER}/proxy/{DASHBOARD_PORT}/status

[ ]:

[7]:
executor = DaskExecutor(address=client.scheduler.address)
executor
[7]:
<Executor: DaskExecutor>
[8]:
# Loading built-in Datasets:
iris = sns.load_dataset("iris")
iris.head()
[8]:
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
3 4.6 3.1 1.5 0.2 setosa
4 5.0 3.6 1.4 0.2 setosa
[9]:
iris_dd = dd.from_pandas(iris, npartitions=1)
iris_dd
[9]:
Dask DataFrame Structure:
sepal_length sepal_width petal_length petal_width species
npartitions=1
0 float64 float64 float64 float64 object
149 ... ... ... ... ...
Dask Name: from_pandas, 1 graph layer
[10]:
#! wget https://cellprofiler-examples.s3.amazonaws.com/ExampleHuman.zip
#! unzip ExampleHuman.zip
[11]:
tifs = glob.glob('ExampleHuman/images/*.tif')
tifs
[11]:
['ExampleHuman/images/AS_09125_050116030001_D03f00d0.tif',
 'ExampleHuman/images/AS_09125_050116030001_D03f00d1.tif',
 'ExampleHuman/images/AS_09125_050116030001_D03f00d2.tif']
[12]:
image = skimage.io.imread(tifs[0])
image
[12]:
array([[ 8,  8,  8, ..., 63, 78, 75],
       [ 8,  8,  7, ..., 67, 71, 71],
       [ 9,  8,  8, ..., 53, 64, 66],
       ...,
       [ 8,  9,  8, ..., 17, 24, 59],
       [ 8,  8,  8, ..., 17, 22, 55],
       [ 8,  8,  8, ..., 16, 18, 38]], dtype=uint8)
[13]:
da.from_array(image)
[13]:
Array Chunk
Bytes 256.00 kiB 256.00 kiB
Shape (512, 512) (512, 512)
Dask graph 1 chunks in 1 graph layer
Data type uint8 numpy.ndarray
512 512

Create a Prefect Flow#

Let’s make a prefect flow, run it, visualize it in prefect, and then also visualize it in the dask status board.

There is absolutely no reason why you would run this particular flow. It’s here for demonstration purposes.

[14]:
@task
def get_mean_per_row(image):
    return image.mean(axis=1).compute()[0]

@task
def get_max_per_row(image):
    return image.max(axis=1).compute()[0]

@task
def get_min_per_row(image):
    return image.min(axis=1).compute()[0]

@task
def convert_to_dask_array(image):
    return da.from_array(image)

@task
def read_images(image_file):
    return skimage.io.imread(image_file)

with Flow('read_images_flow') as flow:
    images = read_images.map(image_file=tifs)
    images_da = convert_to_dask_array.map(image=images)
    images_min = get_min_per_row.map(image=images_da)
    images_max = get_max_per_row.map(image=images_da)
    images_mean = get_mean_per_row.map(image=images_da)
[15]:
flow.visualize()
[15]:
../../_images/workflow-managers_prefect_prefect-local-dask-cluster_20_0.svg
[16]:
flow_state = flow.run(executor=executor)
flow_state
[2022-11-30 13:50:51+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'read_images_flow'
[2022-11-30 13:50:51+0000] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://127.0.0.1:36981
/home/jillian/.conda/envs/datascience/lib/python3.8/site-packages/distributed/scheduler.py:5543: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
  warnings.warn(
[2022-11-30 13:50:55+0000] INFO - prefect.TaskRunner | Task 'read_images': Starting task run...
[2022-11-30 13:50:55+0000] INFO - prefect.TaskRunner | Task 'read_images': Finished task run for task with final state: 'Mapped'
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array': Starting task run...
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array': Finished task run for task with final state: 'Mapped'
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row': Starting task run...
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row': Finished task run for task with final state: 'Mapped'
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row': Starting task run...
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row': Finished task run for task with final state: 'Mapped'
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row': Starting task run...
[2022-11-30 13:50:56+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row': Finished task run for task with final state: 'Mapped'
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[0]': Starting task run...
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[0]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[2]': Starting task run...
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[1]': Starting task run...
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[2]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:57+0000] INFO - prefect.TaskRunner | Task 'read_images[1]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[0]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[0]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[2]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[1]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[2]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[1]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[0]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[0]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[1]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[2]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[1]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[2]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[1]': Starting task run...
[2022-11-30 13:50:58+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[0]': Starting task run...
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[2]': Starting task run...
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[1]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[2]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[1]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[1]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[2]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[0]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[0]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[2]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[0]': Finished task run for task with final state: 'Success'
[2022-11-30 13:50:59+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
/home/jillian/.conda/envs/datascience/lib/python3.8/site-packages/prefect/executors/dask.py:313: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
  scheduler_comm.close_rpc()
[16]:
<Success: "All reference tasks succeeded.">

Getting the Prefect Results#

If you run your functions in a prefect flow you need to interact with the flow_state object in order to get your results.

Interacting with Results

Getting the Prefect Results for objects in memory#

If you are calling prefect from a notebook or script it is likely that your objects are still in memory. You can reference the results by use the flow_state.result[my_object].

[17]:
images_da.result
[18]:
flow_state.result
[18]:
{<Task: read_images>: <Mapped: "Ready to proceed with mapping.">,
 <Task: Constant[list]>: <Success>,
 <Task: convert_to_dask_array>: <Mapped: "Ready to proceed with mapping.">,
 <Task: get_min_per_row>: <Mapped: "Ready to proceed with mapping.">,
 <Task: get_mean_per_row>: <Mapped: "Ready to proceed with mapping.">,
 <Task: get_max_per_row>: <Mapped: "Ready to proceed with mapping.">}
[19]:
flow_state.result[images_da]._result
[19]:
<Result: [dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>, dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>, dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>]>
[20]:
help(flow_state.result[images_da]._result)
Help on Result in module prefect.engine.result.base object:

class Result(builtins.object)
 |  Result(value: Any = None, location: str = None, serializer: prefect.engine.serializers.Serializer = None)
 |
 |  A representation of the result of a Prefect task; this class contains
 |  information about the value of a task's result, a result handler specifying
 |  how to serialize or store this value securely, and a `safe_value` attribute
 |  which holds information about the current "safe" representation of this
 |  result.
 |
 |  Args:
 |      - value (Any, optional): the value of the result
 |      - location (Union[str, Callable], optional): Possibly templated location
 |          to be used for saving the result to the destination. If a callable
 |          function is provided, it should have signature `callable(**kwargs) ->
 |          str` and at write time all formatting kwargs will be passed and a fully
 |          formatted location is expected as the return value.  Can be used for
 |          string formatting logic that `.format(**kwargs)` doesn't support
 |      - serializer (Serializer): a serializer that can transform Python
 |          objects to bytes and recover them. The serializer is used whenever the
 |          `Result` is writing to or reading from storage. Defaults to
 |          `PickleSerializer`.
 |
 |  Methods defined here:
 |
 |  __eq__(self, other: Any) -> bool
 |      Return self==value.
 |
 |  __init__(self, value: Any = None, location: str = None, serializer: prefect.engine.serializers.Serializer = None)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |
 |  __repr__(self) -> str
 |      Return repr(self).
 |
 |  copy(self) -> 'Result'
 |      Return a copy of the current result object.
 |
 |  exists(self, location: str, **kwargs: Any) -> bool
 |      Checks whether the target result exists.
 |
 |      Args:
 |          - location (str, optional): Location of the result in the specific result target.
 |              If provided, will check whether the provided location exists;
 |              otherwise, will use `self.location`
 |          - **kwargs (Any): string format arguments for `location`
 |
 |      Returns:
 |          - bool: whether or not the target result exists.
 |
 |  format(self, **kwargs: Any) -> 'Result'
 |      Takes a set of string format key-value pairs and renders the result.location to a final
 |      location string
 |
 |      Args:
 |          - **kwargs (Any): string format arguments for result.location
 |
 |      Returns:
 |          - Result: a new result instance with the appropriately formatted location
 |
 |  from_value(self, value: Any) -> 'Result'
 |      Create a new copy of the result object with the provided value.
 |
 |      Args:
 |          - value (Any): the value to use
 |
 |      Returns:
 |          - Result: a new Result instance with the given value
 |
 |  read(self, location: str) -> 'Result'
 |      Reads from the target result and returns a corresponding `Result` instance.
 |
 |      Args:
 |          - location (str): Location of the result in the specific result target.
 |
 |      Returns:
 |          - Any: The value saved to the result.
 |
 |  write(self, value_: Any, **kwargs: Any) -> 'Result'
 |      Serialize and write the result to the target location.
 |
 |      Args:
 |          - value_ (Any): the value to write; will then be stored as the `value` attribute
 |              of the returned `Result` instance
 |          - **kwargs (optional): if provided, will be used to format the location template
 |              to determine the location to write to
 |
 |      Returns:
 |          - Result: a new result object with the appropriately formatted location destination
 |
 |  ----------------------------------------------------------------------
 |  Readonly properties defined here:
 |
 |  default_location
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |
 |  __dict__
 |      dictionary for instance variables (if defined)
 |
 |  __weakref__
 |      list of weak references to the object (if defined)
 |
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |
 |  __hash__ = None
 |
 |  __slotnames__ = []

[21]:
flow_state.result[images_da]._result.value
[21]:
[dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>,
 dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>,
 dask.array<array, shape=(512, 512), dtype=uint8, chunksize=(512, 512), chunktype=numpy.ndarray>]

Getting Results from Prefect Flow State#

If your objects are no longer in memory you will need to do a bit more digging.

Note You probably don’t really want to be doing this, but instead persisting your results to a database/parquet file/etc.

[22]:
flow_state.result.keys()
[22]:
dict_keys([<Task: read_images>, <Task: Constant[list]>, <Task: convert_to_dask_array>, <Task: get_min_per_row>, <Task: get_mean_per_row>, <Task: get_max_per_row>])
[23]:
results = {}
for key in flow_state.result.keys():
    print(key)
    results[str(key)] = flow_state.result[key]._result

results.keys()
<Task: read_images>
<Task: Constant[list]>
<Task: convert_to_dask_array>
<Task: get_min_per_row>
<Task: get_mean_per_row>
<Task: get_max_per_row>
[23]:
dict_keys(['<Task: read_images>', '<Task: Constant[list]>', '<Task: convert_to_dask_array>', '<Task: get_min_per_row>', '<Task: get_mean_per_row>', '<Task: get_max_per_row>'])
[24]:
results['<Task: get_max_per_row>']
[24]:
<Result: [82, 12, 178]>
[ ]: