Lyftrondata managed Data Lake on AWS S3 bucket using Apache Spark
Introduction
Lyftrondata is bundled with an Apache Spark single node cluster that is using a local hard disk for storing tables loaded to a local Data Lake that is managed by Lyftrondata. The default configuration is designed for proof of concepts solutions that do not have higher performance or reliability requirements. The default configuration of Lyftrondata uses the local file system to store the tables that were loaded to the Spark Data Lake through Lyftrondata. The files are created in the C:\ProgramData\Lyftrondata\warehouse folder.
The default configuration of a single node, local storage Data Lake configuration of the bundled Apache Spark may be extended to support different topologies:
External storage for Apache Spark
The default Data Lake root folder may be changed to use HDFS, AWS S3 or Azure Blob storage. Apache Spark will use HDFS, AWS or Azure libraries for Hadoop to access the data. This manual covers the scenario to set up the Data Lake on AWS S3.
Distributed Apache Spark cluster
The local single-node Apache Spark may be configured to use an external Apache Spark cluster. This configuration is out of the scope of this manual.
Architecture
Components
The Lyftrondata cluster has the following components:
Component |
Description |
Location |
Lyftrondata Engine |
Lyftrondata Windows Service that is responsible for managing the cluster. The Engine also is hosting the administrative portal. |
Windows Service installed in the C:\Program Files\Lyftrondata folder. Additional files are stored in the C:\ProgramData\Lyftrondata folder. |
Apache Spark |
Local Apache Spark libraries. |
C:\ProgramData\Lyftrondata\l ib\spark folder. |
Hadoop and Hive libraries |
Folders with the minimum set of Hadoop and Hive libraries that must be added to the classpath when a Spark driver is started. Those folders contain HDFS, AWS and Azure libraries that are abstracting the Hadoop file system. |
C:\ProgramData\Lyftrondata\l ib\hadoop and C:\ProgramData\Lyftrondata\l ib\hive folders. |
Lyftrondata Metadata database |
Microsoft SQL Server database used by Lyftrondata to keep the configuration of connections, including Apache Spark parameters that may be injected into the SparkContext during the Spark driver initialization. |
Microsoft SQL Server |
Apache Spark connection |
Data warehouse connection defined on the list of connections in Lyftrondata. The connection information stores the address of the Lyftrondata Spark Driver and a list of key/value configuration parameters that are injected into Hadoop, Hive and Spark libraries when the connection is opened for the first time. |
The default Apache Spark connection to the local Apache Spark is named “Apache Spark 2.3”. |
Apache Spark metastore database (Hive metastore) |
Microsoft SQL Server database used by Apache Spark to store the location of the Data Lake and the mapping of tables to folders in the Data Lake. Apache Spark uses Hive Metastore libraries to access this database. |
Microsoft SQL Server database. The database may be changed to a different database that is supported by Hive Metastore (PostgreSQL, MySQL or many others). |
Apache Spark storage |
The root folder used by Apache Spark to store tables that were loaded to the Data Lake. Internally, Apache Spark uses Hadoop libraries to access the local file system, HDFS, AWS S3 or Azure Blob to read and write files. The files for every table defined in the Data Lake are stored in a separate folder. The default file format is snappy compressed parquet. |
Local hard disk or a different location that must be configured in Hadoop and Hive configuration. |
Configuration settings location
The following settings are affecting the Apache Spark storage configuration:
Component |
Description |
Location |
Local Spark Driver startup configuration |
Configuration parameters used to start the Lyftrondata Spark Driver. Those parameters are used to form the command line parameters to the spark-submit.cmd script. The spark-submit parameters define the location of |
The parameters are defined in the administrative portal on the Administer -> Local Spark Instances -> Spark 2.3. |
|
Spark libraries (SPARK_HOME) and the spark configuration folders with core-site.xml and hive-site.xml configuration files (SPARK_CONF_DIR). The details of those settings may be found in the Apache Spark documentation. |
|
Apache Spark Connection Details |
Connection parameters to the Apache Spark driver (host, secret key). Additionally a list of key/pair parameters that are injected into SparkContext during the driver initialization. Parameters with a “spark.hadoop.*” prefixes are injected into the Hadoop core-site.xml configuration. Those parameters will identify the default file system for the Data Lake storage. Additionally, parameters with a “hive.*” prefixes are dynamically injected into the hive-site.xml file. Hive parameters are mostly responsible for the Hive Metastore configuration. |
Connection parameters for the default “Local Spark 2.3” connection in Lyftrondata. |
Local Apache Spark configuration |
The Spark configuration folder, also used to load Hadoop (core-site.xml) and Hive (hive-site.xml) configuration files.. |
The Apache Spark configuration folder specified as the SPARK_CONF_DIR environment variable for the Local Spark Driver startup configuration. The default “Local Spark 2.3” driver is configured in the c:\ProgramData\Lyftrondata\c onf\spark folder. |
Hive Metastore database |
The Hive Metastore database used by a Hive metastore component that is hosted in an embedded mode inside Apache Spark. The tables in the Hive Metastore are automatically created by the Hive Metastore library when Lyftrondata connects to Apache Spark for the first time. |
Microsoft SQL Server database. |
Cluster topology for AWS S3 storage
The topology of a Lyftrondata cluster that uses an Apache Spark instance with the Data Lake storage on AWS S3 is described on the following topology diagram:
AWS S3 configuration steps
The following steps must be executed in order to switch the location of the Data Lake storage from the file system to an AWS S3 bucket:
Prepare AWS Access Key and Secret Key
Log in to the AWS console and go to the Identity and Access Management (IAM) section
Click the “Add user button”. Pick the user name for an existing user with a valid email address. Enable “Programmatic access”. Click the “Next: Permissions” button.
The user may be added to a group, but it is not required if the access rights will be granted directly to the user at an S3 bucket level. Click “Next: Tags”:
Enter optional tags, this step may be ignored if the user will not be tagged for the search and categorization purpose:
Create the user, despite that the user was not granted any AWS subscription wide permissions:
Click the “Show” button to reveal the secret access key:
Copy the “Access key ID”, it will be used as the spark.hadoop.fs.s3a.access.key
parameter in Apache Spark.
Copy the “Secret access key”, it will be used as the spark.hadoop.fs.s3a.secret.key
parameter in Apache Spark.
Check the mailbox of the user who was registered. AWS will send an email to activate the AWS access. Follow the account activation instruction received in the email.
Create an AWS S3 bucket for the Data Lake
Go to the Amazon S3 service in the AWS console. Click the “Create bucket” button.
Enter a bucket name. For example “datalakebucketlyftrondata1”. The bucket name must be unique and AWS may reject simple names that may be already used by other AWS customers.
Click the “Create bucket” button.
After the bucket was created, go to the bucket details. The bucket should be shown on the list of buckets or the “Go to bucket details” button may be clicked.
Go to the “Permissions” tab:
Go to the “Access Control List”:
Click the “+ Add account” button. Enter the email address of the user and enable all access rights. Click the “Save” button.
Update Hive and Hadoop parameters in the Spark connection
Log in to the Lyftrondata portal. Go to the “Connect” section.
Open the connection to the Spark cluster that will be configured. Click on the “Local Spark 2.3” if the default local Spark cluster is updated.
Click the “EDIT” button. Expand the panel with the panel expansion button:
Scroll down to the end of the “Spark variables” section. Use the “ADD PARAMETER” button to add the following parameters:
Parameter |
Value |
Description |
spark.hadoop.fs.s3a.impl |
org.apache.hadoop.fs.s3a.S3AFile System |
AWS S3 file system registration for the “s3a” address scheme |
spark.hadoop.fs.s3a.access.key |
Enter the AWS user access key that was created at the time of the user creation. |
AWS user access key |
spark.hadoop.fs.s3a.secret.key |
Enter the AWS user secret key that was revealed by clicking “Show” on the user creation screen. |
AWS user secret key |
spark.hadoop.fs.s3a.fast.upload |
true |
Activates a fast data upload protocol for the data loading |
spark.hadoop.fs.defaultFS |
s3a://{here is the bucket name}/ for example: s3a://datalakebucketlyftron1/ |
S3 bucket address, prefixed with “s3a://” and followed by “/”. |
Update the hive.metastore.warehouse.dir parameter in the Hive configuration section to identify a folder inside the bucket. The S3 path used in the spark.hadoop.fs.defaultFS parameter must be used at the beginning of the name. A nested folder is optional.
In this example, we are using s3a://datalakebucketlyftron1/warehouse as the warehouse folder location where Spark will create subfolders for tables.
Click “Save”.
Update the old warehouse location in the Hive/Spark Metastore
Use the Microsoft SQL Server Management Studio to connect to the Hive Metastore database. The JDBC connection string to the database is specified in the C:\ProgramData\Lyftron\conf\spark\hive-site.xml file.
Once the connection to the SQL Server database was established, expand the list of tables in the database that is used as a Hive Metastore database. Find the “dbo.DBS” table and click “Edit top 200 rows”:
Update the DB_LOCATION_URI parameter to match the value of the hive.metastore.warehouse.dir parameter.
NOTE: The dbo.DBS table may not exist if Spark was never used on the Lyftrondata server and Spark has never provisioned the Hive Metastore database schema.
Go down to the next row in order to update the row in the database. The red warning icon will disappear after the update.
Unlock Hadoop core-site.xml parameters
Use the file explorer and go to the C:\ProgramData\Lyftron\conf\spark folder:
Edit the core-site.xml file. Ensure that the fs.defaultFS parameter does not have an entry
<final>true</final>. A correct core-site.xml should have the fs.defaultFS parameter specified as follows:
Restart Apache Spark instance
Go to the Administer -> Local Spark instance section. Click on the Spark instance that was reconfigured.
Click the “KILL” button and confirm:
Wait 10 seconds and click the “START” button. Wait for the “Spark process started” message:
A List of S3A Configuration Properties
The following fs.s3a configuration properties are available. To override these default s3a settings, add your configuration to your core-site.xml.
<property>
<name>spark.hadoop.fs.s3a.access.key</name>
<description>AWS access key ID used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.secret.key</name>
<description>AWS secret key used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.aws.credentials.provider</name>
<description>
Comma-separated class names of credential provider classes which implement
com.amazonaws.auth.AWSCredentialsProvider.
These are loaded and queried in sequence for a valid set of credentials.
Each listed class must implement one of the following means of
construction, which are attempted in order:
1. a public constructor accepting java.net.URI and
org.apache.hadoop.conf.Configuration,
2. a public static method named getInstance that accepts no
arguments and returns an instance of
com.amazonaws.auth.AWSCredentialsProvider, or
3. a public default constructor.
Specifying org.apache.hadoop.spark.hadoop.fs.s3a.AnonymousAWSCredentialsProvider allows
anonymous access to a publicly accessible S3 bucket without any credentials.
Please note that allowing anonymous access to an S3 bucket compromises
security and therefore is unsuitable for most use cases. It can be useful
for accessing public data sets without requiring AWS credentials.
If unspecified, then the default list of credential provider classes,
queried in sequence, is:
1. org.apache.hadoop.spark.hadoop.fs.s3a.BasicAWSCredentialsProvider: supports static
configuration of AWS access key ID and secret access key. See also
spark.hadoop.fs.s3a.access.key and spark.hadoop.fs.s3a.secret.key.
2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports
configuration of AWS access key ID and secret access key in
environment variables named AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK.
3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use
of instance profile credentials if running in an EC2 VM.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.session.token</name>
<description>Session token, when using org.apache.hadoop.spark.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
as one of the providers.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.security.credential.provider.path</name>
<value/>
<description>
Optional comma separated list of credential providers, a list
which is prepended to that set in hadoop.security.credential.provider.path
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.arn</name>
<value/>
<description>
AWS ARN for the role to be assumed.
Required if the spark.hadoop.fs.s3a.aws.credentials.provider contains
org.apache.hadoop.spark.hadoop.fs.s3a.AssumedRoleCredentialProvider
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.session.name</name>
<value/>
<description>
Session name for the assumed role, must be valid characters according to
the AWS APIs.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
If not set, one is generated from the current Hadoop/Kerberos username.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.policy</name>
<value/>
<description>
JSON policy to apply to the role.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.session.duration</name>
<value>30m</value>
<description>
Duration of assumed roles before a refresh is attempted.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
Range: 15m to 1h
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.sts.endpoint</name>
<value/>
<description>
AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.assumed.role.credentials.provider</name>
<value>org.apache.hadoop.spark.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
<description>
List of credential providers to authenticate with the STS endpoint and
retrieve short-lived role credentials.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
If unset, uses "org.apache.hadoop.spark.hadoop.fs.s3a.SimpleAWSCredentialsProvider".
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.connection.ssl.enabled</name>
<value>true</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.endpoint</name>
<description>AWS S3 endpoint to connect to. An up-to-date list is
provided in the AWS Documentation: regions and endpoints. Without this
property, the standard region (s3.amazonaws.com) is assumed.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.path.style.access</name>
<value>false</value>
<description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.host</name>
<description>Hostname of the (optional) proxy server for S3 connections.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.port</name>
<description>Proxy server port. If this property is not set
but spark.hadoop.fs.s3a.proxy.host is, port 80 or 443 is assumed (consistent with
the value of spark.hadoop.fs.s3a.connection.ssl.enabled).
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.username</name>
<description>Username for authenticating with proxy server.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.password</name>
<description>Password for authenticating with proxy server.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.domain</name>
<description>Domain for authenticating with proxy server.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.proxy.workstation</name>
<description>Workstation for authenticating with proxy server.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.attempts.maximum</name>
<value>20</value>
<description>How many times we should retry commands on transient errors.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.connection.establish.timeout</name>
<value>5000</value>
<description>Socket connection setup timeout in milliseconds.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.connection.timeout</name>
<value>200000</value>
<description>Socket connection timeout in milliseconds.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.socket.send.buffer</name>
<value>8192</value>
<description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.socket.recv.buffer</name>
<value>8192</value>
<description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.paging.maximum</name>
<value>5000</value>
<description>How many keys to request from S3 when doing
directory listings at a time.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.threads.max</name>
<value>10</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being
terminated.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.multipart.size</name>
<value>100M</value>
<description>How big (in bytes) to split upload or copy operations up into.
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.multipart.threshold</name>
<value>2147483647</value>
<description>How big (in bytes) to split upload or copy operations up into.
This also controls the partition size in renamed files, as rename() involves
copying the source file(s).
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.multiobjectdelete.enable</name>
<value>true</value>
<description>When enabled, multiple single-object delete requests are replaced by
a single 'delete multiple objects'-request, reducing the number of requests.
Beware: legacy S3-compatible object stores might not support this request.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.acl.default</name>
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
or BucketOwnerFullControl.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.multipart.purge</name>
<value>false</value>
<description>True if you want to purge existing multipart uploads that may not have been
completed/aborted correctly. The corresponding purge age is defined in
spark.hadoop.fs.s3a.multipart.purge.age.
If set, when the filesystem is instantiated then all outstanding uploads
older than the purge age will be terminated -across the entire bucket.
This will impact multipart uploads by other applications and users. so should
be used sparingly, with an age value chosen to stop failed uploads, without
breaking ongoing operations.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.multipart.purge.age</name>
<value>86400</value>
<description>Minimum age in seconds of multipart uploads to purge
on startup if "spark.hadoop.fs.s3a.multipart.purge" is true
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.server-side-encryption-algorithm</name>
<description>Specify a server-side encryption algorithm for s3a: file system.
Unset by default. It supports the following values: 'AES256' (for SSE-S3),
'SSE-KMS' and 'SSE-C'.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.server-side-encryption.key</name>
<description>Specific encryption key to use if spark.hadoop.fs.s3a.server-side-encryption-algorithm
has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
you'll be using your default's S3 KMS key, otherwise you should set this property to
the specific KMS key id.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.signing-algorithm</name>
<description>Override the default signing algorithm so legacy
implementations can still be used
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.block.size</name>
<value>32M</value>
<description>Block size to use when reading files using s3a: file system.
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value>
<description>Comma separated list of directories that will be used to buffer file
uploads to.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.fast.upload.buffer</name>
<value>disk</value>
<description>
The buffering mechanism to for data being written.
Values: disk, array, bytebuffer.
"disk" will use the directories listed in spark.hadoop.fs.s3a.buffer.dir as
the location(s) to save data prior to being uploaded.
"array" uses arrays in the JVM heap
"bytebuffer" uses off-heap memory within the JVM.
Both "array" and "bytebuffer" will consume memory in a single stream up to the number
of blocks set by:
spark.hadoop.fs.s3a.multipart.size * spark.hadoop.fs.s3a.fast.upload.active.blocks.
If using either of these mechanisms, keep this value low
The total number of threads performing work across all threads is set by
spark.hadoop.fs.s3a.threads.max, with spark.hadoop.fs.s3a.max.total.tasks values setting the number of queued
work items.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.fast.upload.active.blocks</name>
<value>4</value>
<description>
Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.readahead.range</name>
<value>64K</value>
<description>Bytes to read ahead during a seek() before closing and
re-opening the S3 HTTP connection. This option will be overridden if
any call to setReadahead() is made to an open stream.
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.user.agent.prefix</name>
<value></value>
<description>
Sets a custom value that will be prepended to the User-Agent header sent in
HTTP requests to the S3 back-end by S3AFileSystem. The User-Agent header
always includes the Hadoop version number followed by a string generated by
the AWS SDK. An example is "User-Agent: Hadoop 2.8.0, aws-sdk-java/1.10.6".
If this optional property is set, then its value is prepended to create a
customized User-Agent. For example, if this configuration property was set
to "MyApp", then an example of the resulting User-Agent would be
"User-Agent: MyApp, Hadoop 2.8.0, aws-sdk-java/1.10.6".
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.metadatastore.authoritative</name>
<value>false</value>
<description>
When true, allow MetadataStore implementations to act as source of
truth for getting file status and directory listings. Even if this
is set to true, MetadataStore implementations may choose not to
return authoritative results. If the configured MetadataStore does
not support being authoritative, this setting will have no effect.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.metadatastore.impl</name>
<value>org.apache.hadoop.spark.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
<description>
Fully-qualified name of the class that implements the MetadataStore
to be used by s3a. The default class, NullMetadataStore, has no
effect: s3a will continue to treat the backing S3 service as the one
and only source of truth for file and directory metadata.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.cli.prune.age</name>
<value>86400000</value>
<description>
Default age (in milliseconds) after which to prune metadata from the
metadatastore when the prune command is run. Can be overridden on the
command-line.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.impl</name>
<value>org.apache.hadoop.spark.hadoop.fs.s3a.S3AFileSystem</value>
<description>The implementation class of the S3A Filesystem</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.region</name>
<value></value>
<description>
AWS DynamoDB region to connect to. An up-to-date list is
provided in the AWS Documentation: regions and endpoints. Without this
property, the S3Guard will operate table in the associated S3 bucket region.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.table</name>
<value></value>
<description>
The DynamoDB table name to operate. Without this property, the respective
S3 bucket name will be used.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.table.create</name>
<value>false</value>
<description>
If true, the S3A client will create the table if it does not already exist.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.table.capacity.read</name>
<value>500</value>
<description>
Provisioned throughput requirements for read operations in terms of capacity
units for the DynamoDB table. This config value will only be used when
creating a new DynamoDB table, though later you can manually provision by
increasing or decreasing read capacity as needed for existing tables.
See DynamoDB documents for more information.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.table.capacity.write</name>
<value>100</value>
<description>
Provisioned throughput requirements for write operations in terms of
capacity units for the DynamoDB table. Refer to related config
spark.hadoop.fs.s3a.s3guard.ddb.table.capacity.read before usage.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.max.retries</name>
<value>9</value>
<description>
Max retries on batched DynamoDB operations before giving up and
throwing an IOException. Each retry is delayed with an exponential
backoff timer which starts at 100 milliseconds and approximately
doubles each time. The minimum wait before throwing an exception is
sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking
before throwing an IOException.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.s3guard.ddb.background.sleep</name>
<value>25</value>
<description>
Length (in milliseconds) of pause between each batch of deletes when
pruning metadata. Prevents prune operations (which can typically be low
priority background operations) from overly interfering with other I/O
operations.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.retry.limit</name>
<value>${spark.hadoop.fs.s3a.attempts.maximum}</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.retry.interval</name>
<value>500ms</value>
<description>
Interval between attempts to retry operations for any reason other
than S3 throttle errors.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.retry.throttle.limit</name>
<value>${spark.hadoop.fs.s3a.attempts.maximum}</value>
<description>
Number of times to retry any throttled request.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.retry.throttle.interval</name>
<value>1000ms</value>
<description>
Interval between retry attempts on throttled requests.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.name</name>
<value>file</value>
<description>
Committer to create for output to S3A, one of:
"file", "directory", "partitioned", "magic".
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.magic.enabled</name>
<value>false</value>
<description>
Enable support in the filesystem for the S3 "Magic" committer.
When working with AWS S3, S3Guard must be enabled for the destination
bucket, as consistent metadata listings are required.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.threads</name>
<value>8</value>
<description>
Number of threads in committers for parallel operations on files
(upload, commit, abort, delete...)
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.staging.tmp.path</name>
<value>tmp/staging</value>
<description>
Path in the cluster filesystem for temporary data.
This is for HDspark.hadoop.fs, not the local filesystem.
It is only for the summary data of each file, not the actual
data being committed.
Using an unqualified path guarantees that the full path will be
generated relative to the home directory of the user creating the job,
hence private (assuming home directory permissions are secure).
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.staging.unique-filenames</name>
<value>true</value>
<description>
Option for final files to have a unique name through job attempt info,
or the value of spark.hadoop.fs.s3a.committer.staging.uuid
When writing data with the "append" conflict option, this guarantees
that new data will not overwrite any existing data.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.staging.conflict-mode</name>
<value>fail</value>
<description>
Staging committer conflict resolution policy.
Supported: "fail", "append", "replace".
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads</name>
<value>true</value>
<description>
Should the staging committers abort all pending uploads to the destination
directory?
Changing this if more than one partitioned committer is
writing to the same destination tree simultaneously; otherwise
the first job to complete will cancel all outstanding uploads from the
others. However, it may lead to leaked outstanding uploads from failed
tasks. If disabled, configure the bucket lifecycle to remove uploads
after a time period, and/or set up a workflow to explicitly delete
entries. Otherwise there is a risk that uncommitted uploads may run up
bills.
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.list.version</name>
<value>2</value>
<description>
Select which version of the S3 SDK's List Objects API to use. Currently
support 2 (default) and 1 (older API).
</description>
</property>
<property>
<name>spark.hadoop.fs.s3a.etag.checksum.enabled</name>
<value>false</value>
<description>
Should calls to getFileChecksum() return the etag value of the remote
object.
WARNING: if enabled, distcp operations between HDspark.hadoop.fs and S3 will fail unless
-skipcrccheck is set.
</description>
</property>
Improving Performance for S3A
This section includes tips for improving performance when working with data stored in Amazon S3
The bandwidth between the Hadoop cluster and Amazon S3 is the upper limit to how fast data can be copied into S3. The further the Hadoop cluster is from the Amazon S3 installation, or the narrower the network connection is, the longer the operation will take. Even a Hadoop cluster deployed within Amazon's own infrastructure may encounter network delays from throttled VM network connections.
Network bandwidth limits notwithstanding, there are some options which can be used to tune the performance of an upload:
Working with Local S3 Buckets
A foundational step to getting good performance is working with buckets close to the Hadoop cluster, where "close" is measured in network terms.
Maximum performance is achieved from working with S3 buckets in the same AWS region as the cluster. For example, if your cluster is in North Virginia ("US East"), you will achieve best performance if your S3 bucket is in the same region.
In addition to improving performance, working with local buckets ensures that no bills are incurred for reading from the bucket.
Configuring and Tuning S3A Block Upload
Because of the nature of the S3 object store, data written to an S3A OutputStream is not written incrementally — instead, by default, it is buffered to disk until the stream is closed in its close() method. This can make output slow because the execution time for OutputStream.close() is proportional to the amount of data buffered and inversely proportional to the bandwidth between the host to S3; that is O(data/bandwidth). Other work in the same process, server, or network at the time of upload may increase the upload time.
In summary, the further the process is from the S3 store, or the smaller the EC2 VM is, the longer it will take complete the work. This can create problems in application code:
Code often assumes that the close() call is fast; the delays can create bottlenecks in operations.
Very slow uploads sometimes cause applications to time out - generally, threads blocking during the upload stop reporting progress, triggering timeouts.
Streaming very large amounts of data may consume all disk space before the upload begins.
Tuning S3A Uploads
When data is written to S3, it is buffered locally and then uploaded in multi-Megabyte blocks, with the final upload taking place as the file is closed.
The following major configuration options are available for the S3A block upload options. These are used whenever data is written to S3.
Table 3.3. S3A Fast Upload Configuration Options
Parameter |
Default Value |
Description |
spark.hadoop.fs.s3a.multipart.size |
100M |
Defines the size (in bytes) of the blocks into which the upload or copy operations will be split up. A suffix from the set {K,M,G,T,P} may be used to scale the numeric value. |
spark.hadoop.fs.s3a.fast.upload.active.blocks |
8 |
Defines the maximum number of blocks a single output stream can have active uploading, or queued to the central FileSystem instance's pool of queued operations. This stops a single stream overloading the shared thread pool. |
spark.hadoop.fs.s3a.buffer.dir |
Empty value |
A comma separated list of temporary directories use for storing blocks of data prior to their being uploaded to S3. When unset (by default), the Hadoop temporary directory hadoop.tmp.dir is used. |
spark.hadoop.fs.s3a.fast.upload.buffer |
disk |
The fs.s3a.fast.upload.buffer determines the buffering mechanism to use when uploading data. Allowed values are: disk, array, bytebuffer:
Both "array" and "bytebuffer" will consume memory in a single stream up to the number of blocks set by: fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks. If using either of these mechanisms, keep this value low. The total number of threads performing work across all threads is set by fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued work items. |
Note that:
- If the amount of data written to a stream is below that set in spark.hadoop.fs.s3a.multipart.size, the upload takes place after the application has written all its data.
- The maximum size of a single file in S3 is one thousand blocks, which, for uploads means 10000 * spark.hadoop.fs.s3a.multipart.size. Too A small value of spark.hadoop.fs.s3a.multipart.size can limit the maximum size of files.
- Incremental writes are not visible; the object can only be listed or read when the multipart operation completes in the close() call, which will block until the upload is completed.
Buffering uploads to disk or RAMs
This is the default buffer mechanism. The amount of data which can be buffered is limited by the amount of available disk space.
When spark.hadoop.fs.s3a.fast.upload.buffer is set to "disk", all data is buffered to local hard disks prior to upload. This minimizes the amount of memory consumed, and so eliminates heap size as the limiting factor in queued uploads.
Buffering uploads in Byte Buffers
When spark.hadoop.fs.s3a.fast.upload.buffer is set to "bytebuffer", all data is buffered in "direct" ByteBuffers prior to upload. This may be faster than buffering to disk in cases such as when disk space is small there may not be much disk space to buffer with (for example, when using "tiny" EC2 VMs).
The ByteBuffers are created in the memory of the JVM, but not in the Java Heap itself. The amount of data which can be buffered is limited by the Java runtime, the operating system, and, for YARN applications, the amount of memory requested for each container.
The slower the upload bandwidth to S3, the greater the risk of running out of memory — and so the more care is needed in tuning the upload thread settings to reduce the maximum amount of data which can be buffered awaiting upload (see below).
Buffering Uploads with Array Buffers
When spark.hadoop.fs.s3a.fast.upload.buffer is set to "array", all data is buffered in byte arrays in the JVM's heap prior to upload. This may be faster than buffering to disk.
The amount of data which can be buffered is limited by the available size of the JVM heap heap. The slower the write bandwidth to S3, the greater the risk of heap overflows. This risk can be mitigated by tuning the upload thread settings (see below).
Thread Tuning for S3A Data Upload
Both the array and bytebuffer buffer mechanisms can consume very large amounts of memory, on-heap or off-heap respectively. The disk buffer mechanism does not use much memory up, but it consumes hard disk capacity.
If there are many output streams being written to in a single process, the amount of memory or disk used is the multiple of all stream's active memory and disk use.
You may need to perform careful tuning to reduce the risk of running out memory, especially if the data is buffered in memory. There are a number parameters which can be tuned:
Table 3.4. S3A Upload Tuning Options
Parameter |
Default Value |
Description |
spark.hadoop.fs.s3a.fast.upload.active.blocks |
4 |
Maximum number of blocks a single output stream can have active (uploading, or queued to the central FileSystem instance's pool of queued operations). This stops a single stream overloading the shared thread pool. |
spark.hadoop.fs.s3a.threads.max |
10 |
The total number of threads available in the filesystem for data uploads or any other queued filesystem operation. |
spark.hadoop.fs.s3a.max.total.tasks |
5 |
The number of operations which can be queued for execution |
spark.hadoop.fs.s3a.threads.keepalivetime |
60 |
The number of seconds a thread can be idle before being terminated. |
When the maximum allowed number of active blocks of a single stream is reached, no more blocks can be uploaded from that stream until one or more of those active block uploads completes. That is, a write() call which would trigger an upload of a now full datablock will instead block until there is capacity in the queue.
Consider the following:
- As the pool of threads set in spark.hadoop.fs.s3a.threads.max is shared (and intended to be used across all threads), a larger number here can allow for more parallel operations. However, as uploads require network bandwidth, adding more threads does not guarantee speedup.
- The extra queue of tasks for the thread pool (spark.hadoop.fs.s3a.max.total.tasks) covers all ongoing background S3A operations.
- When using memory buffering, a small value of spark.hadoop.fs.s3a.fast.upload.active.blocks limits the amount of memory which can be consumed per stream.
- When using disk buffering, a larger value of spark.hadoop.fs.s3a.fast.upload.active.blocks does not consume much memory. But it may result in a large number of blocks to compete with other filesystem operations.
We recommend a low value of spark.hadoop.fs.s3a.fast.upload.active.blocks — enough to start background upload without overloading other parts of the system. Then experiment to see if higher values deliver more throughput — especially from VMs running on EC2.
Optimizing S3A read performance for different file types
The S3A filesystem client supports the notion of input policies, similar to that of the POSIX fadvise() API call. This tunes the behavior of the S3A client to optimize HTTP GET requests for reading different filetypes. To optimize HTTP GET requests, you can take advantage of the S3A input policy option spark.hadoop.fs.s3a.experimental.input.fadvise:
Policy |
Description |
"normal" |
This starts off as "sequential": it asks for the whole file. As soon as the application tries to seek backwards in the file it switches into "random" IO mode. This is not quite as efficient for Random IO as the "random" mode, because that first read may have to be aborted. However, because it is adaptive, it is the best choice if you do not know the data formats which will be read. |
"sequential" (default) |
Read through the file, possibly with some short forward seeks. The whole document is requested in a single HTTP request; forward seeks within the readahead range are supported by skipping over the intermediate data. This leads to maximum read throughput, but with very expensive backward seeks. |
"random" |
Optimized for random IO, specifically the Hadoop `PositionedReadable` operations — though `seek(offset); read(byte_buffer)` also benefits. Rather than ask for the whole file, the range of the HTTP request is set to that of the length of data desired in the `read` operation - rounded up to the readahead value set in `setReadahead()` if necessary. By reducing the cost of closing existing HTTP requests, this is highly efficient for file IO accessing a binary file through a series of PositionedReadable.read() and PositionedReadable.readFully() calls. Sequential reading of a file is expensive, as now many HTTP requests must be made to read through the file. |
For operations simply reading through a file (copying, DistCp, reading gzip or other compressed formats, parsing .csv files, and so on) the sequential policy is appropriate. This is the default, so you don't need to configure it.
For the specific case of high-performance random access IO (for example, accessing ORC files), you may consider using the random policy in the following circumstances:
- Data is read using the PositionedReadable API.
- There are long distance (many MB) forward seeks.
- Backward seeks are as likely as forward seeks.
- There is little or no use of single character read() calls or small read(buffer) calls.
- Applications are running close to the Amazon S3 data store; that is, the EC2 VMs on which the applications run are in the same region as the Amazon S3 bucket.
You must set the desired fadvise policy in the configuration option spark.hadoop.fs.s3a.experimental.input.fadvise when the filesystem instance is created. It can only be set on a per-filesystem basis, not on a per-file-read basis. You can set it in core-site.xml:
<property>
<name>spark.hadoop.fs.s3a.experimental.input.fadvise</name>
<value>random</value>
</property>
Or, you can set it in the spark-defaults.conf configuration of Spark:
spark.hadoop.fs.s3a.experimental.input.fadvise random
Be aware that this random access performance comes at the expense of sequential IO — which includes reading files compressed with gzip.
Improving Load-Balancing Behavior for S3
S3 uses a set of front-end servers to provide access to the underlying data. The decision about which front-end server to use is handled via load-balancing DNS service. When the IP address of an S3 bucket is looked up, the choice of which IP address to return to the client is made based on the current load of the front-end servers.
Over time, the load across the front-end changes, so those servers that are considered "lightly loaded" change. This means that if the DNS value is cached for any length of time, applications may end up talking to an overloaded server; or, in the case of failures, they may end up trying to talk to a server that is no longer there.
And, for historical security reasons, in the era of applets, the DNS TTL of a JVM is set to "infinity" by default.
To improve AWS load-balancing, set the DNS time-to-live of an application which works with Amazon S3 to something lower than the default. Refer to Setting the JVM TTL for DNS Name Lookups in the AWS documentation.
S3 Performance Checklist
Use this checklist to ensure optimal performance when working with data in S3.
Checklist for Data
- [ ] Amazon S3 bucket is in same region as the EC2-hosted cluster. Learn more
- [ ] The directory layout is "shallow". For directory listing performance, the directory layout prefers "shallow" directory trees with many files over deep directory trees with only a few files per directory.
- [ ] The "pseudo" block size set in fs.s3a.block.size is appropriate for the work to be performed on the data.
- [ ] Copy to HDFS any data that needs to be repeatedly read to HDFS.
Checklist for Cluster Configs
- [ ] Set yarn.scheduler.capacity.node-locality-delay to 0 to improve container launch times.
- [ ] When copying data using DistCp, use the following performance optimizations.
- [ ] If planning to use Hive with S3, review Improving Hive Performance with Cloud Object Stores.
- [ ] If planning to use Spark with S3, review Improving Spark Performance with Cloud Object Stores.
Checklist for Code
- [ ] Application does not make rename() calls. Where it does, it does not assume the operation is immediate.
- [ ] Application does not assume that delete() is near-instantaneous.
- [ ] Application uses FileSystem.listFiles(path, recursive=true) to list a directory tree.
- [ ] Application prefers forward seeks through files, rather than full random IO.
- [ ] If making "random" IO through seek() and read() sequences or and Hadoop's PositionedReadable API, fs.s3a.experimental.input.fadvise is set to random. Learn more
Using S3Guard for Consistent S3 Metadata
S3Guard mitigates the issues related to S3's eventual consistency on listings by using a table on Amazon DynamoDB as a consistent metadata store. This guarantees a consistent view of data stored in S3. In addition, S3Guard may improve query performance by reducing the number of times S3 needs to be contacted, —as DynamoDB is significantly faster