Implement SCD-2 ETL Data Pipelines in Snowflake using Streams & Task-Part 2
Introduction: This is a continuation of previous Part 1 blog. I would highly recommend, please check the blog first before moving on. Will help you to get an overview of the ETL process in Snowflake and to orchestrate the pipelines.
Now it’s time for execution of each steps in a sequential manner. Please follow the SQL code as mentioned in the image from 1 to 4. Have created one user defined view to simplify the ETL logic and to flag the records as per operation such as insert, update and delete. Create the view as per the code mentioned in the image 5 to 7. And that’s all we need. Then let’s insert some records into source and run the merge command. You can see the records being populated in the history table per SCD 2 logic.
Let’s update some records in the source table and re-run the merge command. And you will see the changes in the target table as per SCD 2 logic.
And similar approach for the delete operation. Let’s proceed.
Creating a TASK: Using task, you can schedule the MERGE statement to run on a recurring basis. And can set the parameter to execute only if there is data in Customer table changes stream. By creating Task object, it reduces manual interventions. Please follow the instructions for Task creation and execution.
Set the role to Account Admin before executing the task. Every task requires a Warehouse to run. Finally, run the below SQL to create the task. This task will execute every minute and run only if CUSTOMER_TABLE_CHANGES Stream has data in it. Please follow the code to schedule a merge statement.
After task creation, can check the status of the task — “SHOW TASKS;”.
By default, a task is suspended when it is created. To enable the task need to execute i.e. — “Alter TASK CUSTOMER_HISTORY_RUN Resume;” Show Tasks will show the status as started which was previously suspended.
Now we are all set for a demonstration. Let’s insert one record into the source. And we will wait for some time to see the changes in the target table. No need to execute explicitly the merge command how we did last time.
Once Insert will happen successfully on the source table, stream will populate automatically. After few seconds the target table will be populated implicitly as per SCD logic we have defined in the merge statement. We can perform the same operation one for Update and Delete in parallel to test this out. You can see the changes in the history table after some time.
Conclusion: Thank you for reading, I hope this blog will help you getting the basic understanding of Snowflake ETL process. Would like to share, please go through my previous post implementing SCD-2 Pipelines using IICS & Merge statements for your basic understanding. Can reach out to me in case of more questions you have, on my twitter handle or my LinkedIn or leave a comment below. Good luck!
References: Link to Snowflake online documentation