Extract
*Extract
stages read in data from a database or file system.
*Extract
stages should meet this criteria:
- Read data from local or remote filesystems and return a
DataFrame
. - Do not transform/mutate the data.
- Allow for Predicate Pushdown depending on data source.
File based *Extract
stages can accept glob
patterns as input filenames which can be very useful to load just a subset of data. For example delta processing:
Pattern | Description |
---|---|
* |
Matches zero or more characters. |
? |
Matches any single character. |
[abc] |
Matches a single character in the set {a, b, c} . |
[a-b] |
Matches a single character from the character range {a...b} . |
[^a-b] |
Matches a single character that is not from character set or range {a...b} . |
{a,b} |
Matches either expression a or b . |
\c |
Removes (escapes) any special meaning of character c . |
{ab,c{de, fg}} |
Matches a string from the string set {ab, cde, cfg} . |
Spark will automatically match file extensions of .bz2
, .deflate
and .gz
and perform decompression automatically.
AvroExtract
Since: 1.0.0 - Supports Streaming: False
The AvroExtract
stage reads one or more Apache Avro files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true* | URI/Glob of the input delimited Avro files. If not present inputView is requred. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the Avro binary data. |
avroSchemaView | URI | false* | If using inputView this option allows you to specify the Avro schema URI. Has been tested to work with the Kafka Schema Registry with URI like http://kafka-schema-registry:8081/schemas/ids/1 as well as standalone *.avsc files. |
Examples
Minimal
{
"type": "AvroExtract",
"name": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.avro",
"outputView": "customer"
}
Complete
{
"type": "AvroExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customer avro extract",
"description": "load customer avro extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.avro",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/*.avro",
"inputField": "value",
"avroSchemaURI": "hdfs://datalake/schema/user.avsc"
}
BigQueryExtract
Supports Streaming: False
Plugin
The BigQueryExtract
is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.
The BigQueryExtract
stage reads directly from a BigQuery table and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
table | String | true | The BigQuery table in the format [[project:]dataset.]table. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
dataset | String | false* | The dataset containing the table. Required if omitted in table. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
maxParallelism | Integer | false | The maximal number of partitions to split the data into. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
optimizedEmptyProjection | Boolean | false | The connector uses an optimized empty projection (select without any columns) logic, used for count() execution.Default: true . |
parentProject | String | false | The Google Cloud Project ID of the table to bill for the export. Defaults to the project of the Service Account being used. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
project | String | false | The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used. |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
viewMaterializationDataset | String | false | The dataset where the materialized view is going to be created. Defaults to view’s dataset. |
viewMaterializationProject | String | false | The Google Cloud Project ID where the materialized view is going to be created. Defaults to view’s project id. |
viewsEnabled | Boolean | false | Enables the connector to read from views and not only tables. BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them. viewMaterializationProject and viewMaterializationDataset can be used to provide view materialization options.Default: false . |
Examples
Minimal
{
"type": "BigQueryExtract",
"name": "extract customers",
"environments": [
"production",
"test"
],
"table": "dataset.customer",
"outputView": "customer"
}
Complete
{
"type": "BigQueryExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"table": "customer",
"dataset": "dataset",
"outputView": "customer",
"maxParallelism": 10,
"numPartitions": 10,
"partitionBy": [
"country"
],
"optimizedEmptyProjection": true,
"parentProject": "parent-project",
"project": "project",
"persist": true,
"viewMaterializationDataset": "dataset",
"viewMaterializationProject": "project",
"viewsEnabled": true
}
BytesExtract
Since: 1.0.9 - Supports Streaming: False
The BytesExtract
stage reads one or more binary files and returns a DataFrame
containing a Array[Byte]
of the file content (named value
) and the file path (named _filename
).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input binaryfiles. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
failMode | String | false | Either permissive or failfast :permissive will create an empty dataframe of [value, _filename] in case of no files.failfast will fail the Arc job if no files are found.Default: failfast . |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "BytesExtract",
"name": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos"
}
Complete
{
"type": "BytesExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load images from the customer vehicle photos directory",
"description": "load images from the customer vehicle photos directory",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/vehicles/*.jpg",
"outputView": "customer_vehicles_photos",
"persist": false,
"numPartitions": 10,
"contiguousIndex": false,
"authentication": {},
"failMode": "permissive"
}
CassandraExtract
Since: 2.0.0 - Supports Streaming: False
Plugin
The CassandraExtract
is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.
The CassandraExtract
reads directly from a Cassandra cluster and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
keyspace | String | true | The name of the Cassandra keyspace to extract from. |
table | String | true | The name of the Cassandra table to extract from. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "CassandraExtract",
"name": "read",
"environments": [
"production",
"test"
],
"keyspace": "default",
"table": "customer",
"outputView": "customer"
}
Complete
{
"type": "CassandraExtract",
"name": "read",
"environments": [
"production",
"test"
],
"keyspace": "default",
"table": "customer",
"outputView": "customer",
"params": {
"spark.cassandra.connection.host": "cassandra"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": true
}
DeltaLakeExtract
Since: 2.0.0 - Supports Streaming: True
Plugin
The DeltaLakeExtract
is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.
The DeltaLakeExtract
stage reads one or more DeltaLake files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input Databricks Delta files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
options | Map[String, String] | false | Time travel options to allow loading previous versions of the data. These values are limited to:versionAsOf allows travelling to a specific version.timestampAsOf allows travelling to the state before a specified timestamp.relativeVersion allows travelling relative to the current version where the current version is 0 and -1 is the previous version. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
Examples
Minimal
{
"type": "DeltaLakeExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer"
}
Complete
{
"type": "DeltaLakeExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "/delta/customers",
"outputView": "customer",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"options": {
"versionAsOf": 1,
"timestampAsOf": "2019-01-01'T'00:00:00.000Z",
"relativeVersion": -1
}
}
DelimitedExtract
Since: 1.0.0 - Supports Streaming: True
The DelimitedExtract
stage reads either one or more delimited text files or an input Dataset[String]
and returns a DataFrame
. DelimitedExtract
will always set the underlying Spark configuration option of inferSchema
to false
to ensure consistent results.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input delimited text files. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
delimiter | String | false | The type of delimiter in the file. Supported values: Comma , Pipe , DefaultHive . DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.Default: Comma . |
customDelimiter | String | true* | A custom string to use as delimiter. Required if delimiter is set to Custom . |
escape | String | false | A single character used for escaping quotes inside an already quoted value. Default: \ . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
header | Boolean | false | Whether or not the dataset contains a header row. If available the output dataset will have named columns otherwise columns will be named _col1 , _col2 … _colN .Default: false . |
id | String | false | A optional unique identifier for this stage. |
multiLine | Boolean | false | Whether to load multiple lines as a single record or as individual records split by newline. Default: false . |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
quote | String | false | The type of quoting in the file. Supported values: None , SingleQuote , DoubleQuote .Default: DoubleQuote . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "DelimitedExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.csv",
"outputView": "customer"
}
Complete
{
"type": "DelimitedExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customer csv extract",
"description": "load customer csv extract",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.csv",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"delimiter": "Custom",
"customDelimiter": "#",
"escape": "\"",
"header": false,
"multiLine": false,
"inputField": "csvdata",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"quote": "DoubleQuote",
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
ElasticsearchExtract
Since: 1.9.0 - Supports Streaming: False
Plugin
The ElasticsearchExtract
is provided by the https://github.com/tripl-ai/arc-elasticsearch-pipeline-plugin package.
The ElasticsearchExtract
stage reads from an Elasticsearch cluster and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
input | String | true | The name of the source Elasticsearch index. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
params | Map[String, String] | false | Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
}
}
Complete
{
"type": "ElasticsearchExtract",
"name": "load customer extract",
"environments": [
"production",
"test"
],
"input": "customer",
"outputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false
}
HTTPExtract
Since: 1.0.0 - Supports Streaming: False
The HTTPExtract
executes either a GET
or POST
request against a remote HTTP service and returns a DataFrame
which will have a single row and single column holding the value of the HTTP response body.
This stage would typically be used with a JSONExtract
stage by specifying inputView
instead of inputURI
(setting multiLine
=true
allows processing of JSON array responses).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset containing the list of URIs in value field. If not present inputURI is requred. |
inputURI | URI | true* | URI of the HTTP server. If not present inputView is requred. |
uriField | String | false | The name of a field containing the URI to send the request to. Only used if inputView specified. Takes precedence over inputURI if specified. |
bodyField | String | false | The name of a field containing the request body/entity that is sent with a POST request. Only used if inputView specified. Takes precedence over body if specified. |
body | String | false | The request body/entity that is sent with a POST request. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
id | String | false | A optional unique identifier for this stage. |
method | String | false | The request type with valid values GET or POST .Default: GET . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202] . |
Examples
Minimal
{
"type": "HTTPExtract",
"name": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer"
}
Complete
{
"type": "HTTPExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customer extract from api",
"description": "load customer extract from api",
"environments": [
"production",
"test"
],
"inputView": "customers",
"uriField": "uri",
"bodyField": "body",
"inputURI": "https://endpoint:9000/customers",
"outputView": "customer",
"body": "",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"method": "GET",
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"validStatusCodes": [
200
]
}
ImageExtract
Since: 1.4.1 - Supports Streaming: True
The ImageExtract
stage reads one or more image files and returns a DataFrame
which has one column: image
, containing image data (jpeg
, png
, gif
, bmp
, wbmp
) stored with the schema:
Field | Type | Description |
---|---|---|
origin |
String | The file path of the image. |
height |
Integer | The height of the image. |
width |
Integer | The width of the image. |
nChannels |
Integer | The number of image channels. |
mode |
Integer | OpenCV-compatible type. |
data |
Binary | Image bytes in OpenCV-compatible order: row-wise BGR in most cases. |
This means the image data can be accessed like:
SELECT image.height FROM dataset
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input images. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
dropInvalid | Boolean | false | Whether to drop any invalid image files. Default: true. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "ImageExtract",
"name": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.jpg",
"outputView": "customer"
}
Complete
{
"type": "ImageExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customer images",
"description": "load customer images",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.jpg",
"outputView": "customer",
"authentication": {},
"dropInvalid": true,
"numPartitions": 10,
"partitionBy": [
"image.width"
],
"persist": false,
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
JDBCExtract
Since: 1.0.0 - Supports Streaming: False
The JDBCExtract
reads directly from a JDBC Database and returns a DataFrame
. See Spark JDBC documentation.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
jdbcURL | String | true | The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306 . |
tableName | String | true | The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used, e.g. (SELECT * FROM sourcetable WHERE key=value) sourcetable or just sourcetable . |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
fetchsize | Integer | false | The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here. |
partitionBy | Array[String] | false | Columns to partition the data by. |
partitionColumn | String | false | The name of a numeric column from the table in question which defines how to partition the table when reading in parallel from multiple workers. If set numPartitions must also be set. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
predicates | Array[String] | false | A list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame to allow explicit parallel reads.e.g. ['id=1', 'id=2', 'id=3', 'id=4'] would create 4 parallel readers. |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
Examples
Minimal
{
"type": "JDBCExtract",
"name": "load active customers from postgres",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer"
}
Complete
{
"type": "JDBCExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load active customers from postgresql",
"description": "load active customers from postgresql",
"environments": [
"production",
"test"
],
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "(SELECT * FROM customer WHERE active=TRUE) customer",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"fetchsize": 1000,
"numPartitions": 10,
"params": {
"user": "mydbuser",
"password": "mydbpassword"
},
"partitionBy": [
"country"
],
"partitionColumn": "id",
"persist": true,
"predicates": [
"id=1",
"id=2",
"id=3",
"id=4"
],
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema"
}
JSONExtract
Since: 1.0.0 - Supports Streaming: True
The JSONExtract
stage reads either one or more JSON files or an input Dataset[String]
and returns a DataFrame
.
If trying to run against an inputView
in streaming mode this stage will not work. Instead try using the from_json SQL Function with a SQLTransform.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input json files. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the delimited data. |
multiLine | Boolean | false | Whether the input directory contains a single JSON object per file or multiple JSON records in a single file, one per line (see JSONLines. Default: true. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "JSONExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.json",
"outputView": "customer"
}
Complete
{
"type": "JSONExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.json",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"inputField": "jsondata",
"multiLine": false,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
KafkaExtract
Since: 1.0.8 - Supports Streaming: True
Plugin
The KafkaExtract
is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.
The KafkaExtract
stage reads records from a Kafka topic
and returns a DataFrame
. It requires a unique groupID
to be set which on first run will consume from the earliest
offset available in Kafka. Each subsequent run will use the offset as recorded against that groupID
. This means that if a job fails before properly processing the data then data may need to be restarted from the earliest offset by creating a new groupID
.
The returned DataFrame
has the schema:
Field | Type | Description |
---|---|---|
topic |
String | The Kafka Topic. |
partition |
Integer | The partition ID. |
offset |
Long | The record offset. |
timestamp |
Long | The record timestamp. |
timestampType |
Int | The record timestamp type. |
key |
Binary | The record key as a byte array. |
value |
Binary | The record value as a byte array. |
Can be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour (with autoCommit
set to false
) - in that the offset commit can be deferred until certain dependent stages are sucessfully executed.
To convert the key
or value
from a Binary/byte array to a string it is possible to use the decode SQL Function with a SQLTransform like:
SELECT
CAST(key AS STRING) AS stringKey,
CAST(value AS STRING) AS stringValue,
...
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
bootstrapServers | String | true | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,... |
topic | String | true | The target Kafka topic. |
groupID | String | true | A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions. |
autoCommit | Boolean | false | Whether to update the offsets in Kafka automatically. To be used in conjuction with KafkaCommitExecute to allow quasi-transactional behaviour. If autoCommit is set to false this stage will force persist equal to true so that Spark will not execute the Kafka extract process twice with a potentially different result (e.g. new messages added between extracts).Default: false . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
maxPollRecords | Int | false | The maximum number of records returned in a single call to Kafka. Arc will then continue to poll until all records have been read. Default: 500 . |
maxRecords | Int | false | The maximum number of records returned in a single execution of this stage when executed in batch mode. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
strict | Boolean | false | Whether to perform record count validation. Will not work with compacted topics. Default: true . |
timeout | Long | false | The time, in milliseconds, spent waiting in poll if data is not available in Kafka. Default: 10000 . |
Examples
Minimal
{
"type": "KafkaExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job"
}
Complete
{
"type": "KafkaExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"outputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"groupID": "spark-customer-extract-job",
"autoCommit": false,
"maxPollRecords": 500,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"timeout": 10000,
"strict": true
}
MetadataExtract
Since: 2.4.0 - Supports Streaming: True
The MetadataExtract
stage extracts the metadata attached to an input Dataframe
and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
Examples
Minimal
{
"type": "MetadataExtract",
"name": "extract metadata from customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_metadata"
}
Complete
{
"type": "MetadataExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "extract metadata from customer view",
"description": "extract metadata from customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_metadata",
"numPartitions": 1,
"partitionBy": [
"type"
],
"persist": false
}
MongoDBExtract
Since: 2.0.0 - Supports Streaming: False
Plugin
The MongoDBExtract
is provided by the https://github.com/tripl-ai/arc-mongo-pipeline-plugin package.
The MongoDBExtract
stage reads a collection from MongoDB and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
options | Map[String, String] | false | Map of configuration parameters. These parameters are used to provide database connection/collection details. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
Examples
Minimal
{
"type": "MongoDBExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"options": {
"uri": "mongodb://username:password@mongo:27017",
"database": "local",
"collection": "customer"
},
"outputView": "customers"
}
Complete
{
"type": "MongoDBExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"options": {
"uri": "mongodb://username:password@mongo:27017",
"database": "local",
"collection": "customer"
},
"outputView": "customers",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema"
}
ORCExtract
Since: 1.0.0 - Supports Streaming: True
The ORCExtract
stage reads one or more Apache ORC files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input ORC files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "ORCExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.orc",
"outputView": "customer"
}
Complete
{
"type": "ORCExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.orc",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
ParquetExtract
Since: 1.0.0 - Supports Streaming: True
The ParquetExtract
stage reads one or more Apache Parquet files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input Parquet files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "ParquetExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.parquet",
"outputView": "customer"
}
Complete
{
"type": "ParquetExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.parquet",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
RateExtract
Since: 1.2.0 - Supports Streaming: True
The RateExtract
stage creates a streaming datasource which creates rows into a streaming DataFrame
with the signature [timestamp: timestamp, value: long]
.
This stage has been included for testing Structured Streaming jobs as it can be very difficult to generate test data. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment
is set to test
).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
rampUpTime | Integer | false | How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. Default: 0. |
rowsPerSecond | Integer | false | How many rows should be generated per second. Default: 1. |
Examples
Minimal
{
"type": "RateExtract",
"name": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream"
}
Complete
{
"type": "RateExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "create a streaming source",
"description": "create a streaming source",
"environments": [
"production",
"test"
],
"outputView": "stream",
"rowsPerSecond": 2,
"rampUpTime": 0,
"numPartitions": 10
}
SASExtract
Since: 2.4.0 - Supports Streaming: True
Plugin
The SASExtract
is provided by the https://github.com/tripl-ai/arc-sas-pipeline-plugin package.
The SASExtract
stage reads a collection from SAS sas7bdat
binary file and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI/Glob of the input sas7bdat files. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
options | Map[String, String] | false | Options for reading the sas7bdat file. These values are limited to:inferDecimal : infer numeric columns with format width > 0 and format precision > 0, as Decimal(Width, Precision) .inferDecimalScale : scale of inferred decimals.inferFloat : infer numeric columns with <= 4 bytes, as Float .inferInt : infer numeric columns with <= 4 bytes, format width > 0 and format precision =0, as Int .inferLong : infer numeric columns with <= 8 bytes, format width > 0 and format precision = 0, as Long .inferShort : infer numeric columns with <= 2 bytes, format width > 0 and format precision = 0, as Short .maxSplitSize : maximum byte length of input splits which can be decreased to force higher parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
Examples
Minimal
{
"type": "SASExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.sas7bdat",
"outputView": "customer"
}
Complete
{
"type": "SASExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.sas7bdat",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/metadata/customer.json",
"schemaView": "customer_schema",
"options": {
"maxSplitSize": 100000000
}
}
StatisticsExtract
Since: 3.5.0 - Supports Streaming: True
The StatisticsExtract
stage extracts the column statistics from to an input Dataframe
and returns a DataFrame
.
It differs from the Spark inbuilt summary
by:
- operates on all data types.
- returns row-based data rather than column-based (i.e. each input column is one row in the output)
count_distinct
andnull_count
additional metrics
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
approximate | Boolean | false | Whether to calculate approximate statistics or full population based statistics. Calculating population based statistics is a computationally and memory intenstive operation and may result in very long runtime or exceed memory limits.Default: true . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
histogram | Boolean | false | Whether to calculate distribution statistics (25% , 50% , 75% ).Default: false . |
id | String | false | A optional unique identifier for this stage. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
hllRelativeSD | Double | false | The maximum relative standard deviation for the distinct_count output variable. Smaller values will provide greater precision at the expense of runtime.Default: 0.05 . |
Examples
Minimal
{
"type": "StatisticsExtract",
"name": "extract column statistics from customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_statistics"
}
Complete
{
"type": "StatisticsExtract",
"name": "extract column statistics from customer view",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputView": "customer_statistics",
"approximate": true,
"persist": true
}
TextExtract
Since: 1.2.0 - Supports Streaming: True
The TextExtract
stage reads either one or more text files and returns a DataFrame
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true* | Name of the incoming Spark dataset containing a list of URI/Globs to extract from. If not present inputURI is requred. |
inputURI | URI | true* | URI/Glob of the input text files. If not present inputView is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
basePath | URI | false | The base path that partition discovery should start with. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
multiLine | Boolean | false | Whether to load the file as a single record or as individual records split by newline. Default: false . |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
|
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
watermark | Object | false | A structured streaming watermark object. Requires eventTime and delayThreshold attributes. |
Examples
Minimal
{
"type": "TextExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.txt",
"outputView": "customer"
}
Complete
{
"type": "TextExtract",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.txt",
"inputView": "list_of_text_files_to_extract",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"multiLine": true,
"numPartitions": 10,
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"basePath": "hdfs://datalake/customer/",
"watermark": {
"eventTime": "timestamp",
"delayThreshold": "10 minutes"
}
}
XMLExtract
Since: 1.0.0 - Supports Streaming: False
The XMLExtract
stage reads one or more XML files or an input Dataset[String]
and returns a DataFrame
.
This extract works slightly different to the spark-xml
package. To access the data you can use a SQLTransform query like this which will create a new value for each row of the bk:books
array:
SELECT EXPLODE(`bk:books`).*
FROM books_xml
The backtick character (`) can be used to address fields with non-alphanumeric names.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true* | URI/Glob of the input delimited XML files. If not present inputView is requred. |
inputView | String | true* | Name of the incoming Spark dataset. If not present inputURI is requred. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
contiguousIndex | Boolean | false | When loading a file two additional metadata fields are added to each record: _filename and _index (row number in the file). These fields are automatically included as they are very useful when trying to understand where certain data came from when consuming the data downstream.The computational cost of adding the _index column in a distributed execution engine like Spark means that sometimes it is not worth the time/expense of precisely resolving the row number. By setting contiguousIndex equal to false Spark will include a different field _monotonically_increasing_id which is a non-sequential/non-contiguous identifier from which _index can be derived later but will not incur the same cost penalty of resolving _index .Default: true. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
inputField | String | false | If using inputView this option allows you to specify the name of the field which contains the XML data. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
schemaURI | URI | false | Used for multiple purposes:
Additionally, by specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. |
schemaView | String | false | Similar to schemaURI but allows the schema to be passed in as another DataFrame . |
xsdURI | URI | false | URI of an XML Schema Definition (XSD) file used to validate input XML. |
Examples
Minimal
{
"type": "XMLExtract",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/xml/*.xml",
"outputView": "customer"
}
Complete
{
"type": "XMLExtract",
"name": "load customers",
"description": "load customers",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/customer/*.xml",
"xsdURI": "hdfs://datalake/xml/customer.xsd",
"outputView": "customer",
"authentication": {},
"contiguousIndex": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"persist": false,
"schemaURI": "hdfs://datalake/schema/customer.json",
"schemaView": "customer_schema",
"inputView": "customer_xml",
"inputField": "xml"
}