DBTimes
Data Warehouse Source
Comparer library
compareScenario.floatCompareThreshold
compareScenario.maxSameDifferences
compareScenario.compareResult.file.dir
compareScenario.compareResult.file.csv
compareScenario.leftSource
and compareScenario.rightSource
compareScenario.{left|right}Source.moniker
compareScenario.{left|right}Source.file
compareScenario.{left|right}Source.file.path
compareScenario.{left|right}Source.file.parquet
compareScenario.{left|right}Source.file.csv
compareScenario.{left|right}Source.dbms.sqlServer
compareScenario.{left|right}Source.dbms.jdbcUrl
compareScenario.{left|right}Source.dbms.user
compareScenario.{left|right}Source.dbms.password
compareScenario.{left|right}Source.dbms.sourceTable
compareScenario.{left|right}Source.dbms.numberOfPartitions
compareScenario.{left|right}Source.schema
compareScenario.{left|right}Source.query
compareScenario.{left|right}Source.query.sql
compareScenario.{left|right}Source.query.sourceNamePlaceholder
Data Warehouse Source Comparer is a library used to compare tabular results from various sources. It can be used for reconciliation or to verify processing by comparing sources. The source can be located in DBMS’s or files.
To compare two sources you need to
define a compare scenario. The scenario is defined in JSON configuration.
The library’s package:
package com.dbtimes.dw.sourcecomparer
package com.dbtimes.dw.sourcecomparer
object DataSourceComparer
def compare(appConfig: Config): Unit
Used to compare two sources based on the configuration.
import org.apache.spark.sql.SparkSession
import com.typesafe.config.{ConfigFactory, Config}
import com.dbtimes.dw.sourcecomparer.DataSourceComparer
def main(args: Array[String]) {
val configFileName = args(0)
val srcComparerConfig = ConfigFactory.load(configFileName)
val spark = sparkSessionBuilder.getOrCreate()
DataSourceComparer.compare(srcComparerConfig)
spark.stop()
}
Configuration file describes all parameters required by the comparer. The section used by the comparer library must be called "sourceLoad". Configuration file can have additional attributes if needed by the calling application. Additional attributes can be added to any section of configuration.
Optional attributes are highlighted:
· "isExclude" : "false", highlighted in grey are shown with default values, or
· "floatCompareThreshold", highlighted in yellow are not required, or
· “sparkParams”, highlighted in magenta are additional application specific attributes that are not used by the library. These attributes can be organized in any way.
{
"sparkParams":
{
"sessionAppName": "My App Data Source Comparer"
},
"sourceCompare":
{
"logFileDir": "/dw/data/logs/sourceComparer/",
"compareScenarios":
[
{
"name" : "Compare some file with the source",
"isActive": "true",
"floatCompareThreshold" : "0.01",
"maxSameDifferences" : "10",
"compareResult" :
{
"file" :
{
"dir": “/dw/data/compare-result/",
"csv" :
{
"header" : "true"
}
}
},
"schema"
:
[
{ "colName" : "Id", "isUniqueKey" : "true" },
{ "colName" : "Dt", "isUniqueKey" : "true" },
{ "colName" : "AnotherValue",
"isExclude"
: "false" }
],
"leftSource" :
{
"moniker" : "Name of
the left source",
"file" : { "path": "/dw/data/sources/parquet_file", "parquet" : {} },
|
OR |
"file" : { "path": "/dw/data/sources/some_file.csv", "csv" : {
"mode" : "PERMISSIVE", … }, },
|
OR |
"dbms" : { "sqlServer" : { }, "jdbcUrl" : "jdbc:sqlserver://… ", "user": "…", "password": "…", "sourceTable" : "…", "numberOfPartitions" : "10", },
|
"schema" :
[
{ "colName" : "SomeName", "colType" : "String", "isPartitionColumn" : "true" },
{ "colName" : "SomeOtherName", "colType" : "String" },
…
],
"query" :
{
"sql" : " SELECT
Id, Dt, SomeValue, SomeOtherValue, AnotherValue, UpdSrc FROM <SOURCE>",
"sourceNamePlaceholder" : "<SOURCE>"
}
},
"rightSource" :
{
"moniker" : "Name of
the right source",
"file" : { "path": "/dw/data/sources/parquet_file", "parquet" : {} },
|
OR |
"file" : { "path": "/dw/data/sources/some_file.csv", "csv" : {
"mode" : "PERMISSIVE", … }, }, "schema" : [ { "colName" : "SomeName", "colType" : "String" }, { "colName" : "SomeOtherName", "colType" : "String" }, … ],
|
OR |
"dbms" : { "sqlServer" : { }, "jdbcUrl" : "jdbc:sqlserver://… ", "user": "…", "password": "…", "sourceTable" : "…", "numberOfPartitions" : "10", }, "schema" : [ { "colName" : "SomeName", "isPartitionColumn" : "true" } ],
|
"query" :
{
"sql" : "SELECT
Id, Dt, SomeValue, SomeOtherValue, AnotherValue, UpdSrc FROM @RIGHT_SOURCE",
"sourceNamePlaceholder" : "@RIGHT_SOURCE"
}
}
}
,
{
"name" : "Some other scenario",
… elements for other scenario
},
…
]
}
}
A full path of the directory for log files. Used primarily for development. It is not available on HDFS. There are two log files created – one with library messages and, another, with Spark messages.
An array of specifications for comparing sources. A compare scenario is used to compare two sources. We will use compareScenario to refer to one element of compareScenarios array.
A name of the compare scenario to differentiate multiple compare scenarios.
If the value of isActive is false, the compare scenario will not be processed. It also will not be validated.
When comparing two floating point values, they will be considered equal if the absolute value of a difference is less than floatCompareThreshold. The default value for this attribute is zero.
When
defined, the number of reported differences for each column will not exceed the
specified value. The parameter can be used to reduce the size of the output as
in many cases differences can be investigated using a a small number of instances.
Hold specification of where to store the result of the comparison.
Directory where the compare result will be stored.
defines the format of the compare result to be .csv file.
Schema is an array of unique key and exclude columns in compare result.
Schema must contain unique key columns and it may contain columns to exclude from compare. The names and types of all columns in the result set are derived from result set based on compareScenario.leftSource.query.sql
Column name - required.
Designates column as a unique key. At least one column must have this property set to true since comparison algorithm is based on it.
Multiple columns can be designated as unique key to create a composite unique key.
When set to true the corresponding column will be excluded from comparison. If a unique key column is excluded the column will also be excluded from composite unique key. It is an error to exclude the only unique key column.
These two sections are for two data sets being compared. The sections are identical in terms of structure, but they can have different types of sources. For example, one can be DBMS and another a file, or both can be DBMS or both files. The sets being compared must match compareScenario.schema.
The name for the source to be used in compare result. If none defined – the default is “left” or “right”.
Describes file source. The file can be parquet or csv.
Full path of the file.
Defines the file type to be parquet.
Defines the file type to be csv. The attribute can be followed by options that will be used to load this csv file. The names and the values of the options are defined in Spark documentation.
Defines SQL Server as source DBMS
The JDBC URL to connect to. The source-specific
connection properties may be specified in the URL. e.g.,
jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;
Used for SQL Server authentication
Used for SQL Server authentication
A source table
name or a query.
If a query is specified is must be in format “(SELECT … FROM …
[WHERE]… ) AS table-alias”, e.g.,
"[dbo].[ARMResetTest]"
or
"( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl"
If defined together with partitioning column (see column attribute "isPartitionColumn") Spark partitioned read from the DBMS will be used. The lower and upper boundaries will be determined by the loader by querying the source table.
Defines full or partial schema for the source:
· Full schema is required for csv file.
· Schema is not needed for parquet file.
· Partial schema may be needed for DBMS source to define partition column for reading data, if Spark partitioning is used to read data.
Column name - required.
Column data type – optional for DBMS source, required for csv file source.
For DBMS source one column can be defined as partition column to read data on multiple connections in parallel. isPartitionColumn must be defined if compareScenario.{left|right}Source.dbms.numberOfPartitions is defined.
Query is used to define a subset of columns/rows on top of the source query to be used in compare. Query can also be used to adjust column types (by using CAST) or column names to have both left and right set match in terms of structure.
Any valid sql statement to be executed on a data frame that is created from reading the Source. The result set from the query must match compareScenario.schema in terms of columns and column names. The name of the source table is defined using a place holder. The default name for the table name placeholder is "<SOURCE>", but any name placeholder name can be used in which case query.sourceNamePlaceholder must be defined. The placeholder will be replaced with a view name when sql is executed.
The name of the Source in sql.
{
"sparkParams":
{
"sessionAppName": "Data
Source Reconciliation",
"sessionConfigMaster": "local",
"sessionConfigDriverHost": "localhost",
},
"sourceCompare":
{
"logFileDir": “/dw/data/logs/sourceComparer",
"compareScenarios":
[
{
"name" : "Compare
Team file",
"isActive": "false",
"isDebugDwLib"
: "false",
"floatCompareThreshold"
: "0.01",
"compareResult"
:
{
"file" :
{
"dir": “/dw/data/compare-result/",
"csv" :
{
"header"
: "true"
}
}
},
"schema" :
[
{ "colName" : "TeamName","colType" : "String","isUniqueKey" : "true" },
{ "colName" : "SeasonAbbreviation", "colType" : "String", "isUniqueKey" : "true" },
{ "colName" : "SeasonYear", "colType" : "Integer" }
],
"leftSource"
:
{
"moniker" : "Original",
"file" :
{
"path": “/dw/data/nfl/team-info/nfl-teams.csv",
"csv" :
{
"header"
: "true"
}
},
"schema" :
[
{ "colName" : "TeamName","colType" : "String" },
{ "colName" : "SeasonAbbreviation", "colType" : "String" },
{ "colName" : "SeasonYear", "colType" : "Integer" },
{ "colName" : "SeasonTeamName", "colType" : "String" },
{ "colName" : "ActiveSince", "colType" : "Integer" }
],
"query" :
{
"sql": "SELECT TeamName, SeasonAbbreviation, SeasonYear
FROM <SOURCE>",
"sourceNamePlaceholder": "<SOURCE>"
}
},
"rightSource"
:
{
"moniker" : "SQL
Server",
"dbms"
:
{
"sqlServer" : { },
"jdbcUrl" : "jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;",
"__jdbcUrl" : "jdbc:sqlserver://DESKTOP-T3QOCRG\\SQLSERVER2019:1433;database=embs;",
"__user": "sa",
"__password": "Spark!",
"sourceTable"
: "( SELECT
*, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion],
1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
"numberOfPartitions" : "10"
},
"query" :
{
"sql"
: "SELECT TeamName, SeasonAbbreviation, SeasonYear FROM <SOURCE>",
"sourceNamePlaceholder" : "<SOURCE>"
}
}
},
{
"name" : "Compare
with SQL Server",
"isActive": "true",
"isDebugDwLib"
: "false",
"floatCompareThreshold"
: "0.01",
"compareResult"
:
{
"file" :
{
"dir": “/dw/data/compare-result/",
"csv" :
{
"header"
: "true"
}
}
},
"schema" :
[
{ "colName" : "SecId", "isUniqueKey" : "true" },
{ "colName" : "AdjDt", "isUniqueKey" : "true" },
{ "colName" : "RPB" },
{ "colName" : "LoanLifeFloorLo"
},
{ "colName" : "NumLoans"
},
{ "colName" : "EffDt", "isExclude" : "true" },
{ "colName" : "UpdSrc"
}
],
"leftSource"
:
{
"moniker" : "SQL
Server",
"dbms"
:
{
"sqlServer" : { },
"jdbcUrl" : "jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;",
"__jdbcUrl" : "jdbc:sqlserver://DESKTOP-T3QOCRG\\SQLSERVER2019:1433;database=embs;",
"__user": "my_user_id",
"__password": "my_password",
"sourceTable" : "( SELECT *, CONVERT( VARCHAR(
200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
"numberOfPartitions" : "10"
},
"schema" :
[
{ "colName" : "SecId", "isPartitionColumn" : "true" }
],
"query" :
{
"sql" : "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc
FROM <SOURCE> ",
"sourceNamePlaceholder" : "<SOURCE>"
}
},
"rightSource"
:
{
"moniker" : "Parquet
file",
"file" :
{
"path"
: “/dw/data/data/nfl/staging/armresettest.parquet",
"parquet"
:
{}
},
"query" :
{
"sql": "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc
FROM @RIGHT_SOURCE WHERE _SrcDt_EffectiveDateEnd =
'2099-01-01'",
"sourceNamePlaceholder": "@RIGHT_SOURCE"
}
}
},
{
"name" : "Compare
to itself parquet",
"isActive": "true",
"isDebugDwLib"
: "false",
"floatCompareThreshold"
: "0.01",
"compareResult"
:
{
"file" :
{
"dir": “/dw/data/compare-result/",
"csv" :
{
"header"
: "true"
}
}
},
"schema" :
[
{ "colName" : "SecId", "isUniqueKey" : "true" },
{ "colName" : "AdjDt", "isUniqueKey" : "true" },
{ "colName" : "RPB" },
{ "colName" : "LoanLifeFloorLo"
},
{ "colName" : "NumLoans"
},
{ "colName" : "EffDt", "isExclude" : "true" },
{ "colName" : "UpdSrc"
}
],
"leftSource"
:
{
"moniker" : "left
Parquet file",
"file" :
{
"path"
: “/dw/data/data/nfl/staging/armresettest.parquet",
"parquet"
:
{}
},
"query" :
{
"sql": "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc FROM
@RIGHT_SOURCE WHERE _SrcDt_EffectiveDateEnd =
'2099-01-01'",
"sourceNamePlaceholder": "@RIGHT_SOURCE"
}
},
"rightSource"
:
{
"moniker" : "right
Parquet file",
"file" :
{
"path"
: “/dw/data/data/nfl/staging/armresettest.parquet",
"parquet"
:
{}
},
"query" :
{
"sql": "SELECT SecId,AdjDt,IF(
SecId = 9997, RPB - 1, RPB) AS RPB,LoanLifeFloorLo,IF(
SecId = 1388360, NumLoans +
4, NumLoans ) AS NumLoans,EffDt,UpdSrc
FROM @RIGHT_SOURCE WHERE SecId NOT IN (
9998,9999)",
"sourceNamePlaceholder": "@RIGHT_SOURCE"
}
}
}
]
}
}