James Wing By James Wing on 2016-10-31

A frequent goal for an Apache NiFi flow is to ingest data into S3 object storage. You might be using S3 as a Data Lake. Maybe S3 is an intermediate destination, awaiting another pipeline to Redshift or HDFS. S3 has become a default parking lot for data because S3 is general-purpose, cheap, accessible, and reliable. But it isn't always pretty, the results of accumulated data frequently exemplifies the "Data Swamp" label.

Just saving data on S3 doesn't make it an analytic data store. But it could be. In this post, we'll explore how Apache NiFi can help you get your S3 data storage into proper shape for analytic processing with EMR, Hadoop, Drill, and other tools.

Why Ingest to S3

There are a lot of good reasons to include S3 in your data pipeline, both as an intermediate step and as an end state:

  • Easy - simple to use, little or no maintenance
  • Cost - not only cheap, but also fairly transparent costs
  • Reliable and Available - better than most other storage systems we use
  • Accessible - to many tools and connecting systems like Elastic MapReduce (EMR) and Apache Drill, and it can be made available to customers or partners
  • Secure - a variety of security options to control access
  • Performance - not the best, but it's frequently good enough

Despite these many advantages, S3 is not a miracle alternative to HDFS. Disadvantages include:

  • No data locality - unlike HDFS, S3 is always remote storage. However, colocating in the same region as your compute resources remains important.
  • Does not support appending data to existing files, an important difference from HDFS
  • Supports a subset of file system operations

How Is This Hard?

You might wonder how I can simultaneously claim that S3 storage is easy, but this article is also about how it is hard? Huh? What happens is that we cheaply and easily store a bunch of data, but then have a hard time processing it later. I would clarify this as:

It's easy to store files in S3, it's always hard to maintain an organized, governed data set

Some organization and governance is in order. However, we don't want to make this too hard, or we'll miss out on the cheap, easy, accessible nature of S3 storage. The solutions we are looking for are quick but effective guidelines for storing data in S3. Apache NiFi will help us apply improvements incrementally as we learn from our data.

Considerations for Analytic Data Storage in S3

Our purpose here is analytics data, so our concerns about S3 storage are not quite the same as general-purpose files.

  • Organization - A nice Data Lake in an S3 bucket would have directories and files should be structured similarly to what you might do in HDFS. Separate data sets. Separate raw data from processed data. Structure directory trees for time-based data by year, month, day, etc. Keep in mind this key naming scheme is not optimal for S3 transactional throughput, optimal throughput requires an even lexicographical distribution of keys. No worries, NiFi will help us buffer many incoming files into fewer S3 writes.
  • Cost - S3 is reasonably priced, but nobody wants to pay more than necessary.
  • Accessibility - Make it easy for your processing tools to work with your data:
    • Format - CSV, TSV, Avro, Json, etc., etc.
    • Compression - Good balance between size reduction (low storage cost and network usage), speed (CPU), and compatibility with analysis tools
    • Size - Right-size your files for analysis.
    • Frequency - For time-sensitive collection, make sure you are delivering at regular intervals.
  • Security - Configure server-side-encryption and permissions not controlled at the bucket level.
  • Expiration - It's best to configure lifecycle expiration up front, partially for cost savings, but this may also play a role in your data set definition.

NiFi Solutions

Apache NiFi offers solutions to many of these problems for storage in S3, HDFS, and other file systems. For S3 storage, there are two processors in particular that you must know:

  • PutS3Object - Obviously, you use this to send files to S3. Importantly, it can also define S3 properties like custom security access rules, server-side encryption, and expiration.
  • MergeContent - Aggregates flowfiles, based on a combination of rules for count, total size, and elapsed time.

The table below maps our list of considerations above to the NiFi features that will provide elements of the solution:

Problem Solution
Organization NiFi attributes and expression language allow flexible mapping of flowfiles to S3 keys by format, dates, purpose, etc.
Format NiFi includes a number of processors for format conversion, such as ConvertCSVToAvro, ConvertAvroToJSON, etc. ExecuteScript or custom processors can fulfill custom conversions.
Size MergeContent includes many options for aggregating based on record count, bytes, and time windows
Compression MergeContent can compress Avro and other container formats. CompressContent performs arbitrary compression
Frequency MergeContent can balance frequency with size concerns
Security PutS3Object allows per-object access rules using expression language. Server-side encryption can be defined to provide additional protection to data at rest in S3.
Expiration PutS3Object can customize S3 expiration to named rules configured on your bucket.

Twitter to S3 Example

As an example, I built a NiFi flow pulling data from the ubiquitous GetTwitter processor, and storing the tweets in S3. Along the way, I went through the considerations outlined above to create a more proper data set in S3, accessible to both Apache Drill and Hive on Elastic MapReduce.

The flow template is available as a Gist twitter_to_s3_sample_v1.xml, with screenshots and explanatory notes below.

Twitter to S3 Flow

Twitter to S3 Flow

Twitter data source process group

