DBTimes
Data Warehouse Source
Loader library
loadAction.loadControl.loadControlParquetFileDir
loadAction.fileSource.isRemoveDuplicateRows
loadAction.fileSource.effectiveDate
loadAction.fileSource.sourceFileFolder
loadAction.fileSource.sourceFilesNamePattern
loadAction.fileSource.fileSourceCsv.hasHeader
loadAction.fileSource.processedFilesDir
loadAction.dbmsSource.isRemoveDuplicateRows
loadAction.dbmsSource.effectiveDate
loadAction.dbmsSource.sqlServer
loadAction.dbmsSource.password
loadAction.dbmsSource.sourceTable
loadAction.dbmsSource.numberOfPartitions
loadAction.dbmsSource.incrementalLoad.sourceTableChanges
Full load vs.
Incremental load vs. Initial load
loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.left
loadAction.dbmsSource.incrementalLoad.fileDestinationColumnMarkerPatterns.right
loadAction,fileDestination.isVersioned
loadAction,fileDestination.parquet
loadAction,fileDestination.path
loadAction,fileDestination.previousCopyPath
Data Warehouse Source Loader is a library used to maintain the Data Warehouse Staging area in the HDFS.
The entire load process is described
in the configuration file. The implementation produces versioned parquet files
from .csv or delimited files, or SQL Server. The
library can be easily extended to support other sources.
The library’s package:
package com.dbtimes.dw.sourceloader
package com.dbtimes.dw.sourceloader
object DataSourceLoader
def loadData(appConfig: Config): Unit
Used to load data into destination (i.e., staging) files based on the configuration.
import org.apache.spark.sql.SparkSession
import com.typesafe.config.{ConfigFactory, Config}
import com.dbtimes.dw.sourceloader.DataSourceLoader
def main(args: Array[String]) {
val configFileName = args(0)
val srcLoaderConfig = ConfigFactory.load(configFileName)
val spark = sparkSessionBuilder.getOrCreate()
DataSourceLoader.loadData(srcLoaderConfig)
spark.stop()
}
Configuration file describes all parameters required by the loader. The section used by the loader library must be called "sourceLoad". Configuration file can have additional attributes if needed by the calling application. Additional attributes can be added to any section of configuration.
Optional attributes are highlighted:
· "isRemoveDuplicateRows" : "false", highlighted in grey and are shown with default values, or
· "loadControl", highlighted in yellow and are not required, or
· “sparkParams”, highlighted in magenta and are additional application specific attributes that are not used by the library. These attributes can be organized in any way.
{
"sparkParams":
{
"sessionAppName": "My App Data Source Loader"
},
"sourceLoad":
{
"logFileDir": "/dw/data/logs/sourceLoad/",
"loadActions":
[
{
"name" : "Load some file",
"isActive": "true",
"loadControl" :
{
"loadControlParquetFileDir" : "/dw/data/control/"
},
"fileSource" : { "isRemoveDuplicateRows": "false", "effectiveDate" : "2020-03-24", "fileSystemLocation" : OR "wwwLocation" : { { "dir": "/dw/data/sources/…", "url": "https://…" "namePattern" : "some-file\\.csv", "user": "…", "password": "…", "isDisableSslVerification": "true" }, }, "csv" : {
"mode" : "PERMISSIVE", "dateFormat" : "MM/dd/yyyy" }, "processedFilesDir": "/dw/data/sources/processed/" },
|
OR SQL server source |
"dbmsSource" : { "isRemoveDuplicateRows": "false", "effectiveDate" : "2020-03-24", "dbmsSourceSqlServer" : { }, "jdbcUrl" : "jdbc:sqlserver://… ", "user": "…", "password": "…", "sourceTable" : "…", "numberOfPartitions" : "10", "incrementalLoad" : { "sourceTableChanges" : "( SELECT * FROM some_table WHERE [version_column] > 0x<@field-in-the-dest-file@> ) AS tbl", "fileDestinationColumnMarkerPatterns" : { "left" : "\\<\\@", "right" : "\\@\\>" } } },
|
OR Oracle source |
"dbmsSource" : { "isRemoveDuplicateRows": "false", "effectiveDate" : "2020-03-24", "dbmsSourceOracle" : { }, "jdbcUrl" : "jdbc:oracle://… ", "user": "…", "password": "…", "sourceTable" : "…", "numberOfPartitions" : "10", "incrementalLoad" : { "sourceTableChanges" : "( SELECT * FROM some_table WHERE [version_column] > 0x<@field-in-the-dest-file@> )", "fileDestinationColumnMarkerPatterns" : { "left" : "\\<\\@", "right" : "\\@\\>" } } },
|
"fileDestination" :
{
"isVersioned" : "true",
"parquet" : {},
"path" : "/dw/data/…/file-name",
"previousCopyPath" : "/dw/data/…/prev-copy-file-name"
},
"schema" :
[
{ "colName" : "SomeName", "colType" : "String", "isUniqueKey" : "true", "isPartitionColumn" : "true" }
{ "colName" : "SomeOtherName", "colType" : "String" },
…
]
},
{
"name" : "Load some other file",
… array-element for additional actions
},
…
]
}
}
A full path of the directory for log files. Used primarily for development. It is not available on HDFS. There are two log files created – one with library messages and, another, with Spark messages.
An array of specifications for loading files or database tables into staging files. A load action is used to load one database table or one file. A single load action can also be used to load multiple files if their names follow the same pattern with a date being a variable part (see attribute loadAction.fileSource.sourceFilesNamePattern ) We will use loadAction to refer to one element of loadActions array.
A name of the load action to differentiate multiple load actions.
If the value of isActive is false, the load action will not be processed. It also will not be validated.
When this attribute is defined, the loader will create a file with summary info for each load. The info includes record counts, description of source and destination, and the action timing.
When isRemoveDuplicateRows is set to true, duplicate rows will be removed from the source.
A specific effective date can be defined for a source. That can be used in development to change the source date for each run of a load process. In production the effective date is either taken from the file name, or one of the columns, or current calendar date.
A dir with the source file or files.
A regular expression of file names to process. The files can have effective date as part of the name. In that case the files are processed in a date ascending order.
The effective date must use capturing groups with specific names for date parts: effectiveDate, year, month and day. Month and day are optional and each will be defaulted to 1 if absent.
(?<effectiveDate>(?<year>20[0-9][0-9])(?<month>[0-1][0-9])(?<day>[0-3][0-9]))
The example of sourceFilesNamePattern with effective date:
pbp-2015_(?<effectiveDate>(?<year>20[0-9][0-9])-(?<month>[0-1][0-9])-(?<day>[0-3][0-9])).*\.csv
The example of sourceFilesNamePattern without effective date:
NFL-Teams\\.csv
All
the files in the source directory that match the pattern will be processed and
moved to processedFilesDir
If for example the source directory has files
"combine.csv"
"pbp-2013.csv"
"pbp-2013 - Copy.csv"
"pbp-2014.csv"
"pbp-2014 - Copy.csv"
"pbp-2015 - Copy.csv"
"pbp-2016 - Copy.csv"
The
following patttern "pbp-(?<effectiveDate>(?<year>20[0-9][0-9])).csv", will result in processing of two files
"pbp-2014.csv"
"pbp-2013.csv"
The effective date for the first file will be 2014-01-01, and for the second – 2015-01-01.
Specifies any valid Spark read .csv file options for loading csv file.
The dir where the source files are moved once processed successfully.
See loadAction.fileSource.isRemoveDuplicateRows.
See loadAction.fileSource.effectiveDate
Defines SQL Server as source DBMS
The JDBC URL to connect to. The source-specific
connection properties may be specified in the URL. e.g.,
jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;
Used for SQL Server authentication
Used for SQL Server authentication
A source table
name or a query.
If a query is specified is must be in format “(SELECT … FROM …
[WHERE]… ) AS table-alias”, e.g.,
"[dbo].[ARMResetTest]"
or
"( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl"
This attribute is used for full load. The attributes that follow describe incremental load.
If defined together with partitioning column (see column attribute "isPartitionColumn") Spark partitioned read from the DBMS will be used. The lower and upper boundaries will be determined by the loader by querying the source table.
Defines table to be used for the incremental load. The table must be defined using the following format:
"( SELECT … FROM source-table WHERE where-clause-to-yield-changed-data ) as table-alias"
The where-clause-to-yield-changed-data would normally include a timestamp or SQL Server ROWVERSION column and a max value of that column from previous load, e.g.,
WHERE [RowVersion] > 0x<@RowVersionStr@>
where [RowVersion] is the column of ROWVERSION type in the source table and RowVersionStr is the field in the destination file.
The
Source Loader will calculate maximum value of RowVersionStr from the destination
file and replace the field with delimiters with that value, so the final
version of the WHARE clause would look like this
WHERE [RowVersion] > 0x0000000000016577
The
name within <@ and @> delimiters must be a
valid field on the destination file. The WHERE clause can
include multiple columns from the destination file.
There are two ways to load data from DBMS source – full and incremental. Full load should not be confused with initial load. Full load means that the entire source table is read. Full load is always used for the very first or initial load. For subsequent loads full or incremental load can be used. For the incremental load the configuration must have the incremental load section.
|
Order of the load in time |
|
|
Initial Load |
Subsequent load |
Full Load |
a |
a |
Incremental Load |
|
a |
Table 1. Applicability of full/incremental load
The left delimiter of the field in the destination file.
The right delimiter of the field in the destination file.
The destination file can be created either versioned or non-versioned.
When the value of isVersioned is true, the loader will maintain versioned destination file. The new version of the row is created based on unique key when any of the field’s values change. The unique key can be a single column or comprised of multiple columns (compound key). The granularity of the versioning is a calendar day defined by effective date. If a source has a date field that is designated as effective date then its value will be used to create versions. In the absence of such a field, the loader will use the current date.
When the data source has a value of isVersioned set to false, the loaded will maintain non-versioned destination file.
Defines parquet as a type of destination file
Full path of the destination file. This will become a dir where Spark resulting files will be placed.
If this optional attribute is defined the Source Loader will save a copy of a current file to this name. This can be useful to go back to previous version of the file if needed for whatever reason.
Schema is an array of fields in the source file. Schema array can be empty (i.e., array with no elements) for SQL Server source as Spark will retrieve column names and types from the server.
Schema must contain all source columns for the file source.
Column name - required.
Column data type – optional for DBMS source, required for file source.
Designates column as a unique key. This column attribute cannot be derived from the DBMS, so to designate a column as a unique key the column has to be included in schema.
Multiple columns can be designated as unique key to create a composite unique key.
The change in the column with this attribute
set to true does not cause creation of new version if the value changed. For
example, there are columns that calculate number of days based on the current
date. The value of that column will change daily and this column should not be
used to create a new version of the row.
For DBMS source one column can be defined as partition column to read data on multiple connections in parallel. isPartitionColumn must be defined if loadAction.dbmsSource.numberOfPartitions is defined.
{
"sparkParams":
{
"sessionAppName": "Spark Data Source Loader"
},
"sourceLoad":
{
"logFileDir": "/dw/data/logs/sourceLoad/",
"loadActions":
[
{
"name" : "Load Team file",
"isActive": "true",
"loadControl" :
{
"loadControlParquetFileDir" : "/dw/data/control/"
},
"fileSource" :
{
"isRemoveDuplicateRows": "false",
"sourceFileDir": "/dw/data/sources/teaminfo",
"sourceFilesNamePattern" : "NFL-Teams\\.csv",
"fileSourceCsv" :
{
"header" : "true"
},
"processedFilesDir": "/dw/data/sources/processed/"
},
"fileDestination" :
{
"isVersioned" : "false",
"fileDestinationParquet" : {},
"fileDestinationPath" : "/dw/data/staging/team.parquet",
"fileDestinationSavePreviousVersionAs" : "/dw/data/staging/team.prev.parquet"
},
"schema" :
[
{ "colName" : "TeamName","colType" : "String" },
{ "colName" : "SeasonAbbreviation", "colType" : "String" },
{ "colName" : "SeasonYear", "colType" : "Integer" },
{ "colName" : "SeasonTeamName", "colType" : "String" },
{ "colName" : "ActiveSince", "colType" : "Integer" }
]
},
{
"name" : "Load Players file",
"isActive": "true",
"loadControl" :
{
"loadControlParquetFileDir" : "/dw/data/control/"
},
"fileSource" :
{
"isRemoveDuplicateRows": "false",
"sourceFileDir": "/dw/data/sources/players",
"sourceFilesNamePattern" : "players_2013-12-12.csv",
"fileSourceCsv" :
{
"header" : "true"
},
"processedFilesDir": "/dw/data/sources/processed/"
},
"fileDestination" :
{
"isVersioned" : "false",
"fileDestinationParquet" : {},
"fileDestinationPath" : "/dw/data/staging/stgplayers.parquet",
"optional-value fileDestinationSavePreviousVersionAs" : "/dw/data/staging/stgpbp.prev.parquet"
},
"schema" :
[
{ "colName" : "name", "colType" : "String", "isUniqueKey" : "true" },
{ "colName" : "first_name", "colType" : "String" },
{ "colName" : "last_name", "colType" : "String" },
{ "colName" : "birth_city", "colType" : "String" },
{ "colName" : "birth_state", "colType" : "String" },
{ "colName" : "birth_country", "colType" : "String" },
{ "colName" : "birth_date", "colType" : "String", "isUniqueKey" : "true" },
{ "colName" : "college", "colType" : "String" },
{ "colName" : "draft_team", "colType" : "String" },
{ "colName" : "draft_round", "colType" : "String" },
{ "colName" : "draft_pick", "colType" : "String" },
{ "colName" : "draft_year", "colType" : "Integer" },
{ "colName" : "position", "colType" : "String" },
{ "colName" : "height", "colType" : "String" },
{ "colName" : "weight", "colType" : "Integer" },
{ "colName" : "death_date", "colType" : "String" },
{ "colName" : "death_city", "colType" : "String" },
{ "colName" : "death_state", "colType" : "String" },
{ "colName" : "death_country", "colType" : "String" },
{ "colName" : "year_start", "colType" : "Integer" },
{ "colName" : "year_end", "colType" : "Integer" }
]
},
{
"name" : "Load ARMReset table",
"isActive": "true",
"isDebug" : "false",
"loadControl" :
{
"loadControlParquetFileDir" : "C:\\srcload\\LoadControl.parquet\\"
},
"dbmsSource" :
{
"effectiveDate" : "2020-05-13",
"dbmsSourceSqlServer" : { },
"jdbcUrl" : "jdbc:sqlserver://DESKTOP\\SQLSERVER2019:1433;database=testdb;integratedSecurity=true;",
"sourceTable" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
"numberOfPartitions" : "10",
"incrementalLoad" :
{
"sourceTableChanges" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] WHERE [RowVersion] > 0x<@RowVersionStr@> ) AS tbl",
"fileDestinationColumnMarkerPatterns" :
{
"left" : "\\<\\@", "right" : "\\@\\>"
}
},
},
"fileDestination" :
{
"isVersioned" : "true",
"parquet" : {},
"path" : "C:\\staging\\ARMResetTest.parquet",
"previousCopyPath" : "C:\\staging\\ARMResetTest.prev.parquet"
},
"schema" :
[
{ "colName" : "SecId", "isUniqueKey" : "true", "isPartitionColumn" : "true" },
{ "colName" : "AdjDt", "isUniqueKey" : "true" }
]
}
]
}
}