Execute

*Execute stages are used to execute arbitrary commands against external systems such as Databases and APIs.

BigQueryExecute

Supports Streaming: False

Plugin

The BigQueryExecute is provided by the https://github.com/tripl-ai/arc-big-query-pipeline-plugin package.

The BigQueryExecute executes a SQL statement against BigQuery.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
jobName String false BigQuery Job name useful for identifying events in log messages.
location String false Location in which to invoke the BigQuery job.
projectId String false The Google Cloud Project ID of the table. Defaults to the project of the Service Account being used.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "BigQueryExecute",
  "name": "execute on bigquery",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://customer.sql",
}

Complete

{
  "type": "BigQueryExecute",
  "name": "execute the load date table",
  "description": "execute the load date table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
  "authentication": {},
  "sqlParams": {
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

CassandraExecute

Since: 2.0.0 - Supports Streaming: False

Plugin

The CassandraExecute is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.

The CassandraExecute executes a CQL statement against an external Cassandra cluster.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the CQL statement.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "CassandraExecute",
  "name": "create table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/create_table.cql",
  "params": {
    "spark.cassandra.connection.host": "cassandra"
  },
  "sqlParams": {
    "keyspace": ${ETL_CONF_ENVIRONMENT}
  }
}

Complete

{
  "type": "CassandraExecute",
  "name": "create table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/create_table.cql",
  "params": {
    "spark.cassandra.connection.host": "cassandra"
  },
  "sqlParams": {
    "keyspace": ${ETL_CONF_ENVIRONMENT}
  }
}

ConfigExecute

Since: 3.4.0 - Supports Streaming: True

The ConfigExecute takes an input SQL statement which must return a string formatted JSON object allowing runtime creation of job configuration substitution values. ConfigExecute is intended to be used with Dynamic Variables to allow the creation of variables reliant on runtime data.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
outputView String true Name of outgoing Spark dataset after processing.
persist Boolean false Whether to persist dataset to Spark cache. Will also log row count.

Default: false.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "ConfigExecute",
  "name": "calculate dynamic variable",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/calculate_dynamic_variable.sql"
}

Complete

{
  "type": "ConfigExecute",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "calculate dynamic variable",
  "description": "calculate dynamic variable",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/calculate_dynamic_variable.sql",
  "sql": "SELECT TO_JSON(NAMED_STRUCT('ETL_CONF_DYNAMIC', 'SELECT MAKE_DATE(2016,12,18) AS date'))"
  "authentication": {},
  "sqlParams": {
  }
}

ControlFlowExecute

Since: 3.6.0 - Supports Streaming: True

The ControlFlowExecute provides a way to conditionally exiting a job partway through execution and return success. This functionality is intended to allow work avoidance i.e. not executing the certain stages if the work has already been done or is not required. This stage must return [Boolean, Option[String]] and will not run the subsequent job stages if the first return value is false or continue as normal if true.

It requires the the ControlFlow plugin to be explicitly enabled to have any effect.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
key Strong false The name of the key that carries the result of this execution to the ControlFlow plugin.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "ControlFlowExecute",
  "name": "determine whether to execute remainder of job",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/control_flow.sql"
}

Complete

{
  "type": "ControlFlowExecute",
  "name": "determine whether to execute remainder of job",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/control_flow.sql"
  "sql": "SELECT TRUE AS valid, 'this message will appear in logs' AS message",
  "authentication": {},
  "sqlParams": {
  }
}

HTTPExecute

Since: 1.0.0 - Supports Streaming: False

The HTTPExecute takes an input Map[String, String] from the configuration and executes a POST request against a remote HTTP service. This could be used to initialise another process that depends on the output of data pipeline.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
uri URI true URI of the HTTP server.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
id String false A optional unique identifier for this stage.
payloads Map[String, String] false A set of Key/Value that will be encoded as JSON and send to the HTTP server.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202].

Examples

Minimal

