Transform

*Transform stages apply a single transformation to one or more incoming datasets.

Transformers should meet this criteria:

CypherTransform

Since: 2.0.0 - Supports Streaming: True

Plugin

The CypherTransform is provided by the https://github.com/tripl-ai/arc-graph-pipeline-plugin package.

The CypherTransform executes an Cypher graph query against a graph already created by a GraphTransform stage.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input Cypher query.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
cypherParams Map[String, String] false Parameters to inject into the Cypher statement before executing. The parameters use the ${} format.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist graph to Spark cache.

Examples

Minimal

{
  "type": "CypherTransform",
  "name": "create recommendations",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/cypher/recommendations.cypher",
  "outputView": "recommendations"
} 

Complete

{
  "type": "CypherTransform",
  "name": "create recommendations",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/cypher/recommendations.cypher",
  "outputView": "recommendations"
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "cypherTransform": {
    "current_date": "2018-11-24",
    "current_timestamp": "2018-11-24 14:48:56"
  }
} 

DiffTransform

Since: 1.0.8 - Supports Streaming: False

The DiffTransform stage calculates the difference between two input datasets and produces three datasets:

  • A dataset of the intersection of the two datasets - or rows that exist and are the same in both datasets.
  • A dataset of the left dataset - or rows that only exist in the left input dataset (inputLeftView).
  • A dataset of the right dataset - or rows that only exist in the right input dataset (inputRightView).

Persistence

This stage performs this ‘diffing’ operation in a single pass so if multiple of the output views are going to be used then it is a good idea to set persist = true to reduce the cost of recomputing the difference multiple times.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputLeftView String true Name of first incoming Spark dataset.
inputRightView String true Name of second incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
outputIntersectionView String false Name of output intersection view.
outputLeftView String false Name of output left view.
outputRightView String false Name of output right view.
persist Boolean false Whether to persist dataset to Spark cache.

Examples

Minimal

{
  "type": "DiffTransform",
  "name": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputRightView": "customer_20180502",
  "outputIntersectionView": "customer_unchanged"
}

Complete

{
  "type": "DiffTransform",
  "name": "calculate the difference between the yesterday and today datasets",
  "description": "calculate the difference between the yesterday and today datasets",
  "environments": [
    "production",
    "test"
  ],
  "inputLeftView": "customer_20180501",
  "inputRightView": "customer_20180502",
  "outputIntersectionView": "customer_unchanged",
  "outputLeftView": "customer_removed",
  "outputRightView": "customer_added",
  "persist": true
}

GraphTransform

Since: 2.0.0 - Supports Streaming: True

Plugin

The GraphTransform is provided by the https://github.com/tripl-ai/arc-graph-pipeline-plugin package.

The GraphTransform stage takes either a list of views of graph nodes and views of graph relationships to produce a new https://github.com/opencypher/morpheus graph or uses a cypher query to create a new https://github.com/opencypher/morpheus graph.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true* URI/Glob of the input Cypher query.
nodes Array[Object] true* List of node labels and node views to construct the graph from. See example below.
relationships Array[Object] true* List of relationship types and relationship views to construct the graph from. See example below.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
cypherParams Map[String, String] false Parameters to inject into the Cypher statement before executing. The parameters use the ${} format.
outputGraph String true Name of the constructed graph in the catalog. Graph will be accessible via ‘session.[outputGraph]‘.
persist Boolean false Whether to persist graph to Spark cache.

Examples

Minimal

{
  "type": "GraphTransform",
  "name": "create friends graph",
  "environments": [
    "production",
    "test"
  ],
  "nodes": [
    {"label": "Person", "view": "persons"}
  ],
  "relationships": [
    {"type": "FRIEND", "view": "friends"}
  ],
  "outputGraph": "friends"
} 

Complete

{
  "type": "GraphTransform",
  "name": "create customer graph",
  "environments": [
    "production",
    "test"
  ],
  "nodes": [
    {"label": "Person", "view": "persons"},
    {"label": "Product", "view": "products"}
  ],
  "relationships": [
    {"type": "FRIEND", "view": "friends"},
    {"type": "BOUGHT", "view": "bought"}
  ],
  "outputGraph": "customer",
  "persist": true
} 

