DBTimes

Home

Products

Frameworks

Contact us


 

Data Warehouse Source Comparer library. 2

Object DataSourceComparer 2

Value Members. 2

Example. 2

Configuration file. 2

Configuration Attributes. 4

logFileDir 4

compareScenarios. 4

compareScenario.name. 5

compareScenario.isActive. 5

compareScenario.floatCompareThreshold. 5

compareScenario.maxSameDifferences. 5

compareScenario.compareResult 5

compareScenario.compareResult.file.dir 5

compareScenario.compareResult.file.csv. 5

compareScenario.schema. 5

colName. 5

isUniqueKey. 5

isExclude. 5

compareScenario.leftSource and compareScenario.rightSource. 5

compareScenario.{left|right}Source.moniker 5

compareScenario.{left|right}Source.file. 6

compareScenario.{left|right}Source.file.path. 6

compareScenario.{left|right}Source.file.parquet 6

compareScenario.{left|right}Source.file.csv. 6

compareScenario.{left|right}Source.dbms.sqlServer 6

compareScenario.{left|right}Source.dbms.jdbcUrl 6

compareScenario.{left|right}Source.dbms.user 6

compareScenario.{left|right}Source.dbms.password. 6

compareScenario.{left|right}Source.dbms.sourceTable. 6

compareScenario.{left|right}Source.dbms.numberOfPartitions. 6

compareScenario.{left|right}Source.schema. 6

colName. 6

colType. 6

isPartitionColumn. 7

compareScenario.{left|right}Source.query. 7

compareScenario.{left|right}Source.query.sql 7

compareScenario.{left|right}Source.query.sourceNamePlaceholder 7

Sample configuration file. 7

Data Warehouse Source Comparer library

 

Data Warehouse Source Comparer is a library used to compare tabular results from various sources. It can be used for reconciliation or to verify processing by comparing sources. The source can be located in DBMS’s or files.

 

To compare two sources you need to define a compare scenario. The scenario is defined in JSON configuration.

 

The library’s package:

 

package com.dbtimes.dw.sourcecomparer

Object DataSourceComparer

package com.dbtimes.dw.sourcecomparer
 
object DataSourceComparer

Value Members

def compare(appConfig: Config): Unit

Used to compare two sources based on the configuration.

Example

 

import org.apache.spark.sql.SparkSession
import com.typesafe.config.{ConfigFactory, Config}
import com.dbtimes.dw.sourcecomparer.DataSourceComparer

 

def main(args: Array[String]) {
  val configFileName = args(0) 
  val srcComparerConfig = ConfigFactory.load(configFileName)
  val spark = sparkSessionBuilder.getOrCreate()
  DataSourceComparer.compare(srcComparerConfig)
  spark.stop()
}

Configuration file

 

Configuration file describes all parameters required by the comparer. The section used by the comparer library must be called "sourceLoad". Configuration file can have additional attributes if needed by the calling application. Additional attributes can be added to any section of configuration.

 

Optional attributes are highlighted:

·         "isExclude" : "false", highlighted in grey are shown with default values, or

·         "floatCompareThreshold", highlighted in yellow are not required, or

·         sparkParams, highlighted in magenta are additional application specific attributes that are not used by the library. These attributes can be organized in any way.

 

