James Wing By James Wing on 2017-02-14

Receiving and processing S3 event notifications is a common pattern for processing files written to S3. Event notifications convert S3 from just a key/value store to a stream of events. Handling these events is the best way to perform low-latency processing of S3 objects. This pattern consists of several components:

  • S3 bucket for data collection
  • SQS queue to receive S3 event notifications
  • Apache NiFi to process the notifications and incoming data files

The following diagram illustrates the solution:

Processing S3 Event Notifications

When to Design for S3 Event Notifications

Processing S3 event notification is not for every solution. It is a good solution if you are looking for the following benefits:

  • Data is written to S3 by other applications, maybe even customers or partners
  • Process files written to S3 as soon as possible
  • Collect files from many buckets, possibly across AWS accounts

Hopefully, that all sounds good. To double-check, make sure you do NOT need a front-end application receiving and filtering data before it is written to S3. If so, you might prefer to use NiFi as an ingest pipeline before S3. See S3 Ingest with NiFi for more on that.

AWS Setup

A good deal of this design involves configuring AWS to set up an SQS queue, tell S3 to send notifications to the queue, and give NiFi permissions to read from the queue.

SQS Queue

First, create an SQS queue to receive and hold S3 event notifications. I'd like to start with the helpful note that the SQS Queue must be in the same region as the S3 bucket. I always forget this and end up having to configure the queue twice. If you want the queue in a separate region, you would need to configure an SNS topic in the same region as the bucket and subscribe the SQS queue to the SNS topic.

The queue needs to have two permission grants:

  1. Amazon S3 needs to have the sqs:SendMessage permission.
  2. NiFi needs to have both sqs:ReceiveMessage and sqs:DeleteMessage permissions.

From the AWS Management Console for SQS, you should add a policy to the SQS queue as described in Granting Permissions to Publish Event Notification Messages to a Destination.

For NiFi, I recommend attaching a policy to the EC2 instance role like the following:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "NiFiReadSQSMessages",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage"
            ],
            "Resource": [
                "arn:aws:sqs:us-west-2:123456789012:my-s3-event-notifications-queue"
            ]
        }
    ]
}

S3 Configuration

You configure event notifications in the AWS Management Console for S3, in the Properties window for the bucket. I recommend selecting the ObjectCreated(All) event filter to start with. Receiving delete events can be very useful, but I usually prefer to send these to a separate queue for separate processing. Your configuration might look something like this:

Configuring S3 Event Notifications

NiFi Flow

Since half of the work has been done in AWS configuration, our NiFi flow is conceptually simple. First, we need to parse the notification messages to determine the bucket and key of the file written to S3. Second, we fetch that file from S3. It's almost that easy, NiFi's built-in processors will do all of the work, with just a few tricky configurations.

NiFi Flow - Receiving S3 Event Notifications

NiFi Flow - Receiving S3 Event Notifications

EvaluateJsonPath Processor Properties

EvaluateJsonPath Processor Properties

EvaluateJsonPath Processor Properties

EvaluateJsonPath Processor Properties

Receiving Event Notifications

Receiving the events is very straightforward with NiFi's GetSQS processor. GetSQS will both receive messages from our SQS queue, and delete the messages from the queue. The notifications sent by S3 contain JSON documents like the following:

{
  "Records" : [ {
    "eventVersion" : "2.0",
    "eventSource" : "aws:s3",
    "awsRegion" : "us-west-2",
    "eventTime" : "2017-02-14T01:19:37.534Z",
    "eventName" : "ObjectCreated:Put",
    "userIdentity" : {
      "principalId" : "AWS:AROAJFBRSNQHCZV6SXIHG:i-0ae5f34ea50cf5b3f"
    },
    "requestParameters" : {
      "sourceIPAddress" : "10.10.0.13"
    },
    "responseElements" : {
      "x-amz-request-id" : "928651D0811ACEDE",
      "x-amz-id-2" : "H1z2CJYV3ehoHJ6H90XjeWdWbqo0Y0zuPItuYuVh8VA4WI+PTHYoGI5my4S1/HREMzyABjG+mDM="
    },
    "s3" : {
      "s3SchemaVersion" : "1.0",
      "configurationId" : "PutNotificationsForNiFi",
      "bucket" : {
        "name" : "my-bucket",
        "ownerIdentity" : {
          "principalId" : "AM51FGC2R9WV2"
        },
        "arn" : "arn:aws:s3:::my-bucket"
      },
      "object" : {
        "key" : "path/to/some/file.txt",
        "size" : 7301,
        "eTag" : "e15124c70f6c5049f1b9ea7b4a8696b4",
        "sequencer" : "0058A25B296B2924E7"
      }
    }
  } ]
}

Parsing Event Notifications

The first thing you may note is that each notification is a Records array of one or more individual events. We'll use NiFi's SplitJson processor to handle splitting the array into individual flowfile records, using a JsonPath Expression of $.Records. Each record is itself a JSON object with nested properties.

From the events, we need to at least get the S3 bucket and key to fetch content, but we might be interested in more. Again, NiFi has an EvaluateJsonPath processor which will easily read these points out of the record and into flowfile attributes. For example, the JSON path expression $.s3.bucket.name will read the bucket name, and we will assign that to an attribute, s3.bucket.

Reading Content from S3

Using the bucket and key to read content from S3 is quite easy using NiFi's FetchS3Object processor. The important configuration is to use expression language to apply the bucket and key values as ${s3.bucket} and ${s3.key}.

Processing the Content

Now your S3 object content is now in NiFi, what do you do with it? Anything.

NiFi Flow Template

I saved this flow as a template which you can use as a starting point.

For Further Reference