DBTimes

Home

Products

Frameworks

Contact us


 

Data Warehouse Source Loader library. 2

Object DataSourceLoader 2

Value Members. 2

Example. 2

Configuration file. 3

Configuration Attributes. 4

logFileDir 4

loadActions. 4

loadAction.name. 4

loadAction.isActive. 4

loadAction.loadControl.loadControlParquetFileDir 4

loadAction.fileSource.isRemoveDuplicateRows. 4

loadAction.fileSource.effectiveDate. 4

loadAction.fileSource.sourceFileFolder 4

loadAction.fileSource.sourceFilesNamePattern. 5

loadAction.fileSource.fileSourceCsv.hasHeader 5

loadAction.fileSource.processedFilesDir 5

loadAction.dbmsSource.isRemoveDuplicateRows. 5

loadAction.dbmsSource.effectiveDate. 5

loadAction.dbmsSource.sqlServer 5

loadAction.dbmsSource.jdbcUrl 5

loadAction.dbmsSource.user 6

loadAction.dbmsSource.password. 6

loadAction.dbmsSource.sourceTable. 6

loadAction.dbmsSource.numberOfPartitions. 6

loadAction.dbmsSource.incrementalLoad.sourceTableChanges. 6

Full load vs. Incremental load vs. Initial load. 6

loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.left 7

loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.right 7

loadAction,fileDestination.isVersioned. 7

Versioned files. 7

Non-Versioned files. 7

loadAction,fileDestination.parquet 7

loadAction,fileDestination.path. 7

loadAction,fileDestination.previousCopyPath. 7

loadAction,schema. 7

colName. 7

colType. 7

isUniqueKey. 7

isPartitionColumn. 8

Sample configuration file. 8

Data Warehouse Source Loader library

 

Data Warehouse Source Loader is a library used to maintain the Data Warehouse Staging area in the HDFS.

 

The entire load process is described in the configuration file. The implementation produces versioned parquet files from .csv or delimited files, or SQL Server. The library can be easily extended to support other sources.

 

The library’s package:

 

package com.dbtimes.dw.sourceloader

Object DataSourceLoader

package com.dbtimes.dw.sourceloader
 
object DataSourceLoader

Value Members

def loadData(appConfig: Config): Unit

Used to load data into destination (i.e., staging) files based on the configuration.

Example

 

import org.apache.spark.sql.SparkSession
import com.typesafe.config.{ConfigFactory, Config}
import com.dbtimes.dw.sourceloader.DataSourceLoader

 

def main(args: Array[String]) {
  val configFileName = args(0) 
  val srcLoaderConfig = ConfigFactory.load(configFileName)
  val spark = sparkSessionBuilder.getOrCreate()
  DataSourceLoader.loadData(srcLoaderConfig)
  spark.stop()
}

Configuration file

 

Configuration file describes all parameters required by the loader. The section used by the loader library must be called "sourceLoad". Configuration file can have additional attributes if needed by the calling application. Additional attributes can be added to any section of configuration.

 

Optional attributes are highlighted:

·         "isRemoveDuplicateRows" : "false", highlighted in grey and are shown with default values, or

·         "loadControl", highlighted in yellow and are not required, or

·         sparkParams, highlighted in magenta and are additional application specific attributes that are not used by the library. These attributes can be organized in any way.

 

