Load

*Load stages write out Spark datasets to a database or file system.

*Load stages should meet this criteria:

  • Take in a single dataset.
  • Perform target specific validation that the dataset has been written correctly.

AvroLoad

Since: 1.0.0 - Supports Streaming: False

The AvroLoad writes an input DataFrame to a target Apache Avro file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Avro file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "AvroLoad",
  "name": "write customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.avro"
}

Complete

{
  "type": "AvroLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer avro extract",
  "description": "write customer avro extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.avro",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

BigQueryLoad

Supports Streaming: False

Plugin

The BigQueryLoad is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.

The BigQueryLoad stage writes an input DataFrame to a target BigQuery table.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
table String true The BigQuery table in the format [[project:]dataset.]table.
temporaryGcsBucket String true The GCS bucket that temporarily holds the data before it is loaded to BigQuery.
allowFieldAddition String false Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob.

Default: false.
allowFieldRelaxation String false Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob.

Default: false.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
clusteredFields String false Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables.
createDisposition String false Specifies whether the job is allowed to create new tables. Either CREATE_IF_NEEDED or CREATE_NEVER.

Default: CREATE_IF_NEEDED.
dataset String false* The dataset containing the table. Required if omitted in table.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
parentProject String false The Google Cloud Project ID of the table to bill for the export. Defaults to the project of the Service Account being used.
partitionExpirationMs Integer false Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
partitionField String false If field is specified together with partitionType, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field.
project String false The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "BigQueryLoad",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "table": "dataset.customer",
  "temporaryGcsBucket": "bucket-name"
}

Complete

{
  "type": "BigQueryLoad",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "table": "customer",
  "dataset": "dataset",
  "temporaryGcsBucket": "bucket-name",
  "allowFieldAddition": false,
  "allowFieldRelaxation": false,
  "clusteredFields": "field0,field1",
  "createDisposition": "CREATE_IF_NEEDED",
  "parentProject": "parent-project",
  "project": "project",
  "partitionExpirationMs": 525600000,
  "partitionField": "load_date",
  "saveMode": "Overwrite"
}

CassandraLoad

Since: 2.0.0 - Supports Streaming: False

Plugin

The CassandraLoad is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.

The CassandraLoad writes an input DataFrame to a target Cassandra cluster.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
keyspace String true The name of the Cassandra keyspace to write to.
table String true The name of the Cassandra table to write to.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "CassandraLoad",
  "name": "write",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "keyspace": "default",
  "table": "customer"
}

Complete