{
  "type": "HTTPExecute",
  "name": "notify the customer api of job completion",
  "environments": [
    "production",
    "test"
  ],
  "uri": "http://internalserver/api/customer"
}

Complete

{
  "type": "HTTPExecute",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "notify the customer api of job completion",
  "description": "notify the customer api of job completion",
  "environments": [
    "production",
    "test"
  ],
  "uri": "http://internalserver/api/customer",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "payloads": {
    "jobName": "customer",
    "jobStatus": "complete"
  },
  "validStatusCodes": [
    200
  ]
}

JDBCExecute

Since: 1.0.0 - Supports Streaming: False

The JDBCExecute executes a SQL statement against an external JDBC connection.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.
jdbcURL String true The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306. You may be required to set allowMultiQueries=true in the connection string to execute multiple statements.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "JDBCExecute",
  "name": "execute the load date table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer"
}

Complete

{
  "type": "JDBCExecute",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "execute the load date table",
  "description": "execute the load date table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer?user=fred&password=secret&ssl=true",
  "authentication": {},
  "params": {},
  "sqlParams": {
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

KafkaCommitExecute

Since: 1.0.8 - Supports Streaming: False

Plugin

The KafkaCommitExecute is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.

The KafkaCommitExecute takes the resulting DataFrame from a KafkaExtract stage and commits the offsets back to Kafka. This is used so that a user is able to perform a quasi-transaction by specifing a series of stages that must be succesfully executed prior to committing the offset back to Kafka. To use this stage ensure that the autoCommit option on the KafkaExtract stage is set to false.

For example, if a job reads from a Kafka topic and writes the results to parquet then it would be good to ensure the ParquetLoad stage had completed successfully before updating the offset in Kafka.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
bootstrapServers String true A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,...
groupID String true A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.

Examples

Minimal

{
  "type": "KafkaCommitExecute",
  "name": "update the offsets in kafka after processing data",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "bootstrapServers": "kafka:29092",
  "groupID": "spark-customer-extract-job"
}

LogExecute

Since: 2.12.0 - Supports Streaming: True

The LogExecute takes an input SQL statement which must return a string and will write the output to the Arc logs. LogExecute will try to convert the message from a JSON string manually created in the SQL statement so that logging is easier to parse by log aggregation tools.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Magic

The %log magic is available via arc-jupyter with these available parameters:

%log name="name" description="description" environments=production,test sqlParams=inputView=customer,inputField=id
SELECT
  "this message will be logged" AS message
FROM ${inputView}

Examples

Minimal

{
  "type": "LogExecute",
  "name": "log customer counts",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}

Complete

{
  "type": "LogExecute",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "log customer counts",
  "description": "log customer counts",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
  "sql": "SELECT 'this message will appear in logs' AS message",
  "authentication": {},
  "sqlParams": {
    "record_error_tolerance_percentage": "0.05"
  }
}

PipelineExecute

Since: 1.0.9 - Supports Streaming: True

The PipelineExecute stage allows the embedding of another Arc pipeline within the current pipeline. This means it is possible to compose pipelines together without having to serialise and deserialise the results.

An example use case could be a pipeline which defines how your organisation defines active customer records which could then be embedded in multiple downstream pipelines to ensure definition consistency.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
uri String true URI of the input file containing the definition of the pipeline to include.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
id String false A optional unique identifier for this stage.

Examples

Minimal

{
  "type": "PipelineExecute",
  "name": "embed the active customer pipeline",
  "environments": [
    "production",
    "test"
  ],
  "uri": "hdfs://datalake/job/active_customers.json"
}

Complete

{
  "type": "PipelineExecute",
  "id": "00000000-0000-0000-0000-000000000000",
  "name": "embed the active customer pipeline",
  "description": "embed the active customer pipeline",
  "environments": [
    "production",
    "test"
  ],
  "uri": "hdfs://datalake/job/active_customers.json",
  "authentication": {}
}