James Wing By James Wing on 2016-09-12

In an earlier post, I wrote about using Apache NiFi to ingest data into a relational database. Today, we'll reverse the polarity of the stream, and show how to use NiFi to extract records from a relational database for ingest into something else -- a different database, Hadoop on EMR, text files, anything you can do with NiFi. Apache NiFi has a couple built-in processors for extracting database data into NiFi FlowFiles, and we'll look at the pros and cons of each in building your database flow. Last, I built a sample flow that reads database records incrementally.

Concept

As before, the concept is pretty simple - we extract records from a database as an input to a NiFi flow. For our immediate purpose, it is not very important what we will do with the records after we have them in NiFi, we will focus on getting the right records out of the database and into NiFi.

NiFi Database Extract Concept

So what does it mean to get the data out of the database? Are we getting all of the records for a query as a point-in-time snapshot, only the incrementally inserted or updated record since we last checked, or something else? NiFi can handle all these cases, but does so with different processors.

Processors for Querying Databases

Apache NiFi has two processors for extracting rows from relational databases, you want to pick the right one for your needs.

  • ExecuteSQL - executes an arbitrary SQL statement and returns the results as one FlowFile, in Avro format, containing all of the result records. Very simple and flexible, works with a broad set of statements including stored procedure calls. Designed for general-purpose use, does not have specific features for incremental extraction. ExecuteSQL can accept incoming FlowFiles, and FlowFile attributes may be used in expression language statements to make the SQL.
  • QueryDatabaseTable - designed specifically for incremental extraction. Computes SQL queries based on a given table name and incrementing column. Maintains NiFi state data tracking the last incremental value retrieved. Results are formatted as Avro files.
  • GenerateTableFetch - New in NiFi 1.0.0. May be used to generate a sequence of paged query statements for use with ExecuteSQL, making it practical to query very large data sets in manageable chunks.

If you are doing an incremental extract, trying to get only the latest records, then QueryDatabaseTable is probably your processor. If you need to customize the SQL statement to individual input FlowFiles, ExecuteSQL is the only way to go. If you wish to run a routine query on a schedule, ExecuteSQL is probably a better fit.

Dealing with Avro Files

No matter which of the processors you chose above, the output of your query will be an Avro document containing one or more records. Avro is a binary serialization format for compact storage of records and their schema. In many cases, this is ideal for ingesting data into systems like HDFS for processing in Hadoop/Hive where Avro files are first-class citizens. But that might not always be the case, and as a binary format Avro can be more difficult to inspect and work with.

Fortunately, NiFi contains processors for working with Avro files:

  • SplitAvro - splits an Avro file with multiple records into individual FlowFiles, or FlowFiles of an arbitrary size. Since an Avro file may contain more than one record, using SplitAvro can give you consistent size of the FlowFiles in your flow.
  • ConvertAvroToJSON - appropriately named processor that converts Avro files to -- wait for it -- JSON. This is extremely useful both for the flexibility of processing JSON and the ease of visually inspecting JSON data. Combined with the SplitAvro processor, you can easily convert a stream of database records into a stream of single-record JSON FlowFiles.

Example - Extracting Database Rows to JSON Text

As an example, let's extract some records from a database into text files containing one JSON record per line. The sample database I'm using has a table containing simplified Tweet records, so we can test that incremental queries are pulling out only the latest tweets. We'll convert the tweets into JSON records as separate lines of text, merged into files saved on disk. Single-object JSON records are useful because they can be processed individually as JSON, then merged as text, to be read by systems like Hive.

My sample flow includes the following processors:

  • QueryDatabaseTable - queries records from the tweets table using an auto-incremented column, id, to keep track of the records we've processed so far.
  • SplitAvro - reduces the Avro files to a single record each, if they are not already.
  • ConvertAvroToJSON - converts Avro records to individual JSON objects, not in arrays.
  • MergeContent - to group our JSON lines back into files. For now, I'm just grouping 100 files, you might want to do a lot more.
  • PutFile - saves our JSON files to local disk for later processing.

Incremental Database Extract Flow

The most interesting part of this is really the state data captured for QueryDatabaseTable, which is how it keeps track of the records that have been read and which have not. If you needed to start over, you can use this dialog to clear the state while the processor is stopped.

QueryDatabaseTable state data for the id column

Sample References

You can see the resulting flow template as a (gist). Below, I've captured some of the key processor properties:

QueryDatabaseTable processor properties

QueryDatabaseTable processor

SplitAvro processor properties

SplitAvro processor

ConvertAvroToJSON processor properties

ConvertAvroToJSON processor

DBCPConnectionPool service TweetBase Pool

DBCPConnectionPool service TweetBase Pool

Troubleshooting

There are a couple of things you can do to fix issues in your database flow.

Logging

Turning on debug logging will give you more detailed information about what the processors are doing, which may be helpful. Logging is configured in conf/logback.xml, where you can add entries setting the logging level for specific processors. It is also possible to configure the bulletin level on individual processors through the UI, but at DEBUG level I find they fly by awfully fast.

<!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->

<logger name="org.apache.nifi" level="INFO"/>

...

<!-- Add this for QueryDatabaseTable processor. -->
<logger name="org.apache.nifi.processors.standard.QueryDatabaseTable" level="DEBUG"/>

<!-- Add this for ExecuteSQL processor. -->
<logger name="org.apache.nifi.processors.standard.ExecuteSQL" level="DEBUG"/>

Both QueryDatabaseTable and ExecuteSQL log their queries to the DEBUG logger, so you can see what is being executed:

2016-09-12 16:07:14,909 DEBUG [Timer-Driven Process Thread-2] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=2d63a256-6502-4735-a362-8ec6b9b81fc8]
Executing query SELECT id, uuid, created_at, screen_name, location, text FROM tweets WHERE id > 24780

Large Result Sets

Due to the wide variance in SQL support for limiting result sets, NiFi does not provide a means to automagically keep result sets down to hundreds or thousands of rows. Reading a large table with QueryDatabaseTable may not be practical.

But remember that ExecuteSQL allows you to customize the SQL statement with an incoming FlowFile. If you have NiFi 1.0.0, you can use the GenerateTableFetch processor to do build a stream of incremental queries. In earlier versions of NiFi, you can devise such queries using expression language and perhaps an ExecuteScript processor.