At Linear Squared we use complex machine learning models to build analytical products. Maintaining and cleaning data that feed into these models is vital to create accurate models. There are many proprietary ETL (extraction, transformation and loading) products such as SSIS by Microsoft, DataStage by IBM and BODS by SAP. Re-building models for incremental data-sets is a common task in a production environment.

We are a hands-on company and believe the best way to understand something technical is to actually try it out! In this post I will discuss the design and implementation of creating a very simple ETL pipeline to update existing records, and to insert new records. For demonstration purposes I have chosen SSIS.

Note: production level ETL pipelines are for more complex!

The Scenario: Suppose we want to build an ETL pipeline to manage students’ exam data. We can create two tables,

(1) StudentInitial : The initial data table with existing student details regarding exam marks

(2) StudentIncremental : The incremental data with updated marks for some students and new student’s details for the most recent month

 

The Task: Update StudentInitial table using incremental data  from StudentIncremental.

Creating Tables: Executing below queries you can generate two tables called StudentIncremental and StudentInitial.

 

 

 

CREATE TABLE [dbo].[studentincremental] ([id]    [INT] NULL,[name]  [VARCHAR](50) NULL,[marks] [BIGINT] NULL
)
ON [PRIMARY]

CREATE TABLE [dbo].[studentinitial] ([id]    [INT] NULL,[name]  [VARCHAR](50) NULL,[marks] [BIGINT] NULL
)
ON [PRIMARY]

go

INSERT [dbo].[studentincremental] ([id],[name],[marks])
VALUES (2,
N’BBB’,
70)

INSERT [dbo].[studentincremental] ([id],[name],[marks])
VALUES (4,
N’DDD’,
60)

INSERT [dbo].[studentinitial] ([id],[name],[marks])
VALUES (1,
N’AAA’,
20)

INSERT [dbo].[studentinitial] ([id],[name],[marks])
VALUES (2,
N’BBB’,
50)

INSERT [dbo].[studentinitial] ([id],[name],[marks])
VALUES (3,
N’CCC’,
85)

 

The Solution:

Do note that this is just one way of doing this, there are many other possibilities but the one listed here is probably one of the fastest to implement and modify on-demand.

Step 1

First of all you need to add a Dataflow task. Data Flow task contains the data flow engine that loads data between sources and destinations, and provides the functionality for transforming, summarize, cleaning, and modifying data as it is moved.

Step 2

Create two OLE DB Sources for Initial and incremental tables in data flow panel.

Step 3

Add a Sort task for each Source task, and sort by the ID column.

The Sort task is required because Merge Join transform requires sorted sources as input. Otherwise you can use an ORDER BY clause in data selection statement.

 

Step 4

Drag and drop a Merge Join transformation, connect two sorted OLE DB Sources to this. Set Initial Table as left and incremental Table as right input of this transformation.

Step 5

Go to Merge Join transformation editor, ID will be used as joining column (selected based on sort properties of previous components). Note that if you don’t sort input columns of the merge join transformation then you cannot get into the editor of this transformation and you face the error regarding sorting of inputs.

Select all columns from Source and Destination tables in the merge join transform.

Step 6

Add a Conditional Split transformation and write two expressions below to find out new records, and update records. Also rename default outputs like inset and update as screenshot below shows.

Expressions used in this sample are very easy and simply find record changes. For example, expression below:

ISNULL( [ID] ) && !ISNULL( [ID (1)] )

Used to find new records. And literally means records that has ID(1) but not ID since we used full outer join so returns all records when there is a match in either left (StudentInitial) or right (StudentIncremental) table records.

And this script used to find update records:

!ISNULL( [ID] ) && !ISNULL( [ID (1)] )

Step 7

Add an OLE DB Command and connect Updated RECORDS output to it. Create a connection to destination database (same as initial table), Select connection in connection manager.

And write script below to update records by input ID to sql command in component properties

UPDATE [dbo].[studentinitial] SET    [name] = ?,[marks] = ?
WHERE  [id] = ?

Then in the column mapping tab you need map parameters according to your query.

 

Step 8

Add another OLE DB Command and use ‘Insert’ output as the input data stream to it. Connect it to destination database, and write the following statement in Component Properties tab’s SQLCommand property.

INSERT INTO [dbo].[studentinitial] ([id],[name],[marks])
VALUES      (?,
?,
?)

Map input columns to parameters as screenshot below shows.

Testing the solution:

Here are data rows from source tables

After running the package, you will see records as below.

That’s it! You’re done.

Green records are new records. Yellow records are updated records

In this blog post I created a SSIS package that performs the synchronization task by performing upsert (Insert/Update) using SSIS merge join transformation.