{
  "type": "CassandraLoad",
  "name": "write",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "keyspace": "default",
  "table": "customer"
  "saveMode": "Overwrite",
  "params": {
    "confirm.truncate": "true",
    "spark.cassandra.connection.host": "cassandra"
  },
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

ConsoleLoad

Since: 1.2.0 - Supports Streaming: True

The ConsoleLoad prints an input streaming DataFrame the console.

This stage has been included for testing Structured Streaming jobs as it can be very difficult to debug. Generally this stage would only be included when Arc is run in a test mode (i.e. the environment is set to test).

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
outputMode String false The output mode of the console writer. Allowed values Append, Complete, Update. See Output Modes for full details.

Default: Append

Examples

Minimal

{
  "type": "ConsoleLoad",
  "name": "write a streaming dataset to console",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer"
}

Complete

{
  "type": "ConsoleLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write a streaming dataset to console",
  "description": "write a streaming dataset to console",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputMode": "Append"
}

DeltaLakeLoad

Since: 2.0.0 - Supports Streaming: True

Plugin

The DeltaLakeLoad is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.

The DeltaLakeLoad writes an input DataFrame to a target DeltaLake file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Delta file to write to.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
options Map[String, String] false A set of optional parameters like replaceWhere.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.
generateSymlinkManifest Boolean false Create a _symlink_format_manifest file so that the DeltaLakeLoad output can be read by other tools like a Presto database.

Default: true

Examples

Minimal

{
  "type": "DeltaLakeLoad",
  "name": "write customer Delta extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "/delta/customers"
}

Complete

{
  "type": "DeltaLakeLoad",
  "name": "write customer Delta extract",
  "description": "write customer Delta extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "/delta/customers",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite",
  "generateSymlinkManifest": true
}

DeltaLakeMergeLoad

Since: arc-deltalake-pipeline-plugin 1.7.0 - Supports Streaming: True

Plugin

The DeltaLakeMergeLoad is provided by the https://github.com/tripl-ai/arc-deltalake-pipeline-plugin package.

NOTE: This stage includes additional functionality that is not included in the main DeltaLake functionality. A pull request has been raised.

The DeltaLakeMergeLoad writes an input DataFrame to a target DeltaLake file using the MERGE functionality.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Delta file to write to.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
condition String true The join condition to perform the data comparison between the source (the inputView dataset) and target (the outputURI dataset). Note that the names source and target must be used.
createTableIfNotExists Boolean false Create an initial DeltaLake table if one does not already exist.

Default: false
id String false A optional unique identifier for this stage.
generateSymlinkManifest Boolean false Create a manifest file so that the DeltaLakeMergeLoad output can be read by a Presto database.

Default: true
numPartitions Integer false The number of partitions that will be used for controlling parallelism. spark.databricks.delta.merge.repartitionBeforeWrite.enabled must be set for this to have effect.
partitionBy Array[String] false Columns to partition the data by.
whenMatchedDeleteFirst Boolean false If true the whenMatchedDelete operation will happen before whenMatchedUpdate.

If false the whenMatchedUpdate operation will happen before whenMatchedDelete.

Default: true.
whenMatchedDelete Map[String, String] false If specified, whenMatchedDelete will delete records where the record exists in both source and target based on the join condition.

Optionally condition may be specified to restrict the records to delete and can only refer to fields in both source and target.
whenMatchedUpdate Map[String, Object] false If specified, whenMatchedUpdate will update records where the record exists in both source and target based on the join condition.

Optionally condition may be specified to restrict the records to update and can only refer to fields in both source and target.

Optionally values may be specified to define the update rules which can be used to update only selected columns.
whenNotMatchedByTargetInsert Map[String, Object] false If specified, whenNotMatchedByTargetInsert will insert records in source which do not exist in target based on the join condition.

Optionally condition may be specified to restrict the records to insert but can only refer to fields in source.

Optionally values may be specified to define the insert rules which can be used to insert only selected columns.
whenNotMatchedBySourceDelete Map[String, Object] false If specified, whenNotMatchedBySourceDelete will delete records in target which do not exist in source based on the join condition.

Optionally condition may be specified to restrict the records to insert but can only refer to fields in source.

Examples

Minimal

{
  "type": "DeltaLakeMergeLoad",
  "name": "merge with existing customer dataset",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "/delta/customers",
  "condition": "source.customerId = target.customerId",
  "whenNotMatchedByTargetInsert": {},
  "whenNotMatchedBySourceDelete": {}
}

Complete

{
  "type": "DeltaLakeMergeLoad",
  "name": "merge with existing customer dataset",
  "description": "merge with existing customer dataset",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "/delta/customers",
  "createTableIfNotExists": true,
  "condition": "source.customerId = target.customerId",
  "whenMatchedDeleteFirst": true,
  "whenMatchedDelete": {
    "condition": "source.customerDeleteFlag = TRUE"
  },
  "whenMatchedUpdate": {
    "condition": "source.customerUpdateFlag = TRUE",
    "values": {
      "customerId": "source.customerId",
      "customerLastUpdated": "source.customerUpdateTimestamp"
    }
  },
  "whenNotMatchedByTargetInsert": {
    "condition": "source.customerId != 'DUMMY'",
    "values": {
      "customerId": "source.customerId",
      "customerLastUpdated": "source.customerInsertTimestamp"
    }
  },
  "whenNotMatchedBySourceDelete": {
    "condition": "target.locked = FALSE"
  }
}

DelimitedLoad

Since: 1.0.0 - Supports Streaming: True

The DelimitedLoad writes an input DataFrame to a target delimited file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Delimited file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
customDelimiter String true* A custom string to use as delimiter. Required if delimiter is set to Custom.
delimiter String false The type of delimiter in the file. Supported values: Comma, Pipe, DefaultHive. DefaultHive is ASCII character 1, the default delimiter for Apache Hive extracts.

Default: Comma.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
header Boolean false Whether to write a header row.

Default: false.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
quote String false The type of quoting in the file. Supported values: None, SingleQuote, DoubleQuote.

Default: DoubleQuote.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "DelimitedLoad",
  "name": "write customer as csv",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.csv"
}

