Op hooks
If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.
Op hooks let you define success and failure handling policies on ops.
Relevant APIs
| Name | Description |
|---|---|
@dg.failure_hook | The decorator to define a callback on op failure. |
@dg.success_hook | The decorator to define a callback on op success. |
HookContext | The context object available to a hook function. |
build_hook_context | A function for building a HookContext outside of execution, intended to be used when testing a hook. |
Overview
A @dg.success_hook or @dg.failure_hook decorated function is called an op hook. Op hooks are designed for generic purposes — it can be anything you would like to do at a per op level.
Defining an op hook
import dagster as dg
@dg.success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: dg.HookContext):
message = f"Op {context.op.name} finished successfully"
context.resources.slack.chat_postMessage(channel="#foo", text=message)
@dg.failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: dg.HookContext):
message = f"Op {context.op.name} failed"
context.resources.slack.chat_postMessage(channel="#foo", text=message)
Hook context
As you may have noticed, the hook function takes one argument, which is an instance of HookContext. The available properties on this context are:
context.job_name: the name of the job where the hook is triggered.context.log: loggerscontext.hook_def: the hook that the context object belongs to.context.op: the op associated with the hook.context.op_config: The config specific to the associated op.context.op_exception: The thrown exception in the associated failed op.context.op_output_values: The computed output values of the associated op.context.step_key: the key for the step where the hook is triggered.context.resources: the resources the hook can use.context.required_resource_keys: the resources required by this hook.
Using hooks
Dagster provides different ways to trigger op hooks.
Applying a hook on every op in a job
For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job.
The @dg.job decorator accepts hooks as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job function. In the below example, we can pass the slack_message_on_failure hook above in a set as a parameter to @dg.job. Then, slack messages will be sent when any op in the job fails.
@dg.job(resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure})
def notif_all():
# the hook "slack_message_on_failure" is applied on every dg.op instance within this dg.graph
a()
b()
When you run this job, you can provide configuration to the slack resource in the run config:
resources:
slack:
config:
token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
or by using the configured API:
@dg.job(
resource_defs={
"slack": slack_resource.configured(
{"token": "xoxp-1234123412341234-12341234-1234"}
)
},
hooks={slack_message_on_failure},
)
def notif_all_configured():
# the hook "slack_message_on_failure" is applied on every dg.op instance within this dg.graph
a()
b()
Applying a hook on an op
Sometimes a job is a shared responsibility or you only want to be alerted on high-priority op executions. So we also provide a way to set up hooks on op instances which enables you to apply policies on a per-op basis.
@dg.job(resource_defs={"slack": slack_resource})
def selective_notif():
# only dg.op "a" triggers hooks: a slack message will be sent when it fails or succeeds
a.with_hooks({slack_message_on_failure, slack_message_on_success})()
# dg.op "b" won't trigger any hooks
b()
In this case, op "b" won't trigger any hooks, while when op "a" fails or succeeds it will send a slack message.
Testing hooks
You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context function.
import dagster as dg
@dg.success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
context.resources.my_conn.send("foo")
def test_my_success_hook():
my_conn = mock.MagicMock()
# construct dg.HookContext with mocked ``my_conn`` resource.
context = dg.build_hook_context(resources={"my_conn": my_conn})
my_success_hook(context)
assert my_conn.send.call_count == 1