{
  "sparkParams":
  {
    "sessionAppName": "My App Data Source Comparer"
  },
  "sourceCompare":
  {
    "logFileDir": "/dw/data/logs/sourceComparer/",
    "compareScenarios":
    [
      {
        "name" : "Compare some file with the source",
        "isActive": "true",
        "floatCompareThreshold" : "0.01",
       
"maxSameDifferences" : "10",
       
"compareResult" :
        {
         
"file" :
          {
           
"dir": “/dw/data/compare-result/",
           
"csv" :
            {
             
"header" : "true"
            }
          }
        },

        "schema" :
        [
          {
"colName" : "Id", "isUniqueKey" : "true" },
          {
"colName" : "Dt", "isUniqueKey" : "true" },
          {
"colName" : "AnotherValue", "isExclude" : "false"  }
        ],

        "leftSource" :
        {
         
"moniker" : "Name of the left source",

          "file" :                                                              
          {
            "path": "/dw/data/sources/parquet_file",
            "parquet" : {}
          },
 
OR
"file" :                                                              
{
  "path": "/dw/data/sources/some_file.csv",
  "csv" :
  {

    "mode" : "PERMISSIVE",
    "header" : "true",
    "encoding" : "ISO-8859-1",

   

  },
},
 
OR
"dbms" :
{
  "sqlServer" : { },
  "jdbcUrl" : "jdbc:sqlserver://… ",
  "user": "…",
  "password": "…",
  "sourceTable" : "…",
  "numberOfPartitions" : "10",
},
 
          "schema" :
          [
            { "colName" : "SomeName", "colType" : "String", "isPartitionColumn" : "true" },
            { "colName" : "SomeOtherName", "colType" : "String" },
            
          ],
          "query" :
          {

            "sql" : " SELECT Id, Dt, SomeValue, SomeOtherValue, AnotherValue, UpdSrc FROM <SOURCE>",

            "sourceNamePlaceholder" : "<SOURCE>"

          }
        },

        "rightSource" :
        {
         
"moniker" : "Name of the right source",

          "file" :                                                              
          {
            "path": "/dw/data/sources/parquet_file",
            "parquet" : {}
          },
 
OR
"file" :                                                              
{
  "path": "/dw/data/sources/some_file.csv",
  "csv" :
  {

    "mode" : "PERMISSIVE",
    "header" : "true",
    "encoding" : "ISO-8859-1",

   

  },
},
"schema" :
[
  { "colName" : "SomeName", "colType" : "String" },
  { "colName" : "SomeOtherName", "colType" : "String" },
  
],
 
OR
"dbms" :
{
  "sqlServer" : { },
  "jdbcUrl" : "jdbc:sqlserver://… ",
  "user": "…",
  "password": "…",
  "sourceTable" : "…",
  "numberOfPartitions" : "10",
},
"schema" :
[
  { "colName" : "SomeName", "isPartitionColumn" : "true" }
],
 
 
          
          "query" :
          {

            "sql" : "SELECT Id, Dt, SomeValue, SomeOtherValue, AnotherValue, UpdSrc FROM @RIGHT_SOURCE",

            "sourceNamePlaceholder" : "@RIGHT_SOURCE"

          }
        }
      }
      ,
      {
        "name" : "Some other scenario",
         elements for other scenario 
      },
      
    ]
  }
}

 

Configuration Attributes

logFileDir

A full path of the directory for log files. Used primarily for development. It is not available on HDFS. There are two log files created – one with library messages and, another, with Spark messages.

compareScenarios

An array of specifications for comparing sources. A compare scenario is used to compare two sources. We will use compareScenario to refer to one element of compareScenarios array.

compareScenario.name

A name of the compare scenario to differentiate multiple compare scenarios.

compareScenario.isActive

If the value of isActive is false, the compare scenario will not be processed. It also will not be validated.

compareScenario.floatCompareThreshold

When comparing two floating point values, they will be considered equal if the absolute value of a difference is less than floatCompareThreshold. The default value for this attribute is zero.

compareScenario.maxSameDifferences

When defined, the number of reported differences for each column will not exceed the specified value. The parameter can be used to reduce the size of the output as in many cases differences can be investigated using a a small number of instances.

compareScenario.compareResult

Hold specification of where to store the result of the comparison.

compareScenario.compareResult.file.dir

Directory where the compare result will be stored.

compareScenario.compareResult.file.csv

defines the format of the compare result to be .csv file.

compareScenario.schema

Schema is an array of unique key and exclude columns in compare result.

 

Schema must contain unique key columns and it may contain columns to exclude from compare. The names and types of all columns in the result set are derived from result set based on compareScenario.leftSource.query.sql

colName

Column name - required.

isUniqueKey

Designates column as a unique key. At least one column must have this property set to true since comparison algorithm is based on it.

Multiple columns can be designated as unique key to create a composite unique key.

isExclude

When set to true the corresponding column will be excluded from comparison. If a unique key column is excluded the column will also be excluded from composite unique key. It is an error to exclude the only unique key column.

compareScenario.leftSource and compareScenario.rightSource

These two sections are for two data sets being compared. The sections are identical in terms of structure, but they can have different types of sources. For example, one can be DBMS and another a file, or both can be DBMS or both files. The sets being compared must match compareScenario.schema.

compareScenario.{left|right}Source.moniker

The name for the source to be used in compare result. If none defined – the default is “left” or “right”.

compareScenario.{left|right}Source.file

Describes file source. The file can be parquet or csv.

compareScenario.{left|right}Source.file.path

Full path of the file.

compareScenario.{left|right}Source.file.parquet

Defines the file type to be parquet.

compareScenario.{left|right}Source.file.csv

Defines the file type to be csv. The attribute can be followed by options that will be used to load this csv file. The names and the values of the options are defined in Spark documentation.

compareScenario.{left|right}Source.dbms.sqlServer