HTTPTransform

Since: 1.0.9 - Supports Streaming: True

The HTTPTransform stage transforms the incoming dataset by POSTing the value in the incoming dataset with column name value (must be of type string or bytes) and appending the response body from an external API as body.

A good use case of the HTTPTransform stage is to call an external RESTful machine learning model service. To see an example of how to host a simple model as a service see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/flask_serving

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri URI true URI of the HTTP server.
batchSize Integer false The number of records to send in each HTTP request to reduce the cost of HTTP overhead.

Default: 1.
delimiter String false When using a batchSize greater than one this option allows the specification of a delimiter so that the receiving HTTP service can split the request body into records and Arc can split the response body back into records.

Default: \n (newline).
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect HTTP response values (statusCode, reasonPhrase, contentType, responseTime) into a response column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first reponse with a statusCode not in the validStatusCodes array.

Default: failfast.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
inputField String false The field to pass to the endpoint. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. Note: all request response codes must be contained in this list for the stage to be successful if failMode is set to failfast.

Examples

Minimal

{
  "type": "HTTPTransform",
  "name": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention"
}

Complete

{
  "type": "HTTPTransform",
  "name": "look up customer retention score",
  "description": "look up customer retention score",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_enriched",
  "uri": "http://internalserver/api/customer_retention",
  "batchSize": 10,
  "delimiter": ",",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "inputField": "value",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "validStatusCodes": [
    200,
    201
  ],
  "failMode": "failfast"
}

JSONTransform

Since: 1.0.0 - Supports Streaming: True

The JSONTransform stage transforms the incoming dataset to rows of json strings with the column name value. It is intended to be used before stages like HTTPLoad or HTTPTransform to prepare the data for sending externally.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "JSONTransform",
  "name": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json"
}

Complete

{
  "type": "JSONTransform",
  "name": "convert customer to json for sending to eternal api",
  "description": "convert customer to json for sending to eternal api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "cutomer",
  "outputView": "customer_json",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

MetadataFilterTransform

Since: 1.0.9 - Supports Streaming: True

Experimental

The MetadataFilterTransform is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The MetadataFilterTransform stage transforms the incoming dataset by filtering columns using the embedded column metadata.

Underneath Arc will register a table called metadata which contains the metadata of the inputView. This allows complex SQL statements to be executed which returns which columns to retain from the inputView in the outputView. The available columns in the metadata table are:

Field Description
name The field name.
type The field type.
metadata The field metadata.

This can be used like:

-- only select columns which are not personally identifiable information
SELECT
    name
FROM metadata
WHERE metadata.pii = false

Will produce an outputView which only contains the columns in inputView where the inputView column metadata contains a key pii which has the value equal to false.

If the sqlParams contains boolean parameter pii_authorized if the job is authorised to use Personally identifiable information or not then it could be used like:

-- only select columns which job is authorised to access based on ${pii_authorized}
SELECT
    name
FROM metadata
WHERE metadata.pii = (
    CASE
        WHEN ${pii_authorized} = true
        THEN metadata.pii   -- this will allow both true and false metadata.pii values if pii_authorized = true
        ELSE false          -- else if pii_authorized = false only allow metadata.pii = false values
    END
)

The inputView and outputView can be set to the same name so that downstream stages have no way of accessing the pre-filtered data accidentially.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.

This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "MetadataFilterTransform",
  "name": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii.sql",
  "inputView": "customer",
  "outputView": "customer_safe"
}

Complete

{
  "type": "MetadataFilterTransform",
  "name": "filter out Personally identifiable information (pii) fields",
  "description": "filter out Personally identifiable information (pii) fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/filter_pii_dynamic.sql",
  "inputView": "customer",
  "outputView": "customer_safe",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "pii_authorized": "true"
  }
}

MetadataTransform

