Execute
*Execute
stages are used to execute arbitrary commands against external systems such as Databases and APIs.
BigQueryExecute
Supports Streaming: False
Plugin
The BigQueryExecute
is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.
The BigQueryExecute
executes a SQL statement against BigQuery.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
jobName | String | false | BigQuery Job name useful for identifying events in log messages. |
location | String | false | Location in which to invoke the BigQuery job. |
projectId | String | false | The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "BigQueryExecute",
"name": "execute on bigquery",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://customer.sql",
}
Complete
{
"type": "BigQueryExecute",
"name": "execute the load date table",
"description": "execute the load date table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
"authentication": {},
"sqlParams": {
"current_timestamp": "2018-11-24 14:48:56"
}
}
CassandraExecute
Since: 2.0.0 - Supports Streaming: False
Plugin
The CassandraExecute
is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.
The CassandraExecute
executes a CQL statement against an external Cassandra cluster.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI of the input file containing the CQL statement. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "CassandraExecute",
"name": "create table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/create_table.cql",
"params": {
"spark.cassandra.connection.host": "cassandra"
},
"sqlParams": {
"keyspace": ${ETL_CONF_ENVIRONMENT}
}
}
Complete
{
"type": "CassandraExecute",
"name": "create table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/create_table.cql",
"params": {
"spark.cassandra.connection.host": "cassandra"
},
"sqlParams": {
"keyspace": ${ETL_CONF_ENVIRONMENT}
}
}
ConfigExecute
Since: 3.4.0 - Supports Streaming: True
The ConfigExecute
takes an input SQL statement which must return a string
formatted JSON
object allowing runtime creation of job configuration substitution values. ConfigExecute
is intended to be used with Dynamic Variables to allow the creation of variables reliant on runtime data.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
outputView | String | true | Name of outgoing Spark dataset after processing. |
persist | Boolean | false | Whether to persist dataset to Spark cache. Will also log row count. Default: false . |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "ConfigExecute",
"name": "calculate dynamic variable",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/calculate_dynamic_variable.sql"
}
Complete
{
"type": "ConfigExecute",
"id": "00000000-0000-0000-0000-000000000000",
"name": "calculate dynamic variable",
"description": "calculate dynamic variable",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/calculate_dynamic_variable.sql",
"sql": "SELECT TO_JSON(NAMED_STRUCT('ETL_CONF_DYNAMIC', 'SELECT MAKE_DATE(2016,12,18) AS date'))"
"authentication": {},
"sqlParams": {
}
}
ControlFlowExecute
Since: 3.6.0 - Supports Streaming: True
The ControlFlowExecute
provides a way to conditionally exiting a job partway through execution and return success. This functionality is intended to allow work avoidance i.e. not executing the certain stages if the work has already been done or is not required. This stage must return [Boolean, Option[String]] and will not run the subsequent job stages if the first return value is false
or continue as normal if true
.
It requires the the ControlFlow plugin to be explicitly enabled to have any effect.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
key | Strong | false | The name of the key that carries the result of this execution to the ControlFlow plugin. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "ControlFlowExecute",
"name": "determine whether to execute remainder of job",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/control_flow.sql"
}
Complete
{
"type": "ControlFlowExecute",
"name": "determine whether to execute remainder of job",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/control_flow.sql"
"sql": "SELECT TRUE AS valid, 'this message will appear in logs' AS message",
"authentication": {},
"sqlParams": {
}
}
HTTPExecute
Since: 1.0.0 - Supports Streaming: False
The HTTPExecute
takes an input Map[String, String]
from the configuration and executes a POST
request against a remote HTTP service. This could be used to initialise another process that depends on the output of data pipeline.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
uri | URI | true | URI of the HTTP server. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
headers | Map[String, String] | false | HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers. |
id | String | false | A optional unique identifier for this stage. |
payloads | Map[String, String] | false | A set of Key/Value that will be encoded as JSON and send to the HTTP server. |
validStatusCodes | Array[Integer] | false | A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202] . |
Examples
Minimal
{
"type": "HTTPExecute",
"name": "notify the customer api of job completion",
"environments": [
"production",
"test"
],
"uri": "http://internalserver/api/customer"
}
Complete
{
"type": "HTTPExecute",
"id": "00000000-0000-0000-0000-000000000000",
"name": "notify the customer api of job completion",
"description": "notify the customer api of job completion",
"environments": [
"production",
"test"
],
"uri": "http://internalserver/api/customer",
"headers": {
"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
"custom-header": "payload"
},
"payloads": {
"jobName": "customer",
"jobStatus": "complete"
},
"validStatusCodes": [
200
]
}
JDBCExecute
Since: 1.0.0 - Supports Streaming: False
The JDBCExecute
executes a SQL statement against an external JDBC connection.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | true | URI of the input file containing the SQL statement. |
jdbcURL | String | true | The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306 . You may be required to set allowMultiQueries=true in the connection string to execute multiple statements. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
params | Map[String, String] | false | Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Examples
Minimal
{
"type": "JDBCExecute",
"name": "execute the load date table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer"
}
Complete
{
"type": "JDBCExecute",
"id": "00000000-0000-0000-0000-000000000000",
"name": "execute the load date table",
"description": "execute the load date table",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
"jdbcURL": "jdbc:postgresql://localhost:5432/customer?user=fred&password=secret&ssl=true",
"authentication": {},
"params": {},
"sqlParams": {
"current_timestamp": "2018-11-24 14:48:56"
}
}
KafkaCommitExecute
Since: 1.0.8 - Supports Streaming: False
Plugin
The KafkaCommitExecute
is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.
The KafkaCommitExecute
takes the resulting DataFrame
from a KafkaExtract stage and commits the offsets back to Kafka. This is used so that a user is able to perform a quasi-transaction by specifing a series of stages that must be succesfully executed prior to committing
the offset back to Kafka. To use this stage ensure that the autoCommit
option on the KafkaExtract stage is set to false
.
For example, if a job reads from a Kafka topic and writes the results to parquet
then it would be good to ensure the ParquetLoad stage had completed successfully before updating the offset in Kafka.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputView | String | true | Name of incoming Spark dataset. |
bootstrapServers | String | true | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,... |
groupID | String | true | A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
Examples
Minimal
{
"type": "KafkaCommitExecute",
"name": "update the offsets in kafka after processing data",
"environments": [
"production",
"test"
],
"inputView": "customer",
"bootstrapServers": "kafka:29092",
"groupID": "spark-customer-extract-job"
}
LogExecute
Since: 2.12.0 - Supports Streaming: True
The LogExecute
takes an input SQL statement which must return a string
and will write the output to the Arc logs. LogExecute
will try to convert the message from a JSON string manually created in the SQL statement so that logging is easier to parse by log aggregation tools.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
inputURI | URI | *true | URI of the input file containing the SQL statement. Required if sql not provided. |
sql | String | *true | A SQL statement to execute. Required if inputURI not provided. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
sqlParams | Map[String, String] | false | Parameters to inject into the SQL statement before executing. The parameters use the ${} format. |
Magic
The %log
magic is available via arc-jupyter with these available parameters:
%log name="name" description="description" environments=production,test sqlParams=inputView=customer,inputField=id
SELECT
"this message will be logged" AS message
FROM ${inputView}
Examples
Minimal
{
"type": "LogExecute",
"name": "log customer counts",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}
Complete
{
"type": "LogExecute",
"id": "00000000-0000-0000-0000-000000000000",
"name": "log customer counts",
"description": "log customer counts",
"environments": [
"production",
"test"
],
"inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
"sql": "SELECT 'this message will appear in logs' AS message",
"authentication": {},
"sqlParams": {
"record_error_tolerance_percentage": "0.05"
}
}
PipelineExecute
Since: 1.0.9 - Supports Streaming: True
The PipelineExecute
stage allows the embedding of another Arc pipeline within the current pipeline. This means it is possible to compose pipelines together without having to serialise and deserialise the results.
An example use case could be a pipeline
which defines how your organisation defines active customer records which could then be embedded in multiple downstream pipelines
to ensure definition consistency.
Parameters
Attribute | Type | Required | Description |
---|---|---|---|
name | String | true | Name of the stage for logging. |
environments | Array[String] | true | A list of environments under which this stage will be executed. See environments documentation. |
uri | String | true | URI of the input file containing the definition of the pipeline to include. |
authentication | Map[String, String] | false | An authentication map for authenticating with a remote service. See authentication documentation. |
description | String | false | An optional stage description to help document job files and print to job logs to assist debugging. |
id | String | false | A optional unique identifier for this stage. |
Examples
Minimal
{
"type": "PipelineExecute",
"name": "embed the active customer pipeline",
"environments": [
"production",
"test"
],
"uri": "hdfs://datalake/job/active_customers.json"
}
Complete
{
"type": "PipelineExecute",
"id": "00000000-0000-0000-0000-000000000000",
"name": "embed the active customer pipeline",
"description": "embed the active customer pipeline",
"environments": [
"production",
"test"
],
"uri": "hdfs://datalake/job/active_customers.json",
"authentication": {}
}