Complete

{
  "type": "DelimitedLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer as csv",
  "description": "write customer as csv",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.csv",
  "authentication": {},
  "delimiter": "Custom",
  "customDelimiter": "#",
  "header": true,
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "quote": "DoubleQuote",
  "saveMode": "Overwrite"
}

ElasticsearchLoad

Since: 1.9.0 - Supports Streaming: False

Plugin

The ElasticsearchLoad is provided by the https://github.com/tripl-ai/arc-elasticsearch-pipeline-plugin package.

The ElasticsearchLoad writes an input DataFrame to a target Elasticsearch cluster.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
output String true The name of the target Elasticsearch index.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
params Map[String, String] false Map of configuration parameters. Parameters for connecting to the Elasticsearch cluster are detailed here.
partitionBy Array[String] false Columns to partition the data by.

Examples

Minimal

{
  "type": "ElasticsearchLoad",
  "name": "write customer",
  "environments": [
    "production",
    "test"
  ],
  "output": "customer",
  "inputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  }
}

Complete

{
  "type": "ElasticsearchLoad",
  "name": "write customer",
  "environments": [
    "production",
    "test"
  ],
  "output": "customer",
  "inputView": "customer",
  "params": {
    "es.nodes": "<my>.elasticsearch.com",
    "es.port": "443",
    "es.nodes.wan.only": "true",
    "es.net.ssl": "true"
  },
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

HTTPLoad

Since: 1.0.0 - Supports Streaming: True

The HTTPLoad takes an input DataFrame and executes a series of POST requests against a remote HTTP service. The input to this stage needs to be a single column dataset of signature value: string and is intended to be used after a JSONTransform stage which would prepare the data for sending to the external server.

In the future additional Transform stages (like ProtoBufTransform) could be added to prepare binary payloads instead of just json string.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the HTTP server.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
id String false A optional unique identifier for this stage.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202]. Note: all request response codes must be contained in this list for the stage to be successful.

Examples

Minimal

{
  "type": "HTTPLoad",
  "name": "load customers to the customer api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "http://internalserver/api/customer"
}

Complete

{
  "type": "HTTPLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "load customers to the customer api",
  "description": "load customers to the customer api",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "http://internalserver/api/customer",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "validStatusCodes": [
    200,
    201
  ]
}

JDBCLoad

Since: 1.0.0 - Supports Streaming: True

The JDBCLoad writes an input DataFrame to a target JDBC Database. See Spark JDBC documentation.

Whilst it is possible to use JDBCLoad to create tables directly in the target database Spark only has a limited knowledge of the schema required in the destination database and so will translate things like StringType internally to a TEXT type in the target database (because internally Spark does not have limited length strings). The recommendation is to use a preceding JDBCExecute to execute a CREATE TABLE statement which creates the intended schema then inserting into that table with saveMode set to Append.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
jdbcURL String true The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306.
tableName String true The target JDBC table. Must be in database.schema.table format.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here.
batchsize Integer false The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers.

Default: 1000.
bulkload Boolean false Whether to enable a bulk copy. This is currently only available for sqlserver targets but more targets can be added as drivers become available.

