Load
*Load
stages write out Spark datasets
to a database or file system.
*Load
stages should meet this criteria:
- Take in a single
dataset
. - Perform target specific validation that the dataset has been written correctly.
AvroLoad
Since: 1.0.0 - Supports Streaming: False
The AvroLoad
writes an input DataFrame
to a target Apache Avro file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Avro file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "AvroLoad",
"name": "write customer avro extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.avro"
}
Complete
{
"type": "AvroLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer avro extract",
"description": "write customer avro extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.avro",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
BigQueryLoad
Supports Streaming: False
Plugin
The BigQueryLoad
is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.
The BigQueryLoad
stage writes an input DataFrame
to a target BigQuery table.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
table | String | true | The BigQuery table in the format [[project:]dataset.]table. |
temporaryGcsBucket | String | true | The GCS bucket that temporarily holds the data before it is loaded to BigQuery. |
allowFieldAddition | String | false | Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob.Default: false . |
allowFieldRelaxation | String | false | Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob.Default: false . |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
clusteredFields | String | false | Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables. |
createDisposition | String | false | Specifies whether the job is allowed to create new tables. Either CREATE_IF_NEEDED or CREATE_NEVER .Default: CREATE_IF_NEEDED . |
dataset | String | false* | The dataset containing the table. Required if omitted in table. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
parentProject | String | false | The Google Cloud Project ID of the table to bill for the export. Defaults to the project of the Service Account being used. |
partitionExpirationMs | Integer | false | Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value. |
partitionField | String | false | If field is specified together with partitionType , the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. |
project | String | false | The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "BigQueryLoad",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputView": "customer",
"table": "dataset.customer",
"temporaryGcsBucket": "bucket-name"
}
Complete
{
"type": "BigQueryLoad",
"name": "load customers",
"environments": [
"production",
"test"
],
"inputView": "customer",
"table": "customer",
"dataset": "dataset",
"temporaryGcsBucket": "bucket-name",
"allowFieldAddition": false,
"allowFieldRelaxation": false,
"clusteredFields": "field0,field1",
"createDisposition": "CREATE_IF_NEEDED",
"parentProject": "parent-project",
"project": "project",
"partitionExpirationMs": 525600000,
"partitionField": "load_date",
"saveMode": "Overwrite"
}
CassandraLoad
Since: 2.0.0 - Supports Streaming: False
Plugin
The CassandraLoad
is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.
The CassandraLoad
writes an input DataFrame
to a target Cassandra cluster.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
keyspace | String | true | The name of the Cassandra keyspace to write to. |
table | String | true | The name of the Cassandra table to write to. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "CassandraLoad",
"name": "write",
"environments": [
"production",
"test"
],
"inputView": "customer",
"keyspace": "default",
"table": "customer"
}
Complete
{
"type": "CassandraLoad",
"name": "write",
"environments": [
"production",
"test"
],
"inputView": "customer",
"keyspace": "default",
"table": "customer"
"saveMode": "Overwrite",
"params": {
"confirm.truncate": "true",
"spark.cassandra.connection.host": "cassandra"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
ConsoleLoad
Since: 1.2.0 - Supports Streaming: True
The ConsoleLoad
prints an input streaming DataFrame
the console.
This stage has been included for testing Structured Streaming jobs as it can be very difficult to debug. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment
is set to test
).
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
outputMode | String | false | The output mode of the console writer. Allowed values Append , Complete , Update . See Output Modes for full details.Default: Append |
Examples
Minimal
{
"type": "ConsoleLoad",
"name": "write a streaming dataset to console",
"environments": [
"production",
"test"
],
"inputView": "customer"
}
Complete
{
"type": "ConsoleLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write a streaming dataset to console",
"description": "write a streaming dataset to console",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputMode": "Append"
}
DeltaLakeLoad
Since: 2.0.0 - Supports Streaming: True
Plugin
The DeltaLakeLoad
is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.
The DeltaLakeLoad
writes an input DataFrame
to a target DeltaLake file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Delta file to write to. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
options | Map[String, String] | false | A set of optional parameters like replaceWhere . |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
generateSymlinkManifest | Boolean | false | Create a _symlink_format_manifest file so that the DeltaLakeLoad output can be read by other tools like a Presto database.Default: true |
Examples
Minimal
{
"type": "DeltaLakeLoad",
"name": "write customer Delta extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "/delta/customers"
}
Complete
{
"type": "DeltaLakeLoad",
"name": "write customer Delta extract",
"description": "write customer Delta extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "/delta/customers",
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite",
"generateSymlinkManifest": true
}
DeltaLakeMergeLoad
Since: arc-deltalake-pipeline-plugin 1.7.0 - Supports Streaming: True
Plugin
The DeltaLakeMergeLoad
is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.
NOTE: This stage includes additional functionality that is not included in the main DeltaLake functionality. A pull request has been raised.
The DeltaLakeMergeLoad
writes an input DataFrame
to a target DeltaLake file using the MERGE
functionality.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Delta file to write to. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
condition | String | true | The join condition to perform the data comparison between the source (the inputView dataset) and target (the outputURI dataset). Note that the names source and target must be used. |
createTableIfNotExists | Boolean | false | Create an initial DeltaLake table if one does not already exist.Default: false |
id | String | false | A optional unique identifier for this stage. |
generateSymlinkManifest | Boolean | false | Create a manifest file so that the DeltaLakeMergeLoad output can be read by a Presto database. Default: true |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. spark.databricks.delta.merge.repartitionBeforeWrite.enabled must be set for this to have effect. |
partitionBy | Array[String] | false | Columns to partition the data by. |
whenMatchedDeleteFirst | Boolean | false | If true the whenMatchedDelete operation will happen before whenMatchedUpdate .If false the whenMatchedUpdate operation will happen before whenMatchedDelete .Default: true . |
whenMatchedDelete | Map[String, String] | false | If specified, whenMatchedDelete will delete records where the record exists in both source and target based on the join condition .Optionally condition may be specified to restrict the records to delete and can only refer to fields in both source and target . |
whenMatchedUpdate | Map[String, Object] | false | If specified, whenMatchedUpdate will update records where the record exists in both source and target based on the join condition .Optionally condition may be specified to restrict the records to update and can only refer to fields in both source and target .Optionally values may be specified to define the update rules which can be used to update only selected columns. |
whenNotMatchedByTargetInsert | Map[String, Object] | false | If specified, whenNotMatchedByTargetInsert will insert records in source which do not exist in target based on the join condition .Optionally condition may be specified to restrict the records to insert but can only refer to fields in source .Optionally values may be specified to define the insert rules which can be used to insert only selected columns. |
whenNotMatchedBySourceDelete | Map[String, Object] | false | If specified, whenNotMatchedBySourceDelete will delete records in target which do not exist in source based on the join condition .Optionally condition may be specified to restrict the records to insert but can only refer to fields in source . |
Examples
Minimal
{
"type": "DeltaLakeMergeLoad",
"name": "merge with existing customer dataset",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "/delta/customers",
"condition": "source.customerId = target.customerId",
"whenNotMatchedByTargetInsert": {},
"whenNotMatchedBySourceDelete": {}
}
Complete
{
"type": "DeltaLakeMergeLoad",
"name": "merge with existing customer dataset",
"description": "merge with existing customer dataset",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "/delta/customers",
"createTableIfNotExists": true,
"condition": "source.customerId = target.customerId",
"whenMatchedDeleteFirst": true,
"whenMatchedDelete": {
"condition": "source.customerDeleteFlag = TRUE"
},
"whenMatchedUpdate": {
"condition": "source.customerUpdateFlag = TRUE",
"values": {
"customerId": "source.customerId",
"customerLastUpdated": "source.customerUpdateTimestamp"
}
},
"whenNotMatchedByTargetInsert": {
"condition": "source.customerId != 'DUMMY'",
"values": {
"customerId": "source.customerId",
"customerLastUpdated": "source.customerInsertTimestamp"
}
},
"whenNotMatchedBySourceDelete": {
"condition": "target.locked = FALSE"
}
}
DelimitedLoad
Since: 1.0.0 - Supports Streaming: True
The DelimitedLoad
writes an input DataFrame
to a target delimited file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Delimited file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
customDelimiter | String | true* | A custom string to use as delimiter. Required if delimiter is set to Custom . |
delimiter | String | false | The type of delimiter in the file. Supported values: Comma , Pipe , DefaultHive . DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.Default: Comma . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
header | Boolean | false | Whether to write a header row. Default: false . |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
quote | String | false | The type of quoting in the file. Supported values: None , SingleQuote , DoubleQuote .Default: DoubleQuote . |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "DelimitedLoad",
"name": "write customer as csv",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.csv"
}
Complete
{
"type": "DelimitedLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer as csv",
"description": "write customer as csv",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.csv",
"authentication": {},
"delimiter": "Custom",
"customDelimiter": "#",
"header": true,
"numPartitions": 10,
"partitionBy": [
"country"
],
"quote": "DoubleQuote",
"saveMode": "Overwrite"
}
ElasticsearchLoad
Since: 1.9.0 - Supports Streaming: False
Plugin
The ElasticsearchLoad
is provided by the https://github.com/tripl-ai/arc-elasticsearch-pipeline-plugin package.
The ElasticsearchLoad
writes an input DataFrame
to a target Elasticsearch cluster.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
output | String | true | The name of the target Elasticsearch index. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
params | Map[String, String] | false | Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here. |
partitionBy | Array[String] | false | Columns to partition the data by. |
Examples
Minimal
{
"type": "ElasticsearchLoad",
"name": "write customer",
"environments": [
"production",
"test"
],
"output": "customer",
"inputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
}
}
Complete
{
"type": "ElasticsearchLoad",
"name": "write customer",
"environments": [
"production",
"test"
],
"output": "customer",
"inputView": "customer",
"params": {
"es.nodes": "<my>.elasticsearch.com",
"es.port": "443",
"es.nodes.wan.only": "true",
"es.net.ssl": "true"
},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
HTTPLoad
Since: 1.0.0 - Supports Streaming: True
The HTTPLoad
takes an input DataFrame
and executes a series of POST
requests against a remote HTTP service. The input to this stage needs to be a single column dataset of signature value: string
and is intended to be used after a JSONTransform stage which would prepare the data for sending to the external server.
In the future additional Transform stages (like ProtoBufTransform
) could be added to prepare binary payloads instead of just json
string
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the HTTP server. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
id | String | false | A optional unique identifier for this stage. |
validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202] . Note: all request response codes must be contained in this list for the stage to be successful. |
Examples
Minimal
{
"type": "HTTPLoad",
"name": "load customers to the customer api",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "http://internalserver/api/customer"
}
Complete
{
"type": "HTTPLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "load customers to the customer api",
"description": "load customers to the customer api",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "http://internalserver/api/customer",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"validStatusCodes": [
200,
201
]
}
JDBCLoad
Since: 1.0.0 - Supports Streaming: True
The JDBCLoad
writes an input DataFrame
to a target JDBC Database. See Spark JDBC documentation.
Whilst it is possible to use JDBCLoad
to create tables directly in the target database Spark only has a limited knowledge of the schema required in the destination database and so will translate things like StringType
internally to a TEXT
type in the target database (because internally Spark does not have limited length strings). The recommendation is to use a preceding JDBCExecute to execute a CREATE TABLE
statement which creates the intended schema then inserting into that table with saveMode
set to Append
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
jdbcURL | String | true | The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306 . |
tableName | String | true | The target JDBC table. Must be in database .schema .table format. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here. |
batchsize | Integer | false | The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. Default: 1000 . |
bulkload | Boolean | false | Whether to enable a bulk copy. This is currently only available for sqlserver targets but more targets can be added as drivers become available.Default: false . |
createTableColumnTypes | String | false | The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: “name CHAR(64), comments VARCHAR(1024) ”). The specified types should be valid spark sql data types. |
createTableOptions | String | false | This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB ). |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
isolationLevel | String | false | The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC’s Connection object, with default of READ_UNCOMMITTED. Please refer the documentation in java.sql.Connection. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
tablock | Boolean | false | When in bulkload mode whether to set TABLOCK on the driver.Default: true . |
truncate | Boolean | false | If using SaveMode equal to Overwrite , this additional option causes Spark to TRUNCATE TABLE of existing data instead of executing a DELETE FROM statement. |
Examples
Minimal
{
"type": "JDBCLoad",
"name": "write customer to postgres",
"environments": [
"production",
"test"
],
"inputView": "customer",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "mydatabase.myschema.customer",
"params": {
"user": "mydbuser",
"password": "mydbpassword"
}
}
Complete
{
"type": "JDBCLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer to postgres",
"description": "write customer to postgres",
"environments": [
"production",
"test"
],
"inputView": "customer",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer",
"tableName": "mydatabase.myschema.customer",
"batchsize": 10000,
"createTableColumnTypes": "name CHAR(64), comments VARCHAR(1024)",
"createTableOptions": "CREATE TABLE t (name string) ENGINE=InnoDB",
"isolationLevel": "READ_COMMITTED",
"numPartitions": 10,
"params": {
"user": "mydbuser",
"password": "mydbpassword"
},
"saveMode": "Append",
"tablock": false,
"truncate": false
}
JSONLoad
Since: 1.0.0 - Supports Streaming: True
The JSONLoad
writes an input DataFrame
to a target JSON file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Delimited file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "JSONLoad",
"name": "write customer json extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.json"
}
Complete
{
"type": "JSONLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer json extract",
"description": "write customer json extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.json",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
KafkaLoad
Since: 1.0.8 - Supports Streaming: True
Plugin
The KafkaLoad
is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.
The KafkaLoad
writes an input DataFrame
to a target Kafka topic
. The input to this stage needs to be a single column dataset of signature value: string
- intended to be used after a JSONTransform stage - or a two columns of signature key: string, value: string
which could be created by a SQLTransform stage.
In the future additional Transform stages (like ProtoBufTransform
) may be added to prepare binary payloads instead of just json
string
.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
bootstrapServers | String | true | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,... |
topic | String | true | The target Kafka topic. |
acks | Integer | false | The number of acknowledgments the producer requires the leader to have received before considering a request complete. Alowed values: 1 : the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.0 : the job will not wait for any acknowledgment from the server at all.-1 : the leader will wait for the full set of in-sync replicas to acknowledge the record (safest).Default: 1 . |
batchSize | Integer | false | Number of records to send in single requet to reduce number of requests to Kafka. Default: 16384 . |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
retries | Integer | false | How many times to try to resend any record whose send fails with a potentially transient error. Default: 0 . |
Examples
Minimal
{
"type": "KafkaLoad",
"name": "write customer to kafka",
"environments": [
"production",
"test"
],
"inputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers"
}
Complete
{
"type": "KafkaLoad",
"name": "write customer to kafka",
"description": "write customer to kafka",
"environments": [
"production",
"test"
],
"inputView": "customer",
"bootstrapServers": "kafka:29092",
"topic": "customers",
"acks": 1,
"batchSize": 16384,
"numPartitions": 10,
"retries": 0
}
MongoDBLoad
Since: 2.0.0 - Supports Streaming: False
Plugin
The MongoDBLoad
is provided by the https://github.com/tripl-ai/arc-mongo-pipeline-plugin package.
The MongoDBLoad
writes an input DataFrame
to a target MongoDB collection.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
options | Map[String, String] | false | Map of configuration parameters. These parameters are used to provide database connection/collection details. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "MongoDBLoad",
"name": "load customers",
"environments": [
"production",
"test"
],
"options": {
"uri": "mongodb://username:password@mongo:27017",
"database": "local",
"collection": "customer"
},
"inputView": "customers"
}
Complete
{
"type": "MongoDBLoad",
"name": "load customers",
"environments": [
"production",
"test"
],
"options": {
"uri": "mongodb://username:password@mongo:27017",
"database": "local",
"collection": "customer"
},
"inputView": "customers",
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
ORCLoad
Since: 1.0.0 - Supports Streaming: True
The ORCLoad
writes an input DataFrame
to a target Apache ORC file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the ORC file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "ORCLoad",
"name": "write customer ORC extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.orc"
}
Complete
{
"type": "ORCLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer ORC extract",
"description": "write customer ORC extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.orc",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
ParquetLoad
Since: 1.0.0 - Supports Streaming: True
The ParquetLoad
writes an input DataFrame
to a target Apache Parquet file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Parquet file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
Examples
Minimal
{
"type": "ParquetLoad",
"name": "write customer Parquet extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.parquet"
}
Complete
{
"type": "ParquetLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer Parquet extract",
"description": "write customer Parquet extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.parquet",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite"
}
TextLoad
Since: 1.9.0 - Supports Streaming: False
The TextLoad
writes an input DataFrame
to a target text file. It requires the inputView
to be a single column of data so a preprocessing step in something like a SQLTransform
or JSONTransform
is required.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the Parquet file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
singleFile | Boolean | false | Write in single file mode instead of a directory containing one or more partitions. Accepts datasets with either [value: string] , [value: string, filename: string] or [value: string, filename: string, index: integer] schema. If filename is supplied this component will write to one or more files. If index is supplied this component will order the records in each filename by index before writing. |
prefix | String | false | A string to append before the row data when in singleFile mode. |
separator | String | false | A separator string to append between the row data when in singleFile mode. Most common use will be \n which will insert newlines. |
suffix | String | false | A string to append after the row data when in singleFile mode. |
Examples
Minimal
{
"type": "TextLoad",
"name": "write customer Text extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.text"
}
Complete
{
"type": "TextLoad",
"id": "00000000-0000-0000-0000-000000000000",
"name": "write customer Text extract",
"description": "write customer text extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.text",
"authentication": {},
"numPartitions": 10,
"saveMode": "Overwrite",
"singleFile": true,
"prefix": "[",
"separator": ",\n",
"suffix": "]"
}
XMLLoad
Since: 1.0.0 - Supports Streaming: False
The XMLLoad
writes an input DataFrame
to a target XML file.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
outputURI | URI | true | URI of the XML file to write to. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
numPartitions | Integer | false | The number of partitions that will be used for controlling parallelism. |
partitionBy | Array[String] | false | Columns to partition the data by. |
saveMode | String | false | The mode for writing the output file to describe how errors are handled. Available options are: Append , ErrorIfExists , Ignore , Overwrite . Default is Overwrite if not specified. |
singleFile | Boolean | false | Write in single file mode instead of a directory containing one or more partitions. Accepts datasets with either [value: string] , [value: string, filename: string] or [value: string, filename: string, index: integer] schema. If filename is supplied this component will write to one or more files. If index is supplied this component will order the records in each filename by index before writing. |
prefix | String | false | A string to append before the row data when in singleFile mode. Useful for specifying the encoding <?xml version="1.0" encoding="UTF-8"?> . |
Examples
The XML writer uses reserved keywords to be able to set attributes as well as values. The _VALUE
value is reserved to define values when attributes which are prefixed with an underscore (_
) exist:
SELECT
NAMED_STRUCT(
'_VALUE', NAMED_STRUCT(
'child0', 0,
'child1', NAMED_STRUCT(
'nested0', 0,
'nested1', 'nestedvalue'
)
),
'_attributeName', 'attributeValue'
) AS Document
Results in:
<Document attributeName="attributeValue">
<child0>1</child0>
<child1>
<nested0>1</nested0>
<nested1>nestedvalue</nested1>
</child1>
</Document>
Minimal
{
"type": "XMLLoad",
"name": "write customer XML extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.xml"
}
Complete
{
"type": "XMLLoad",
"name": "write customer XML extract",
"description": "write customer XML extract",
"environments": [
"production",
"test"
],
"inputView": "customer",
"outputURI": "hdfs://output_data/customer/customer.xml",
"authentication": {},
"numPartitions": 10,
"partitionBy": [
"country"
],
"saveMode": "Overwrite",
"singleFile": true,
"prefix": "<?xml version="1.0" encoding="UTF-8"?>\n"
}