My first official venture into the world of data pipelines and ML Ops
I have recently been setting up my first ML Ops infrastructure. The space is a bit of a minefield, with so many components including data pipeline tools, feature stores, model registries, model serving tools, data monitoring and more.
🤯
You have to start somewhere and after doing a bit of thinking and planning, I decided starting with a solid data pipeline tool would be a good place to start. The space is a bit scattered right now. I am sure it will look very different in a couple of years time, but one of the more modern tools that seems to be getting tracking is Dagster.
Here are some gotchas that I had to deal with when setting up Dagster for the first time. I think I am mostly writing them here so I can refer back to them if I need to setup Dagster again one day!
Disclaimer: I am no infrastructure or data engineer expert. So there is a very good chance that (1) my points below are super obvious to you, and (2) there are much easier ways of achieving the same thing. If so, drop me a line and let me know! 🤗
Bloody loving Dagster. Highly recommended. The documentation is a bit lacking as they are moving pretty quickly and the open source deployment guide could do with a bit of TLC, but I am impressed so far.
Unsurprisingly, the Dagster team are putting a fair bit of focus on encouraging you to use their cloud offering, however, I didn't really have any budget for that, and so ended up going with the open source version, deployed using Helm to our Kubernetes cluster.
Although the Dagster documentation is OK, the product is moving super quickly and as such the documentation is always running behind the actual product. Moreover, the "deployment" section of the documentation is further lacking given that it completely independent of the Cloud offering.
It probably doesn't help that I am no Helm expert either 😆.
To specify the location for all your Dagster definitions, etc you need to specify dagsterApiGrpcArgs
in your values.yaml
file. using python modules seems to be the new done thing, but most of the guides example still reference loading definitions from a python file. Anyway, to use module based deployments your values.yaml
for your chart deployment should look like this:
dagsterApiGrpcArgs:
- "-m"
- "dagster_user_code"
where your Dockerfile will probably end with something like this
COPY dagster_user_code ./dagster_user_code
Important to note the arguments to dagsterApiGrpcArgs
are different to the command line arguments for dagster
, so watch out.
If you are like me, you are using some CI/CD tool to:
However, annoyingly the Helm chart Dagster provides does not give you a variable that you can use to set the image tag to look for, so instead you have to set this manually at the command line when you deploy your chart.
The bit I was struggling with was that the deployments attribute in the values.yaml
is a list, so you have to specify the index, e.g. your helm upgrade/install command will look like this:
helm upgrade .... --set deployments[0].image.tag=$IMAGE_TAG_FROM_EARLIER_CI_CD_STEP dagster/dagster-user-deployment -f values.yaml
I spent way too much time figuring this out when I was initially working on this 🙈.
I didn't see the value of spinning up a new AWS RDS instance for Dagster specifically, so I decided to use our existing RDS instance. However, if you are not careful, Dagster will just add its tables to your public schema (this definitely didn't happen to me 🙈).
Instead you have got to specify the search path in your values.yaml
file, e.g.
postgresql:
enabled: false # <----- Disable the default postgresql deployment. However, you still need to set the settings below
...
postgresqlParams: {
options: "-c search_path=dagster", # <----- only look into dagster schema
}
of course don't forget to set the host, etc and add the credentials to dagster-postgres-credentials
secret (which you might want to manually create.)
I wish this alternative was the default.
From my understanding, at the bare minimum, Dagster needs three components
By default the Dagster values.yaml
has support for dagster-user-deployments
. If you use that deployment, every time you want to deploy your user code, you have to deploy the dagster daemon and daggit. Sorry, what? Why? I wish it was just removed and was not even an option.
So what you need to do is to use a separate chart to deploy dagser user code (the chart is called dagster/dagster-user-code
), and then disable the dagster-user-deployments
in the dagster/dagster
chart, but you disable it like this:
dagster-user-deployments:
enabled: true # <----- Set this to true, even though you are disabling it
enableSubchart: false # <----- this is what actually disables the actual user deployment.
I don't know if I missed a memo here, but I found this super confusing.
Dagster have recently had some major API changes over its lifetime, out are solids and repositories, and in are assets and definitions. E.g.
In 1.1.6, we introduced Definitions, which replaces repositories. While repositories will continue to work, we recommend migrating to Definitions. Refer to the Code locations documentation for more info.
This means you cannot rely on a lot of old tutorials and code example. As an added bonus, since ChatGPT was trained using data from back in 2021, you certainly cannot use it for any helpful Dagster tips 😆.
So good luck with that.
IO Managers are components of Dagster that manage materialisation of assets. Something like S3 is a sensible storage space for your assets.
But remember I am using Dagster as part of my ML Ops - so one of my assets corresponds to parameters that are used by my model and I don't want to load the whole asset if I am interested in a subset of those parameters.
This is where a database based IO manager comes in. I can write 1000's of rows to the database, but during online serving, I can load the particular rows I am interested in.
Data pipeline .... -> S3-backed asset -> Postgres-backed asset
| |
off-line serving online serving
But guess what, there is no Postgres IO manager in Dagster. There is a DBIOManager, but that's marked as private, so I decided to roll out my own. There are a couple of gottachs though that I discuss below.
I wish this was officially supported.
There are 3 types of contexts that I have had to deal with in Dagster, there may be more. I wish they had more similar APIs, but sadly they don't.
OutputContext
: the context during writing an asset in say an IO managerInputContext
: the context during loading assets in say an IO managerAssetExecutionContext
: the context during asset materialisationWatch out for which one you have!
asset_partitions_time_window
If you are partitioning your assets by a time window, there is a handy property that gives you the time window. However, be aware, if you don't have a time window partition it raises an exception.
I have not found a consistent way of checking to see if I have a time partition, I have had to manually check the instance of my partitions definition:
def has_timebased_partitions(partition_definition: PartitionsDefinition) -> bool:
if isinstance(partition_definition, TimeWindowPartitionsDefinition):
return True
if isinstance(partition_definition, MultiPartitionsDefinition):
for partition in partition_definition.partitions_defs:
if isinstance(partition.partitions_def, TimeWindowPartitionsDefinition):
return True
return False
Notice that if you have a MultiPartitionsDefinition
, you have to check each of the underlying definitions explicitly.
Moreover, if you have MultiPartitionsDefinition
, and you have access to the OutputContext
or an AssetExecutionContext
, you can simply ask for context.asset_partitions_time_window
, but for an InputContext
, you have to query the asset_partitions_time_window
on the actual time based partition:
if has_timebased_partitions(context.asset_partitions_def):
# for output context dagster lets me get the time window from context
# in multidimension case but for input context it does not - thanks Dagster.
if isinstance(context.asset_partitions_def, MultiPartitionsDefinition):
partition_time_window = context.asset_partitions_def.time_window_for_partition_key(
context.partition_key
)
else:
partition_time_window = context.asset_partitions_time_window
else:
partition_time_window = None
Not sure if this is an oversight or a requirement that I am missing, but there it is.
Another quirk to note is that if you have a single dimension partition, there isn't an explicit name for it, whereas for multi partitions, each dimension has a name. This makes it a bit awkward because you have to write the code that handles both cases. For the single dimension case I have ended up setting a dimension name on the metadata
:
@asset(
io_manager_key="postgres_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date=START_OF_TIME, end_offset=1),
metadata={"table": "....", "dimension": "created_at"},
)
These are pretty handy. For me, I am interested in doing a rolling average of some of my metrics, and with these, I can load a few more partitions to manipulate in my operation, so I have this asset definition:
@asset(
io_manager_key="io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date=START_OF_TIME, end_offset=1),
ins={
"deliveries": AssetIn(
metadata={"allow_missing_partitions": True},
partition_mapping=TimeWindowPartitionMapping(start_offset=-6),
)
},
)
However, note that the typing of the input asset will suddenly be different - e.g. if each partition of your input asset is pandas.DataFrame
, then your input asset for this new asset will be a dictionary of pandas.DataFrame
's.
However, if you have allow_missing_partitions=True
then you could get just a pandas.DataFrame if there is only one partition, so you end up with code like this:
if isinstance(deliveries, dict):
deliveries_df = pd.concat(deliveries.values())
else:
deliveries_df = cast(pd.DataFrame, deliveries)
end_offset=1
Ooh, this one drove me crazy for good a good few minutes. By default, the MonthlyPartitionsDefinition
will create partitions excluding the current month. If you want data from the current month to be included, you have to set end_offset=1
.
It's early days in my Dagster journey, so I am not sure how it will go. I am particularly concerned about major changes being introduced in their API that will make my code obsolete. As I mentioned they have had some major changes from solids to assets and repositories to definitions.
Other users have also raised concerns about the open source vs cloud offering, and Dagster team have not committed to keeping all the components free, but hopefully all the core features will stay free or at a reduced cost for self-hosted versions.
Anyway, I am committed now, so let's see how it goes.
Good luck to the Dagster team!
p.s. if you do have any comments about my issues above, please please do reach out!