Default: false.
createTableColumnTypes String false The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: “name CHAR(64), comments VARCHAR(1024)”). The specified types should be valid spark sql data types.
createTableOptions String false This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB).
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
isolationLevel String false The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC’s Connection object, with default of READ_UNCOMMITTED. Please refer the documentation in java.sql.Connection.
numPartitions Integer false The number of partitions that will be used for controlling parallelism. This also determines the maximum number of concurrent JDBC connections.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.
tablock Boolean false When in bulkload mode whether to set TABLOCK on the driver.

Default: true.
truncate Boolean false If using SaveMode equal to Overwrite, this additional option causes Spark to TRUNCATE TABLE of existing data instead of executing a DELETE FROM statement.

Examples

Minimal

{
  "type": "JDBCLoad",
  "name": "write customer to postgres",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "mydatabase.myschema.customer",
  "params": {
    "user": "mydbuser",
    "password": "mydbpassword"
  }
}

Complete

{
  "type": "JDBCLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer to postgres",
  "description": "write customer to postgres",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer",
  "tableName": "mydatabase.myschema.customer",
  "batchsize": 10000,
  "createTableColumnTypes": "name CHAR(64), comments VARCHAR(1024)",
  "createTableOptions": "CREATE TABLE t (name string) ENGINE=InnoDB",
  "isolationLevel": "READ_COMMITTED",
  "numPartitions": 10,
  "params": {
    "user": "mydbuser",
    "password": "mydbpassword"
  },
  "saveMode": "Append",
  "tablock": false,
  "truncate": false
}

JSONLoad

Since: 1.0.0 - Supports Streaming: True

The JSONLoad writes an input DataFrame to a target JSON file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Delimited file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "JSONLoad",
  "name": "write customer json extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.json"
}

Complete

{
  "type": "JSONLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer json extract",
  "description": "write customer json extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.json",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

KafkaLoad

Since: 1.0.8 - Supports Streaming: True

Plugin

The KafkaLoad is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.

The KafkaLoad writes an input DataFrame to a target Kafka topic. The input to this stage needs to be a single column dataset of signature value: string - intended to be used after a JSONTransform stage - or a two columns of signature key: string, value: string which could be created by a SQLTransform stage.

In the future additional Transform stages (like ProtoBufTransform) may be added to prepare binary payloads instead of just json string.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
bootstrapServers String true A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,...
topic String true The target Kafka topic.
acks Integer false The number of acknowledgments the producer requires the leader to have received before considering a request complete.

Alowed values:
1: the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
0: the job will not wait for any acknowledgment from the server at all.
-1: the leader will wait for the full set of in-sync replicas to acknowledge the record (safest).

Default: 1.
batchSize Integer false Number of records to send in single requet to reduce number of requests to Kafka.

Default: 16384.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
retries Integer false How many times to try to resend any record whose send fails with a potentially transient error.

Default: 0.

Examples

Minimal

{
  "type": "KafkaLoad",
  "name": "write customer to kafka",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers"
}

Complete

{
  "type": "KafkaLoad",
  "name": "write customer to kafka",
  "description": "write customer to kafka",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "bootstrapServers": "kafka:29092",
  "topic": "customers",
  "acks": 1,
  "batchSize": 16384,
  "numPartitions": 10,
  "retries": 0
}

MongoDBLoad

Since: 2.0.0 - Supports Streaming: False

Plugin

The MongoDBLoad is provided by the https://github.com/tripl-ai/arc-mongo-pipeline-plugin package.

The MongoDBLoad writes an input DataFrame to a target MongoDB collection.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
options Map[String, String] false Map of configuration parameters. These parameters are used to provide database connection/collection details.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "MongoDBLoad",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "options": {
    "uri": "mongodb://username:password@mongo:27017",
    "database": "local",
    "collection": "customer"
  },
  "inputView": "customers"
}

Complete

{
  "type": "MongoDBLoad",
  "name": "load customers",
  "environments": [
    "production",
    "test"
  ],
  "options": {
    "uri": "mongodb://username:password@mongo:27017",
    "database": "local",
    "collection": "customer"
  },
  "inputView": "customers",
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

ORCLoad

Since: 1.0.0 - Supports Streaming: True

The ORCLoad writes an input DataFrame to a target Apache ORC file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the ORC file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "ORCLoad",
  "name": "write customer ORC extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.orc"
}

