A common problem in Data Engineering is trying to time the ingestion of data as soon as it is ready. Whether you are waiting for an upstream system to finish or for a file to arrive in a storage account, it can be difficult to get this right.
Trying to apply a best guess from past timings only leads to frustration and we end up:
One of the best ways to solve this problem is to work with event based triggers. Databricks has a File Arrival Trigger which we can use to run a Databricks job when a new file arrives in an external location. This blog will walk through an example of how to configure file arrival triggers to work with an AWS S3 bucket. We will see how the end-to-end process works from a file arriving to a job being automatically run.
The first thing we need to do is to create a bucket in S3. This will be the external location that Databricks uses to check for new files that will trigger our Databricks job. You can find detailed instructions to do this here, or you can use an existing bucket.
I will be using a bucket called databricksfiletriggerdemo:
The next step is to setup permissions between Databricks and the S3 bucket. I won’t cover each and every step here as the documentation does a great job already. I will cover the key concepts and what we are giving permission to and why.
To finish this step you will need to have CREATE STORAGE CREDENTIAL
privileges on the Unity Catalog Metastore attached to the Databricks workspace you are using. In AWS, the ability or someone with the ability to create IAM roles. You can find detailed documentation to be followed here.
The key tasks for configuring permissions for our use case are:
Once the above steps have been completed, you should have a successful test connection. I created a credential called aimee-aws-s3
.
Now that the permissions have been configured we can create an external location. This will use the storage credential to connect to access the data held in the S3 bucket. This will be the location that the trigger will check every 60 seconds for new files. This can be configured in the Databricks workspace by navigating to:
Catalog
-> External Data
-> Create External Location
You will need to fill out the following options:
External location name: <Pick a name>
Storage type: S3 (Read-only)
URL: s3://<bucket-path>
Storage credential: Select the credential created in the previous step
Example of my external location and a successful test, you can see I've used the credential from the previous step and that I am pointing to the S3 bucket from my AWS account. Databricks will automatically set the usage to be 'read-only', this will take precedence over any 'write' permissions set via the AWS policy.
Now that we have configured our permissions and external location, let's create a Databricks job that will be run every time a file lands in the S3 bucket.
I've created a notebook which when run will:
You can find the source code for the notebook here.
We will need a job that runs the notebook when it is triggered. In your Databricks workspace go to:
Workflows -> Create -> Job
The job is very basic and contains a single task. When it is triggered it will go to the path of my update log notebook and run it:
The final step is to create a file arrival trigger which can be found on the right pane under ‘Schedules & Triggers’.
Select a trigger type of ‘File arrival’ and then use the S3 bucket path as part of the external location. The trigger needs to be 'Active' otherwise this will not work.
You can see from the YAML for the job the file trigger can be seen and shows the S3 bucket we configured. We are running a notebook task which points to the update log notebook and will run when the job is triggered.
To test that the trigger works, upload a file to the S3 bucket. File arrival triggers will check for a new file every minute. You will see the following whilst you are waiting for it to evaluate for the first time.
When a file has been discovered, the task should begin to run.
Once the task is successful, go to the log table we created in the previous step and see if the logs have been updated. As you can see below, I have a new entry which means that the end to end process has worked.
Finally, a note on cost.
This example was very simplistic but this can be used as a foundation for event-driven ingestion for data which has unpredictable readiness times.
If you want to build on this further and productionise your code, check out some of our Databricks blogs on Databricks Asset Bundles here.