Introduction to Processing Click Stream Data Feeds with Hadoop and Map/Reduce

In an earlier post, Matt Moss showed how to process data feed data using an SQL database. This can be useful in a pinch when you have a smaller amount of data and need an answer quickly.

What happens though when you now need to process the data at a large scale?

For example, you may need to:

  • Identify visitors that have purchased a product during the last year
  • Summarize visitor activity, year-over-year in a single record per visitor
  • Calculate alternative campaign attribution models
  • Cut certain columns and reformat them for some other process
  • Merge data from some other data source with Adobe Data Feeds

There are a couple of tools out there that can help you do these things. Pig, Hive, and Spark all run on top of Hadoop. Hadoop itself operates on the principal of Map/Reduce — that is, you have a part of your program that groups data together (the mapper), and a part of the program that aggregates it (the reducer). For more information on how Map/Reduce works, Wikipedia has a fairly thorough article on it.

Pig, Hive and Spark are all great tools, but have different use cases. I tend to use Pig and Hive for running ad-hoc queries against large datasets. Its very possible to process data feeds with these tools, but its a little tricky. Parsing things like the event_list or product_list require use of User Defined Functions. The type of aggregation that you want to do, may be better expressed in Java instead of Pig or Hive. Finally, you may not want to learn another language like Pig Latin or Scala. Whatever your reason, Hadoop Map/Reduce may be a good option for you.

When I’ve consulted for Data Feed users in the past, I’ve found that they start out projects like those above without a good understanding of Data Feed data. They’ll try to aggregate visitor data using the wrong visitor id, or order it by timestamp. They’ll calculate time parting metrics using ‘hit_time_gmt’ instead of a timezone aware ‘datetime’. They’ll build reducers in ways that blow out memory fast, or mappers that don’t map the data correctly. In this post, I’ll attempt to explain some best practices for processing data feed data in a Map/Reduce program and will use Java in the examples.

If you need some help in getting a sandbox setup, I’ve found this blog post to be an excellent resource for quickly getting up and running.

While the most important code snippets are included here, you can download the full project on GitHub.

Grouping Data

In Map/Reduce, you need a common key that can be used to group records together. The mapper’s job is to identify this key from the rest of the record and provide that information to the shuffle/sort step.

In most cases, you’ll be grouping data by Visitor ID when processing Data Feed data. If you check the documentation, you’ll notice that there are several ID columns. Which one should you use?

The documentation recommends the following:

“When calculating metrics, you should use the value from the post_visid_high and post_visid_low columns since these columns will always contain the final visitor ID.”

This seems pretty clear. To add to this statement, I’ve personally spoken with Adobe’s platform engineering team about this very question and learned that (at least as recent as 2016), the Analytics UI uses these columns to identify visitors and calculate metrics. So, if your totals aren’t lining up, make sure you’re using post_visid_high/post_visid_low.

In a mapper, I usually just concatenate these two columns and provide them as the key:

post_visid_high:post_visid_low

For example, given post_visid_high value of ABC123 and post_visid_low value of XYZ789, I’d end up with the following value:

ABC123:XYZ789

This has worked well for me so far.

Its worth noting something about the mcvisid column. Other Marketing Cloud products associate data with the mcvisid. For example, if you’re trying to merge demographic data from Adobe Audience Manager (AAM) with visitor behavior from Analytics Data Feeds, you’ll need to use the mcvisid. Unfortunately, you may run into edge cases where data feed data is missing an mcvisid, (or the other way around). This is very rare and usually happens when a customer hasn’t visited your site after the Visitor ID service has been rolled out.

Sampling Data

For debugging purposes, it can be helpful to read in an entire dataset and only process a sample. The way the visitor ID is generated makes this a little easier. The visid_high and visid_low columns are just unsigned 64-bit integers. You can grab the last two digits and use a modulus operation to filter out some of the hits. Thankfully, there is a enough randomness in the IDs themselves that you’ll typically get a uniform distribution.

If you need sub 1% sampling — i.e., 0.1%, you’ll need to hash the visid high/low as a really big number and process the modulus of the entire hash. This works extremely well, but can be a little more computationally expensive.

public static boolean sampleHit(
     String postVisidHigh,
     String postVisidLow,
     double sampleRate) throws NoSuchAlgorithmException {
 // Convert a rate from something like
 // 4.5% to 450 for use later
 int rate = (int)(sampleRate * 100);
 
 // Convert the visitor ID to a hash
 MessageDigest digest = MessageDigest.getInstance("SHA-1");
 byte[] hash = digest.digest(
     String.format("%s:%s", postVisidHigh, postVisidLow)
     .getBytes()
 );

 // If the integer value (returned from Java's .hashCode),
 // modded by 10,000, is less than the
 // rate we calculated above, then include the hit.
 if((hash.hashCode() % 10000) < rate) {
  return true;
 }
 return false;
}

Sorting Data

If you’re grouping data by visitor ID, the reducer will basically receive a visitor ID for the key, and a list of hits/impressions for the value. Adobe Data Feed data isn’t in chronological order though! You may think, “Well, I just want a sum of revenue by visitor, so I don’t care that hits are out of order.”

As you work with this data, I guarantee that there will be use cases where you need it sorted. For example:

  • Calculating average time on site by visitor — basically anything to do with time!
  • First, last touch/J-shape/U-shape campaign attribution requires hits to be in order
  • Identifying the site section most recently visited or originally visited

There are two ways to do this:

  1. Sort the hits within the reducer code. This is easiest to implement, but requires the most memory.
  2. Write some custom code to have Hadoop sort the hits for you. This can be a little more difficult but scales the best. This will be discussed in my next post.