Since: 2.4.0 - Supports Streaming: True

The MetadataTransform stage attaches metadata input Dataframe and returns a new DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
schemaURI URI true* URI of the input JSON file containing the schema. Required if schemaView not provided.
schemaView String true* Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.
failMode String false Either permissive or failfast:

permissive will attach metadata to any column of the input DataFrame which has the same name as in the incoming schema.

failfast will fail the Arc job if any of the columns in the input schema are not found in the input DataFrame.

Default: permissive.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "MetadataTransform",
  "name": "set metadata for customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json"
}

Complete

{
  "type": "MetadataTransform",
  "name": "set metadata for customer view",
  "description": "set metadata for customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer",
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "failMode": "failfast"
  "numPartitions": 1,
  "partitionBy": [
    "type"
  ],
  "persist": false 
}

MLTransform

Since: 1.0.0 - Supports Streaming: True

The MLTransform stage transforms the incoming dataset with a pretrained Spark ML (Machine Learning) model. This will append one or more predicted columns to the incoming dataset. The incoming model must be a PipelineModel or CrossValidatorModel produced using Spark’s Scala, Java, PySpark or SparkR API.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input PipelineModel or CrossValidatorModel.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean true Whether to persist dataset to Spark cache. Will also log row count.

Default: false. MLTransform will also log percentiles of prediction probabilities for classification models if this option is enabled.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "MLTransform",
  "name": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored"
}

Complete