{
  "sparkParams":
  {
    "sessionAppName": "My App Data Source Loader"
  },
  "sourceLoad":
  {
    "logFileDir": "/dw/data/logs/sourceLoad/",
    "loadActions":
    [
      {
        "name" : "Load some file",
        "isActive": "true",
        "loadControl" :
        {
          "loadControlParquetFileDir" : "/dw/data/control/"
        },
        "fileSource" :                                                              
        {
          "isRemoveDuplicateRows": "false",
          "effectiveDate" : "2020-03-24",
          "fileSystemLocation" :                  OR "wwwLocation" :
          {                                          {
            "dir": "/dw/data/sources/…",               "url": "https://…"
            "namePattern" : "some-file\\.csv",         "user": "…",
                                                       "password": "…",
                                                       "isDisableSslVerification": "true"
          },                                          },
          "csv" :
          {

            "mode" : "PERMISSIVE",
            "header" : "true",
            "encoding" : "ISO-8859-1",

            "dateFormat" : "MM/dd/yyyy"

          },
          "processedFilesDir": "/dw/data/sources/processed/"
        },
 
OR SQL server source
"dbmsSource" :
{
  "isRemoveDuplicateRows": "false",
  "effectiveDate" : "2020-03-24",
  "dbmsSourceSqlServer" : { },
  "jdbcUrl" : "jdbc:sqlserver://… ",
  "user": "…",
  "password": "…",
  "sourceTable" : "…",
  "numberOfPartitions" : "10",
  "incrementalLoad" :
  {
    "sourceTableChanges" : "( SELECT * FROM some_table WHERE [version_column] > 0x<@field-in-the-dest-file@> ) AS tbl",
    "fileDestinationColumnMarkerPatterns" :
    {
      "left" : "\\<\\@", "right" :  "\\@\\>"
    }
  }
},
 
OR Oracle source
"dbmsSource" :
{
  "isRemoveDuplicateRows": "false",
  "effectiveDate" : "2020-03-24",
  "dbmsSourceOracle" : { },
  "jdbcUrl" : "jdbc:oracle://… ",
  "user": "…",
  "password": "…",
  "sourceTable" : "…",
  "numberOfPartitions" : "10",
  "incrementalLoad" :
  {
    "sourceTableChanges" : "( SELECT * FROM some_table WHERE [version_column] > 0x<@field-in-the-dest-file@> )",
    "fileDestinationColumnMarkerPatterns" :
    {
      "left" : "\\<\\@", "right" :  "\\@\\>"
    }
  }
},
 
        "fileDestination" :
        {
          "isVersioned" : "true",
          "parquet" : {},
          "path" : "/dw/data/…/file-name",
          "previousCopyPath" : "/dw/data/…/prev-copy-file-name"
        },
        "schema" :
        [
          { "colName" : "SomeName", "colType" : "String", "isUniqueKey" : "true", "isPartitionColumn" : "true" }
          { "colName" : "SomeOtherName", "colType" : "String" },
          
        ]
      },
      {
        "name" : "Load some other file",
       … array-element for additional actions 
      },
      
    ]
  }
}

 

Configuration Attributes

logFileDir

A full path of the directory for log files. Used primarily for development. It is not available on HDFS. There are two log files created – one with library messages and, another, with Spark messages.

loadActions

An array of specifications for loading files or database tables into staging files. A load action is used to load one database table or one file. A single load action can also be used to load multiple files if their names follow the same pattern with a date being a variable part (see attribute loadAction.fileSource.sourceFilesNamePattern ) We will use loadAction to refer to one element of loadActions array.

loadAction.name

A name of the load action to differentiate multiple load actions.

loadAction.isActive

If the value of isActive is false, the load action will not be processed. It also will not be validated.

loadAction.loadControl.loadControlParquetFileDir

When this attribute is defined, the loader will create a file with summary info for each load. The info includes record counts, description of source and destination, and the action timing.

loadAction.fileSource.isRemoveDuplicateRows

When isRemoveDuplicateRows is set to true, duplicate rows will be removed from the source.

loadAction.fileSource.effectiveDate

A specific effective date can be defined for a source. That can be used in development to change the source date for each run of a load process. In production the effective date is either taken from the file name, or one of the columns, or current calendar date.

loadAction.fileSource.sourceFileDir

A dir with the source file or files.

loadAction.fileSource.sourceFilesNamePattern

A regular expression of file names to process. The files can have effective date as part of the name. In that case the files are processed in a date ascending order.

The effective date must use capturing groups with specific names for date parts: effectiveDate, year, month and day. Month and day are optional and each will be defaulted to 1 if absent.

(?<effectiveDate>(?<year>20[0-9][0-9])(?<month>[0-1][0-9])(?<day>[0-3][0-9]))

 

The example of sourceFilesNamePattern with effective date:

pbp-2015_(?<effectiveDate>(?<year>20[0-9][0-9])-(?<month>[0-1][0-9])-(?<day>[0-3][0-9])).*\.csv

 

The example of sourceFilesNamePattern without effective date:

NFL-Teams\\.csv

 

All the files in the source directory that match the pattern will be processed and moved to processedFilesDir

 

If for example the source directory has files

 

"combine.csv"

"pbp-2013.csv"

"pbp-2013 - Copy.csv"

"pbp-2014.csv"

"pbp-2014 - Copy.csv"

"pbp-2015 - Copy.csv"

"pbp-2016 - Copy.csv"

 

The following patttern "pbp-(?<effectiveDate>(?<year>20[0-9][0-9])).csv", will result in processing of two files

"pbp-2014.csv"

"pbp-2013.csv"

 

The effective date for the first file will be 2014-01-01, and for the second – 2015-01-01.

loadAction.fileSource.csv

Specifies any valid Spark read .csv file options for loading csv file.

loadAction.fileSource.processedFilesDir

The dir where the source files are moved once processed successfully.

loadAction.dbmsSource.isRemoveDuplicateRows

See loadAction.fileSource.isRemoveDuplicateRows.

loadAction.dbmsSource.effectiveDate

