This article builds on a previous post: how to capture clickstream events in Kafka with Snowplow, where we capture clickstream events in Kafka (specifically Confluent Cloud).

In this post we’ll build a near real-time, Neo4j-based recommendation engine from the clickstream of page views.

Snowplow tag and recommender data flow

When a reader of this blog opens a page, the Snowplow tag executes. If a cookie doesn’t already exist, the tag creates one that lasts for 365 days. The cookie contains a domain_userid which can be retrieved by calling the getSnowplowDuid() function. Here’s an example, executed in Chrome’s console:

getSnowplowDuid output in Chrome console

The Snowplow domain_userid is the identifier we’ll use to build the graph via the clickstream, and retrieve recommendations from the Neo4j-based recommender service.

dual flow clickstream recommender

A Javascript function, getRecommendations(), in recommendations.js runs in the browser, calls the recommender service, and displays the results on the page.

To reduce the risk of data loss, cost, and the level of effort to implement, we decided to use managed services (specifically Neo4j Aura and Confluent Cloud) for the stateful components. The stateless components were run in Docker containers.

build the graph in real-time with the Neo4j sink connector

The first step was to sink the clickstream events from Kafka into Neo4j. We created a Dockerized Kafka Connect instance with the Neo4j connector installed. The steps to create the custom Docker image with the Neo4j connector was documented in a previous blog post.

We created unique constraints to ensure that there can only ever be a single Page node for each page_url and, similarly, there can only be a single User node for each domain_userid:

CREATE CONSTRAINT page_page_url_unique IF NOT EXISTS ON (p:Page) ASSERT p.page_url IS UNIQUE;
CREATE CONSTRAINT user_domain_userid_unique IF NOT EXISTS ON (u:User) ASSERT u.domain_userid IS UNIQUE;

Adding these constraints also created indexes on these fields, which speeds up writes:

neo4j@neo4j> call db.indexes;
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | name                        | state    | populationPercent | uniqueness | type    | entityType | labelsOrTypes | properties        | provider           |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1  | "page_page_url_unique"      | "ONLINE" | 100.0             | "UNIQUE"   | "BTREE" | "NODE"     | ["Page"]      | ["page_url"]      | "native-btree-1.0" |
| 3  | "user_domain_userid_unique" | "ONLINE" | 100.0             | "UNIQUE"   | "BTREE" | "NODE"     | ["User"]      | ["domain_userid"] | "native-btree-1.0" |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+

Without indexes, the writes would get exponentially slower with the size of the graph.

Here’s the PUT call, submitted via HTTPie to the Dockerized Kafka Connect instance, to deploy the Neo4j sink connector:

http PUT snowplow.woolford.io:8083/connectors/snowplow-neo4j/config <<< '
{
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "snowplow-neo4j",
    "neo4j.server.uri": "neo4j+s://45940f18.databases.neo4j.io:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "********",
    "neo4j.topic.cypher.snowplow-enriched-good-json": "MERGE(u:User {domain_userid: event.domain_userid}) MERGE(p:Page {page_url: event.page_url}) SET p.page_title: event.page_title MERGE(u)-[:VIEWED {timestamp: apoc.date.fromISO8601(event.derived_tstamp)}]->(p)",
    "topics": "snowplow-enriched-good-json",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
}'

Let’s take a closer look at the Cypher statement that’s executed for each event:

MERGE(u:User {domain_userid: event.domain_userid})
MERGE(p:Page {page_url: event.page_url})
SET p.page_title = event.page_title 
MERGE(u)-[:VIEWED {timestamp: apoc.date.fromISO8601(event.derived_tstamp)}]->(p)

The MERGE is effectively an upsert. If an object (i.e. node or relationship) with the specific properties inside the MERGE doesn’t already exist, that object is created.

For the User nodes, we’re only storing a single property: the domain_userid. For the Page nodes, the page_url is the key, and the page_title is an additional property that could change. The recommender needs both the URL and title in order to create links on the page. We MERGE on the unique constraint key (e.g page_url) and SET any other properties associated with that key (e.g. the page_title).

In the first line, we create a User with the domain_userid property, and assign it to the alias u. Similarly, we do the same thing with the Page, based on the page_url property, which we assign to the alias p. We then create a :VIEWED relationship between the user (u) and the page (p), and add the timestamp property to that relationship.

