Automate downstream assets
A common pattern in data science and ML pipelines is building feature aggregations or summary tables in Snowflake — customer lifetime value scores, daily revenue rollups — and then running downstream Python pipelines that consume them. With Snowflake Dynamic Tables as virtual assets, those aggregates exist in the lineage graph but Dagster never executes them. That creates a problem: how does your downstream pipeline know to re-run when source data changes?
AutomationCondition.eager().resolve_through_virtual() solves this. It makes automation conditions treat virtual assets as transparent, tracing the dependency chain through the Dynamic Tables all the way back to the real source data.
The downstream asset
executive_dashboard_report is a Python asset that reads from both Dynamic Tables and produces a summary. It could just as easily be a feature store update, a model retraining job, or an ML pipeline trigger — anything that should respond to changes in Snowflake-computed aggregates:
@dg.asset(
group_name="analytics",
deps=["customer_lifetime_value", "daily_revenue_rollup"],
automation_condition=dg.AutomationCondition.eager().resolve_through_virtual(),
kinds={"snowflake", "python"},
)
def executive_dashboard_report(
context: dg.AssetExecutionContext,
snowflake: SnowflakeResource,
) -> dg.MaterializeResult:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
(SELECT COUNT(DISTINCT customer_id)
FROM ECOMMERCE.ANALYTICS.CUSTOMER_LIFETIME_VALUE) AS total_customers,
(SELECT SUM(revenue)
FROM ECOMMERCE.ANALYTICS.DAILY_REVENUE_ROLLUP
WHERE order_date >= DATEADD('day', -30, CURRENT_DATE())) AS total_revenue_30d
""")
row = cursor.fetchone()
total_customers = int(row[0]) if row and row[0] else 0
total_revenue = float(row[1]) if row and row[1] else 0.0
return dg.MaterializeResult(
metadata={
"total_customers": dg.MetadataValue.int(total_customers),
"total_revenue_30d": dg.MetadataValue.float(total_revenue),
}
)
deps=["customer_lifetime_value", "daily_revenue_rollup"] expresses the correct logical dependency — the asset reads from the Dynamic Tables at runtime. resolve_through_virtual() on the automation condition is what makes the trigger chain work end-to-end:
raw_ordersis materialized (or reports a materialization via the external asset event API)- Dagster evaluates automation conditions for all downstream assets
resolve_through_virtual()traverses throughcustomer_lifetime_valueanddaily_revenue_rollup(both virtual) to findraw_ordersas the real upstreamexecutive_dashboard_reportis requested
Next steps
Continue this example by adding dynamic table freshness monitoring.