Defines SQL Server as source DBMS

compareScenario.{left|right}Source.dbms.jdbcUrl

The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g.,

jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;

compareScenario.{left|right}Source.dbms.user

Used for SQL Server authentication

compareScenario.{left|right}Source.dbms.password

Used for SQL Server authentication

compareScenario.{left|right}Source.dbms.sourceTable

A source table name or a query. If a query is specified is must be in format “(SELECT   FROM  [WHERE]… ) AS table-alias”, e.g.,

"[dbo].[ARMResetTest]"

or

"( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl"

compareScenario.{left|right}Source.dbms.numberOfPartitions

If defined together with partitioning column (see column attribute "isPartitionColumn") Spark partitioned read from the DBMS will be used. The lower and upper boundaries will be determined by the loader by querying the source table.

compareScenario.{left|right}Source.schema

Defines full or partial schema for the source:

·         Full schema is required for csv file.

·         Schema is not needed for parquet file.

·         Partial schema may be needed for DBMS source to define partition column for reading data, if Spark partitioning is used to read data.

colName

Column name - required.

colType

Column data type – optional for DBMS source, required for csv file source.

isPartitionColumn

For DBMS source one column can be defined as partition column to read data on multiple connections in parallel. isPartitionColumn must be defined if compareScenario.{left|right}Source.dbms.numberOfPartitions is defined.

compareScenario.{left|right}Source.query

Query is used to define a subset of columns/rows on top of the source query to be used in compare. Query can also be used to adjust column types (by using CAST) or column names to have both left and right set match in terms of structure.

compareScenario.{left|right}Source.query.sql

Any valid sql statement to be executed on a data frame that is created from reading the Source. The result set from the query must match compareScenario.schema in terms of columns and column names. The name of the source table is defined using a place holder. The default name for the table name placeholder is "<SOURCE>", but any name placeholder name can be used in which case query.sourceNamePlaceholder must be defined. The placeholder will be replaced with a view name when sql is executed.

compareScenario.{left|right}Source.query.sourceNamePlaceholder

The name of the Source in sql.

Sample configuration file

