Transform

*Transform stages apply a single transformation to one or more incoming datasets.

Transformers should meet this criteria:

DebeziumTransform

Since: 3.6.0 - Supports Streaming: True

Plugin

The DebeziumTransform is provided by the https://github.com/tripl-ai/arc-debezium-pipeline-plugin package.

Experimental

The DebeziumTransform is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change and feedback is valued.

The DebeziumTransform stage decodes Debezium change-data-capture JSON formatted messages for MySQL, PostgreSQL and MongoDB databases and creates a DataFrame which represents an eventually consistent view of a source dataset at a point in time. It supports Complete and Append modes Structured Streaming modes.

It is intended to be used after a KafkaExtract stage.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
schema Array true* An inline Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaURI URI true* URI of the input JSON file containing the Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaView String true* Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame. Only one of schema, schemaURI, schemaView can be provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
initialStateKey String false Used to set the key to group the initialStateView to merge with the Debezium events.
initialStateView String false Used to inject a previous state into the connector the Debezium events are applied against. Requires initialStateKey to be set.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
strict Boolean false If strict mode is enabled then every change per key is applied in strict sequence and will fail if a prior state is not what is expected. When strict mode is disabled then DebeziumTransform will employ a last-writer wins conflict resolution strategy which requires strict ordering from the source but will be quicker.

Not all sources support non-strict mode d ue to how they record change events such as MongoDB.

Default: true.

Examples

Minimal

{
  "type": "DebeziumTransform",
  "name": "DebeziumTransform",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer_change_data_capture",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json"
}

Complete

{
  "type": "DebeziumTransform",
  "name": "DebeziumTransform",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer_change_data_capture",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "strict": true,
  "initialStateView": "previous_customer",
  "initialStateKey": "customer_id",
  "persist": true,
  "numPartitions": 10,
  "partitionBy": [
    "customer_segment"
  ]
}

DiffTransform

Since: 1.0.8 - Supports Streaming: False

The DiffTransform stage calculates the difference between two input datasets and produces three datasets:

  • A dataset of the intersection of the two datasets - or rows that exist and are the same in both datasets.
  • A dataset of the left dataset - or rows that only exist in the left input dataset (inputLeftView).
  • A dataset of the right dataset - or rows that only exist in the right input dataset (inputRightView).

Persistence

This stage performs this ‘diffing’ operation in a single pass so if multiple of the output views are going to be used then it is a good idea to set persist = true to reduce the cost of recomputing the difference multiple times.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputLeftView String true Name of first incoming Spark dataset.
inputRightView String true Name of second incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
inputLeftKeys Array[String] false A list of columns to create the join key from the first incoming Spark dataset. If provided, the outputIntersectionView will contain left and right structs. If not provided all columns are included.
inputRightKeys Array[String] false A list of columns to create the join key from the second incoming Spark dataset. If provided, the outputIntersectionView will contain left and right structs. If not provided all columns are included.
outputIntersectionView String false Name of output intersection view.
outputLeftView String false Name of output left view.
outputRightView String false Name of output right view.
persist Boolean false Whether to persist dataset to Spark cache.

Examples

Minimal

{
  "type": "DiffTransform",
  "name": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputRightView": "customer_20180502",
  "outputIntersectionView": "customer_unchanged"
}

Complete

{
  "type": "DiffTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "calculate the difference between the yesterday and today datasets",
  "description": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputLeftKeys": [
    "customerId"
  ],
  "inputRightView": "customer_20180502",
  "inputRightKeys": [
    "customerId"
  ],
  "outputIntersectionView": "customer_unchanged",
  "outputLeftView": "customer_removed",
  "outputRightView": "customer_added",
  "persist": true
}

HTTPTransform

Since: 1.0.9 - Supports Streaming: True

The HTTPTransform stage transforms the incoming dataset by POSTing the value in the incoming dataset with column name value (must be of type string or bytes) and appending the response body from an external API as body.

A good use case of the HTTPTransform stage is to call an external RESTful machine learning model service. To see an example of how to host a simple model as a service see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/flask_serving

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri URI true URI of the HTTP server.
batchSize Integer false The number of records to send in each HTTP request to reduce the cost of HTTP overhead.

Default: 1.
delimiter String false When using a batchSize greater than one this option allows the specification of a delimiter so that the receiving HTTP service can split the request body into records and Arc can split the response body back into records.

Default: \n (newline).
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect HTTP response values (statusCode, reasonPhrase, contentType, responseTime) into a response column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first reponse with a statusCode not in the validStatusCodes array.

