Amazon Athena is a recently launched service that provides interactive SQL queries over your data in S3. Athena uses the Hive Metastore to define your data structure, and Presto for processing queries. According to Amazon's marketing copy, "there’s no need for complex ETL jobs to prepare your data for analysis". Sounds Great!
Notice, however, the fine print -- it doesn't say "no ETL", just no "complex ETL". My first Athena project was to to query the summarized tweets I stored up from the earlier S3 ingest project. This did not work because I had stored the data in Avro format, which is not yet supported by Athena. Another data set I tried has a mix of CSV data files with JSON metadata, which Athena rejected.
What ETL do we need for Athena? Apache NiFi is a great tool for building an ingest pipeline to the Amazon Athena query service, and to other AWS data tools. In this post, I will explain how to set up a data set in S3 for Athena using Apache NiFi. Then we'll revisit the Twitter to S3 flow, optimizing for Athena.
Why use Amazon Athena?
Amazon Athena can form a nutritious part of a lean data solution. It allows interactive, exploratory queries of data in files on S3, similar to what you would get from Hive on Hadoop, but without having to spin up and pay for an Elastic MapReduce (EMR) cluster. Also like Hive, Athena may be an intermediate step towards EMR Hadoop, Spark, or Redshift as a tool to extract structured tabular data from source files. There is the additional aspect that you are probably going to put your data files on S3 anyways, so the marginal effort to expose it to Athena is low. Athena is priced based on the amount of data read to process queries. This makes Athena very attractive for data cases that might not fit an EMR Spark cluster or a Redshift instance.
Structuring S3 Data for Amazon Athena with NiFi
So Amazon Athena reads files from S3, and NiFi does a great job converting data and routing files to S3. How do they match up?
|Feature||Amazon Athena||Apache NiFi|
|Compression||Athena accepts Snappy, Gzip, and Zlib compression||NiFi's CompressContent processor does Snappy and Gzip|
|Text Formats||Athena supports delimited text formats including CSV, TSV, etc., defined per Apache Hive serdes||NiFi allows any format, but has good support delimited text|
|JSON||Athena supports JSON (one object per line)||NiFi has good support for JSON|
|Columnar Formats||Athena supports ORC and Parquet||NiFi 1.1+ supports ORC|
|Directory structure||Athena tables are Hive "External Tables" and must be defined waith a directory location in S3 like
||NiFi easily does this as part of writing files to S3 using NiFi Expression Language to define the S3 object key.|
|Partitioning||Athena can support partitions, but the Hive metastore must be updated||Again, NiFi can easily format the S3 keys for your data partitions|
Some other characteristics of Athena that you may run into include:
- File formats must be consistent - As a Hive store, Athena has no ability to ignore files within a table directory location. So your files must consistently conform to the stated structure.
- File extensions are significant - Some file extensions are read and interpreted, so a file with a
.gzextension is transparently un-gzipped.
- Columnar formats are read-only - Athena is read-only, so while it can read columnar formats like ORC and Parquet, it does not yet help you write them. Amazon suggests spinning up an EMR cluster for the conversion.
Querying Tweets with Athena
Let's modify the earlier Twitter to S3 sample to follow our Athena ETL guidance above. This is pretty easy, we will just use the previous "raw" Twitter JSON feed and make a few tweaks:
- Remove the previous "summary" Tweet path that used Avro files
- Use the previous raw Tweet path
- Keep the data in the raw JSON format
- Bundle Tweets into files of 1,000 or no older than 1 hour
- Keep gzip compression
The new flow looks astonishly, amazingly, coincidentally similar to our old S3 flow, but without the Avro summaries.
In the old flow, we created summaries of Tweets in Avro files as as part of the ingest pipeline, and queried those with Apache Drill. Since Athena doesn't read Avro files (yet), we'll use the Hive table definition to performs the summary function.
First, create a database in Athena:
CREATE DATABASE IF NOT EXISTS twitter;
Then create a table for Tweets:
CREATE EXTERNAL TABLE twitter.tweets ( id_str string, created_at string, text string, lang string, user STRUCT< id_str: string, name: string, screen_name: string, location: string, description: string, followers_count: bigint, friends_count: bigint >, entities STRUCT< hashtags: ARRAY< STRUCT< text: string > >, urls: ARRAY< STRUCT< url: string > >, user_mentions: ARRAY< STRUCT< screen_name: string, name: string, id_str: string > > > ) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION 's3://test-bucket/athena-tweets/raw';
Now we can query out the Tweet data using Presto:
SELECT * FROM twitter.tweets LIMIT 10;
And run aggregate queries across the data set:
SELECT format_datetime(parse_datetime(created_at, 'E MMM dd HH:mm:ss Z yyyy'), 'yyyy-MM-dd:HH') as tweet_hour, count(*) as count FROM tweets GROUP BY 1 ORDER BY 1 DESC;