Execute

*Execute stages are used to execute arbitrary commands against external systems such as Databases and APIs.

CassandraExecute

Since: 2.0.0 - Supports Streaming: False

Plugin

The CassandraExecute is provided by the https://github.com/tripl-ai/arc-cassandra-pipeline-plugin package.

The CassandraExecute executes a CQL statement against an external Cassandra cluster.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the CQL statement.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the Cassandra connection object.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "CassandraExecute",
  "name": "create table",
  "environments": [
    "production",
    "test"
  ],  
  "inputURI": "hdfs://datalake/sql/create_table.cql",
  "params": {
    "spark.cassandra.connection.host": "cassandra"
  },
  "sqlParams": {
    "keyspace": ${ETL_CONF_ENVIRONMENT}
  }                 
}

Complete

{
  "type": "CassandraExecute",
  "name": "create table",
  "environments": [
    "production",
    "test"
  ],  
  "inputURI": "hdfs://datalake/sql/create_table.cql",
  "params": {
    "spark.cassandra.connection.host": "cassandra"
  },
  "sqlParams": {
    "keyspace": ${ETL_CONF_ENVIRONMENT}
  }     
}

HTTPExecute

Since: 1.0.0 - Supports Streaming: False

The HTTPExecute takes an input Map[String, String] from the configuration and executes a POST request against a remote HTTP service. This could be used to initialise another process that depends on the output of data pipeline.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
uri URI true URI of the HTTP server.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
headers Map[String, String] false HTTP Headers to set for the HTTP request. These are not limited to the Internet Engineering Task Force standard headers.
payloads Map[String, String] false A set of Key/Value that will be encoded as JSON and send to the HTTP server.
validStatusCodes Array[Integer] false A list of valid status codes which will result in a successful stage if the list contains the HTTP server response code. If not provided the default values are [200, 201, 202].

Examples

Minimal

{
  "type": "HTTPExecute",
  "name": "notify the customer api of job completion",
  "environments": [
    "production",
    "test"
  ],
  "uri": "http://internalserver/api/customer"
}

Complete

{
  "type": "HTTPExecute",
  "name": "notify the customer api of job completion",
  "description": "notify the customer api of job completion",
  "environments": [
    "production",
    "test"
  ],
  "uri": "http://internalserver/api/customer",
  "headers": {
    "Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==",
    "custom-header": "payload"
  },
  "payloads": {
    "jobName": "customer",
    "jobStatus": "complete"
  },
  "validStatusCodes": [
    200
  ]
}

JDBCExecute

Since: 1.0.0 - Supports Streaming: False

The JDBCExecute executes a SQL statement against an external JDBC connection.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI true URI of the input file containing the SQL statement.
jdbcURL String true The JDBC URL to connect to. e.g., jdbc:mysql://localhost:3306.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
params Map[String, String] false Map of configuration parameters.. Any parameters provided will be added to the JDBC connection object. These are not logged so it is safe to put passwords here.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "JDBCExecute",
  "name": "execute the load date table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer"
}

Complete

{
  "type": "JDBCExecute",
  "name": "execute the load date table",
  "description": "execute the load date table",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/update_customer_load_date.sql",
  "jdbcURL": "jdbc:postgresql://localhost:5432/customer?user=fred&password=secret&ssl=true",
  "authentication": {},
  "params": {},
  "sqlParams": {
    "current_timestamp": "2018-11-24 14:48:56"
  }
}

KafkaCommitExecute

Since: 1.0.8 - Supports Streaming: False

Plugin

The KafkaCommitExecute is provided by the https://github.com/tripl-ai/arc-kafka-pipeline-plugin package.

The KafkaCommitExecute takes the resulting DataFrame from a KafkaExtract stage and commits the offsets back to Kafka. This is used so that a user is able to perform a quasi-transaction by specifing a series of stages that must be succesfully executed prior to committing the offset back to Kafka. To use this stage ensure that the autoCommit option on the KafkaExtract stage is set to false.

For example, if a job reads from a Kafka topic and writes the results to parquet then it would be good to ensure the ParquetLoad stage had completed successfully before updating the offset in Kafka.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputView String true Name of incoming Spark dataset.
bootstrapServers String true A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. e.g. host1:port1,host2:port2,...
groupID String true A string that uniquely identifies the group of consumer processes to which this consumer belongs. This will retain the offset of the job between executions.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "KafkaCommitExecute",
  "name": "update the offsets in kafka after processing data",
  "environments": [
    "production",
    "test"
  ],
  "inputView": "customer",
  "bootstrapServers": "kafka:29092",
  "groupID": "spark-customer-extract-job"
}

LogExecute

Since: 2.12.0 - Supports Streaming: True

The LogExecute takes an input SQL statement which must return a string and will write the output to the Arc logs. LogExecute will try to convert the message from a JSON string manually created in the SQL statement so that logging is easier to parse by log aggregation tools.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
inputURI URI *true URI of the input file containing the SQL statement. Required if sql not provided.
sql String *true A SQL statement to execute. Required if inputURI not provided.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.
sqlParams Map[String, String] false Parameters to inject into the SQL statement before executing. The parameters use the ${} format.

Examples

Minimal

{
  "type": "LogExecute",
  "name": "log customer counts",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold.sql"
}

Complete

{
  "type": "LogExecute",
  "name": "log customer counts",
  "description": "log customer counts",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "hdfs://datalake/sql/customer_error_threshold_dynamic.sql",
  "sql": "SELECT 'this message will appear in logs' AS message",
  "authentication": {},
  "sqlParams": {
    "record_error_tolerance_percentage": "0.05"
  }
}

PipelineExecute

Since: 1.0.9 - Supports Streaming: True

The PipelineExecute stage allows the embedding of another Arc pipeline within the current pipeline. This means it is possible to compose pipelines together without having to serialise and deserialise the results.

An example use case could be a pipeline which defines how your organisation defines active customer records which could then be embedded in multiple downstream pipelines to ensure definition consistency.

Parameters

Attribute Type Required Description
name String true Name of the stage for logging.
environments Array[String] true A list of environments under which this stage will be executed. See environments documentation.
uri String true URI of the input file containing the definition of the pipeline to include.
authentication Map[String, String] false An authentication map for authenticating with a remote service. See authentication documentation.
description String false An optional stage description to help document job files and print to job logs to assist debugging.

Examples

Minimal

{
  "type": "PipelineExecute",
  "name": "embed the active customer pipeline",
  "environments": [
    "production",
    "test"
  ],
  "uri": "hdfs://datalake/job/active_customers.json"
}

Complete

{
  "type": "PipelineExecute",
  "name": "embed the active customer pipeline",
  "description": "embed the active customer pipeline",
  "environments": [
    "production",
    "test"
  ],
  "uri": "hdfs://datalake/job/active_customers.json",
  "authentication": {}
}