Validate
*Validate
stages are used to perform validation and basic workflow controls. Programmers should think of these stages as assertions in that the job should terminate with error if a certain condition is not met.
EqualityValidate
Since: 1.0.0 - Supports Streaming: False
The EqualityValidate
takes two input DataFrame
and will succeed if they are identical or fail if not. This stage is useful to use in automated testing as it can be used to validate a derived dataset equals a known ‘good’ dataset.
This stage will validate:
- Same number of columns.
- Same data type of columns.
- Same number of rows.
- Same values in those rows.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
leftView | String | true | Name of first incoming Spark dataset. |
rightView | String | true | Name of second incoming Spark dataset. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
Examples
Minimal
{
"type": "EqualityValidate",
"name": "verify data equality",
"environments": [
"production",
"test"
],
"leftView": "customers_caculated",
"rightView": "customers_known_correct"
}
MetadataValidate
Since: 2.4.0 - Supports Streaming: True
Similar to SQLValidate
, the MetadataValidate
stage takes an input SQL statement which is executed against the metadata attached to the input DataFrame
which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not.
Arc will register a table called metadata
which contains the metadata of the inputView
. This allows complex SQL statements to be executed which returns which columns to retain from the inputView
in the outputView
. The available columns in the metadata
table are:
Field | Description |
---|---|
name | The field name. |
type | The field type. |
metadata | The field metadata. |
This can be used like:
-- ensure that no pii columns are present in the inputView
SELECT
SUM(pii_column) = 0
,TO_JSON(
NAMED_STRUCT(
'columns', COUNT(*),
'pii_columns', SUM(pii_column)
)
)
FROM (
SELECT
CASE WHEN metadata.pii THEN 1 ELSE 0 END AS pii_column
FROM metadata
) valid
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "MetadataValidate",
"name": "apply data integrity rules against metadata",
"environments": [
"production",
"test"
],
"inputView": "customer",
"inputURI": "hdfs://datalake/sql/assert_no_pii_columns.sql"
}
Complete
{
"type": "MetadataValidate",
"id": "00000000-0000-0000-0000-000000000000",
"name": "apply integrity rules against metadata",
"description": "ensure no pii fields are in dataset",
"environments": [
"production",
"test"
],
"inputView": "customer",
"inputURI": "hdfs://datalake/sql/assert_no_pii_columns_dynamic.sql"
"authentication": {},
"sqlParams": {
"pii_metadata_field_name": "pii"
}
}
SQLValidate
Since: 1.0.0 - Supports Streaming: False
The SQLValidate
takes an input SQL statement which must return [Boolean, Option[String]] and will succeed if the first return value is true or fail if not. The second return value will be logged in case of success or failure which can be useful for understanding reason for job success/failure. This stage is exteremely powerful as abritrary job validation rules, expressed as SQL statements, can be executed to allow/prevent the job to succeed.
For example it can be used to perform automated extract validation against file formats which may have a header/footer layout or datasets where a certain level of data conversion errors are acceptable.
SQLValidate
will try to convert the message from a JSON string manually created in the SQL statement so that logging is easier to parse by log aggregation tools.
See patterns for more examples.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Magic
The %sqlvalidate
magic is available via arc-jupyter with these available parameters:
%sqlvaildate name="name" description="description" environments=production,test sqlParams=inputView=customer,inputField=id
SELECT
${inputField} = 1 AS valid,
"" AS message
FROM ${inputView}
Examples
Minimal
{
"type": "SQLValidate",
"name": "apply data integrity rules",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}
Complete
{
"type": "SQLValidate",
"id": "00000000-0000-0000-0000-000000000000",
"name": "apply data integrity rules",
"description": "apply data integrity rules",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
"sql": "SELECT TRUE AS valid, 'this message will appear in logs' AS message",
"authentication": {},
"sqlParams": {
"record_error_tolerance_percentage": "0.05"
}
}
For example after performing a TypingTransform
it would be possible to execute a query which tests that a certain percentage of records are not errored:
_type | date | description | total | _error |
---|---|---|---|---|
detail | 2016-12-19 | daily total | 14.23 | [false] |
detail | 2016-12-20 | daily total | null | [true] |
detail | 2016-12-21 | daily total | 18.20 | [false] |
With a JSON
message (preferred):
SELECT
(SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
,TO_JSON(
NAMED_STRUCT(
'error', SUM(errors)/ COUNT(errors)
,'threshold', ${record_error_tolerance_percentage}
)
)
FROM (
SELECT
CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
FROM detail
) valid
The FILTER
function can be used to select errors only from a certain field:
SIZE(FILTER(_errors, _error -> _error.field == 'fieldname'))
With a text message:
SELECT
(SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage}
,CASE
WHEN (SUM(errors) / COUNT(errors)) < ${record_error_tolerance_percentage} THEN 'number of errors below threshold. success.'
ELSE CONCAT('error records ', ROUND((SUM(errors) / COUNT(errors)) * 100, 2), '%. required < ', ${record_error_tolerance_percentage} * 100,'%')
END
FROM (
SELECT
CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS errors
FROM detail
) valid