{ "cells": [ { "cell_type": "markdown", "id": "8de55989-b274-450b-b467-c848ebe47bee", "metadata": {}, "source": [ "## Prefect Local Dask Cluster - Example" ] }, { "cell_type": "code", "execution_count": 1, "id": "0b9e9f40-ac32-4a46-92cf-63b562bd1777", "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "import pandas as pd\n", "import tempfile\n", "from sklearn.datasets import load_iris\n", "import seaborn as sns\n", "\n", "import dask\n", "from dask.distributed import Client\n", "from dask.distributed import get_client\n", "from prefect.executors import DaskExecutor\n", "from prefect import Flow, task\n", "\n", "from dask.diagnostics import ProgressBar\n", "import dask.array as da\n", "import os\n", "\n", "import skimage\n", "from skimage import data, io, filters\n", "import glob\n", "import shutil\n", "import sys\n", "from devtools import debug" ] }, { "cell_type": "markdown", "id": "04910a31-1950-4715-b132-d13d225d34a5", "metadata": {}, "source": [ "### Make Graphviz available\n", "\n", "Get the path of your current environment" ] }, { "cell_type": "code", "execution_count": 2, "id": "14dacc8e-3f8a-4588-aded-e5f6e244bf69", "metadata": {}, "outputs": [], "source": [ "os.environ['PATH'] = f\"/home/user2135/.conda/envs/datascience/bin/:{os.environ['PATH']}\"" ] }, { "cell_type": "code", "execution_count": 3, "id": "bc2cd7f6-a95c-4509-859e-acf9d1c56929", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'/home/user2135/.conda/envs/datascience/bin/dot'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "shutil.which('dot')" ] }, { "cell_type": "code", "execution_count": 4, "id": "f7856156-2635-4a1c-8524-c6ff67409393", "metadata": {}, "outputs": [], "source": [ "pbar = ProgressBar() \n", "pbar.register() # global registration" ] }, { "cell_type": "code", "execution_count": null, "id": "d3270bc1-4c76-4914-a68f-5832f8b0cf26", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 5, "id": "23bb9ed7-f53e-4494-8021-d79e7e3f406b", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-fb530e1c-f34d-11ec-ba63-02cda6d9e347

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

fc533d34

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 1\n", "
\n", " Total threads: 1\n", " \n", " Total memory: 7.71 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-b9f2c099-6335-44e6-87c4-28088ead4c93

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:39365\n", " \n", " Workers: 1\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 1\n", "
\n", " Started: Just now\n", " \n", " Total memory: 7.71 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:34241\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:33073/status\n", " \n", " Memory: 7.71 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:37611\n", "
\n", " Local directory: /fastscratch/dask-worker-space/worker-g5afbpis\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "client = Client()\n", "client" ] }, { "cell_type": "code", "execution_count": 6, "id": "15ff36e6-574c-4261-b41a-469610542e78", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'http://127.0.0.1:8787/status'" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.dashboard_link" ] }, { "cell_type": "markdown", "id": "9c3189fb-c442-47e3-b15e-de08a019a3df", "metadata": {}, "source": [ "### Access the Local Dask Cluster Dashboard\n", "\n", "From the Client widget grab the dashboard port. You'll need to proxy it as:\n", "\n", "**your-jhub-address/user/{YOUR_USER}/proxy/{DASHBOARD_PORT}/status**" ] }, { "cell_type": "code", "execution_count": null, "id": "c60c50fb-17c4-4c9d-90ec-a71ec590c513", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 7, "id": "26f580c9-7c52-4e2e-a157-fe7f978cac28", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "executor = DaskExecutor(address=client.scheduler.address)\n", "executor" ] }, { "cell_type": "code", "execution_count": 8, "id": "87151e20-278d-4347-b573-5f5201f2dde3", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
sepal_lengthsepal_widthpetal_lengthpetal_widthspecies
05.13.51.40.2setosa
14.93.01.40.2setosa
24.73.21.30.2setosa
34.63.11.50.2setosa
45.03.61.40.2setosa
\n", "
" ], "text/plain": [ " sepal_length sepal_width petal_length petal_width species\n", "0 5.1 3.5 1.4 0.2 setosa\n", "1 4.9 3.0 1.4 0.2 setosa\n", "2 4.7 3.2 1.3 0.2 setosa\n", "3 4.6 3.1 1.5 0.2 setosa\n", "4 5.0 3.6 1.4 0.2 setosa" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Loading built-in Datasets:\n", "iris = sns.load_dataset(\"iris\")\n", "iris.head()" ] }, { "cell_type": "code", "execution_count": 9, "id": "5f6907f9-5149-4502-ab69-69eb530ca0b8", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
sepal_lengthsepal_widthpetal_lengthpetal_widthspecies
npartitions=1
0float64float64float64float64object
149...............
\n", "
\n", "
Dask Name: from_pandas, 1 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " sepal_length sepal_width petal_length petal_width species\n", "npartitions=1 \n", "0 float64 float64 float64 float64 object\n", "149 ... ... ... ... ...\n", "Dask Name: from_pandas, 1 tasks" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "iris_dd = dd.from_pandas(iris, npartitions=1)\n", "iris_dd" ] }, { "cell_type": "code", "execution_count": 10, "id": "882947e1-e47a-40d4-94bb-5dcf43df0d77", "metadata": {}, "outputs": [], "source": [ "#! wget https://cellprofiler-examples.s3.amazonaws.com/ExampleHuman.zip\n", "#! unzip ExampleHuman.zip" ] }, { "cell_type": "code", "execution_count": 11, "id": "7c2438ba-7481-4753-b557-98bde9c112c1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['ExampleHuman/images/AS_09125_050116030001_D03f00d1.tif',\n", " 'ExampleHuman/images/AS_09125_050116030001_D03f00d0.tif',\n", " 'ExampleHuman/images/AS_09125_050116030001_D03f00d2.tif']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tifs = glob.glob('ExampleHuman/images/*.tif')\n", "tifs" ] }, { "cell_type": "code", "execution_count": 12, "id": "a5eb3eb1-1f99-4443-890c-00860cfecaf4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[10, 10, 10, ..., 11, 9, 9],\n", " [11, 11, 9, ..., 10, 9, 10],\n", " [10, 9, 11, ..., 10, 9, 10],\n", " ...,\n", " [ 9, 9, 9, ..., 9, 9, 9],\n", " [ 9, 9, 10, ..., 10, 9, 9],\n", " [10, 10, 9, ..., 8, 9, 10]], dtype=uint8)" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "image = skimage.io.imread(tifs[0])\n", "image" ] }, { "cell_type": "code", "execution_count": 13, "id": "365d3401-7239-4548-98d0-4661a9b0d52b", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 256.00 kiB 256.00 kiB
Shape (512, 512) (512, 512)
Count 1 Tasks 1 Chunks
Type uint8 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 512\n", " 512\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "da.from_array(image)" ] }, { "cell_type": "markdown", "id": "3e0dd366-3555-421c-a6a2-1e6ab0d5ebe8", "metadata": {}, "source": [ "### Create a Prefect Flow\n", "\n", "Let's make a prefect flow, run it, visualize it in prefect, and then also visualize it in the dask status board.\n", "\n", "*There is absolutely no reason why you would run this particular flow. It's here for demonstration purposes.*" ] }, { "cell_type": "code", "execution_count": 14, "id": "85ac5dc8-70d9-424b-b415-d908ce2ef016", "metadata": {}, "outputs": [], "source": [ "@task\n", "def get_mean_per_row(image):\n", " return image.mean(axis=1).compute()[0]\n", "\n", "@task\n", "def get_max_per_row(image):\n", " return image.max(axis=1).compute()[0]\n", " \n", "@task\n", "def get_min_per_row(image):\n", " return image.min(axis=1).compute()[0]\n", "\n", "@task\n", "def convert_to_dask_array(image):\n", " return da.from_array(image)\n", "\n", "@task\n", "def read_images(image_file):\n", " return skimage.io.imread(image_file)\n", "\n", "with Flow('read_images_flow') as flow:\n", " images = read_images.map(image_file=tifs)\n", " images_da = convert_to_dask_array.map(image=images)\n", " images_min = get_min_per_row.map(image=images_da)\n", " images_max = get_max_per_row.map(image=images_da)\n", " images_mean = get_mean_per_row.map(image=images_da)" ] }, { "cell_type": "code", "execution_count": 15, "id": "734fabe6-31db-4d39-860b-f88b48dfdbbe", "metadata": {}, "outputs": [ { "data": { "image/svg+xml": [ "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "22559310596224\n", "\n", "Constant[list]\n", "\n", "\n", "\n", "22559310597328\n", "\n", "read_images <map>\n", "\n", "\n", "\n", "22559310596224->22559310597328\n", "\n", "\n", "image_file\n", "\n", "\n", "\n", "22559310596032\n", "\n", "get_min_per_row <map>\n", "\n", "\n", "\n", "22559310695680\n", "\n", "get_mean_per_row <map>\n", "\n", "\n", "\n", "22559310597040\n", "\n", "convert_to_dask_array <map>\n", "\n", "\n", "\n", "22559310597328->22559310597040\n", "\n", "\n", "image\n", "\n", "\n", "\n", "22559310695248\n", "\n", "get_max_per_row <map>\n", "\n", "\n", "\n", "22559310597040->22559310596032\n", "\n", "\n", "image\n", "\n", "\n", "\n", "22559310597040->22559310695680\n", "\n", "\n", "image\n", "\n", "\n", "\n", "22559310597040->22559310695248\n", "\n", "\n", "image\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow.visualize()" ] }, { "cell_type": "code", "execution_count": 16, "id": "152da439-a2ec-4f37-a2b1-648218e03ff6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[2022-06-23 23:41:23+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'read_images_flow'\n", "[2022-06-23 23:41:23+0000] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://127.0.0.1:39365\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/user2135/.conda/envs/datascience/lib/python3.8/site-packages/distributed/scheduler.py:4823: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.\n", " warnings.warn(\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images': Starting task run...\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images': Finished task run for task with final state: 'Mapped'\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[0]': Starting task run...\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[0]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[1]': Starting task run...\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[1]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[2]': Starting task run...\n", "[2022-06-23 23:41:28+0000] INFO - prefect.TaskRunner | Task 'read_images[2]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array': Starting task run...\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array': Finished task run for task with final state: 'Mapped'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[0]': Starting task run...\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[0]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[1]': Starting task run...\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[1]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[2]': Starting task run...\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'convert_to_dask_array[2]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row': Starting task run...\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row': Finished task run for task with final state: 'Mapped'\n", "[2022-06-23 23:41:29+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[0]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[1]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row': Finished task run for task with final state: 'Mapped'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[2]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[1]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[1]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[0]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[2]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[0]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_min_per_row[2]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[1]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row': Finished task run for task with final state: 'Mapped'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[2]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_mean_per_row[0]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[0]': Starting task run...\n", "[2022-06-23 23:41:30+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[1]': Starting task run...\n", "[2022-06-23 23:41:31+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[2]': Starting task run...\n", "[2022-06-23 23:41:31+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[1]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:31+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[2]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:31+0000] INFO - prefect.TaskRunner | Task 'get_max_per_row[0]': Finished task run for task with final state: 'Success'\n", "[2022-06-23 23:41:31+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/user2135/.conda/envs/datascience/lib/python3.8/site-packages/prefect/executors/dask.py:313: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited\n", " scheduler_comm.close_rpc()\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow_state = flow.run(executor=executor)\n", "flow_state" ] }, { "cell_type": "markdown", "id": "97632bca-0553-4020-9eee-67b46ffeb881", "metadata": {}, "source": [ "### Getting the Prefect Results\n", "\n", "If you run your functions in a prefect flow you need to interact with the `flow_state` object in order to get your results.\n", "\n", "[Interacting with Results](https://docs.prefect.io/core/concepts/results.html#interacting-with-task-result-objects)" ] }, { "cell_type": "markdown", "id": "24962993-7c25-4fc5-a932-4f1bd4a4644d", "metadata": {}, "source": [ "### Getting the Prefect Results for objects in memory\n", "\n", "If you are calling prefect from a notebook or script it is likely that your objects are still in memory. You can reference the results by use the `flow_state.result[my_object]`." ] }, { "cell_type": "code", "execution_count": 17, "id": "0826773d-16df-4ffa-8a11-4b2da1b94aa9", "metadata": {}, "outputs": [], "source": [ "images_da.result" ] }, { "cell_type": "code", "execution_count": 18, "id": "8533fc60-6c10-4cdb-bafc-f0439089bb6d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{: ,\n", " : ,\n", " : ,\n", " : ,\n", " : ,\n", " : }" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow_state.result" ] }, { "cell_type": "code", "execution_count": 19, "id": "548ec550-699d-4766-94d5-cd576a798272", "metadata": {}, "outputs": [ { "data": { "text/plain": [ ", dask.array, dask.array]>" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow_state.result[images_da]._result" ] }, { "cell_type": "code", "execution_count": 20, "id": "3434da06-0486-4afe-91df-00a08466237c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on Result in module prefect.engine.result.base object:\n", "\n", "class Result(builtins.object)\n", " | Result(value: Any = None, location: str = None, serializer: prefect.engine.serializers.Serializer = None)\n", " | \n", " | A representation of the result of a Prefect task; this class contains\n", " | information about the value of a task's result, a result handler specifying\n", " | how to serialize or store this value securely, and a `safe_value` attribute\n", " | which holds information about the current \"safe\" representation of this\n", " | result.\n", " | \n", " | Args:\n", " | - value (Any, optional): the value of the result\n", " | - location (Union[str, Callable], optional): Possibly templated location\n", " | to be used for saving the result to the destination. If a callable\n", " | function is provided, it should have signature `callable(**kwargs) ->\n", " | str` and at write time all formatting kwargs will be passed and a fully\n", " | formatted location is expected as the return value. Can be used for\n", " | string formatting logic that `.format(**kwargs)` doesn't support\n", " | - serializer (Serializer): a serializer that can transform Python\n", " | objects to bytes and recover them. The serializer is used whenever the\n", " | `Result` is writing to or reading from storage. Defaults to\n", " | `PickleSerializer`.\n", " | \n", " | Methods defined here:\n", " | \n", " | __eq__(self, other: Any) -> bool\n", " | Return self==value.\n", " | \n", " | __init__(self, value: Any = None, location: str = None, serializer: prefect.engine.serializers.Serializer = None)\n", " | Initialize self. See help(type(self)) for accurate signature.\n", " | \n", " | __repr__(self) -> str\n", " | Return repr(self).\n", " | \n", " | copy(self) -> 'Result'\n", " | Return a copy of the current result object.\n", " | \n", " | exists(self, location: str, **kwargs: Any) -> bool\n", " | Checks whether the target result exists.\n", " | \n", " | Args:\n", " | - location (str, optional): Location of the result in the specific result target.\n", " | If provided, will check whether the provided location exists;\n", " | otherwise, will use `self.location`\n", " | - **kwargs (Any): string format arguments for `location`\n", " | \n", " | Returns:\n", " | - bool: whether or not the target result exists.\n", " | \n", " | format(self, **kwargs: Any) -> 'Result'\n", " | Takes a set of string format key-value pairs and renders the result.location to a final\n", " | location string\n", " | \n", " | Args:\n", " | - **kwargs (Any): string format arguments for result.location\n", " | \n", " | Returns:\n", " | - Result: a new result instance with the appropriately formatted location\n", " | \n", " | from_value(self, value: Any) -> 'Result'\n", " | Create a new copy of the result object with the provided value.\n", " | \n", " | Args:\n", " | - value (Any): the value to use\n", " | \n", " | Returns:\n", " | - Result: a new Result instance with the given value\n", " | \n", " | read(self, location: str) -> 'Result'\n", " | Reads from the target result and returns a corresponding `Result` instance.\n", " | \n", " | Args:\n", " | - location (str): Location of the result in the specific result target.\n", " | \n", " | Returns:\n", " | - Any: The value saved to the result.\n", " | \n", " | write(self, value_: Any, **kwargs: Any) -> 'Result'\n", " | Serialize and write the result to the target location.\n", " | \n", " | Args:\n", " | - value_ (Any): the value to write; will then be stored as the `value` attribute\n", " | of the returned `Result` instance\n", " | - **kwargs (optional): if provided, will be used to format the location template\n", " | to determine the location to write to\n", " | \n", " | Returns:\n", " | - Result: a new result object with the appropriately formatted location destination\n", " | \n", " | ----------------------------------------------------------------------\n", " | Readonly properties defined here:\n", " | \n", " | default_location\n", " | \n", " | ----------------------------------------------------------------------\n", " | Data descriptors defined here:\n", " | \n", " | __dict__\n", " | dictionary for instance variables (if defined)\n", " | \n", " | __weakref__\n", " | list of weak references to the object (if defined)\n", " | \n", " | ----------------------------------------------------------------------\n", " | Data and other attributes defined here:\n", " | \n", " | __hash__ = None\n", " | \n", " | __slotnames__ = []\n", "\n" ] } ], "source": [ "help(flow_state.result[images_da]._result)" ] }, { "cell_type": "code", "execution_count": 21, "id": "7ea46dfc-2488-47d2-af96-0df555fe2978", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[dask.array,\n", " dask.array,\n", " dask.array]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow_state.result[images_da]._result.value" ] }, { "cell_type": "markdown", "id": "eea74c70-f75e-4770-b3b2-60684f9264fa", "metadata": {}, "source": [ "### Getting Results from Prefect Flow State\n", "\n", "If your objects are no longer in memory you will need to do a bit more digging.\n", "\n", "*Note* You probably don't really want to be doing this, but instead persisting your results to a database/parquet file/etc." ] }, { "cell_type": "code", "execution_count": 22, "id": "8bea24ba-0a0c-4e5e-beca-fd8b0a2cdc31", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "dict_keys([, , , , , ])" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flow_state.result.keys()" ] }, { "cell_type": "code", "execution_count": 23, "id": "d1883b28-f9dd-414e-b74c-a25aab50c5c0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "\n", "\n", "\n", "\n", "\n" ] }, { "data": { "text/plain": [ "dict_keys(['', '', '', '', '', ''])" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results = {}\n", "for key in flow_state.result.keys():\n", " print(key)\n", " results[str(key)] = flow_state.result[key]._result\n", " \n", "results.keys()" ] }, { "cell_type": "code", "execution_count": 24, "id": "34c89e71-c0c6-4356-90e7-cdabe82b5b4c", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results['']" ] }, { "cell_type": "code", "execution_count": null, "id": "506d3248-4512-4d05-a4d7-1b675d2f7012", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python (datascience)", "language": "python", "name": "datascience" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 5 }