We demonstrate two semantically equivalent HiveQL queries that use Amazon DynamoDB's storage handler but compile to MapReduce jobs with vastly different performance.
The combination of Amazon DynamoDB, Amazon Elastic MapReduce (Amazon EMR) and Apache Hive is a useful one when storing and analyzing large amounts of data. We can use Amazon DynamoDB to store time-stamped information across several applications by creating a table, myData, with the following structure:
We set the primary key type to be a hash and range. We set the hash attribute name to appId and type to number, and we set the range attribute name to ts (for a Unix timestamp) and type to number. We can store any number of items with an arbitrary set of key-value pairs that are indexed by an appId and ts. If we want to store more than one item with the same appId and ts then we can use time-based UUIDs instead of plain Unix timestamps.
In the following example we set the read and write provisioned throughputs to one unit each. We populate the table using a Node.js script. This script sequentially adds 100 items to the table. Each item contains the two required attributes: appId, which ranges from 0 to 9, and ts, which ranges from 0 to 99 for each appId. The script warns when it exceeds the level of provisioned read throughput for the table, automatically backing off and retrying to ensure that all items are properly recorded.
Now, we spin up an Amazon EMR cluster, myCluster, to perform the analyses. We chose "Hive program" for the job flow and "Start an Interactive Hive Session". We leave all other options unchanged: the master instance type is small (m1.small), the core instance count is two, the core instance type is small (m1.small) and the task instance count is zero.
When the cluster has started, we SSH into the master node and start an interactive Hive session. We connect to myData as follows:
CREATE EXTERNAL TABLE myData (appId BIGINT, ts BIGINT)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES (
'dynamodb.table.name' = 'myData',
'dynamodb.column.mapping' = 'appId:appId,ts:ts',
'dynamodb.throughput.read.percent' = '1.0'
);
We can count the total number of items in the table using:
SELECT count(1) FROM myData;
This correctly returns 1000 and takes approximately 1045 seconds to complete. The bottleneck is the table's provisioned read throughput. It takes approximately 1000 seconds to read the 1000 items; the remaining time is spent compiling the HiveQL statements into MapReduce jobs and submitting the jobs to the cluster for execution. If we want to improve the query's performance we can increase the provisioned read throughput.
Suppose we only want to analyze a subset of the data in our table. Of our ten appIds, maybe only some of them are active at any one time and we only want to count the numbers of items with a specified appId. We can use the following:
SELECT count(1) FROM myData WHERE appId = 0;
This correctly returns 100 and takes approximately 143 seconds to complete. This is reasonable. The hash for the table is appId so we only need to read 100 items from the table, but of course we incur a similar penalty as before for compiling the HiveQL statements and submitting the jobs to the cluster.
What happens if we have more than one active application?
SELECT count(1) FROM myData WHERE appId = 0 OR appId = 9;
This correctly returns 200 but takes approximately 1037 seconds to complete! The query is scanning the entire table even though the attribute or column in the predicate is indexed.
What about the following query?
SELECT
sum(value)
FROM (
SELECT count(1) AS value FROM myData WHERE appId = 0
UNION ALL
SELECT count(1) AS value FROM myData WHERE appId = 9
) u;
This also returns 200 but takes approximately 327 seconds to complete. Amazon DynamoDB's storage handler (org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler) uses a query when there is only one condition on the hash in the WHERE clause. However, it uses a scan when there is more than one condition. This implementation is defined in hive-bigbird-handler.jar on the Amazon EMR cluster. Unfortunately, Amazon hasn't released the source-code for the storage handler. It is also against Amazon's rules to decompile this file (see 8.5 License Restrictions).
The HiveQL language abstracts away the procedural steps of a series of MapReduce jobs, allowing us to focus on what we want to retrieve and/or manipulate. However, this means that statements that are semantically equivalent, may compile to MapReduce jobs with vastly different performance. All course, all non-trivial abstractions are, to some degree, leaky.
The combination of Amazon DynamoDB, Amazon Elastic MapReduce (Amazon EMR) and Apache Hive is a useful one when storing and analyzing large amounts of data. We can use Amazon DynamoDB to store time-stamped information across several applications by creating a table, myData, with the following structure:
We set the primary key type to be a hash and range. We set the hash attribute name to appId and type to number, and we set the range attribute name to ts (for a Unix timestamp) and type to number. We can store any number of items with an arbitrary set of key-value pairs that are indexed by an appId and ts. If we want to store more than one item with the same appId and ts then we can use time-based UUIDs instead of plain Unix timestamps.
In the following example we set the read and write provisioned throughputs to one unit each. We populate the table using a Node.js script. This script sequentially adds 100 items to the table. Each item contains the two required attributes: appId, which ranges from 0 to 9, and ts, which ranges from 0 to 99 for each appId. The script warns when it exceeds the level of provisioned read throughput for the table, automatically backing off and retrying to ensure that all items are properly recorded.
Now, we spin up an Amazon EMR cluster, myCluster, to perform the analyses. We chose "Hive program" for the job flow and "Start an Interactive Hive Session". We leave all other options unchanged: the master instance type is small (m1.small), the core instance count is two, the core instance type is small (m1.small) and the task instance count is zero.
When the cluster has started, we SSH into the master node and start an interactive Hive session. We connect to myData as follows:
CREATE EXTERNAL TABLE myData (appId BIGINT, ts BIGINT)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES (
'dynamodb.table.name' = 'myData',
'dynamodb.column.mapping' = 'appId:appId,ts:ts',
'dynamodb.throughput.read.percent' = '1.0'
);
We can count the total number of items in the table using:
SELECT count(1) FROM myData;
This correctly returns 1000 and takes approximately 1045 seconds to complete. The bottleneck is the table's provisioned read throughput. It takes approximately 1000 seconds to read the 1000 items; the remaining time is spent compiling the HiveQL statements into MapReduce jobs and submitting the jobs to the cluster for execution. If we want to improve the query's performance we can increase the provisioned read throughput.
Suppose we only want to analyze a subset of the data in our table. Of our ten appIds, maybe only some of them are active at any one time and we only want to count the numbers of items with a specified appId. We can use the following:
SELECT count(1) FROM myData WHERE appId = 0;
This correctly returns 100 and takes approximately 143 seconds to complete. This is reasonable. The hash for the table is appId so we only need to read 100 items from the table, but of course we incur a similar penalty as before for compiling the HiveQL statements and submitting the jobs to the cluster.
What happens if we have more than one active application?
SELECT count(1) FROM myData WHERE appId = 0 OR appId = 9;
This correctly returns 200 but takes approximately 1037 seconds to complete! The query is scanning the entire table even though the attribute or column in the predicate is indexed.
What about the following query?
SELECT
sum(value)
FROM (
SELECT count(1) AS value FROM myData WHERE appId = 0
UNION ALL
SELECT count(1) AS value FROM myData WHERE appId = 9
) u;
This also returns 200 but takes approximately 327 seconds to complete. Amazon DynamoDB's storage handler (org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler) uses a query when there is only one condition on the hash in the WHERE clause. However, it uses a scan when there is more than one condition. This implementation is defined in hive-bigbird-handler.jar on the Amazon EMR cluster. Unfortunately, Amazon hasn't released the source-code for the storage handler. It is also against Amazon's rules to decompile this file (see 8.5 License Restrictions).
The HiveQL language abstracts away the procedural steps of a series of MapReduce jobs, allowing us to focus on what we want to retrieve and/or manipulate. However, this means that statements that are semantically equivalent, may compile to MapReduce jobs with vastly different performance. All course, all non-trivial abstractions are, to some degree, leaky.
No comments:
Post a Comment