In both cases, we’re going to use the visit_num and visit_page_num columns to sort the data. Some of you that are familiar with data feed columns may ask, “Why not use the datetime or hit_time_gmt columns?” It so happens that sometimes hits can reach collection within the same second and thus have the same timestamp. The visit_num and visit_page_num columns account for this and properly order the hits.

This blog post covers the first method. The post next week will cover the second method.

Sort the Hits in the Reducer

This option is easiest to implement, but doesn’t scale well. I once processed visitor data for a customer that had a bug in their implementation that caused 150,000 hits to be associated with the same visitor ID. When my reducer tried to sort the hits, it ran out of memory. This was a beast to debug, and an even bigger beast to fix. It is also an edge case.

In Java, the Mapper method’s ‘value’ property is an ‘Iterable’ type, which doesn’t have native sorting capabilities. You’ll have to read the data from this variable into something like an ArrayList. Make sure to check the Pitfalls section of this post for a description of the unusual way that Hadoop manages memory here.

From there, you can implement a custom compare function and use Arrays.sort() to sort your records.

Let’s build a factory that returns a comparator. The method below brings in the columnHeaders data because it needs that information in order to determine which columns are the visit_num and visit_page_num columns.

public static Comparator<Text> getComparator(
     List<String> columnHeaders) {
 return new Comparator<Text>() {
  @Override
  public int compare(Text o1, Text o2) {
   // Parse the visit_num and visit_page_num columns
   // out of both hits
   String[] hit1Columns = o1.toString().split("\\t", -1);
   int hit1VisitNum = Integer.valueOf(
    DataFeedTools.getValue(
     "visit_num",
     hit1Columns,
     columnHeaders
    )
   );
   int hit1VisitPageNum = Integer.valueOf(
    DataFeedTools.getValue(
     "visit_page_num",
     hit1Columns,
     columnHeaders
    )
   );
   String[] hit2Columns = o2.toString().split("\\t", -1);
   int hit2VisitNum = Integer.valueOf(
    DataFeedTools.getValue(
     "visit_num",
     hit2Columns, columnHeaders
    )
   );
   int hit2VisitPageNum = Integer.valueOf(
    DataFeedTools.getValue(
     "visit_page_num",
     hit2Columns,
     columnHeaders
    )
   );

   // Place them inside of a number that can be easily compared
   // In this case, we're using a double.
   // Data will be formatted like this: 1.1, 1.2, 1.3
   // where the number left of the decimal is the visit_num
   // and the number right of the decimal is visit_page_num.
   double hit1Sequence = Double.valueOf(
    String.format("%d.%d", hit1VisitNum, hit1VisitPageNum)
   );
   double hit2Sequence = Double.valueOf(
    String.format("%d.%d", hit2VisitNum, hit2VisitPageNum)
   );
   // Now compare. Return -1 if o1 is before o2. 
   // Return 0 if they're equal (should never happen),
   // return 1 if o1 is after o2.
   if(hit1Sequence > hit2Sequence) {
    return 1;
   } else if(hit1Sequence < hit2Sequence) {
    return -1;
   } else return 0;
  }
 };
}

In the reducer we should read our data from the “Iterable<Text> values” variable into an “ArrayList<Text> visitorTraffic” variable. From there, we can call hits.sort(getComparator(columnHeaders)); and have sorted data.

Pitfalls

In Java, the Iterable variable that comes from the reducer is not your typical variable. It is engineered to reduce the amount of memory used as much as possible. For this reason, as you call next() on the variable, instead of moving a pointer through the list, Hadoop simply changes the value in memory and leaves the pointer the same.

This can produce some unexpected results.

For example, something like this:

List<Text> inMemoryList = new ArrayList<>();
foreach(Text value : values) {
   inMemoryList.add(value);
}

Will produce a new array called inMemoryList in which all values match the last value read in from values. This is because each element in inMemoryList points to the same chunk of memory that contains a single value.

To make this weirder, consider the following example:

Set<Text> inMemoryList = new HashSet<>();
foreach(Text value : values) {
   inMemoryList.add(value);
}

This produces the same result. All of the values in the HashSet inMemoryList will match the last value stored in values. The reason this is really weird is a HashSet is supposed to only store unique values. You should never be able to have two duplicate values in a HashSet (at least as far as .equals() is concerned). But, that’s exactly what will happen.

Instead, do the following:

Set<Text> inMemoryList = new HashSet<>();
foreach(Text value : values) {
   inMemoryList.add(new Text(value));
}

This code performs a ‘deep copy’ of the data in the values array and ensures that every pointer in the HashSet points to a separate place in memory.

This type of behavior is very hard to catch with unit tests. I recommend implementing your own Iterable that mimics this behavior for that purpose.

Summary

You should have the tools that you need to start calculating some metrics and summarizing some simple visitor behavior. Stay tuned next week for more information on enabling Hadoop’s Secondary Sort feature for a more scalable solution.

If you have questions or would like to suggest corrections to any of the content here, please leave me a comment below. I’d also especially like to hear from you if there is a topic that you’re dying to have Trevor and I cover.

One last thing — an outstanding resource for learning more about custom grouper/comparitor functions is the book Hadoop : The Definitive Guide. In general, I use this book frequently as a reference and also own the earlier editions.

Jared Stevens

Jared Stevens is a software engineer at Adobe and works on the Analytics reporting APIs. He has also worked as a Software Engineering consultant at Adobe for 7 years and has assisted many of Adobe's top tier customers with custom integrations and data processing requests. When he's not knee deep in data, he enjoys backpacking, video games, and learning about new things.