Validate

*Validate stages are used to perform validation and basic workflow controls..

EqualityValidate

Since: 1.0.0 - Supports Streaming: False

The EqualityValidate takes two input DataFrame and will succeed if they are identical or fail if not. This stage is useful to use in automated testing as it can be used to validate a derived dataset equals a known ‘good’ dataset.

This stage will validate:

  • Same number of columns.
  • Same data type of columns.
  • Same number of rows.
  • Same values in those rows.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
leftView String true Name of first incoming Spark dataset.
rightView String true Name of second incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "EqualityValidate",
  "name": "verify data equality",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "customers_caculated",
  "rightView": "customers_known_correct"
}

MetadataValidate

Since: 2.4.0 - Supports Streaming: True

Similar to SQLValidate, the MetadataValidate stage takes an input SQL statement which is executed against the metadata attached to the input DataFrame which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not.

Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:

Field Description
name The field name.
type The field type.
metadata The field metadata.

This can be used like:

-- ensure that no pii columns are present in the inputView
SELECT 
    SUM(pii_column) = 0
    ,TO_JSON(
        NAMED_STRUCT(
            'columns', COUNT(*), 
            'pii_columns', SUM(pii_column)
        )
    ) 
FROM (
    SELECT 
        CASE WHEN metadata.pii THEN 1 ELSE 0 END AS pii_column
    FROM detail
) valid

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
inputURI URI true URI of the input file containing the SQL statement.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "MetadataValidate",
  "name": "apply data integrity rules against metadata",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "inputURI": "hdfs://datalake/sql/assert_no_pii_columns.sql"
}

Complete

{
  "type": "MetadataValidate",
  "name": "apply integrity rules against metadata",
  "description": "ensure no pii fields are in dataset",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "inputURI": "hdfs://datalake/sql/assert_no_pii_columns_dynamic.sql"
  "authentication": {},
  "sqlParams": {
    "pii_metadata_field_name": "pii"
  }
}

SQLValidate

Since: 1.0.0 - Supports Streaming: False

The SQLValidate takes an input SQL statement which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not. The second return value will be logged in case of success or failure which can be useful for understanding reason for job success/failure. This stage is exteremely powerful as abritrary job validation rules, expressed as SQL statements, can be executed to allow/prevent the job to succeed.

For example it can be used to perform automated extract validation against file formats which may have a header/footer layout or datasets where a certain level of data conversion errors are acceptable.

SQLValidate will try to convert the message from a JSON string which can be manually created in the SQL statement so that logging is easier to parse by log aggregation tools.

See patterns for more examples.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "SQLValidate",
  "name": "apply data integrity rules",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}

Complete

{
  "type": "SQLValidate",
  "name": "apply data integrity rules",
  "description": "apply data integrity rules",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
  "authentication": {},
  "sqlParams": {
    "record_error_tolerance_percentage": "0.05"
  }
}

For example after performing a TypingTransform it would be possible to execute a query which tests that a certain percentage of records are not errored:

_type date description total _error
detail 2016-12-19 daily total 14.23 [false]
detail 2016-12-20 daily total null [true]
detail 2016-12-21 daily total 18.20 [false]

With a JSON message (preferred):

SELECT 
    (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
    ,TO_JSON(NAMED_STRUCT('error', SUM(errors)/ COUNT(errors), 'threshold', ${record_error_tolerance_percentage})) 
FROM (
    SELECT 
        CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
    FROM detail
) valid

The FILTER function can be used to select errors only from a certain field:

SIZE(FILTER(_errors, _error -> _error.field == 'fieldname'))

With a text message:

SELECT 
    (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
    ,CASE 
        WHEN (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage} THEN 'number of errors below threshold. success.'
        ELSE CONCAT('error records ', ROUND((SUM(errors) / COUNT(errors)) * 100, 2), '%. required < ', ${record_error_tolerance_percentage} * 100,'%') 
    END    
FROM (
    SELECT 
        CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
    FROM detail
) valid