Extract

*Extract stages read in data from a database or file system.

*Extract stages should meet this criteria:

File based *Extract stages can accept glob patterns as input filenames which can be very useful to load just a subset of data. For example delta processing:

Pattern Description
* Matches zero or more characters.
? Matches any single character.
[abc] Matches a single character in the set {a, b, c}.
[a-b] Matches a single character from the character range {a...b}.
[^a-b] Matches a single character that is not from character set or range {a...b}.
{a,b} Matches either expression a or b.
\c Removes (escapes) any special meaning of character c.
{ab,c{de, fg}} Matches a string from the string set {ab, cde, cfg}.

Spark will automatically match file extensions of .bz2, .deflate and .gz and perform decompression automatically.

AvroExtract

Since: 1.0.0 - Supports Streaming: False

The AvroExtract stage reads one or more Apache Avro files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true* URI/Glob of the input delimited Avro files. If not present inputView is requred.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
inputField String false If using inputView this option allows you to specify the name of the field which contains the Avro binary data.
avroSchemaView URI false* If using inputView this option allows you to specify the Avro schema URI. Has been tested to work with the Kafka Schema Registry with URI like http://kafka-schema-registry:8081/schemas/ids/1 as well as standalone *.avsc files.

Examples

Minimal

{
  "type": "AvroExtract",
  "name": "load customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.avro",
  "outputView": "customer"
}

Complete