{
  "type": "MLTransform",
  "name": "apply machine learning model",
  "description": "apply machine learning model",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
  "inputView": "customer",
  "outputView": "customer_scored",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

SimilarityJoinTransform

Since: 2.1.0 - Supports Streaming: True

The SimilarityJoinTransform stage uses Approximate String Matching (a.k.a. Fuzzy Matching) to find similar records between two datasets. It is possible to pass the same dataset into both side of the comparison to find duplicates (ie. both leftView and rightView) to find duplicates (in which case the threshold value should be high (close to 1.0) to avoid a potentially very large cross-product resultset).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
leftView String true The view name of the left dataset. This should be the bigger of the two input sets.
rightView String true The view name of the right dataset.
leftFields Array[String] true Columns to include in the similarity join from the left dataset. These are order dependent.
rightFields Array[String] true Columns to include in the similarity join from the right dataset. These are order dependent.
outputView String true Name of outgoing Spark dataset after processing.
caseSensitive Boolean false Whether to use case sensitive comparison.

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numHashTables Integer false The number of hash tables which can be used to trade off execution time vs. false positive rate. Lower values should produce quicker exeuction but higher false positive rate.

Default: 5.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Currently unused.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
shingleLength Integer false The length to split the input fields into. E.g. the string 1 Parliament Drive would be split into [1 P, Pa, Par, arl…] if shingleLength is set to 3. Longer or shorter shingleLength may help provide higher similarity depending on your dataset.

Default: 3.
threshold Double false The similarity threshold for evaluating the records as the same. The default, 0.8, means that 80% of the character sequences must be the same for the records to be considered equal for joining.

Default: 0.8.

Examples

Minimal

{
  "type": "SimilarityJoinTransform",
  "name": "test",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "official_postal_addresses",
  "leftFields": [
    "flat_number",
    "number_first",
    "street_name",
    "street_type",
    "locality_name",
    "postcode",
    "state"
  ],
  "rightView": "crm_addresses",
  "rightFields": [
    "street",
    "state_postcode_suburb"
  ],
  "outputView": "official_address_compare"
}

Complete

{
  "type": "SimilarityJoinTransform",
  "name": "test",
  "environments": [
    "production",
    "test"
  ],
  "leftView": "official_postal_addresses",
  "leftFields": [
    "flat_number",
    "number_first",
    "street_name",
    "street_type",
    "locality_name",
    "postcode",
    "state"
  ],
  "rightView": "crm_addresses",
  "rightFields": [
    "street",
    "state_postcode_suburb"
  ],
  "outputView": "official_address_compare",
  "threshold": 0.75,
  "shingleLength": 3,
  "numHashTables": 10,
  "caseSensitive": false,
  "persist": false,
  "partitionBy": [
    "state"
  ],
  "numPartitions": 10
}

SQLTransform

Since: 1.0.0 - Supports Streaming: True

The SQLTransform stage transforms the incoming dataset with a Spark SQL statement. This stage relies on previous stages to load and register the dataset views (outputView) and will execute arbitrary SQL statements against those datasets.

All the inbuilt Spark SQL functions are available and have been extended with some additional functions.

Please be aware that in streaming mode not all join operations are available. See: Support matrix for joins in streaming queries.

CAST vs TypingTransform

It is strongly recommended to use the TypingTransform for reproducible, repeatable results.

Whilst SQL is capable of converting data types using the CAST function (e.g. CAST(dateColumn AS DATE)) be very careful. ANSI SQL specifies that any failure to convert then an exception condition is raised: data exception-invalid character value for cast whereas Spark SQL will return a null value and suppress any exceptions: try s.toString.toInt catch { case _: NumberFormatException => null }. If you used a CAST in a financial scenario, for example bill calculation, the silent NULLing of values could result in errors being suppressed and bills incorrectly calculated.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

For example if the sqlParams contains parameter current_timestamp of value 2018-11-24 14:48:56 then this statement would execute in a deterministic way: SELECT * FROM customer WHERE expiry > FROM_UNIXTIME(UNIX_TIMESTAMP('${current_timestamp}', 'uuuu-MM-dd HH:mm:ss')) (so would be testable).

The SQL statement is a plain Spark SQL statement, for example:

SELECT
    customer.customer_id
    ,customer.first_name
    ,customer.last_name
    ,account.account_id
    ,account.account_name
FROM customer
LEFT JOIN account ON account.customer_id = customer.customer_id

Magic

The %sql magic is available via arc-jupyter with these available parameters:

%sql name="sqltransform" description="description" environments=production,test outputView=example persist=true sqlParams=inputView=customer,inputField=id
SELECT
    ${inputField}
FROM ${inputView}

Examples

Minimal

{
  "type": "SQLTransform",
  "name": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer.sql",
  "outputView": "customer"
}

Complete

{
  "type": "SQLTransform",
  "name": "standardise customer fields",
  "description": "standardise customer fields",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_dynamic.sql",
  "sql": "SELECT id, name FROM customer_raw",
  "outputView": "customer",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false,
  "sqlParams": {
    "current_date": "2018-11-24",
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

The current_date and current_timestamp can easily be passed in as environment variables using $(date "+%Y-%m-%d") and $(date "+%Y-%m-%d %H:%M:%S") respectively.

TensorFlowServingTransform

Since: 1.0.0 - Supports Streaming: True

Experimental

The TensorFlowServingTransform is currently in experimental state whilst the requirements become clearer.

This means this API is likely to change.

The TensorFlowServingTransform stage transforms the incoming dataset by calling a TensorFlow Serving service. Because each call is atomic the TensorFlow Serving instances could be behind a load balancer to increase throughput.

To see how to host a simple model in TensorFlow Serving see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/tensorflow_serving

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
uri String true The URI of the TensorFlow Serving REST end point.
batchSize Integer false The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
inputField String false The field to pass to the model. JSON encoding can be used to pass multiple values (tuples).

Default: value.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Currently unused.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
responseType String false The type returned by the TensorFlow Serving API. Expected to be integer, double or object (which may present as a string depending on how the model has been built).

Default: object.
signatureName String false The name of the TensorFlow Serving signature.

Examples

Minimal

{
  "type": "TensorFlowServingTransform",
  "name": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict"
}

Complete

{
  "type": "TensorFlowServingTransform",
  "name": "call the customer segmentation model",
  "description": "call the customer segmentation model",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_segmented",
  "uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict",
  "signatureName": "serving_default",
  "batchSize": 100,
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": true,
  "responseType": "integer"
}

TypingTransform

Since: 1.0.0 - Supports Streaming: True

The TypingTransform stage transforms the incoming dataset with based on a schema defined in the schema format.

The logical process that is applied to perform the typing on a field-by-field basis is shown below.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
schemaURI URI true* URI of the input JSON file containing the schema. Required if schemaView not provided.
schemaView String true* Similar to schemaURI but allows the schema to be passed in as another DataFrame. It takes precedence over schemaURI if provided.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will process all rows in the dataset and collect any errors for each row in the _errors column. Rules can then be applied in a SQLValidate stage if required.

failfast will fail the Arc job on the first row containing at least one error.

Default: permissive.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "TypingTransform",
  "name": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "inputView": "customer_untyped",
  "outputView": "customer"
}

Complete

{
  "type": "TypingTransform",
  "name": "apply data types to customer records",
  "description": "apply data types to customer records",
  "environments": [
    "production",
    "test"
  ],
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "inputView": "customer_untyped",
  "outputView": "customer",
  "authentication": {},
  "failMode": "failfast",
  "numPartitions": 10,
  "partitionBy": [
    "customerId"
  ],
  "persist": false
}

A demonstration of how the TypingTransform behaves. Assuming you have read an input like a DelimitedExtract which will read a dataset where all the columns are read as strings:

+-------------------------+---------------------+
|startTime                |endTime              |
+-------------------------+---------------------+
|2018-09-26 07:17:43      |2018-09-27 07:17:43  |
|2018-09-25 08:25:51      |2018-09-26 08:25:51  |
|2018-02-30 01:16:40      |2018-03-01 01:16:40  |
|30 February 2018 01:16:40|2018-03-2018 01:16:40|
+-------------------------+---------------------+

In this case the goal is to safely convert the values from strings like "2018-09-26 07:17:43" to a proper timestamp object so that we can ensure the timestamp is valid (e.g. not on a date that does not exist e.g. the 30 day of February) and can easily perform date operations such as subtracting 1 week. To do so a schema file could be constructed to look like:

[
  {
    "id": "8e42c8f0-22a8-40db-9798-6dd533c1de36",
    "name": "startTime",
    "description": "The startTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  },
  {
    "id": "2e7553cf-2748-49cd-a291-8918823e706a",
    "name": "endTime",
    "description": "The endTime field.",
    "type": "timestamp",
    "trim": true,
    "nullable": true,
    "nullableValues": [
        "",
        "null"
    ],
    "formatters": [
        "uuuu-MM-dd HH:mm:ss"
    ],
    "timezoneId": "UTC"
  }
]

Here is the output of the TypingTransformation when applied to the input dataset.

+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|startTime          |endTime            |_errors                                                                                                                                                                                                                                                             |
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-26 17:17:43|2018-09-27 17:17:43|[]                                                                                                                                                                                                                                                                  |
|2018-09-25 18:25:51|2018-09-26 18:25:51|[]                                                                                                                                                                                                                                                                  |
|null               |2018-03-01 12:16:40|[[startTime, Unable to convert '2018-02-30 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]                                                                                                                                     |
|null               |null               |[[startTime, Unable to convert '28 February 2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC'], [endTime, Unable to convert '2018-03-2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]|
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  • Because the conversion happened successfully for both values on the first two rows there are no errors for those rows.
  • On the third row the value '2018-02-30 01:16:40' cannot be converted as the 30th day of February is not a valid date and the value is set to null. If the nullable in the schema for field startTime was set to false the job would fail as it would be unable to continue.
  • On the forth row both rows are invalid as the formatter and date values are both wrong.

The SQLValidate stage is a good way to use this data to enforce data quality constraints.

Logical Flow

The sequence that these fields are converted from string fields to typed fields is per this flow chart. Each value and its typing schema is passed into this logical process. For each row the values are returned as standard table columns and the returned error values are groupd into a field called _errors on a row-by-row basis. Patterns for consuming the _errors array is are demonstrated in the SQLValidate stage.

Logical Flow for Data Typing