Complete

{
  "type": "ORCLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer ORC extract",
  "description": "write customer ORC extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.orc",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

ParquetLoad

Since: 1.0.0 - Supports Streaming: True

The ParquetLoad writes an input DataFrame to a target Apache Parquet file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Parquet file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.

Examples

Minimal

{
  "type": "ParquetLoad",
  "name": "write customer Parquet extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.parquet"
}

Complete

{
  "type": "ParquetLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer Parquet extract",
  "description": "write customer Parquet extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.parquet",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite"
}

TextLoad

Since: 1.9.0 - Supports Streaming: False

The TextLoad writes an input DataFrame to a target text file. It requires the inputView to be a single column of data so a preprocessing step in something like a SQLTransform or JSONTransform is required.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the Parquet file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.
singleFile Boolean false Write in single file mode instead of a directory containing one or more partitions. Accepts datasets with either [value: string], [value: string, filename: string] or [value: string, filename: string, index: integer] schema. If filename is supplied this component will write to one or more files. If index is supplied this component will order the records in each filename by index before writing.
prefix String false A string to append before the row data when in singleFile mode.
separator String false A separator string to append between the row data when in singleFile mode. Most common use will be \n which will insert newlines.
suffix String false A string to append after the row data when in singleFile mode.

Examples

Minimal

{
  "type": "TextLoad",
  "name": "write customer Text extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.text"
}

Complete

{
  "type": "TextLoad",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "write customer Text extract",
  "description": "write customer text extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.text",
  "authentication": {},
  "numPartitions": 10,
  "saveMode": "Overwrite",
  "singleFile": true,
  "prefix": "[",
  "separator": ",\n",
  "suffix": "]"
}

XMLLoad

Since: 1.0.0 - Supports Streaming: False

The XMLLoad writes an input DataFrame to a target XML file.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
outputURI URI true URI of the XML file to write to.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
numPartitions Integer false The number of partitions that will be used for controlling parallelism.
partitionBy Array[String] false Columns to partition the data by.
saveMode String false The mode for writing the output file to describe how errors are handled. Available options are: Append, ErrorIfExists, Ignore, Overwrite. Default is Overwrite if not specified.
singleFile Boolean false Write in single file mode instead of a directory containing one or more partitions. Accepts datasets with either [value: string], [value: string, filename: string] or [value: string, filename: string, index: integer] schema. If filename is supplied this component will write to one or more files. If index is supplied this component will order the records in each filename by index before writing.
prefix String false A string to append before the row data when in singleFile mode. Useful for specifying the encoding <?xml version="1.0" encoding="UTF-8"?>.

Examples

The XML writer uses reserved keywords to be able to set attributes as well as values. The _VALUE value is reserved to define values when attributes which are prefixed with an underscore (_) exist:

SELECT
  NAMED_STRUCT(
    '_VALUE', NAMED_STRUCT(
      'child0', 0,
      'child1', NAMED_STRUCT(
        'nested0', 0,
        'nested1', 'nestedvalue'
      )
    ),
    '_attributeName', 'attributeValue'
  ) AS Document

Results in:

<Document attributeName="attributeValue">
  <child0>1</child0>
  <child1>
    <nested0>1</nested0>
    <nested1>nestedvalue</nested1>
  </child1>
</Document>

Minimal

{
  "type": "XMLLoad",
  "name": "write customer XML extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.xml"
}

Complete

{
  "type": "XMLLoad",
  "name": "write customer XML extract",
  "description": "write customer XML extract",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "outputURI": "hdfs://output_data/customer/customer.xml",
  "authentication": {},
  "numPartitions": 10,
  "partitionBy": [
    "country"
  ],
  "saveMode": "Overwrite",
  "singleFile": true,
  "prefix": "<?xml version="1.0" encoding="UTF-8"?>\n"
}