Building Pipelines to Understand User Behavior

Data Engineering Best Practices  |  June 23rd, 2016

In early March, I spoke at the Hadoop with the Best online conference. I had fun sharing one of my total passions: data pipelines! In particular, I talked about some techniques for catching raw user events, acting on those events, and understanding user activity from the sessionization of such events. Here, I’ll give just a taste of what was covered. For more detail, please check out the video above (courtesy of With the Best).1

First, some background and motivation on my end: Silicon Valley Data Science (SVDS) is a boutique data science consulting firm. We help folks with their hardest data strategy, data science, and/or data engineering problems. In this role, we’re in a unique position to solve different kinds of problems across various industries, and start to recognize the patterns of solution that emerge. I’m very interested in cultivating and sharing the data engineering best practices that we’ve learned.

Key takeaways from this post include knowing what’s needed to understand user activity, and seeing some pipeline architectures that support this analysis. To achieve these goals, let’s walk through three data pipeline patterns that form the backbone of a business’ ability to understand user activity and behavior:

  • Ingesting events
  • Taking action on those events
  • Recognizing activity based on those events

Ingesting Events

The primary goal of an ingestion pipeline is simply to ingest events;all other considerations are secondary for now. We will walk through an example pipeline and discuss how that architecture changes as we scale up to handle billions of events per day. We’ll note along the way how general concepts of immutability and lazy evaluation can have large ramifications on data ingestion pipeline architecture.

Let’s start by covering typical classes of and types of events, some common event fields, and various ways that events are represented. These vary greatly across current and legacy systems, and you should always expect that munging will be involved as you’re working to ingest events from various data sources over time.

For our sessionization examples, we’re interested in user events such as
`login`, `checkout`, `add friend`, etc.

These user events can be “flat”:

```
    {
      "time_utc": "1457741907.959400112",
      "user_id": "688b60d1-c361-445b-b2f6-27f2eecfc217",
      "event_type": "button_pressed",
      "button_type": "one-click purchase",
      "item_sku": "1 23456 78999 9",
      "item_description": "Tony's Run-flat Tire",
      "item_unit_price": ...
      ...
    }
```

Or have some structure:

```
    {
      "time_utc": "1457741907.959400112",
      "user_id": "688b60d1-c361-445b-b2f6-27f2eecfc217",
      "event_type": "button_pressed",
      "event_details": {
        "button_type": "one-click purchase",
        "puchased_items": [
          {
            "sku": "1 23456 78999 9",
            "description": "Tony's Run-flat Tire",
            "unit_price": ...
            ...
          },
        ],
      },
      ...
    }
```

Both formats are often found within the same systems out in the wild, so you have to intelligently detect or classify events rather than just making blanket assumptions about them.

Stages of an ingestion pipeline

Before we dive into what ingestion pipelines usually look like, there are some things to keep in mind. You want to be sure to build a pipeline that is immutable, lazily evaluated, and made up of simple/composable (testable) components. We’ll see below how these abstract (CS-101) concepts really matter when it comes to pipeline implementation, maintenance, and scale.

We’ll start by focusing on just ingesting or landing events. Start with a simple pipeline such as:

Event ingestion without streaming with filename

Click to enlarge

At first glance, it seems like that’s all you would need for most problems. Especially since query-side tools are so fast and effective these days. Ingestion should be straightforward. The ingest pipeline simply needs to get the events as raw as possible as far back as possible in a format that’s amenable to fast queries.

Let’s state that again—it’s important.

The pipeline’s core job is to get events that are as raw as possible (immutable processing pipeline) as far back into the system as possible (lazily evaluated analysis) before any expensive computation is done. Modern query-side tools support these paradigms quite well. Better performance is obtained when events land in query-optimized formats and are grouped into query-optimized files and partitions where possible:

Event ingestion without streaming with filename

Click to enlarge

That’s simple enough and seems pretty straightforward in theory. In practice, you can ingest events straight into HDFS only up to a certain scale and degree of event complexity.

As scale increases, an ingestion pipeline has to effectively become a dynamic impedance matching network. It’s the funnel that’s catching events from what can be a highly distributed, large number of data sources and trying to slam all these events into a relatively small number of filesystem datanodes.

How do we catch events from a large number of data sources and efficiently land them into HDFS?

Events without streaming

Click to enlarge

Use Spark!

No, but seriously, add a streaming solution in-between (I do like Spark Streaming here):

Streaming bare

Click to enlarge

And use Kafka to decouple all the bits:

 

Streaming events at scale

Click to enlarge

Kafka decouples the data sources on the left from the data nodes on the right. And they can scale independently. Also, they scale independently of any stream computation infrastructure you might need for in-stream decisions in the future.

Impedance or size mismatches between data sources and data storage are really only one half of the story. Note that another culprit, event complexity, can limit ingest throughput for a number of reasons. A common example of where this happens is when event “types” are either poorly defined or are changing so much they’re hard to identify. As event complexity increases, so does the logic you use to group or partition the events so they’re fast to query. In practice, this quickly grows from simple logic to full-blown event classification algorithms. Often, those classification algorithms have to learn from the body of events that’ve already landed. You’re making decisions on events in front of you based on all the events you’ve ever seen. More on that in the “Recognize Activity” section later.

Ingest pipelines can get complicated as you try to scale in size and complexity—expect it and plan for it. The best way is to do this is to build or use a toolchain that can let you add a streaming and queueing solution without a lot of rearchitecture or downtime. Folks often don’t try to solve this problem until it’s already painful in production, but there are solutions available. My current favorite uses a hybrid combination of Terraform, Ansible, Consul, and ClouderaManager/Ambari.

Note also that we haven’t talked about any real-time processing or low-latency business requirements here at all. The need for a stream processing solution arises when we’re simply trying to catch events at scale.