See loadAction.fileSource.effectiveDate

loadAction.dbmsSource.sqlServer

Defines SQL Server as source DBMS

loadAction.dbmsSource.jdbcUrl

The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g.,

jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;

loadAction.dbmsSource.user

Used for SQL Server authentication

loadAction.dbmsSource.password

Used for SQL Server authentication

loadAction.dbmsSource.sourceTable

A source table name or a query. If a query is specified is must be in format “(SELECT   FROM  [WHERE]… ) AS table-alias”, e.g.,

"[dbo].[ARMResetTest]"

or

"( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl"

 

This attribute is used for full load. The attributes that follow describe incremental load.

loadAction.dbmsSource.numberOfPartitions

If defined together with partitioning column (see column attribute "isPartitionColumn") Spark partitioned read from the DBMS will be used. The lower and upper boundaries will be determined by the loader by querying the source table.

loadAction.dbmsSource.incrementalLoad.sourceTableChanges

Defines table to be used for the incremental load. The table must be defined using the following format:

"( SELECT … FROM source-table WHERE where-clause-to-yield-changed-data ) as table-alias"

 

The where-clause-to-yield-changed-data would normally include a timestamp or SQL Server ROWVERSION column and a max value of that column from previous load, e.g.,

WHERE [RowVersion] > 0x<@RowVersionStr@>

where [RowVersion] is the column of ROWVERSION type in the source table and RowVersionStr is the field in the destination file.

 

The Source Loader will calculate maximum value of RowVersionStr from the destination file and replace the field with delimiters with that value, so the final version of the WHARE clause would look like this

WHERE [RowVersion] > 0x0000000000016577

 

The name within <@ and @> delimiters must be a valid field on the destination file. The WHERE clause can include multiple columns from the destination file.

Full load vs. Incremental load vs. Initial load

There are two ways to load data from DBMS source – full and incremental. Full load should not be confused with initial load. Full load means that the entire source table is read. Full load is always used for the very first or initial load. For subsequent loads full or incremental load can be used. For the incremental load the configuration must have the incremental load section.

 

 

 

Order of the

load in time

 

Initial Load

Subsequent load

Full Load

a

a

Incremental Load

 

a

Table 1. Applicability of full/incremental load

loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.left

The left delimiter of the field in the destination file.

loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.right

The right delimiter of the field in the destination file.

loadAction,fileDestination.isVersioned

The destination file can be created either versioned or non-versioned.

Versioned files

When the value of isVersioned is true, the loader will maintain versioned destination file. The new version of the row is created based on unique key when any of the field’s values change. The unique key can be a single column or comprised of multiple columns (compound key). The granularity of the versioning is a calendar day defined by effective date. If a source has a date field that is designated as effective date then its value will be used to create versions. In the absence of such a field, the loader will use the current date.

Non-Versioned files

When the data source has a value of isVersioned set to false, the loaded will maintain non-versioned destination file.

loadAction,fileDestination.parquet

Defines parquet as a type of destination file

loadAction,fileDestination.path

Full path of the destination file. This will become a dir where Spark resulting files will be placed.

loadAction,fileDestination.previousCopyPath

If this optional attribute is defined the Source Loader will save a copy of a current file to this name. This can be useful to go back to previous version of the file if needed for whatever reason.

loadAction,schema

Schema is an array of fields in the source file. Schema array can be empty (i.e., array with no elements) for SQL Server source as Spark will retrieve column names and types from the server.

 

Schema must contain all source columns for the file source.

If fileDestination.isVersioned set to true (default) the schema must have one or more columns with isUniqueKey attribute set to true. For SQL Server source schema must have only columns with that define unique key (i.e., isUniqueKey set to true). The other columns are not required – they will be determined by the loader.
colName

Column name - required.

colType

Column data type – optional for DBMS source, required for file source.

isUniqueKey

Designates column as a unique key. This column attribute cannot be derived from the DBMS, so to designate a column as a unique key the column has to be included in schema.

 

Multiple columns can be designated as unique key to create a composite unique key.

isExcludeFromVersioning

The change in the column with this attribute set to true does not cause creation of new version if the value changed. For example, there are columns that calculate number of days based on the current date. The value of that column will change daily and this column should not be used to create a new version of the row.

isPartitionColumn

For DBMS source one column can be defined as partition column to read data on multiple connections in parallel. isPartitionColumn must be defined if loadAction.dbmsSource.numberOfPartitions is defined.

Sample configuration file