Default: failfast.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
id String false A optional unique identifier for this stage.
inputField String false The field to pass to the endpoint. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. Note: all request response codes must be contained in this list for the stage to be successful if failMode is set to failfast.

Examples

Minimal

{
  "type": "HTTPTransform",
  "name": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention"
}

Complete

{
  "type": "HTTPTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "look up customer retention score",
  "description": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention",
  "batchSize": 10,
  "delimiter": ",",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "inputField": "value",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "validStatusCodes": [
    200,
    201
  ],
  "failMode": "failfast"
}

JSONTransform

Since: 1.0.0 - Supports Streaming: True

The JSONTransform stage transforms the incoming dataset to rows of json strings with the column name value. It is intended to be used before stages like HTTPLoad or HTTPTransform to prepare the data for sending externally.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "JSONTransform",
  "name": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json"
}

Complete

{
  "type": "JSONTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "convert customer to json for sending to eternal api",
  "description": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

MetadataFilterTransform

Since: 1.0.9 - Supports Streaming: True

The MetadataFilterTransform stage transforms the incoming dataset by filtering columns using the embedded column metadata.

Underneath Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:

Field Description
name The field name.
type The field type.
metadata The field metadata.

This can be used like:

-- only select columns which are not personally identifiable information
SELECT
    name
FROM metadata
WHERE metadata.pii = false

Will produce an outputView which only contains the columns in inputView where the inputView column metadata contains a key pii which has the value equal to false.

If the sqlParams contains boolean parameter pii_authorized if the job is authorised to use Personally identifiable information or not then it could be used like:

-- only select columns which job is authorised to access based on ${pii_authorized}
SELECT
    name
FROM metadata
WHERE metadata.pii = (
    CASE
        WHEN ${pii_authorized} = true
        THEN metadata.pii   -- this will allow both true and false metadata.pii values if pii_authorized = true
        ELSE false          -- else if pii_authorized = false only allow metadata.pii = false values
    END
)

The inputView and outputView can be set to the same name so that downstream stages have no way of accessing the pre-filtered data accidentially.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.

This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised.
sql String *true A SQL statement to execute. Required if inputURI not provided.

This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised.
inputURI URI true URI of the input file containing the SQL statement.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "MetadataFilterTransform",
  "name": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii.sql",
  "inputView": "customer",
  "outputView": "customer_safe"
}

Complete

{
  "type": "MetadataFilterTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "filter out Personally identifiable information (pii) fields",
  "description": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii_dynamic.sql",
  "inputView": "customer",
  "outputView": "customer_safe",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "pii_authorized": "true"
  }
}

MetadataTransform

Since: 2.4.0 - Supports Streaming: True

The MetadataTransform stage attaches metadata input Dataframe and returns a new DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
schema Array true* An inline Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaURI URI true* URI of the input JSON file containing the Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaView String true* Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame. Only one of schema, schemaURI, schemaView can be provided.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
failMode String false Either permissive or failfast:

permissive will attach metadata to any column of the input DataFrame which has the same name as in the incoming schema.

failfast will fail the Arc job if any of the columns in the input schema are not found in the input DataFrame.

Default: permissive.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "MetadataTransform",
  "name": "set metadata for customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json"
}

Complete

{
  "type": "MetadataTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "set metadata for customer view",
  "description": "set metadata for customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "failMode": "failfast"
  "numPartitions": 1,
  "partitionBy": [
    "type"
  ],
  "persist": false
}

MLTransform

Since: 1.0.0 - Supports Streaming: True

The MLTransform stage transforms the incoming dataset with a pretrained Spark ML (Machine Learning) model. This will append one or more predicted columns to the incoming dataset. The incoming model must be a PipelineModel or CrossValidatorModel produced using Spark’s Scala, Java, PySpark or SparkR API.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input PipelineModel or CrossValidatorModel.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false. MLTransform will also log percentiles of prediction probabilities for classification models if this option is enabled.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "MLTransform",
  "name": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored"
}

Complete

