Deploy
Arc has been packaged as a Docker image to simplify deployment as a stateless process on cloud infrastructure. As there are multiple versions of Arc, Spark, Scala and Hadoop see the https://github.com/orgs/tripl-ai/packages for the relevant version. The Arc container is built on top of the offical Spark Kubernetes Docker image.
Deploy Repository
The deploy repository has examples of how to run Arc jobs on common cloud environments and includes examples how to set up on Kubernetes with permissions.
Arc Local
An example command to start a job from the Arc Starter base directory:
docker run \
--rm \
--volume $(pwd)/examples:/home/jovyan/examples:Z \
--env "ETL_CONF_ENV=production" \
--entrypoint='' \
--publish 4040:4040 \
ghcr.io/tripl-ai/arc:latest \
bin/spark-submit \
--master local[*] \
--driver-memory 4g \
--driver-java-options "-XX:+UseG1GC" \
--conf spark.authenticate=true \
--conf spark.authenticate.secret=$(openssl rand -hex 64) \
--conf spark.io.encryption.enabled=true \
--conf spark.network.crypto.enabled=true \
--class ai.tripl.arc.ARC \
/opt/spark/jars/arc.jar \
--etl.config.uri=file:///home/jovyan/examples/tutorial/0/nyctaxi.ipynb
This example is included to demonstrate:
ETL_CONF_ENV
is a reserved environment variable which determines which stages to execute in the current mode. For each of the stages the job designer can specify an array ofenvironments
under which that stage will be executed (in the case aboveproduction
andtest
are specified).
The purpose of this stage is so that it is possible to add or remove stages for execution modes liketest
orintegration
which are executed by a CI/CD tool prior to deployment and that you do not want to run inproduction
mode - so maybe a comparison against a known ‘good’ test dataset could be executed in onlytest
mode.In this sample job the spark master is
local[*]
indicating that this is a single instance ‘cluster’ where Arc relies on vertical not horizonal scaling. Depending on the constrains of the job (i.e. CPU vs disk IO) it is often better to execute with vertical scaling on cloud compute rather than pay the cost of network shuffling.etl.config.uri
is a reserved JVM property which describes to Arc which job to execute. See below for all the properties that can be passed to Arc.
Arc on Kubernetes
Arc is built using the offical Spark Kubernetes image build process which allows Arc to be easily deployed to a Kubernetes cluster.
bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name arc \
--class ai.tripl.arc.ARC \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=ghcr.io/tripl-ai/arc:latest \
--conf spark.kubernetes.driverEnv.ETL_CONF_ENV=production \
--conf spark.kubernetes.driverEnv.ETL_CONF_DATA_URL=s3a://nyc-tlc/trip*data \
--conf spark.kubernetes.driverEnv.ETL_CONF_JOB_URL=https://raw.githubusercontent.com/tripl-ai/arc-starter/master/examples/kubernetes \
local:///opt/spark/jars/arc.jar \
--etl.config.uri=https://raw.githubusercontent.com/tripl-ai/arc-starter/master/examples/kubernetes/nyctaxi.ipynb
Configuration Parameters
Environment Variable | Property | Description |
---|---|---|
ETL_CONF_ENABLE_STACKTRACE | etl.config.enableStackTrace | Whether to enable stacktraces in the event of exception which can be useful for debugging but is not very intuitive for many users. Boolean. Default: false . |
ETL_CONF_ENV | etl.config.environment | The environment to run under.E.g. if ETL_CONF_ENV is set to production then a stage with "environments": ["production", "test"] would be executed and one with "environments": ["test"] would not be executed. |
ETL_CONF_IGNORE_ENVIRONMENTS | etl.config.ignoreEnvironments | Allows skipping the environments tests and execute all stages/plugins. |
ETL_CONF_JOB_ID | etl.config.job.id | A job identifier added to all the logging messages. |
ETL_CONF_JOB_NAME | etl.config.job.name | A job name added to all logging messages and Spark history server. |
ETL_CONF_LINT_ONLY | etl.config.lintOnly | Verify the job file and exit with success /failure . |
ETL_CONF_STORAGE_LEVEL | etl.config.storageLevel | The StorageLevel used when persisting datasets. String. Default: MEMORY_AND_DISK_SER . |
ETL_CONF_STREAMING | etl.config.streaming | Run in Structured Streaming mode or not. Boolean. Default: false . |
ETL_CONF_TAGS | etl.config.tags | Custom key/value tags separated by space to add to all logging messages. E.g. ETL_CONF_TAGS=cost_center=123456 owner=jovyan . |
ETL_CONF_URI | etl.config.uri | The URI of the job file to execute. |
ETL_CONF_COMPLETION_ENVIRONMENTS | etl.config.completion.environments | A comma separated list of enivoronments to be returned when invoking the Completer API. Default: production,test . |
Policy Parameters
Environment Variable | Property | Description |
---|---|---|
ETL_POLICY_INLINE_SCHEMA | etl.policy.inline.schema | Whether to support inline schemas (such as the schema attribute in TypingTransform ) as opposed to force reading from an external file. Boolean. Default: true . |
ETL_POLICY_INLINE_SQL | etl.policy.inline.sql | Whether to support inline SQL (such as the sql attribute in SQLTransform ) as opposed to force reading from an external file. Boolean. Default: true . |
ETL_POLICY_IPYNB | etl.policy.ipynb | Whether to support submission of IPython Notebook (.ipynb) files as opposed to Arc HOCON format only. Boolean. Default: true . |
ETL_POLICY_DROP_UNSUPPORTED | etl.policy.drop.unsupported | Whether to enable automatic dropping of unsupported types when performing *Load stages (e.g. ParquetLoad cannot support NullType and would be dropped if enabled).If NullType columns have been created due to a SQL query (like SELECT NULL AS fieldname ) it is sometimes possible to correctly type the column by CAST ing the column like SELECT CAST(NULL AS INTEGER) AS fieldname which will treat the column as an IntegerType containing only NULL values.Default: false . |
Authentication Parameters
Permissions arguments can be used to retrieve the job file from cloud storage:
Variable | Property | Description |
---|---|---|
ETL_CONF_ADL_OAUTH2_CLIENT_ID | etl.config.fs.adl.oauth2.client.id | The OAuth client identifier for connecting to Azure Data Lake. |
ETL_CONF_ADL_OAUTH2_REFRESH_TOKEN | etl.config.fs.adl.oauth2.refresh.token | The OAuth refresh token for connecting to Azure Data Lake. |
ETL_CONF_AZURE_ACCOUNT_KEY | etl.config.fs.azure.account.key | The account key for connecting to Azure Blob Storage. |
ETL_CONF_AZURE_ACCOUNT_NAME | etl.config.fs.azure.account.name | The account name for connecting to Azure Blob Storage. |
ETL_CONF_GOOGLE_CLOUD_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE | etl.config.fs.google.cloud.auth.service.account.json.keyfile | The service account json keyfile path for connecting to Google Cloud Storage. |
ETL_CONF_GOOGLE_CLOUD_PROJECT_ID | etl.config.fs.gs.project.id | The project identifier for connecting to Google Cloud Storage. |
ETL_CONF_S3A_ACCESS_KEY | etl.config.fs.s3a.access.key | The access key for connecting to Amazon S3. |
ETL_CONF_S3A_CONNECTION_SSL_ENABLED | etl.config.fs.s3a.connection.ssl.enabled | Whether to enable SSL connection to Amazon S3. |
ETL_CONF_S3A_ENDPOINT | etl.config.fs.s3a.endpoint | The endpoint for connecting to Amazon S3. |
ETL_CONF_S3A_SECRET_KEY | etl.config.fs.s3a.secret.key | The secret for connecting to Amazon S3. |
ETL_CONF_S3A_ANONYMOUS | etl.config.fs.s3a.anonymous | Whether to connect to Amazon S3 in anonymous mode. e.g. ETL_CONF_S3A_ANONYMOUS=true . |
ETL_CONF_S3A_ENCRYPTION_ALGORITHM | etl.config.fs.s3a.encryption.algorithm | The bucket encrpytion algorithm: SSE-S3 , SSE-KMS , SSE-C . |
ETL_CONF_S3A_KMS_ARN | The Key Management Service Amazon Resource Name when using SSE-KMS encryptionAlgorithm e.g. arn:aws:kms:us-west-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab . |
|
ETL_CONF_S3A_CUSTOM_KEY | etl.config.fs.s3a.custom.key | The key to use when using Customer-Provided Encryption Keys (SSE-C ). |
Dynamic Variables
Sometimes it is useful to be able to utilise runtime only varaibles in an Arc job (aka. lazy evaluation), for example, dynamically calculating a partition to be read.
By default all stages have an implicit resolution
key defaulting to strict
which will try to resolve all parameters at the start of the job. By setting resolution
to lazy
it is possible to defer the resolution of the variables until execution time for that stage.
Examples
This example calculates a list of distinct dates from the new_transactions
dataset (like CAST('2020-01-13' AS DATE),CAST('2020-01-14' AS DATE)
) and returns it as a variable named ETL_CONF_DYNAMIC_PUSHDOWN
which is then used to read a subset of the transactions
dataset. This pattern was used succesfully to force a certain behavior in the Spark SQL optimizer (predicate pushdown). Without the resolution
equal to lazy
the job would fail as the ${ETL_CONF_DYNAMIC_PUSHDOWN}
parameter would not be present at the beginning of the job.
{
"type": "ConfigExecute",
"name": "test",
"description": "test",
"environments": [
"production",
"test"
],
"sql": """
SELECT TO_JSON(
NAMED_STRUCT(
'ETL_CONF_DYNAMIC_PUSHDOWN', ARRAY_JOIN(COLLECT_LIST(CONCAT("CAST(\'",DATE_FORMAT(transaction_date,"yyyy-MM-dd"),"\' AS DATE)")), ',')
)
) AS parameters
FROM (
SELECT transaction_date FROM new_transactions GROUP BY transaction_date
)
"""
},
{
"resolution": "lazy",
"type": "SQLTransform",
"name": "load the partitions impacted by new records",
"environments": [
"production",
"test"
],
"sql": "SELECT * FROM transactions WHERE transaction_date IN (${ETL_CONF_DYNAMIC_PUSHDOWN})",
"sqlParams": {
"ETL_CONF_DYNAMIC_PUSHDOWN": ${ETL_CONF_DYNAMIC_PUSHDOWN}
},
"outputView": "outputView"
}
Environments
The Environments
list specifies a list of environments under which the stage will be executed. The environments list must contain the value in the ETL_CONF_ENV
environment variable or etl.config.environment
spark-submit
argument for the stage to be executed.
Examples
If a stage is to be executed in both production and testing and the ETL_CONF_ENV
environment variable is set to production
or test
then the DelimitedExtract
stage defined here will be executed. If the ETL_CONF_ENV
environment variable was set to something else like user_acceptance_testing
then this stage will not be executed and a warning message will be logged.
{
"type": "DelimitedExtract",
...
"environments": ["production", "test"],
...
}
A practical use case of this is to execute additional stages in testing which would prevent the job from being automatically deployed to production via Continuous Delivery if it fails:
{
"type": "ParquetExtract",
"name": "load the manually verified known good set of data from testing",
"environments": ["test"],
"outputView": "known_correct_dataset",
...
},
{
"type": "EqualityValidate",
"name": "ensure the business logic produces the same result as the known good set of data from testing",
"environments": ["test"],
"leftView": "newly_caluclated_dataset",
"rightView": "known_correct_dataset",
...
}
Spark and ulimit
On larger instances with many cores per machine it is possible to exceed the default (1024
) max open files (ulimit
). This should be verified on your instances if you are receiving too many open files
type errors.