{
  "sparkParams":
  {
    "sessionAppName": "Spark Data Source Loader"
  },
  "sourceLoad":
  {
    "logFileDir": "/dw/data/logs/sourceLoad/",
    "loadActions":
    [
      {
        "name" : "Load Team file",
        "isActive": "true",
        "loadControl" :
        {
          "loadControlParquetFileDir" : "/dw/data/control/"
        },
        "fileSource" :
        {
          "isRemoveDuplicateRows": "false",
          "sourceFileDir": "/dw/data/sources/teaminfo",
          "sourceFilesNamePattern" : "NFL-Teams\\.csv",
          "fileSourceCsv" :
          {
            "header" : "true"
          },
          "processedFilesDir": "/dw/data/sources/processed/"
        },
        "fileDestination" :
        {
          "isVersioned" : "false",
          "fileDestinationParquet" : {},
          "fileDestinationPath" : "/dw/data/staging/team.parquet",
          "fileDestinationSavePreviousVersionAs" : "/dw/data/staging/team.prev.parquet"
        },
        "schema" :
        [
          { "colName" : "TeamName","colType" : "String" },
          { "colName" : "SeasonAbbreviation", "colType" : "String" },
          { "colName" : "SeasonYear", "colType" : "Integer" },
          { "colName" : "SeasonTeamName", "colType" : "String" },
          { "colName" : "ActiveSince", "colType" : "Integer" }
        ]
      },
      {
        "name" : "Load Players file",
        "isActive": "true",
        "loadControl" :
        {
          "loadControlParquetFileDir" : "/dw/data/control/"
        },
        "fileSource" :
        {
          "isRemoveDuplicateRows": "false",
          "sourceFileDir": "/dw/data/sources/players",
          "sourceFilesNamePattern" : "players_2013-12-12.csv",
          "fileSourceCsv" :
          {
            "header" : "true"
          },
          "processedFilesDir": "/dw/data/sources/processed/"
        },
        "fileDestination" :
        {
          "isVersioned" : "false",
          "fileDestinationParquet" : {},
          "fileDestinationPath" : "/dw/data/staging/stgplayers.parquet",
          "optional-value fileDestinationSavePreviousVersionAs" : "/dw/data/staging/stgpbp.prev.parquet"
        },
        "schema" :
        [
          { "colName" : "name", "colType" : "String", "isUniqueKey" : "true" },
          { "colName" : "first_name", "colType" : "String" },
          { "colName" : "last_name", "colType" : "String" },
          { "colName" : "birth_city", "colType" : "String" },
          { "colName" : "birth_state", "colType" : "String" },
          { "colName" : "birth_country", "colType" : "String" },
          { "colName" : "birth_date", "colType" : "String", "isUniqueKey" : "true" },
          { "colName" : "college", "colType" : "String" },
          { "colName" : "draft_team", "colType" : "String" },
          { "colName" : "draft_round", "colType" : "String" },
          { "colName" : "draft_pick", "colType" : "String" },
          { "colName" : "draft_year", "colType" : "Integer" },
          { "colName" : "position", "colType" : "String" },
          { "colName" : "height", "colType" : "String" },
          { "colName" : "weight", "colType" : "Integer" },
          { "colName" : "death_date", "colType" : "String" },
          { "colName" : "death_city", "colType" : "String" },
          { "colName" : "death_state", "colType" : "String" },
          { "colName" : "death_country", "colType" : "String" },
          { "colName" : "year_start", "colType" : "Integer" },
          { "colName" : "year_end", "colType" : "Integer" }
        ]
      },
      {
        "name" : "Load ARMReset table",
        "isActive": "true",
        "isDebug" : "false",
        "loadControl" :
        {
          "loadControlParquetFileDir" : "C:\\srcload\\LoadControl.parquet\\"
        },
        "dbmsSource" :
        {
          "effectiveDate" : "2020-05-13",
          "dbmsSourceSqlServer" : { },
          "jdbcUrl" : "jdbc:sqlserver://DESKTOP\\SQLSERVER2019:1433;database=testdb;integratedSecurity=true;",
          "sourceTable" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
          "numberOfPartitions" : "10",
          "incrementalLoad" :
          {
            "sourceTableChanges" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] WHERE [RowVersion] > 0x<@RowVersionStr@> ) AS tbl",
            "fileDestinationColumnMarkerPatterns" :
            {
              "left" : "\\<\\@", "right" :  "\\@\\>"
            }
          },
        },
        "fileDestination" :
        {
          "isVersioned" : "true",
          "parquet" : {},
          "path" : "C:\\staging\\ARMResetTest.parquet",
          "previousCopyPath" : "C:\\staging\\ARMResetTest.prev.parquet"
        },
        "schema" :
        [
          { "colName" : "SecId", "isUniqueKey" : "true", "isPartitionColumn" : "true" },
          { "colName" : "AdjDt", "isUniqueKey" : "true" }
        ]
      }
    ]
  }
}