{
  "type": "AvroExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customer avro extract",
  "description": "load customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.avro",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/*.avro",
  "inputField": "value",
  "avroSchemaURI": "hdfs://datalake/schema/user.avsc"
}

BigQueryExtract

Supports Streaming: False

Plugin

The BigQueryExtract is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.

The BigQueryExtract stage reads directly from a BigQuery table and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
table String true The BigQuery table in the format [[project:]dataset.]table.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
dataset String false* The dataset containing the table. Required if omitted in table.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
maxParallelism Integer false The maximal number of partitions to split the data into.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
optimizedEmptyProjection Boolean false The connector uses an optimized empty projection (select without any columns) logic, used for count() execution.

Default: true.
parentProject String false The Google Cloud Project ID of the table to bill for the export. Defaults to the project of the Service Account being used.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
project String false The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
viewMaterializationDataset String false The dataset where the materialized view is going to be created. Defaults to view’s dataset.
viewMaterializationProject String false The Google Cloud Project ID where the materialized view is going to be created. Defaults to view’s project id.
viewsEnabled Boolean false Enables the connector to read from views and not only tables.

BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them. viewMaterializationProject and viewMaterializationDataset can be used to provide view materialization options.

Default: false.

Examples

Minimal

{
  "type": "BigQueryExtract",
  "name": "extract customers",
  "environments": [
    "production",
    "test"
  ],
  "table": "dataset.customer",
  "outputView": "customer"
}

Complete

{
  "type": "BigQueryExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "table": "customer",
  "dataset": "dataset",
  "outputView": "customer",
  "maxParallelism": 10,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "optimizedEmptyProjection": true,
  "parentProject": "parent-project",
  "project": "project",
  "persist": true,
  "viewMaterializationDataset": "dataset",
  "viewMaterializationProject": "project",
  "viewsEnabled": true
}

BytesExtract

Since: 1.0.9 - Supports Streaming: False

The BytesExtract stage reads one or more binary files and returns a DataFrame containing a Array[Byte] of the file content (named value) and the file path (named _filename).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input binaryfiles. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
failMode String false Either permissive or failfast:

permissive will create an empty dataframe of [value, _filename] in case of no files.

failfast will fail the Arc job if no files are found.

Default: failfast.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "BytesExtract",
  "name": "load images from the customer vehicle photos directory",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/vehicles/*.jpg",
  "outputView": "customer_vehicles_photos"
}

Complete

{
  "type": "BytesExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load images from the customer vehicle photos directory",
  "description": "load images from the customer vehicle photos directory",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/vehicles/*.jpg",
  "outputView": "customer_vehicles_photos",
  "persist": false,
  "numPartitions": 10,
  "contiguousIndex": false,
  "authentication": {},
  "failMode": "permissive"
}

CassandraExtract

Since: 2.0.0 - Supports Streaming: False

Plugin

The CassandraExtract is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.

The CassandraExtract reads directly from a Cassandra cluster and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
keyspace String true The name of the Cassandra keyspace to extract from.
table String true The name of the Cassandra table to extract from.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "CassandraExtract",
  "name": "read",
  "environments": [
    "production",
    "test"
  ],
  "keyspace": "default",
  "table": "customer",
  "outputView": "customer"
}

Complete

{
  "type": "CassandraExtract",
  "name": "read",
  "environments": [
    "production",
    "test"
  ],
  "keyspace": "default",
  "table": "customer",
  "outputView": "customer",
  "params": {
    "spark.cassandra.connection.host": "cassandra"
  },
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": true
}

DeltaLakeExtract

Since: 2.0.0 - Supports Streaming: True

Plugin

The DeltaLakeExtract is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.

The DeltaLakeExtract stage reads one or more DeltaLake files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input Databricks Delta files.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
options Map[String, String] false Time travel options to allow loading previous versions of the data. These values are limited to:

versionAsOf allows travelling to a specific version.

timestampAsOf allows travelling to the state before a specified timestamp.

relativeVersion allows travelling relative to the current version where the current version is 0 and -1 is the previous version.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.

Examples

Minimal

{
  "type": "DeltaLakeExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "/delta/customers",
  "outputView": "customer"
}

Complete

{
  "type": "DeltaLakeExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "/delta/customers",
  "outputView": "customer",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "options": {
    "versionAsOf": 1,
    "timestampAsOf": "2019-01-01'T'00:00:00.000Z",
    "relativeVersion": -1
  }
}

DelimitedExtract

Since: 1.0.0 - Supports Streaming: True

The DelimitedExtract stage reads either one or more delimited text files or an input Dataset[String] and returns a DataFrame. DelimitedExtract will always set the underlying Spark configuration option of inferSchema to false to ensure consistent results.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input delimited text files. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
delimiter String false The type of delimiter in the file. Supported values: Comma, Pipe, DefaultHive. DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.

Default: Comma.
customDelimiter String true* A custom string to use as delimiter. Required if delimiter is set to Custom.
escape String false A single character used for escaping quotes inside an already quoted value. Default: \.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
header Boolean false Whether or not the dataset contains a header row. If available the output dataset will have named columns otherwise columns will be named _col1, _col2_colN.

Default: false.
id String false A optional unique identifier for this stage.
multiLine Boolean false Whether to load multiple lines as a single record or as individual records split by newline.

Default: false.
inputField String false If using inputView this option allows you to specify the name of the field which contains the delimited data.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
quote String false The type of quoting in the file. Supported values: None, SingleQuote, DoubleQuote.

Default: DoubleQuote.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "DelimitedExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.csv",
  "outputView": "customer"
}

Complete

{
  "type": "DelimitedExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customer csv extract",
  "description": "load customer csv extract",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.csv",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "delimiter": "Custom",
  "customDelimiter": "#",
  "escape": "\"",
  "header": false,
  "multiLine": false,
  "inputField": "csvdata",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "quote": "DoubleQuote",
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

ElasticsearchExtract

Since: 1.9.0 - Supports Streaming: False

Plugin

The ElasticsearchExtract is provided by the https://github.com/tripl-ai/arc-elasticsearch-pipeline-plugin package.

The ElasticsearchExtract stage reads from an Elasticsearch cluster and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
input String true The name of the source Elasticsearch index.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "ElasticsearchExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "input": "customer",
  "outputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  }
}

Complete

{
  "type": "ElasticsearchExtract",
  "name": "load customer extract",
  "environments": [
    "production",
    "test"
  ],
  "input": "customer",
  "outputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  },
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false
}

HTTPExtract

Since: 1.0.0 - Supports Streaming: False

The HTTPExtract executes either a GET or POST request against a remote HTTP service and returns a DataFrame which will have a single row and single column holding the value of the HTTP response body.

This stage would typically be used with a JSONExtract stage by specifying inputView instead of inputURI (setting multiLine=true allows processing of JSON array responses).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset containing the list of URIs in value field. If not present inputURI is requred.
inputURI URI true* URI of the HTTP server. If not present inputView is requred.
uriField String false The name of a field containing the URI to send the request to. Only used if inputView specified. Takes precedence over inputURI if specified.
bodyField String false The name of a field containing the request body/entity that is sent with a POST request. Only used if inputView specified. Takes precedence over body if specified.
body String false The request body/entity that is sent with a POST request.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
id String false A optional unique identifier for this stage.
method String false The request type with valid values GET or POST.

Default: GET.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
outputView String true Name of outgoing Spark dataset after processing.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202].

Examples

Minimal

{
  "type": "HTTPExtract",
  "name": "load customer extract from api",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "https://endpoint:9000/customers",
  "outputView": "customer"
}

Complete

{
  "type": "HTTPExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customer extract from api",
  "description": "load customer extract from api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customers",
  "uriField": "uri",
  "bodyField": "body",
  "inputURI": "https://endpoint:9000/customers",
  "outputView": "customer",
  "body": "",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "method": "GET",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "validStatusCodes": [
    200
  ]
}

ImageExtract

Since: 1.4.1 - Supports Streaming: True

The ImageExtract stage reads one or more image files and returns a DataFrame which has one column: image, containing image data (jpeg, png, gif, bmp, wbmp) stored with the schema:

Field Type Description
origin String The file path of the image.
height Integer The height of the image.
width Integer The width of the image.
nChannels Integer The number of image channels.
mode Integer OpenCV-compatible type.
data Binary Image bytes in OpenCV-compatible order: row-wise BGR in most cases.

This means the image data can be accessed like:

SELECT image.height FROM dataset

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input images.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
dropInvalid Boolean false Whether to drop any invalid image files.

Default: true.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "ImageExtract",
  "name": "load customer images",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.jpg",
  "outputView": "customer"
}

Complete

{
  "type": "ImageExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customer images",
  "description": "load customer images",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.jpg",
  "outputView": "customer",
  "authentication": {},
  "dropInvalid": true,
  "numPartitions": 10,
  "partitionBy": [
    "image.width"
  ],
  "persist": false,
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

JDBCExtract

Since: 1.0.0 - Supports Streaming: False

The JDBCExtract reads directly from a JDBC Database and returns a DataFrame. See Spark JDBC documentation.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
jdbcURL String true The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306.
tableName String true The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used, e.g. (SELECT * FROM sourcetable WHERE key=value) sourcetable or just sourcetable.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
fetchsize Integer false The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here.
partitionBy Array[String] false Columns to partition the data by.
partitionColumn String false The name of a numeric column from the table in question which defines how to partition the table when reading in parallel from multiple workers. If set numPartitions must also be set.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
predicates Array[String] false A list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame to allow explicit parallel reads.

e.g. ['id=1', 'id=2', 'id=3', 'id=4'] would create 4 parallel readers.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.

Examples

Minimal

{
  "type": "JDBCExtract",
  "name": "load active customers from postgres",
  "environments": [
    "production",
    "test"
  ],
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
  "outputView": "customer"
}

Complete

{
  "type": "JDBCExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load active customers from postgresql",
  "description": "load active customers from postgresql",
  "environments": [
    "production",
    "test"
  ],
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "fetchsize": 1000,
  "numPartitions": 10,
  "params": {
    "user": "mydbuser",
    "password": "mydbpassword"
  },
  "partitionBy": [
    "country"
  ],
  "partitionColumn": "id",
  "persist": true,
  "predicates": [
    "id=1",
    "id=2",
    "id=3",
    "id=4"
  ],
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema"
}

JSONExtract

Since: 1.0.0 - Supports Streaming: True

The JSONExtract stage reads either one or more JSON files or an input Dataset[String] and returns a DataFrame.

If trying to run against an inputView in streaming mode this stage will not work. Instead try using the from_json SQL Function with a SQLTransform.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input json files. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
inputField String false If using inputView this option allows you to specify the name of the field which contains the delimited data.
multiLine Boolean false Whether the input directory contains a single JSON object per file or multiple JSON records in a single file, one per line (see JSONLines.

Default: true.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.



Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.
schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "JSONExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.json",
  "outputView": "customer"
}

Complete

{
  "type": "JSONExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.json",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "inputField": "jsondata",
  "multiLine": false,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

KafkaExtract

Since: 1.0.8 - Supports Streaming: True

Plugin

The KafkaExtract is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.

The KafkaExtract stage reads records from a Kafka topic and returns a DataFrame. It requires a unique groupID to be set which on first run will consume from the earliest offset available in Kafka. Each subsequent run will use the offset as recorded against that groupID. This means that if a job fails before properly processing the data then data may need to be restarted from the earliest offset by creating a new groupID.

The returned DataFrame has the schema:

Field Type Description
topic String The Kafka Topic.
partition Integer The partition ID.
offset Long The record offset.
timestamp Long The record timestamp.
timestampType Int The record timestamp type.
key Binary The record key as a byte array.
value Binary The record value as a byte array.

Can be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour (with autoCommit set to false) - in that the offset commit can be deferred until certain dependent stages are sucessfully executed.

To convert the key or value from a Binary/byte array to a string it is possible to use the decode SQL Function with a SQLTransform like:

SELECT
  CAST(key AS STRING) AS stringKey,
  CAST(value AS STRING) AS stringValue,
  ...

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
bootstrapServers String true A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,...
topic String true The target Kafka topic.
groupID String true A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions.
autoCommit Boolean false Whether to update the offsets in Kafka automatically. To be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour.

If autoCommit is set to false this stage will force persist equal to true so that Spark will not execute the Kafka extract process twice with a potentially different result (e.g. new messages added between extracts).

Default: false.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
maxPollRecords Int false The maximum number of records returned in a single call to Kafka. Arc will then continue to poll until all records have been read.

Default: 500.
maxRecords Int false The maximum number of records returned in a single execution of this stage when executed in batch mode.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
strict Boolean false Whether to perform record count validation. Will not work with compacted topics. Default: true.
timeout Long false The time, in milliseconds, spent waiting in poll if data is not available in Kafka. Default: 10000.

Examples

Minimal

{
  "type": "KafkaExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers",
  "groupID": "spark-customer-extract-job"
}

Complete

{
  "type": "KafkaExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers",
  "groupID": "spark-customer-extract-job",
  "autoCommit": false,
  "maxPollRecords": 500,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "timeout": 10000,
  "strict":  true
}

MetadataExtract

Since: 2.4.0 - Supports Streaming: True

The MetadataExtract stage extracts the metadata attached to an input Dataframe and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.

Examples

Minimal

{
  "type": "MetadataExtract",
  "name": "extract metadata from customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_metadata"
}

Complete

{
  "type": "MetadataExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "extract metadata from customer view",
  "description": "extract metadata from customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_metadata",
  "numPartitions": 1,
  "partitionBy": [
    "type"
  ],
  "persist": false
}

MongoDBExtract

Since: 2.0.0 - Supports Streaming: False

Plugin

The MongoDBExtract is provided by the https://github.com/tripl-ai/arc-mongo-pipeline-plugin package.

The MongoDBExtract stage reads a collection from MongoDB and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
options Map[String, String] false Map of configuration parameters. These parameters are used to provide database connection/collection details.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.

Examples

Minimal

{
  "type": "MongoDBExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "options": {
    "uri": "mongodb://username:password@mongo:27017",
    "database": "local",
    "collection": "customer"
  },
  "outputView": "customers"
}

Complete

{
  "type": "MongoDBExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "options": {
    "uri": "mongodb://username:password@mongo:27017",
    "database": "local",
    "collection": "customer"
  },
  "outputView": "customers",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/metadata/customer.json",
  "schemaView": "customer_schema"
}

ORCExtract

Since: 1.0.0 - Supports Streaming: True

The ORCExtract stage reads one or more Apache ORC files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input ORC files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "ORCExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.orc",
  "outputView": "customer"
}

Complete

{
  "type": "ORCExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.orc",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

ParquetExtract

Since: 1.0.0 - Supports Streaming: True

The ParquetExtract stage reads one or more Apache Parquet files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input Parquet files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "ParquetExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.parquet",
  "outputView": "customer"
}

Complete

{
  "type": "ParquetExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.parquet",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

RateExtract

Since: 1.2.0 - Supports Streaming: True

The RateExtract stage creates a streaming datasource which creates rows into a streaming DataFrame with the signature [timestamp: timestamp, value: long].

This stage has been included for testing Structured Streaming jobs as it can be very difficult to generate test data. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment is set to test).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
outputView String true Name of outgoing Spark dataset after processing.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
rampUpTime Integer false How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds.

Default: 0.
rowsPerSecond Integer false How many rows should be generated per second.

Default: 1.

Examples

Minimal

{
  "type": "RateExtract",
  "name": "create a streaming source",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "stream"
}

Complete

{
  "type": "RateExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "create a streaming source",
  "description": "create a streaming source",
  "environments": [
    "production",
    "test"
  ],
  "outputView": "stream",
  "rowsPerSecond": 2,
  "rampUpTime": 0,
  "numPartitions": 10
}

SASExtract

Since: 2.4.0 - Supports Streaming: True

Plugin

The SASExtract is provided by the https://github.com/tripl-ai/arc-sas-pipeline-plugin package.

The SASExtract stage reads a collection from SAS sas7bdat binary file and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI/Glob of the input sas7bdat files.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
options Map[String, String] false Options for reading the sas7bdat file. These values are limited to:

inferDecimal: infer numeric columns with format width > 0 and format precision > 0, as Decimal(Width, Precision).

inferDecimalScale: scale of inferred decimals.

inferFloat: infer numeric columns with <= 4 bytes, as Float.

inferInt: infer numeric columns with <= 4 bytes, format width > 0 and format precision =0, as Int.

inferLong: infer numeric columns with <= 8 bytes, format width > 0 and format precision = 0, as Long.

inferShort: infer numeric columns with <= 2 bytes, format width > 0 and format precision = 0, as Short.

maxSplitSize: maximum byte length of input splits which can be decreased to force higher parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.

Examples

Minimal

{
  "type": "SASExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.sas7bdat",
  "outputView": "customer"
}

Complete

{
  "type": "SASExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.sas7bdat",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/metadata/customer.json",
  "schemaView": "customer_schema",
  "options": {
    "maxSplitSize": 100000000
  }
}

StatisticsExtract

Since: 3.5.0 - Supports Streaming: True

The StatisticsExtract stage extracts the column statistics from to an input Dataframe and returns a DataFrame.

It differs from the Spark inbuilt summary by:

  • operates on all data types.
  • returns row-based data rather than column-based (i.e. each input column is one row in the output)
  • count_distinct and null_count additional metrics

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputView String true Name of outgoing Spark dataset after processing.
approximate Boolean false Whether to calculate approximate statistics or full population based statistics. Calculating population based statistics is a computationally and memory intenstive operation and may result in very long runtime or exceed memory limits.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
histogram Boolean false Whether to calculate distribution statistics (25%, 50%, 75%).

Default: false.
id String false A optional unique identifier for this stage.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
hllRelativeSD Double false The maximum relative standard deviation for the distinct_count output variable. Smaller values will provide greater precision at the expense of runtime.

Default: 0.05.

Examples

Minimal

{
  "type": "StatisticsExtract",
  "name": "extract column statistics from customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_statistics"
}

Complete

{
  "type": "StatisticsExtract",
  "name": "extract column statistics from customer view",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputView": "customer_statistics",
  "approximate": true,
  "persist": true
}

TextExtract

Since: 1.2.0 - Supports Streaming: True

The TextExtract stage reads either one or more text files and returns a DataFrame.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true* Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred.
inputURI URI true* URI/Glob of the input text files. If not present inputView is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
basePath URI false The base path that partition discovery should start with.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
multiLine Boolean false Whether to load the file as a single record or as individual records split by newline.

Default: false.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.

schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
watermark Object false A structured streaming watermark object.

Requires eventTime and delayThreshold attributes.

Examples

Minimal

{
  "type": "TextExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.txt",
  "outputView": "customer"
}

Complete

{
  "type": "TextExtract",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.txt",
  "inputView": "list_of_text_files_to_extract",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "multiLine": true,
  "numPartitions": 10,
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "basePath": "hdfs://datalake/customer/",
  "watermark": {
    "eventTime": "timestamp",
    "delayThreshold": "10 minutes"
  }
}

XMLExtract

Since: 1.0.0 - Supports Streaming: False

The XMLExtract stage reads one or more XML files or an input Dataset[String] and returns a DataFrame.

This extract works slightly different to the spark-xml package. To access the data you can use a SQLTransform query like this which will create a new value for each row of the bk:books array:

SELECT EXPLODE(`bk:books`).*
FROM books_xml

The backtick character (`) can be used to address fields with non-alphanumeric names.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true* URI/Glob of the input delimited XML files. If not present inputView is requred.
inputView String true* Name of the incoming Spark dataset. If not present inputURI is requred.
outputView String true Name of outgoing Spark dataset after processing.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
contiguousIndex Boolean false When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.

The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index.

Default: true.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
inputField String false If using inputView this option allows you to specify the name of the field which contains the XML data.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
schemaURI URI false

Used for multiple purposes:

  • Can be used to set metadata on a the extracted DataFrame. Note this will overwrite the existing metadata if it exists.

  • Can be used to specify a schema in case of no input files. This stage will create an empty DataFrame with this schema so any downstream logic that depends on the columns in this dataset, e.g. SQLTransform, is still able to run. This feature can be used to allow deployment of business logic that depends on a dataset which has not been enabled by an upstream sending system.



Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.
schemaView String false Similar to schemaURI but allows the schema to be passed in as another DataFrame.
xsdURI URI false URI of an XML Schema Definition (XSD) file used to validate input XML.

Examples

Minimal

{
  "type": "XMLExtract",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/xml/*.xml",
  "outputView": "customer"
}

Complete

{
  "type": "XMLExtract",
  "name": "load customers",
  "description": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/customer/*.xml",
  "xsdURI": "hdfs://datalake/xml/customer.xsd",
  "outputView": "customer",
  "authentication": {},
  "contiguousIndex": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "persist": false,
  "schemaURI": "hdfs://datalake/schema/customer.json",
  "schemaView": "customer_schema",
  "inputView": "customer_xml",
  "inputField": "xml"
}