{
  "type": "MLTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "apply machine learning model",
  "description": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

SimilarityJoinTransform

Since: 2.1.0 - Supports Streaming: True

The SimilarityJoinTransform stage uses Approximate String Matching (a.k.a. Fuzzy Matching) to find similar records between two datasets. It is possible to pass the same dataset into both side of the comparison to find duplicates (ie. both leftView and rightView) to find duplicates (in which case the threshold value should be high (close to 1.0) to avoid a potentially very large cross-product resultset).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
leftView String true The view name of the left dataset. This should be the bigger of the two input sets.
rightView String true The view name of the right dataset.
leftFields Array[String] true Columns to include in the similarity join from the left dataset. These are order dependent.
rightFields Array[String] true Columns to include in the similarity join from the right dataset. These are order dependent.
outputView String true Name of outgoing Spark dataset after processing.
caseSensitive Boolean false Whether to use case sensitive comparison.

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numHashTables Integer false The number of hash tables which can be used to trade off execution time vs. false positive rate. Lower values should produce quicker exeuction but higher false positive rate.

Default: 5.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Currently unused.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
shingleLength Integer false The length to split the input fields into. E.g. the string 1 Parliament Drive would be split into [1 P, Pa, Par, arl…] if shingleLength is set to 3. Longer or shorter shingleLength may help provide higher similarity depending on your dataset.

Default: 3.
threshold Double false The similarity threshold for evaluating the records as the same. The default, 0.8, means that 80% of the character sequences must be the same for the records to be considered equal for joining.

Default: 0.8.

Examples

Minimal

{
  "type": "SimilarityJoinTransform",
  "name": "test",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "official_postal_addresses",
  "leftFields": [
    "flat_number",
    "number_first",
    "street_name",
    "street_type",
    "locality_name",
    "postcode",
    "state"
  ],
  "rightView": "crm_addresses",
  "rightFields": [
    "street",
    "state_postcode_suburb"
  ],
  "outputView": "official_address_compare"
}

Complete

{
  "type": "SimilarityJoinTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "test",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "official_postal_addresses",
  "leftFields": [
    "flat_number",
    "number_first",
    "street_name",
    "street_type",
    "locality_name",
    "postcode",
    "state"
  ],
  "rightView": "crm_addresses",
  "rightFields": [
    "street",
    "state_postcode_suburb"
  ],
  "outputView": "official_address_compare",
  "threshold": 0.75,
  "shingleLength": 3,
  "numHashTables": 10,
  "caseSensitive": false,
  "persist": false,
  "partitionBy": [
    "state"
  ],
  "numPartitions": 10
}

SQLTransform

Since: 1.0.0 - Supports Streaming: True

The SQLTransform stage transforms the incoming dataset with a Spark SQL statement. This stage relies on previous stages to load and register the dataset views (outputView) and will execute arbitrary SQL statements against those datasets.

All the inbuilt Spark SQL functions are available and have been extended with some additional functions.

Please be aware that in streaming mode not all join operations are available. See: Support matrix for joins in streaming queries.

CAST vs TypingTransform

It is strongly recommended to use the TypingTransform for reproducible, repeatable results.

Whilst SQL is capable of converting data types using the CAST function (e.g. CAST(dateColumn AS DATE)) be very careful. ANSI SQL specifies that any failure to convert then an exception condition is raised: data exception-invalid character value for cast whereas Spark SQL will return a null value and suppress any exceptions: try s.toString.toInt catch { case _: NumberFormatException => null }. If you used a CAST in a financial scenario, for example bill calculation, the silent NULLing of values could result in errors being suppressed and bills incorrectly calculated.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

For example if the sqlParams contains parameter current_timestamp of value 2018-11-24 14:48:56 then this statement would execute in a deterministic way: SELECT * FROM customer WHERE expiry > FROM_UNIXTIME(UNIX_TIMESTAMP('${current_timestamp}', 'uuuu-MM-dd HH:mm:ss')) (so would be testable).

The SQL statement is a plain Spark SQL statement, for example:

SELECT
    customer.customer_id
    ,customer.first_name
    ,customer.last_name
    ,account.account_id
    ,account.account_name
FROM customer
LEFT JOIN account ON account.customer_id = customer.customer_id

Magic

The %sql magic is available via arc-jupyter with these available parameters:

%sql name="sqltransform" description="description" environments=production,test outputView=example persist=true sqlParams=inputView=customer,inputField=id
SELECT
    ${inputField}
FROM ${inputView}

Examples

Minimal

{
  "type": "SQLTransform",
  "name": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer.sql",
  "outputView": "customer"
}

Complete

{
  "type": "SQLTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "standardise customer fields",
  "description": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_dynamic.sql",
  "sql": "SELECT id, name FROM customer_raw",
  "outputView": "customer",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "current_date": "2018-11-24",
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

The current_date and current_timestamp can easily be passed in as environment variables using $(date "+%Y-%m-%d") and $(date "+%Y-%m-%d %H:%M:%S") respectively.

TensorFlowServingTransform

Since: 1.0.0 - Supports Streaming: True

The TensorFlowServingTransform stage transforms the incoming dataset by calling a TensorFlow Serving service. Because each call is atomic the TensorFlow Serving instances could be behind a load balancer to increase throughput.

To see how to host a simple model in TensorFlow Serving see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/tensorflow_serving

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri String true The URI of the TensorFlow Serving REST end point.
batchSize Integer false The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient.

Default: 100.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
inputField String false The field to pass to the model. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Currently unused.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
responseType String false The type returned by the TensorFlow Serving API. Expected to be integer, double or object (which may present as a string depending on how the model has been built).

Default: object.
signatureName String false The name of the TensorFlow Serving signature.

Examples

Minimal

{
  "type": "TensorFlowServingTransform",
  "name": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict"
}

Complete

{
  "type": "TensorFlowServingTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "call the customer segmentation model",
  "description": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict",
  "signatureName": "serving_default",
  "batchSize": 100,
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": true,
  "responseType": "integer"
}

TypingTransform

Since: 1.0.0 - Supports Streaming: True

The TypingTransform stage transforms the incoming dataset with based on a schema defined in the schema format.

The logical process that is applied to perform the typing on a field-by-field basis is shown below.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
schema Array true* An inline Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaURI URI true* URI of the input JSON file containing the Arc schema. Only one of schema, schemaURI, schemaView can be provided.
schemaView String true* Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame. Only one of schema, schemaURI, schemaView can be provided.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect any errors for each row in the _errors column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first row containing at least one error.

Default: permissive.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "TypingTransform",
  "name": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "inputView": "customer_untyped",
  "outputView": "customer"
}

Complete

{
  "type": "TypingTransform",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "apply data types to customer records",
  "description": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "inputView": "customer_untyped",
  "outputView": "customer",
  "authentication": {},
  "failMode": "failfast",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

A demonstration of how the TypingTransform behaves. Assuming you have read an input like a DelimitedExtract which will read a dataset where all the columns are read as strings:

+-------------------------+---------------------+
|startTime                |endTime              |
+-------------------------+---------------------+
|2018-09-26 07:17:43      |2018-09-27 07:17:43  |
|2018-09-25 08:25:51      |2018-09-26 08:25:51  |
|2018-02-30 01:16:40      |2018-03-01 01:16:40  |
|30 February 2018 01:16:40|2018-03-2018 01:16:40|
+-------------------------+---------------------+

In this case the goal is to safely convert the values from strings like "2018-09-26 07:17:43" to a proper timestamp object so that we can ensure the timestamp is valid (e.g. not on a date that does not exist e.g. the 30 day of February) and can easily perform date operations such as subtracting 1 week. To do so a schema file could be constructed to look like:

[
  {
    "id": "8e42c8f0-22a8-40db-9798-6dd533c1de36",
    "name": "startTime",
    "description": "The startTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  },
  {
    "id": "2e7553cf-2748-49cd-a291-8918823e706a",
    "name": "endTime",
    "description": "The endTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  }
]

Here is the output of the TypingTransformation when applied to the input dataset.

+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|startTime          |endTime            |_errors                                                                                                                                                                                                                                                             |
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-26 17:17:43|2018-09-27 17:17:43|[]                                                                                                                                                                                                                                                                  |
|2018-09-25 18:25:51|2018-09-26 18:25:51|[]                                                                                                                                                                                                                                                                  |
|null               |2018-03-01 12:16:40|[[startTime, Unable to convert '2018-02-30 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]                                                                                                                                     |
|null               |null               |[[startTime, Unable to convert '28 February 2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC'], [endTime, Unable to convert '2018-03-2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]|
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  • Because the conversion happened successfully for both values on the first two rows there are no errors for those rows.
  • On the third row the value '2018-02-30 01:16:40' cannot be converted as the 30th day of February is not a valid date and the value is set to null. If the nullable in the schema for field startTime was set to false the job would fail as it would be unable to continue.
  • On the forth row both rows are invalid as the formatter and date values are both wrong.

The SQLValidate stage is a good way to use this data to enforce data quality constraints.

Logical Flow

The sequence that these fields are converted from string fields to typed fields is per this flow chart. Each value and its typing schema is passed into this logical process. For each row the values are returned as standard table columns and the returned error values are groupd into a field called _errors on a row-by-row basis. Patterns for consuming the _errors array is are demonstrated in the SQLValidate stage.

Logical Flow for Data Typing