Using Dagster Pipes, part 1: Define a Dagster asset that invokes a subprocess#
This is part one of the Using Dagster Pipes tutorial. If you are looking for how to modify your existing code that is already being orchestrated by Dagster, you can jump to Part 2: Modify external code.
In this part of the tutorial, you'll create a Dagster asset that, in its execution function, opens a Dagster pipes session and invokes a subprocess that executes some external code.
Before getting started, make sure you have fulfilled all the prerequisites for the tutorial. You should have a standalone Python script named external_code.py which looks like the following:
import pandas as pd
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)print(f"processing total {total_orders} orders")if __name__ =="__main__":
main()
Provided AssetExecutionContext as the context argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
Specified a resource for the asset to use, PipesSubprocessClient. We’ll also come back to this in a little bit.
Declared a command list cmd to run the external script. In the list:
First, found the path to the Python executable on the system using shutil.which("python").
Then, provided the file path to the file that we want to execute. In this case, it’s the external_code.py file that you created earlier.
Step 1.2: Invoke the external code from the asset#
Then, invoke a subprocess that executes the external code from the asset using the pipes_subprocess_client resource:
The PipesSubprocessClient resource used by the asset exposes a run method.
When the asset is executed, this method will synchronously execute the subprocess in in a pipes session, and it will return a PipesClientCompletedInvocation object.
This object contains a get_materialize_result method, which you can use to access the MaterializeResult event reported by the subprocess. We'll talk about how to report events from the subprocess in the next section.
To make the asset and subprocess resource loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster+, you’ll create a Definitions object that contains them.
Copy and paste the following to the bottom of dagster_code.py:
from dagster import Definitions
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},)
At this point, dagster_code.py should look like the following:
Click Materialize located in the top right to run your code:
Navigate to the Run details page, where you should see the logs for the run:
In external_code.py, we have a print statement that outputs to stdout. Dagster will display these in the UI's raw compute log view. To see the stdout log, toggle the log section to stdout:
At this point, you've created a Dagster asset that invokes an external Python script, launched the code in a subprocess, and viewed the result in Dagster UI. Next, you'll learn how to modify your external code to work with Dagster Pipes to send information back to Dagster.