{
 
"sparkParams":
  {
   
"sessionAppName": "Data Source Reconciliation",
   
"sessionConfigMaster": "local",
   
"sessionConfigDriverHost": "localhost",
  },
 
"sourceCompare":
  {
   
"logFileDir": “/dw/data/logs/sourceComparer",
   
"compareScenarios":
    [
      {
       
"name" : "Compare Team file",
       
"isActive": "false",
       
"isDebugDwLib" : "false",
       
"floatCompareThreshold" : "0.01",
        
"compareResult" :
        {
         
"file" :
          {
           
"dir": “/dw/data/compare-result/",
           
"csv" :
            {
             
"header" : "true"
           
}
          }
        },
       
"schema" :
        [
          {
"colName" : "TeamName","colType" : "String","isUniqueKey" : "true" },
          {
"colName" : "SeasonAbbreviation", "colType" : "String", "isUniqueKey" : "true" },
          {
"colName" : "SeasonYear", "colType" : "Integer" }
        ],
       
"leftSource" :
        {
         
"moniker" : "Original",
         
"file" :
          {
           
"path": “/dw/data/nfl/team-info/nfl-teams.csv",
           
"csv" :
            {
             
"header" : "true"
           
}
          },
         
"schema" :
          [
            {
"colName" : "TeamName","colType" : "String" },
            {
"colName" : "SeasonAbbreviation", "colType" : "String" },
            {
"colName" : "SeasonYear", "colType" : "Integer" },
            {
"colName" : "SeasonTeamName", "colType" : "String" },
            {
"colName" : "ActiveSince", "colType" : "Integer" }
          ],
         
"query" :
          {
           
"sql": "SELECT TeamName, SeasonAbbreviation, SeasonYear FROM <SOURCE>",
           
"sourceNamePlaceholder": "<SOURCE>"
         
}
        },
       
"rightSource" :
        {
         
"moniker" : "SQL Server",
         
"dbms" :
          {
           
"sqlServer" : { },
           
"jdbcUrl" : "jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;",
           
"__jdbcUrl" : "jdbc:sqlserver://DESKTOP-T3QOCRG\\SQLSERVER2019:1433;database=embs;",
           
"__user": "sa",
           
"__password": "Spark!",
           
"sourceTable" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
           
"numberOfPartitions" : "10"
         
},
         
"query" :
          {
            
"sql" : "SELECT TeamName, SeasonAbbreviation, SeasonYear FROM <SOURCE>",
           
"sourceNamePlaceholder" : "<SOURCE>"
         
}
        }
      },
      {
       
"name" : "Compare with SQL Server",
       
"isActive": "true",
       
"isDebugDwLib" : "false",
       
"floatCompareThreshold" : "0.01",
       
"compareResult" :
        {
         
"file" :
          {
           
"dir": “/dw/data/compare-result/",
           
"csv" :
            {
             
"header" : "true"
           
}
          }
        },
       
"schema" :
        [
          {
"colName" : "SecId", "isUniqueKey" : "true" },
          {
"colName" : "AdjDt", "isUniqueKey" : "true" },
          {
"colName" : "RPB" },
          {
"colName" : "LoanLifeFloorLo" },
          {
"colName" : "NumLoans" },
          {
"colName" : "EffDt", "isExclude" : "true" },
          {
"colName" : "UpdSrc" }
        ],
       
"leftSource" :
        {
         
"moniker" : "SQL Server",
         
"dbms" :
          {
           
"sqlServer" : { },
           
"jdbcUrl" : "jdbc:sqlserver://my-server\\SQLSERVER2019:1433;database=my-db-name;integratedSecurity=true;",
           
"__jdbcUrl" : "jdbc:sqlserver://DESKTOP-T3QOCRG\\SQLSERVER2019:1433;database=embs;",
           
"__user": "my_user_id",
           
"__password": "my_password",
           
"sourceTable" : "( SELECT *, CONVERT( VARCHAR( 200), CONVERT( BINARY(8), [RowVersion], 1 ), 2 ) AS RowVersionStr FROM [dbo].[ARMResetTest] ) as tbl",
           
"numberOfPartitions" : "10"
         
},
         
"schema" :
          [
            {
"colName" : "SecId", "isPartitionColumn" : "true" }
          ],
         
"query" :
          {
           
"sql" : "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc FROM <SOURCE> ",
           
"sourceNamePlaceholder" : "<SOURCE>"
         
}
        },
       
"rightSource" :
        {
         
"moniker" : "Parquet file",
         
"file" :
          {
           
"path" : “/dw/data/data/nfl/staging/armresettest.parquet",
           
"parquet" : {}
          },
         
"query" :
          {
           
"sql": "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc FROM @RIGHT_SOURCE WHERE _SrcDt_EffectiveDateEnd = '2099-01-01'",
           
"sourceNamePlaceholder": "@RIGHT_SOURCE"
          
}
        }
      },
      {
       
"name" : "Compare to itself parquet",
       
"isActive": "true",
       
"isDebugDwLib" : "false",
       
"floatCompareThreshold" : "0.01",
       
"compareResult" :
        {
         
"file" :
          {
            
"dir": “/dw/data/compare-result/",
           
"csv" :
            {
             
"header" : "true"
           
}
          }
        },
       
"schema" :
        [
          {
"colName" : "SecId", "isUniqueKey" : "true" },
          {
"colName" : "AdjDt", "isUniqueKey" : "true" },
          {
"colName" : "RPB" },
          {
"colName" : "LoanLifeFloorLo" },
          {
"colName" : "NumLoans" },
          {
"colName" : "EffDt", "isExclude" : "true" },
          {
"colName" : "UpdSrc" }
        ],
       
"leftSource" :
        {
         
"moniker" : "left Parquet file",
         
"file" :
          {
           
"path" : “/dw/data/data/nfl/staging/armresettest.parquet",
           
"parquet" : {}
          },
         
"query" :
          {
            
"sql": "SELECT SecId,AdjDt,RPB,LoanLifeFloorLo,NumLoans,EffDt,UpdSrc FROM @RIGHT_SOURCE WHERE _SrcDt_EffectiveDateEnd = '2099-01-01'",
           
"sourceNamePlaceholder": "@RIGHT_SOURCE"
         
}
        },
       
"rightSource" :
        {
          
"moniker" : "right Parquet file",
         
"file" :
          {
           
"path" : “/dw/data/data/nfl/staging/armresettest.parquet",
           
"parquet" : {}
          },
         
"query" :
          {
           
"sql": "SELECT SecId,AdjDt,IF( SecId = 9997, RPB - 1, RPB) AS RPB,LoanLifeFloorLo,IF( SecId = 1388360, NumLoans + 4, NumLoans ) AS NumLoans,EffDt,UpdSrc FROM @RIGHT_SOURCE WHERE SecId NOT IN ( 9998,9999)",
           
"sourceNamePlaceholder": "@RIGHT_SOURCE"
         
}
        }
      }
    ]
  }
}