Twitter data source process group

Process group extracting summaries of Tweets in Avro format

Process group extracting summaries of Tweets in Avro format

Process group aggregating original JSON Tweets

Process group aggregating original JSON Tweets

Process group sending Tweet container files to S3

Process group sending Tweet container files to S3

Organizing Tweets

In the example flow, I decided to save two copies of each tweet - one copy of the raw, unmodified JSON response from GetTwitter, and a summarized extract saved in Avro format. Keeping the raw data preserves our options for future processing, while the summarized tweets will make immediate analysis easier. Our S3 bucket's directory structure will break down like this:

tweet-lake
 |-- raw
 |   |-- 2016
 |       |-- 10
 |           |-- 30
 |           +-- 31
 +-- summary
     |-- 2016
         |-- 10
             |-- 30
             +-- 31

Within this structure, we will have directories for year, month, and day to give us some simple time-based query options. A typical NiFi pattern is to use two flowfile attributes, path and filename, and we will use this later in PutS3Object. Our paths will consist of the raw/summary bundle type, bundle.type, plus levels for the year, month, and day. In NiFi expression language:

tweet-lake/${bundle.type}/${now():format("yyyy/MM/dd")}

We'll repeat some of the date information in the file names, so they stand on their own. A good alternative here would be to start the filename with the hour, minute, second, so additional time filtering could be applied on partial S3 keys. I aso want to add the count of tweets contained in the bundle and a flowfile UUID for traceability and uniqueness. In NiFi expression language:

{now():format("yyyy-MM-dd'T'HH-mm-ss")}-${merge.count}-${bundle.type}-tweets-${uuid}.${'format'}

A sample ${path}/${filename} result would be like this:

tweet-lake/raw/2016/10/28/2016-10-28T19-48-21-1000-raw-tweets-535428fb-3aa9-4823-8d4d-65afa6b1437d.json

But which date? In my example I use the current date now() when the container files are renamed. While this is probably close to the timestamp of the Tweet, it is not the same thing, and using the Tweet timestamp would improve my example.

This organization is applied by an UpdateAttribute processor immediately before sending the to S3. Organizing S3 files using ${path} and ${filename}

Raw JSON Tweets

Storing the raw Tweet JSON is fairly straightforward, we want to bundle them into groups.

I merged the raw Tweets using a MergeContent processor, with a Binary Concatenation format, a newline delimiter, and configured the maximum bin size as 100,000 records no older than 1 hour. In practice, it fills up 100,000 before an hour passes.

This results in files about 430 - 440 MB files. A bit bigger than we need. I added a CompressContent processor to apply Gzip compression, this gets the bundled file down to about 75 - 80 MB.

MergeContent properties for raw JSON Tweets

Avro Summary Tweets

Converting raw JSON Tweets to an Avro summary requires an Avro schema and the ConvertJSONToAvro processor. One of the nice features of the Avro conversion is that we can take a proper subset of the original JSON simply by using a subset schema. I made a simple Avro schema for a summarized Tweet, with just id, timestamp, text, and simplified user, hashtags, and user mention entities.

I used MergeContent to bundle up summary Tweets using the Avro format, with settings for a maximum of 100,000 records and a maximum age of 15 minutes. In my testing, this resulted in one file every 15 minutes with about 35,000 records each, and about 12 - 14 MB in size. The Avro files from NiFi are compressed with Snappy. ~15 MB files are smaller than I might like, but I think 15 minutes is a good interval for the summaries, and I don't want to deliver less frequently.

MergeContent properties for Avro summary Tweets

Sending to S3

Sending the flowfiles to S3 is the easy part. No surprise, I used the PutS3Object processor. An important setting is the Object Key, set to ${path}/${filename} as described in the Organization section above. Don't forget to configure Server-Side Encryption. In my case, permissions and expiration rules have been set at the bucket level, I'm not going to override them on a per-object basis. But it's nice to know that PutS3Object can support that when you need it.

Test Query from Drill

I used Apache Drill to run a test query directly against my S3 data set. Drill provides a SQL interface over many data sources, including both Avro and JSON files, and including data stored on S3.

0: jdbc:drill:zk=local> SELECT COUNT(*) FROM s3scratch.`tweet-lake/summary/2016/10/31`;
+----------+
|  EXPR$0  |
+----------+
| 3276577  |
+----------+
1 row selected (702.782 seconds)

Not quick, I know, but this is a test setup running on a laptop. And querying one of the gzipped JSON bundles:

0: jdbc:drill:zk=local> SELECT COUNT(*) FROM s3scratch.`tweet-lake/raw/2016/10/31/2016-10-31T04-05-19-100000-raw-tweets-0781f16e-35f6-42ac-acb0-00c86d4b64c3.json.gz`;
+---------+
| EXPR$0  |
+---------+
| 100000  |
+---------+
1 row selected (331.841 seconds)

It is a basic measurement of success that our data is accessible from a query tool like Drill.

For Further Reference