Transform
*Transform
stages apply a single transformation to one or more incoming datasets.
Transformers should meet this criteria:
- Be logically pure.
- Perform only a single function.
- Utilise Spark internal functionality where possible.
DebeziumTransform
Since: 3.6.0 - Supports Streaming: True
Plugin
The DebeziumTransform
is provided by the https://github.com/tripl-ai/arc-debezium-pipeline-plugin package.
Experimental
The DebeziumTransform
is currently in experimental state whilst the requirements become clearer.
This means this API is likely to change and feedback is valued.
The DebeziumTransform
stage decodes Debezium change-data-capture JSON
formatted messages for MySQL
, PostgreSQL
and MongoDB
databases and creates a DataFrame which represents an eventually consistent view of a source dataset at a point in time. It supports Complete
and Append
modes Structured Streaming modes.
It is intended to be used after a KafkaExtract stage.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
schema | Array | true* | An inline Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaURI | URI | true* | URI of the input JSON file containing the Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaView | String | true* | Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame . Only one of schema , schemaURI , schemaView can be provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
initialStateKey | String | false | Used to set the key to group the initialStateView to merge with the Debezium events. |
initialStateView | String | false | Used to inject a previous state into the connector the Debezium events are applied against. Requires initialStateKey to be set. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
strict | Boolean | false | If strict mode is enabled then every change per key is applied in strict sequence and will fail if a prior state is not what is expected. When strict mode is disabled then DebeziumTransform will employ a last-writer wins conflict resolution strategy which requires strict ordering from the source but will be quicker.Not all sources support non-strict mode d ue to how they record change events such as MongoDB .Default: true . |
Examples
Minimal
{
"type": "DebeziumTransform",
"name": "DebeziumTransform",
"environments": [
"production",
"test"
],
"inputView": "customer_change_data_capture",
"outputView": "customer",
"schemaURI": "hdfs://datalake/schema/customer.json"
}
Complete
{
"type": "DebeziumTransform",
"name": "DebeziumTransform",
"environments": [
"production",
"test"
],
"inputView": "customer_change_data_capture",
"outputView": "customer",
"schemaURI": "hdfs://datalake/schema/customer.json",
"strict": true,
"initialStateView": "previous_customer",
"initialStateKey": "customer_id",
"persist": true,
"numPartitions": 10,
"partitionBy": [
"customer_segment"
]
}
DiffTransform
Since: 1.0.8 - Supports Streaming: False
The DiffTransform
stage calculates the difference between two input datasets and produces three datasets:
- A dataset of the
intersection
of the two datasets - or rows that exist and are the same in both datasets. - A dataset of the
left
dataset - or rows that only exist in the left input dataset (inputLeftView
). - A dataset of the
right
dataset - or rows that only exist in the right input dataset (inputRightView
).
Persistence
This stage performs this ‘diffing’ operation in a single pass so if multiple of the output views are going to be used then it is a good idea to set persist = true
to reduce the cost of recomputing the difference multiple times.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputLeftView | String | true | Name of first incoming Spark dataset. |
inputRightView | String | true | Name of second incoming Spark dataset. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
inputLeftKeys | Array[String] | false | A list of columns to create the join key from the first incoming Spark dataset. If provided, the outputIntersectionView will contain left and right structs. If not provided all columns are included. |
inputRightKeys | Array[String] | false | A list of columns to create the join key from the second incoming Spark dataset. If provided, the outputIntersectionView will contain left and right structs. If not provided all columns are included. |
outputIntersectionView | String | false | Name of output intersection view. |
outputLeftView | String | false | Name of output left view. |
outputRightView | String | false | Name of output right view. |
persist | Boolean | false | Whether to persist dataset to Spark cache. |
Examples
Minimal
{
"type": "DiffTransform",
"name": "calculate the difference between the yesterday and today datasets",
"environments": [
"production",
"test"
],
"inputLeftView": "customer_20180501",
"inputRightView": "customer_20180502",
"outputIntersectionView": "customer_unchanged"
}
Complete
{
"type": "DiffTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "calculate the difference between the yesterday and today datasets",
"description": "calculate the difference between the yesterday and today datasets",
"environments": [
"production",
"test"
],
"inputLeftView": "customer_20180501",
"inputLeftKeys": [
"customerId"
],
"inputRightView": "customer_20180502",
"inputRightKeys": [
"customerId"
],
"outputIntersectionView": "customer_unchanged",
"outputLeftView": "customer_removed",
"outputRightView": "customer_added",
"persist": true
}
HTTPTransform
Since: 1.0.9 - Supports Streaming: True
The HTTPTransform
stage transforms the incoming dataset by POST
ing the value in the incoming dataset with column name value
(must be of type string
or bytes
) and appending the response body from an external API as body
.
A good use case of the HTTPTransform
stage is to call an external RESTful machine learning model service. To see an example of how to host a simple model as a service see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/flask_serving
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
uri | URI | true | URI of the HTTP server. |
batchSize | Integer | false | The number of records to send in each HTTP request to reduce the cost of HTTP overhead. Default: 1 . |
delimiter | String | false | When using a batchSize greater than one this option allows the specification of a delimiter so that the receiving HTTP service can split the request body into records and Arc can split the response body back into records.Default: \n (newline). |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
failMode | String | false | Either permissive or failfast :permissive will process all rows in the dataset and collect HTTP response values (statusCode , reasonPhrase , contentType , responseTime ) into a response column. Rules can then be applied in a SQLValidate stage if required.failfast will fail the Arc job on the first reponse with a statusCode not in the validStatusCodes array.Default: failfast . |
headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
id | String | false | A optional unique identifier for this stage. |
inputField | String | false | The field to pass to the endpoint. JSON encoding can be used to pass multiple values (tuples). Default: value . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202] . Note: all request response codes must be contained in this list for the stage to be successful if failMode is set to failfast . |
Examples
Minimal
{
"type": "HTTPTransform",
"name": "look up customer retention score",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_enriched",
"uri": "http://internalserver/api/customer_retention"
}
Complete
{
"type": "HTTPTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "look up customer retention score",
"description": "look up customer retention score",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_enriched",
"uri": "http://internalserver/api/customer_retention",
"batchSize": 10,
"delimiter": ",",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"inputField": "value",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"validStatusCodes": [
200,
201
],
"failMode": "failfast"
}
JSONTransform
Since: 1.0.0 - Supports Streaming: True
The JSONTransform
stage transforms the incoming dataset to rows of json
strings with the column name value
. It is intended to be used before stages like HTTPLoad or HTTPTransform to prepare the data for sending externally.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "JSONTransform",
"name": "convert customer to json for sending to eternal api",
"environments": [
"production",
"test"
],
"inputView": "cutomer",
"outputView": "customer_json"
}
Complete
{
"type": "JSONTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "convert customer to json for sending to eternal api",
"description": "convert customer to json for sending to eternal api",
"environments": [
"production",
"test"
],
"inputView": "cutomer",
"outputView": "customer_json",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}
MetadataFilterTransform
Since: 1.0.9 - Supports Streaming: True
The MetadataFilterTransform
stage transforms the incoming dataset by filtering columns using the embedded column metadata.
Underneath Arc will register a table called metadata
which contains the metadata of the inputView
. This allows complex SQL statements to be executed which returns which columns to retain from the inputView
in the outputView
. The available columns in the metadata
table are:
Field | Description |
---|---|
name | The field name. |
type | The field type. |
metadata | The field metadata. |
This can be used like:
-- only select columns which are not personally identifiable information
SELECT
name
FROM metadata
WHERE metadata.pii = false
Will produce an outputView
which only contains the columns in inputView
where the inputView
column metadata contains a key pii
which has the value equal to false
.
If the sqlParams
contains boolean parameter pii_authorized
if the job is authorised to use Personally identifiable information or not then it could be used like:
-- only select columns which job is authorised to access based on ${pii_authorized}
SELECT
name
FROM metadata
WHERE metadata.pii = (
CASE
WHEN ${pii_authorized} = true
THEN metadata.pii -- this will allow both true and false metadata.pii values if pii_authorized = true
ELSE false -- else if pii_authorized = false only allow metadata.pii = false values
END
)
The inputView
and outputView
can be set to the same name so that downstream stages have no way of accessing the pre-filtered data accidentially.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided.This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided.This statement must be written to query against a table called metadata and must return at least the name column or an error will be raised. |
inputURI | URI | true | URI of the input file containing the SQL statement. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
persist | Boolean | true | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "MetadataFilterTransform",
"name": "filter out Personally identifiable information (pii) fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/filter_pii.sql",
"inputView": "customer",
"outputView": "customer_safe"
}
Complete
{
"type": "MetadataFilterTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "filter out Personally identifiable information (pii) fields",
"description": "filter out Personally identifiable information (pii) fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/filter_pii_dynamic.sql",
"inputView": "customer",
"outputView": "customer_safe",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"sqlParams": {
"pii_authorized": "true"
}
}
MetadataTransform
Since: 2.4.0 - Supports Streaming: True
The MetadataTransform
stage attaches metadata input Dataframe
and returns a new DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
schema | Array | true* | An inline Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaURI | URI | true* | URI of the input JSON file containing the Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaView | String | true* | Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame . Only one of schema , schemaURI , schemaView can be provided. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
failMode | String | false | Either permissive or failfast :permissive will attach metadata to any column of the input DataFrame which has the same name as in the incoming schema.failfast will fail the Arc job if any of the columns in the input schema are not found in the input DataFrame .Default: permissive . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "MetadataTransform",
"name": "set metadata for customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer",
"schemaURI": "hdfs://datalake/schema/customer.json"
}
Complete
{
"type": "MetadataTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "set metadata for customer view",
"description": "set metadata for customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer",
"schemaURI": "hdfs://datalake/schema/customer.json",
"failMode": "failfast"
"numPartitions": 1,
"partitionBy": [
"type"
],
"persist": false
}
MLTransform
Since: 1.0.0 - Supports Streaming: True
The MLTransform
stage transforms the incoming dataset with a pretrained Spark ML (Machine Learning) model. This will append one or more predicted columns to the incoming dataset. The incoming model must be a PipelineModel
or CrossValidatorModel
produced using Spark’s Scala, Java, PySpark or SparkR API.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI of the input PipelineModel or CrossValidatorModel . |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | true | Whether to persist dataset to Spark cache. Will also log row count. Default: false . MLTransform will also log percentiles of prediction probabilities for classification models if this option is enabled. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
Examples
Minimal
{
"type": "MLTransform",
"name": "apply machine learning model",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
"inputView": "customer",
"outputView": "customer_scored"
}
Complete
{
"type": "MLTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "apply machine learning model",
"description": "apply machine learning model",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/ml/machineLearningPipelineModel",
"inputView": "customer",
"outputView": "customer_scored",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}
SimilarityJoinTransform
Since: 2.1.0 - Supports Streaming: True
The SimilarityJoinTransform
stage uses Approximate String Matching (a.k.a. Fuzzy Matching) to find similar records between two datasets. It is possible to pass the same dataset into both side of the comparison to find duplicates (ie. both leftView
and rightView
) to find duplicates (in which case the threshold
value should be high (close to 1.0
) to avoid a potentially very large cross-product resultset).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
leftView | String | true | The view name of the left dataset. This should be the bigger of the two input sets. |
rightView | String | true | The view name of the right dataset. |
leftFields | Array[String] | true | Columns to include in the similarity join from the left dataset. These are order dependent. |
rightFields | Array[String] | true | Columns to include in the similarity join from the right dataset. These are order dependent. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
caseSensitive | Boolean | false | Whether to use case sensitive comparison. Default: false . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numHashTables | Integer | false | The number of hash tables which can be used to trade off execution time vs. false positive rate. Lower values should produce quicker exeuction but higher false positive rate. Default: 5 . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
params | Map[String, String] | false | Map of configuration parameters. Currently unused. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
shingleLength | Integer | false | The length to split the input fields into. E.g. the string 1 Parliament Drive would be split into [1 P , Pa , Par , arl …] if shingleLength is set to 3 . Longer or shorter shingleLength may help provide higher similarity depending on your dataset.Default: 3 . |
threshold | Double | false | The similarity threshold for evaluating the records as the same. The default, 0.8 , means that 80% of the character sequences must be the same for the records to be considered equal for joining.Default: 0.8 . |
Examples
Minimal
{
"type": "SimilarityJoinTransform",
"name": "test",
"environments": [
"production",
"test"
],
"leftView": "official_postal_addresses",
"leftFields": [
"flat_number",
"number_first",
"street_name",
"street_type",
"locality_name",
"postcode",
"state"
],
"rightView": "crm_addresses",
"rightFields": [
"street",
"state_postcode_suburb"
],
"outputView": "official_address_compare"
}
Complete
{
"type": "SimilarityJoinTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "test",
"environments": [
"production",
"test"
],
"leftView": "official_postal_addresses",
"leftFields": [
"flat_number",
"number_first",
"street_name",
"street_type",
"locality_name",
"postcode",
"state"
],
"rightView": "crm_addresses",
"rightFields": [
"street",
"state_postcode_suburb"
],
"outputView": "official_address_compare",
"threshold": 0.75,
"shingleLength": 3,
"numHashTables": 10,
"caseSensitive": false,
"persist": false,
"partitionBy": [
"state"
],
"numPartitions": 10
}
SQLTransform
Since: 1.0.0 - Supports Streaming: True
The SQLTransform
stage transforms the incoming dataset with a Spark SQL statement. This stage relies on previous stages to load and register the dataset views (outputView
) and will execute arbitrary SQL statements against those datasets.
All the inbuilt Spark SQL functions are available and have been extended with some additional functions.
Please be aware that in streaming mode not all join operations are available. See: Support matrix for joins in streaming queries.
CAST vs TypingTransform
It is strongly recommended to use the TypingTransform
for reproducible, repeatable results.
Whilst SQL is capable of converting data types using the CAST
function (e.g. CAST(dateColumn AS DATE)
) be very careful. ANSI SQL specifies that any failure to convert then an exception condition is raised: data exception-invalid character value for cast
whereas Spark SQL will return a null value and suppress any exceptions: try s.toString.toInt catch { case _: NumberFormatException => null }
. If you used a CAST
in a financial scenario, for example bill calculation, the silent NULL
ing of values could result in errors being suppressed and bills incorrectly calculated.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format.For example if the sqlParams contains parameter current_timestamp of value 2018-11-24 14:48:56 then this statement would execute in a deterministic way: SELECT * FROM customer WHERE expiry > FROM_UNIXTIME(UNIX_TIMESTAMP('${current_timestamp}', 'uuuu-MM-dd HH:mm:ss')) (so would be testable). |
The SQL statement is a plain Spark SQL statement, for example:
SELECT
customer.customer_id
,customer.first_name
,customer.last_name
,account.account_id
,account.account_name
FROM customer
LEFT JOIN account ON account.customer_id = customer.customer_id
Magic
The %sql
magic is available via arc-jupyter with these available parameters:
%sql name="sqltransform" description="description" environments=production,test outputView=example persist=true sqlParams=inputView=customer,inputField=id
SELECT
${inputField}
FROM ${inputView}
Examples
Minimal
{
"type": "SQLTransform",
"name": "standardise customer fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer.sql",
"outputView": "customer"
}
Complete
{
"type": "SQLTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "standardise customer fields",
"description": "standardise customer fields",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_dynamic.sql",
"sql": "SELECT id, name FROM customer_raw",
"outputView": "customer",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false,
"sqlParams": {
"current_date": "2018-11-24",
"current_timestamp": "2018-11-24 14:48:56"
}
}
The current_date
and current_timestamp
can easily be passed in as environment variables using $(date "+%Y-%m-%d")
and $(date "+%Y-%m-%d %H:%M:%S")
respectively.
TensorFlowServingTransform
Since: 1.0.0 - Supports Streaming: True
The TensorFlowServingTransform
stage transforms the incoming dataset by calling a TensorFlow Serving service. Because each call is atomic the TensorFlow Serving instances could be behind a load balancer to increase throughput.
To see how to host a simple model in TensorFlow Serving see:
https://github.com/tripl-ai/arc/tree/master/src/it/resources/tensorflow_serving
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
uri | String | true | The URI of the TensorFlow Serving REST end point. |
batchSize | Integer | false | The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient. Default: 100 . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
inputField | String | false | The field to pass to the model. JSON encoding can be used to pass multiple values (tuples). Default: value . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
params | Map[String, String] | false | Map of configuration parameters. Currently unused. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
responseType | String | false | The type returned by the TensorFlow Serving API. Expected to be integer , double or object (which may present as a string depending on how the model has been built).Default: object . |
signatureName | String | false | The name of the TensorFlow Serving signature. |
Examples
Minimal
{
"type": "TensorFlowServingTransform",
"name": "call the customer segmentation model",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_segmented",
"uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict"
}
Complete
{
"type": "TensorFlowServingTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "call the customer segmentation model",
"description": "call the customer segmentation model",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_segmented",
"uri": "http://tfserving:9001/v1/models/customer_segmentation/versions/1:predict",
"signatureName": "serving_default",
"batchSize": 100,
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": true,
"responseType": "integer"
}
TypingTransform
Since: 1.0.0 - Supports Streaming: True
The TypingTransform
stage transforms the incoming dataset with based on a schema defined in the schema format.
The logical process that is applied to perform the typing on a field-by-field basis is shown below.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
schema | Array | true* | An inline Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaURI | URI | true* | URI of the input JSON file containing the Arc schema. Only one of schema , schemaURI , schemaView can be provided. |
schemaView | String | true* | Similar to schemaURI but allows the Arc schema to be passed in as another DataFrame . Only one of schema , schemaURI , schemaView can be provided. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
failMode | String | false | Either permissive or failfast :permissive will process all rows in the dataset and collect any errors for each row in the _errors column. Rules can then be applied in a SQLValidate stage if required.failfast will fail the Arc job on the first row containing at least one error.Default: permissive . |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "TypingTransform",
"name": "apply data types to customer records",
"environments": [
"production",
"test"
],
"schemaURI": "hdfs://datalake/schema/customer.json",
"inputView": "customer_untyped",
"outputView": "customer"
}
Complete
{
"type": "TypingTransform",
"id": "00000000-0000-0000-0000-000000000000",
"name": "apply data types to customer records",
"description": "apply data types to customer records",
"environments": [
"production",
"test"
],
"schemaURI": "hdfs://datalake/schema/customer.json",
"inputView": "customer_untyped",
"outputView": "customer",
"authentication": {},
"failMode": "failfast",
"numPartitions": 10,
"partitionBy": [
"customerId"
],
"persist": false
}
A demonstration of how the TypingTransform
behaves. Assuming you have read an input like a DelimitedExtract which will read a dataset where all the columns are read as strings:
+-------------------------+---------------------+
|startTime |endTime |
+-------------------------+---------------------+
|2018-09-26 07:17:43 |2018-09-27 07:17:43 |
|2018-09-25 08:25:51 |2018-09-26 08:25:51 |
|2018-02-30 01:16:40 |2018-03-01 01:16:40 |
|30 February 2018 01:16:40|2018-03-2018 01:16:40|
+-------------------------+---------------------+
In this case the goal is to safely convert the values from strings like "2018-09-26 07:17:43"
to a proper timestamp
object so that we can ensure the timestamp is valid (e.g. not on a date that does not exist e.g. the 30 day of February) and can easily perform date operations such as subtracting 1 week. To do so a schema file could be constructed to look like:
[
{
"id": "8e42c8f0-22a8-40db-9798-6dd533c1de36",
"name": "startTime",
"description": "The startTime field.",
"type": "timestamp",
"trim": true,
"nullable": true,
"nullableValues": [
"",
"null"
],
"formatters": [
"uuuu-MM-dd HH:mm:ss"
],
"timezoneId": "UTC"
},
{
"id": "2e7553cf-2748-49cd-a291-8918823e706a",
"name": "endTime",
"description": "The endTime field.",
"type": "timestamp",
"trim": true,
"nullable": true,
"nullableValues": [
"",
"null"
],
"formatters": [
"uuuu-MM-dd HH:mm:ss"
],
"timezoneId": "UTC"
}
]
Here is the output of the TypingTransformation
when applied to the input dataset.
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|startTime |endTime |_errors |
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-26 17:17:43|2018-09-27 17:17:43|[] |
|2018-09-25 18:25:51|2018-09-26 18:25:51|[] |
|null |2018-03-01 12:16:40|[[startTime, Unable to convert '2018-02-30 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']] |
|null |null |[[startTime, Unable to convert '28 February 2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC'], [endTime, Unable to convert '2018-03-2018 01:16:40' to timestamp using formatters ['uuuu-MM-dd HH:mm:ss'] and timezone 'UTC']]|
+-------------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
- Because the conversion happened successfully for both values on the first two rows there are no errors for those rows.
- On the third row the value
'2018-02-30 01:16:40'
cannot be converted as the 30th day of February is not a valid date and the value is set tonull
. If thenullable
in the schema for fieldstartTime
was set tofalse
the job would fail as it would be unable to continue. - On the forth row both rows are invalid as the
formatter
anddate
values are both wrong.
The SQLValidate stage is a good way to use this data to enforce data quality constraints.
Logical Flow
The sequence that these fields are converted from string
fields to typed
fields is per this flow chart. Each value and its typing schema is passed into this logical process. For each row the values
are returned as standard table columns and the returned error
values are groupd into a field called _errors
on a row-by-row basis. Patterns for consuming the _errors
array is are demonstrated in the SQLValidate stage.