Taking Action

Catching events within the system is an interesting challenge all by itself. However, just efficiently and faithfully capturing events isn’t the end of the story. Consider the middle-of-the-road version of the ingestion pipeline we discussed above:

Streaming bare

Click to enlarge

That’s sorta boring if we’re not taking action on events as we catch them. Actions such as Notifications, Decorations, Routing/Gating, etc. can be taken in either “batch” or “real-time” modes (see figure below).

Streaming simple

Click to enlarge

Unfortunately, folks have all sorts of meanings of the terms “batch” and “real-time.” Let’s clear that up and be a little more precise.

For every action you intend to take, and really every data product of your pipeline, you need to determine the latency requirements. What is the timeliness of that resulting action? Meaning, how soon after either a.) an event was generated, or b.) an event was seen within the system will that resulting action be valid? The answers might surprise you.

Latency requirements let you make a first-pass attempt at specifying the execution context of each action. There are two separate execution contexts we talk about here:

  • Batch: Asynchronous jobs that are potentially run against the entire body of
    events and event histories. These can be highly complex, computationally
    expensive tasks that might involve a large amount of data from various
    sources. The implementations of these jobs can involve Spark or Hadoop
    map-reduce code, Cascading-style frameworks, or even SQL-based analysis via
    Impala, Hive, or SparkSQL.
  • Stream: Jobs that are run against either an individual event or a small
    window of events. These are typically simple, low-computation jobs that
    don’t require context or information from other events. These are typically
    implemented using Spark-streaming or Storm.

When I say “real-time” here, I mean that the action will be taken from within the stream execution context.

It’s important to realize that not all actions require “real-time” latency. There are plenty of actions that are perfectly valid even if they’re operating on “stale” day-old, hour-old, 15min-old data. Of course, this sensitivity to latency varies greatly by action, domain, and industry. Also, how stale stream versus batch events are depends upon the actual performance characteristics of your ingestion pipeline under load. Measure all the things!

An approach I particularly like is to initially act from a batch context. There’s generally less development effort, more computational resources, more robustness, more flexibility, and more forgiveness involved when you’re working in a batch execution context. You’re less likely to interrupt or congest your ingestion pipeline.

Once you have basic actions working from the batch layer, then do some profiling and identify which of the actions you’re working with really require less stale data. Selectively bring those actions or analyses forward. Tools such as Spark can help tremendously with this. It’s not all fully baked yet, but there are ways to write Spark code where the same business logic can be optionally bound in either stream or batch execution contexts. You can move code around based on pipeline requirements and performance.

In practice, a good deal of architecting such a pipeline is all about preserving or protecting your stream ingestion and decision-making capabilities for when you really need them.

A real system often involves additionally protecting and decoupling your stream processing from making any potentially blocking service API calls (sending emails for example) by adding kafka queues for things like outbound notifications downstream of ingestion:

Streaming with notify queues

Click to enlarge

As well as isolating your streaming system from writes to HDFS using the same trick we used for event ingestion in the previous section:

Streaming two layers

Click to enlarge

Recognizing Activity

What’s user activity? Usually it’s a sequence of one or more events associated with a user. From an infrastructure standpoint, the key distinction is that activity usually needs to be constructed from a sequence of user events that don’t all fit within a single window of stream processing. This can either be because there are too many of them or because they’re spread out over too long a period of time.

Another way to think of this is that event context matters. In order to recognize activity as such, you often need to capture or create user context (let’s call it “state”) in such a way that it’s easily read by (and possibly updated from) processing in-stream.

We add HBase to our standard stack, and use it to store state:

Classifying with state

Click to enlarge

Which is then accessible from either stream or batch processing. HBase is attractive as a fast key-value store. Several other key-value stores could work here. I’ll often start using one simply because it’s easier to deploy/manage at first, and then refine the choice of tool once more precise performance requirements of the state store have emerged from use.

It’s important to note that you want fast key-based reads and writes. Full-table scans of columns are pretty much verboten in this setup. They’re simply too slow for value from stream.

The usual approach is to update state in batch. My favorite example when first talking to folks about this approach is to consider a user’s credit score. Events coming into the system are routed in stream based on the associated user’s credit score.

The stream system can simply (hopefully quickly) look that up in HBase keyed on a user ID of some sort:

HBase state credit score

Click to enlarge

The credit score is some number calculated by scanning across all a user’s events over the years. It’s a big, long-running, expensive computation. Do that continuously in batch and just update HBase as you go. If you do that, then you make that information available for decisions in stream.

Note that this is effectively a way to base fast-path decisions on information learned from slow-path computation. A way for the system to quite literally learn from the past.

Another example of this is tracking a package. The events involved are the various independent scans the package undergoes throughout its journey.

For “state,” you might just want to keep an abbreviated version of the raw history of each package:

HBase state tracking package

Click to enlarge

Or just some derived notion of its state:

HBase state tracking package derived

Click to enlarge

Those derived notions of state are tough to define from a single scan in a warehouse somewhere, but make perfect sense when viewed in the context of the entire package history.

Wrap Up

We’ve had a glimpse of a selection of pipeline patterns to handle the following, at various scales:

  • Ingesting events
  • Taking action on those events
  • Recognizing activity based on those events

There are more details covered in the video.

These patterns are high-level views of some of our best practices for data engineering. These are far from comprehensive at this point, so I’m interested in learning more about what other folks are doing out there. Do you have other pipeline patterns you’d add to this list? Other ways to solve some of the problems we’ve investigated here? Please post comments or links!

You can find the slides here. Note that this isn’t an ordinary slide deck—be sure to use the “down” arrow to drill deeper into the content.

1. Note Mark’s audio drops out at the very end, due to connectivity issues. The content in the lost bit is covered in the “Wrap Up” of this post.