Validate

*Validate stages are used to perform validation and basic workflow controls. Programmers should think of these stages as assertions in that the job should terminate with error if a certain condition is not met.

EqualityValidate

Since: 1.0.0 - Supports Streaming: False

The EqualityValidate takes two input DataFrame and will succeed if they are identical or fail if not. This stage is useful to use in automated testing as it can be used to validate a derived dataset equals a known ‘good’ dataset.

This stage will validate:

  • Same number of columns.
  • Same data type of columns.
  • Same number of rows.
  • Same values in those rows.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
leftView String true Name of first incoming Spark dataset.
rightView String true Name of second incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.

Examples

Minimal

{
  "type": "EqualityValidate",
  "name": "verify data equality",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "customers_caculated",
  "rightView": "customers_known_correct"
}

MetadataValidate

Since: 2.4.0 - Supports Streaming: True

Similar to SQLValidate, the MetadataValidate stage takes an input SQL statement which is executed against the metadata attached to the input DataFrame which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not.

Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:

Field Description
name The field name.
type The field type.
metadata The field metadata.

This can be used like:

-- ensure that no pii columns are present in the inputView
SELECT
  SUM(pii_column) = 0
  ,TO_JSON(
    NAMED_STRUCT(
      'columns', COUNT(*),
      'pii_columns', SUM(pii_column)
    )
  )
FROM (
  SELECT
    CASE WHEN metadata.pii THEN 1 ELSE 0 END AS pii_column
  FROM metadata
) valid

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "MetadataValidate",
  "name": "apply data integrity rules against metadata",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "inputURI": "hdfs://datalake/sql/assert_no_pii_columns.sql"
}

Complete

{
  "type": "MetadataValidate",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "apply integrity rules against metadata",
  "description": "ensure no pii fields are in dataset",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "inputURI": "hdfs://datalake/sql/assert_no_pii_columns_dynamic.sql"
  "authentication": {},
  "sqlParams": {
    "pii_metadata_field_name": "pii"
  }
}

SQLValidate

Since: 1.0.0 - Supports Streaming: False

The SQLValidate takes an input SQL statement which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not. The second return value will be logged in case of success or failure which can be useful for understanding reason for job success/failure. This stage is exteremely powerful as abritrary job validation rules, expressed as SQL statements, can be executed to allow/prevent the job to succeed.

For example it can be used to perform automated extract validation against file formats which may have a header/footer layout or datasets where a certain level of data conversion errors are acceptable.

SQLValidate will try to convert the message from a JSON string manually created in the SQL statement so that logging is easier to parse by log aggregation tools.

See patterns for more examples.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Magic

The %sqlvalidate magic is available via arc-jupyter with these available parameters:

%sqlvaildate name="name" description="description" environments=production,test sqlParams=inputView=customer,inputField=id
SELECT
  ${inputField} = 1 AS valid,
  "" AS message
FROM ${inputView}

Examples

Minimal

{
  "type": "SQLValidate",
  "name": "apply data integrity rules",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}

Complete

{
  "type": "SQLValidate",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "apply data integrity rules",
  "description": "apply data integrity rules",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
  "sql": "SELECT TRUE AS valid, 'this message will appear in logs' AS message",
  "authentication": {},
  "sqlParams": {
    "record_error_tolerance_percentage": "0.05"
  }
}

For example after performing a TypingTransform it would be possible to execute a query which tests that a certain percentage of records are not errored:

_type date description total _error
detail 2016-12-19 daily total 14.23 [false]
detail 2016-12-20 daily total null [true]
detail 2016-12-21 daily total 18.20 [false]

With a JSON message (preferred):

SELECT
  (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
  ,TO_JSON(
    NAMED_STRUCT(
      'error', SUM(errors)/ COUNT(errors)
      ,'threshold', ${record_error_tolerance_percentage}
    )
  )
FROM (
  SELECT
    CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
  FROM detail
) valid

The FILTER function can be used to select errors only from a certain field:

SIZE(FILTER(_errors, _error -> _error.field == 'fieldname'))

With a text message:

SELECT
  (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
  ,CASE
    WHEN (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage} THEN 'number of errors below threshold. success.'
    ELSE CONCAT('error records ', ROUND((SUM(errors) / COUNT(errors)) * 100, 2), '%. required < ', ${record_error_tolerance_percentage} * 100,'%')
  END
FROM (
  SELECT
    CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
  FROM detail
) valid