Since the clickstream and recommendation engine are both brand new, the timestamp doesn’t provide a lot of informational value at the time of writing. We decided to capture it in the graph because we will eventually use the timestamp to boost the relevance of more recent activity and discount page views that happened a long time ago.

flag the pages containing articles

Not all the pages this blog are articles: there’s the homepage and tag index pages. We don’t want to include these in the recommendation results. Neo4j’s data model is extremely flexible, and it’s easy to write small utilities to add data elements to the graph. Here’s a simple Python example that adds an is_post=true property to each of the article pages:

#!/usr/bin/env python
import os
from neo4j import GraphDatabase


def set_neo4j_post_flag():
    NEO4J_URI = os.getenv('NEO4J_URI')
    NEO4J_USER = os.getenv('NEO4J_USER')
    NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')

    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD), database="neo4j")

    posts_files = os.listdir("_posts/")
    for posts_file in posts_files:
        post_file_no_extension=os.path.splitext(posts_file)[0]
        page_url = "https://woolford.io/{post_file_no_extension}/".format(post_file_no_extension=post_file_no_extension)
        with driver.session() as session:
            session.run("MATCH(p:Page {page_url: $page_url }) SET p.is_post = true", page_url=page_url)


if __name__ == "__main__":
    set_neo4j_post_flag()

This property will be used to ensure that only articles are returned as recommendations.

In addition to Python, there are supported client libraries for .Net, Java, Javascript and Go.

“If you liked this, you might also like that…”

The snowplow-neo4j-recommender service is a Spring Boot REST service that executes Cypher statements against a Neo4j database. Here’s an example API call to get recommendations for domain_userid ea1a9c2f-062e-4c15-9b42-36e9358f1462:

http https://snowplow.woolford.io:7443/recommendations/ea1a9c2f-062e-4c15-9b42-36e9358f1462
[
    {
        "id": 4,
        "pageTitle": "Zeek, Kafka, and Neo4j",
        "pageUrl": "https://woolford.io/2019-12-11-zeek-neo4j/"
    },
    {
        "id": 1,
        "pageTitle": "streaming joins in ksqlDB",
        "pageUrl": "https://woolford.io/2020-07-11-streaming-joins/"
    },
    {
        "id": 5,
        "pageTitle": "a $10 pot of honey",
        "pageUrl": "https://woolford.io/2018-02-11-cowrie/"
    }
]

This service was embedded in a Docker container and pushed to DockerHub so it’s easy to deploy:

docker run -d -p 8081:8081/tcp --env-file snowplow-neo4j.env alexwoolford/snowplow-neo4j-recommender:1.0.0
snowplow-neo4j.env
    SPRING_NEO4J_URI=neo4j+s://45940f18.databases.neo4j.io:7687
    SPRING_NEO4J_AUTHENTICATION_USERNAME=neo4j
    SPRING_NEO4J_AUTHENTICATION_PASSWORD=********
    SPRING_DATA_NEO4J_DATABASE=neo4j
    

Now let’s walk through the recommender logic. Suppose we have a user a who viewed page x. And the other users who viewed page x also viewed pages y and z. All three of the other users viewed page y, and two out of three viewed page z. Our page recommendations are ranked by popularity amongst these common users. Our first recommendation for user a would be page y and our second recommendation would be page z:

recommender Arrows diagram

This logic was translated into a Cypher query that’s executed by the recommender REST service:

MATCH (user:User {domain_userid: 'ea1a9c2f-062e-4c15-9b42-36e9358f1462'})-[:VIEWED]->(page:Page)<-[:VIEWED]-(other_user:User)-[:VIEWED]->(other_page:Page) 
WHERE user <> other_user 
AND other_page.is_post = true 
WITH other_page, COUNT(other_user) AS frequency 
ORDER BY frequency DESC 
RETURN other_page 
LIMIT 3

The ORDER BY clause ranks the recommendations in descending order. The best recommendation is listed first. The LIMIT clause return just the top 3 results.

next

Now that we have a basic recommender service in place, we can begin the iterative process of making it better. We will explore chains of events (i.e. the :NEXT relationship), boosting, community detection, and AB-testing in a follow-up post.

Thanks for reading this post and becoming a node in our graph.