Implement SCD-2 ETL Data Pipelines in Snowflake using Streams & Task-Part 2

Debi Prasad Mishra
4 min readSep 30, 2021

Introduction: This is a continuation of previous Part 1 blog. I would highly recommend, please check the blog first before moving on. Will help you to get an overview of the ETL process in Snowflake and to orchestrate the pipelines.

Now it’s time for execution of each steps in a sequential manner. Please follow the SQL code as mentioned in the image from 1 to 4. Have created one user defined view to simplify the ETL logic and to flag the records as per operation such as insert, update and delete. Create the view as per the code mentioned in the image 5 to 7. And that’s all we need. Then let’s insert some records into source and run the merge command. You can see the records being populated in the history table per SCD 2 logic.

Data loaded into source i.e. CUSTOMER
Data loaded into Stream i.e. CUSTOMER_TABLE_CHANGES after Insert(s)
Data loaded into View i.e. CUSTOMER_CHANGE_DATA once stream has data
Run the Merge command. Data loaded into target i.e. CUSTOMER_HISTORY

Let’s update some records in the source table and re-run the merge command. And you will see the changes in the target table as per SCD 2 logic.

Update on CUSTOMER where CUSTOMER_ID: 881
Stream i.e. CUSTOMER_TABLE_CHANGES after Update(s)
Run the Merge command. Data loaded into target i.e. CUSTOMER_HISTORY

And similar approach for the delete operation. Let’s proceed.

Delete on CUSTOMER where CUSTOMER_ID: 883
Stream i.e. CUSTOMER_TABLE_CHANGES after Delete(s)
Run the Merge command. Data updated on target i.e. CUSTOMER_HISTORY

Creating a TASK: Using task, you can schedule the MERGE statement to run on a recurring basis. And can set the parameter to execute only if there is data in Customer table changes stream. By creating Task object, it reduces manual interventions. Please follow the instructions for Task creation and execution.

Set the role to Account Admin before executing the task. Every task requires a Warehouse to run. Finally, run the below SQL to create the task. This task will execute every minute and run only if CUSTOMER_TABLE_CHANGES Stream has data in it. Please follow the code to schedule a merge statement.

Task CUSTOMER_HISTORY_RUN to schedule a Merge Statement when Stream has Data

After task creation, can check the status of the task — “SHOW TASKS;”.

Status is suspended when created

By default, a task is suspended when it is created. To enable the task need to execute i.e. — “Alter TASK CUSTOMER_HISTORY_RUN Resume;” Show Tasks will show the status as started which was previously suspended.

Status is started after Alter command execution

Now we are all set for a demonstration. Let’s insert one record into the source. And we will wait for some time to see the changes in the target table. No need to execute explicitly the merge command how we did last time.

This is how Task operates internally
Insert CUSTOMER_ID: 884 into CUSTOMER

Once Insert will happen successfully on the source table, stream will populate automatically. After few seconds the target table will be populated implicitly as per SCD logic we have defined in the merge statement. We can perform the same operation one for Update and Delete in parallel to test this out. You can see the changes in the history table after some time.

Data inserted into target i.e. CUSTOMER_HISTORY

Conclusion: Thank you for reading, I hope this blog will help you getting the basic understanding of Snowflake ETL process. Would like to share, please go through my previous post implementing SCD-2 Pipelines using IICS & Merge statements for your basic understanding. Can reach out to me in case of more questions you have, on my twitter handle or my LinkedIn or leave a comment below. Good luck!

References: Link to Snowflake online documentation

--

--

Debi Prasad Mishra

Engineer by Profession, Student by Passion. Working as a Senior Business Intelligence Engineer. Snowflake Data Superhero 2022-23