Getting Started with RelationalAI#

RelationalAI (RAI) augments your Snowflake data cloud with a powerful toolkit for turning data into decisions.

At RAI’s core is a model: an executable knowledge graph that captures the rules and logic that govern your organization’s decision-making processes. Models are written using the relationalai Python package and run securely in your Snowflake account on Snowpark Container Services.

IMPORTANT

Before you can use relationalai, ensure that your Snowflake account administrator has:

  1. Installed the RelationalAI Native App from the Snowflake Marketplace.
  2. Granted you access to a Snowflake role with the necessary privileges for importing data into a RelationalAI model.

See the Native App Installation guide for details.

Install the relationalai Python Package#

Follow the steps for your operating system to install Python and the relationalai package.

NOTE

The relationalai Python package requires Python 3.9, 3.10, or 3.11. If you have an existing compatible Python installation, skip to the last step below.

  1. Navigate to the Python 3.11 download page, scroll down to the Files section, and download the macOS 64-bit universal2 installer.

  2. Open the installer and follow the prompts to install Python 3.11. Use the default installation options. When the installation is complete, double click the Install Certificates command in the Finder window.

  3. Open a terminal and verify that Python 3.11 is installed by running:

    #python3.11 --version
    

    The output should be similar to Python 3.11.9.

  4. Create a new project directory and virtual environment and use pip to install the relationalai package:

    #mkdir rai-getting-started && cd rai-getting-started
    
    # Replace `python3.11` with `python3.9` or `python3.10`
    # if you're using a different version.
    python3.11 -m venv .venv  # Create a virtual environment.
    source .venv/bin/activate  # Activate the virtual environment.
    
    python -m pip install relationalai
    

    Activating your virtual environment adds .venv/bin to your PATH, so you can use its Python executable as python.

    TIP

    Activate your virtual environment each time you open a new terminal to use the correct Python version and access the relationalai package.

Load Data into Snowflake#

In this guide, you’ll explore a scenario in which an electrical utility company must prioritize repairs to its power grid after a storm. You can follow along in a Jupyter Notebook or using your favorite Python editor.

  1. Download the dataset files.
  2. Create or use an existing Snowflake database and schema. In this guide, we use a database named rai_getting_started and a schema named power_transmission_network.
  3. Load the CSV data into tables named node and powerlines. Refer to the Snowflake documentation for more details.

RelationalAI models are built on top of Snowflake schemas. In the next step, you’ll initialize a model for the power_transmission_network schema.

Create a RelationalAI Model#

The relationalai package includes a command-line interface (CLI) for tasks such as configuring a new model and managing imports. Access the CLI by running rai in a terminal with the project’s virtual environment activated.

Configure Your Model#

Run rai init to connect to your Snowflake account and configure a new model:

#rai init

Follow the interactive prompts to enter your Snowflake credentials and choose the:

IMPORTANT

rai init saves your model’s configuration to a file named raiconfig.toml in the current directory. This file contains sensitive information, so be sure to exclude it from version control!

Run a Test Query#

To verify that your model is configured correctly, open a Jupyter Notebook or Python editor and run:

#import relationalai as rai

# Create a model named "power_transmission_network".
model = rai.Model("power_transmission_network")

# Send a test query.
with model.query() as select:
    response = select(1 + 1)

# The results are stored as a pandas DataFrame in the `results` attribute.
print(response.results)
# Output:
#    v
# 0  2

Queries are executed with the model.query() context manager, and are written using the RAI query-builder syntax. You’ll see several examples of this syntax as you build the model for prioritizing power grid repairs. But first, let’s hook the model up to the data in Snowflake.

Import Data into Your Model#

Use the rai imports:stream CLI command to stream data from the nodes and powerlines tables into the power_transmission_network model:

#rai imports:stream --source rai_getting_started.power_transmission_network.nodes --model power_transmission_network
rai imports:stream --source rai_getting_started.power_transmission_network.powerlines --model power_transmission_network

The stream keeps your model up-to-date with the Snowflake tables by ingesting change data capture (CDC) events once every minute.

It may take a few moments for your data to become available to the model. Use rai imports:list to check the status of the imports:

#rai imports:list --model power_transmission_network

When both imports report the status LOADED, the data is ready for use.

Define Model Objects and Types#

Models consist of three components:

  1. Objects represent entities in the model.
  2. Types group objects into categories.
  3. Rules describe objects and the relationships between them.

Remove the test query you wrote earlier and replace it with the following code to create NetworkNode and PowerLine types containing objects from the nodes and powerlines tables:

#NetworkNode = model.Type("NetworkNode", source="rai_getting_started.power_transmission_network.nodes")
PowerLine = model.Type("PowerLines", source="rai_getting_started.power_transmission_network.powerlines")

Run the following query to verify that the objects have been loaded into the model:

#from relationalai.std import aggregates, alias

# Count the number of nodes in the network.
with model.query() as select:
    node = NetworkNode()
    num_nodes = select(alias(aggregates.count(node), "num_nodes"))

print(num_nodes.results)
# Output:
#    num_nodes
# 0       1000

# Count the number of powerlines in the network.
with model.query() as select:
    powerline = PowerLine()
    num_powerlines = select(alias(aggregates.count(powerline), "num_powerlines"))

print(num_powerlines.results)
# Output:
#    num_powerlines
# 0             999

NetworkNode and PowerLine objects have properties derived from their corresponding Snowflake table’s columns:

NetworkNodePowerLine
idThe unique numeric identifier of the node.source_node_idThe ID of the node on the transmitting side.
typeThe type of the node (e.g., “substation”).target_node_idThe ID of the node on the receiving side.
descriptionA description of the node (e.g., “hospital”).
statusThe status of the node (e.g., “ok” or “fail”).

You can access these properties using dot notation, such as node.id or powerline.source_node_id:

## Count the number of failed nodes in the network.
with model.query() as select:
    node = NetworkNode()
    node.status == "fail"  # Filter nodes with a status of "fail".
    num_failed = select(alias(aggregates.count(node), "num_failed"))

print(num_failed.results)
# Output:
#    num_failed
# 0          20

To see all known properties available for a type, use the Type.known_properties() method:

#print(NetworkNode.known_properties())
# Output:
# ['snowflake_id', 'type', 'description', 'id', 'status']

print(PowerLine.known_properties())
# Output:
# ['snowflake_id', 'target_node_id', 'source_node_id']

PowerLine.source_node_id and .target_node_id are foreign keys that reference the NetworkNode.id property. However, it is often more convenient to reference the source and target NetworkNode objects directly. Use the .define() method to create new properties named source and target that reference the source and target node objects that the IDs refer to:

#PowerLine.define(
    source=(NetworkNode, "source_node_id", "id"),
    target=(NetworkNode, "target_node_id", "id"),
)

Similar to a join in SQL, this creates a relationship between the PowerLine and NetworkNode types such that accessing the source or target property of a PowerLine object returns the NetworkNode with the corresponding ID.

Next, use model.Type() to define a type for nodes that need repair:

#NeedsRepair = model.Type("NeedsRepair")

NeedsRepair exists only in the model and is not associated with any Snowflake table. It’s also empty! Let’s populate it and determine the repair priority of each node.

Add Rules to Your Model#

You write rules using the RAI query-builder syntax, which describes rules declaratively as pipelines that objects pass through during model execution.

In our scenario, some of the power grid nodes have been damaged in a storm and need repair. Add the following rule to your Python file to assign nodes with a status of "fail" to the NeedsRepair type:

#with model.rule():
    node = NetworkNode()
    node.status == "fail"
    node.set(NeedsRepair)

Rules are evaluated lazily. The model.rule() context manager translates query-builder code in the with block into a query plan, which only executes when you query the model. As the model runs, objects flow through the rule:

The primary purpose of rules is to capture knowledge for decision-making.

In our dataset, nodes have a description property that indicates the type of load attached to the node. Some nodes in the power grid are more critical than others. For example, nodes that supply power to critical infrastructure need to be repaired first.

The next rule prioritizes nodes based on their description property:

#with model.rule():
    node = NeedsRepair(type="load")
    with model.match():
        # Nodes with a description of "hospital", "military", or
        # "government" are given the highest priority.
        with node.description.in_(["hospital", "military", "government"]):
            node.set(load_priority=2)
        # "Load" nodes with other descriptions are given a lower priority.
        with model.case():
            node.set(load_priority=1)

Objects flow through this rule as follows:

You might wonder: “Why not use a simple if statement?”

You can’t use if in a rule because objects and their property values aren’t known at your Python program’s runtime. Instead, they are determined at query execution time. This principle extends to other Python constructs, like in, which are substituted by RAI query-builder methods, such as node.description.in_() for checking if a node’s description is in a list of values.

RelationalAI makes it easy to leverage the network of relationships between objects in your model. You can tap into this network—also known as a graph—to power your rules with more advanced logic. For example, we can use the inherent graph structure of the power grid in our scenario to prioritize nodes that are upstream of critical loads or have more downstream connections.

Let’s build a graph from NeedsRepair nodes and the power lines that connect them:

#from relationalai.std.graphs import Graph

# Get a new graph object. By default graphs are directed.
graph = Graph(model)

# Add 'NeedsRepair' nodes to the `graph.Node` type.
with model.rule():
    repair_node = NeedsRepair()
    graph.Node.add(repair_node)

# Add edges between `NeedsRepair` nodes that are connected by a 'PowerLine'.
with model.rule():
    line = PowerLine()
    # `NeedsRepair(line.source)` and `NeedsRepair(line.target)` filter
    # `PowerLine` objects, only adding edges where both endpoints need repair.
    graph.Edge.add(from_=NeedsRepair(line.source), to=NeedsRepair(line.target))

Next, add a rule that increases the load_priority property of nodes that are upstream of critical loads, and sets the connection_priority property to the number of downstream connections:

#with model.rule():
    upstream, downstream = NeedsRepair(), NeedsRepair()
    graph.compute.is_reachable(upstream, downstream)
    upstream.set(
        load_priority=aggregates.max(downstream.load_priority.or_(0), per=[upstream]),
        connection_priority=aggregates.count(downstream, per=[upstream])
    )

In this rule:

Finally, combine the load_priority and connection_priority properties to create a single priority property:

#
# Rank nodes in descending order first by load priority, then by
# connection priority, and finally by the node's ID to break ties.
with model.rule():
    node = NeedsRepair()
    node.set(priority=aggregates.rank_desc(
        node.load_priority.or_(0),
        node.connection_priority.or_(0),
        node.id
    ))

The aggregates.rank_desc() method assigns a rank to each NeedsRepair node based on the values of the load_priority and connection_priority properties. Nodes are ranked in descending order first by load_priority, then by connection_priority, and finally by the node’s id to break ties.

Query Your Model#

Let’s see how well your model is prioritizing nodes for repair. Use the model.query() context manager to query the model:

#with model.query() as select:
    node = NeedsRepair()
    node.priority <= 10  # Limit the results to the top 10 nodes.
    response = select(node.priority, node.id, node.type, node.description.or_(""))

In this query:

Here are the results:

#print(response.results)
#    priority   id         type  description
# 0         1  447  transformer
# 1         2  448  transformer
# 2         3  449  transformer
# 3         4  450  transformer
# 4         5  813         load     hospital
# 5         6  451  transformer
# 6         7  830         load   commercial
# 7         8  806         load  residential
# 8         9  829  transformer
# 9        10  825  transformer

The top load node is a hospital, which makes sense given that we said nodes supplying critical infrastructure should be repaired first. You can get a better sense of how the nodes are prioritized by visualizing the graph:

## Pass properties of 'NeedsRepair' objects to the graph so they can be
# displayed in the visualization.
with model.rule():
    repair = NeedsRepair()
    # Get the graph's 'Node' object for each 'NeedsRepair' object and
    # set the properties that will be displayed in the visualization.
    graph.Node(repair).set(
        id=repair.id,
        type=repair.type,
        description=repair.description,
        priority=repair.priority,
    )

# Visualize the graph. The 'visualize()' method accepts a style dictionary
# that lets you customize the appearance of nodes and edges in the graph.
graph.visualize(
    style={
        "node": {
            # Color load nodes red and all other nodes black.
            "color": lambda n: {"load": "red"}.get(n["type"], "black"),
            # Label nodes by their priority.
            "label": lambda n: n["priority"],
            # Include additional information when hovering over a node.
            "hover": lambda n: f"ID: {n['id']}\nType: {n['type']}\nDescription: {n.get('description', 'none')}",
        },
    },
).display()  # In Jupyter Notebooks, .display() is not required.

The .visualize() method generates an interactive graph visualization of the model. When called in a Jupyter Notebook, the visualization is displayed inline. In other environments, use .display() to open the visualization in a new browser window.

Trace the path through the power lines from the highest priority node (labelled 1) to the least, and hover over nodes to see their type and descriptions. Your model has not only identified that the hospital load is the most critical, but also the sequence of repairs for the transformers upstream of the hospital!

Conclusion and Next Steps#

In just a few lines of code, you’ve built a RelationalAI model over power grid data in Snowflake, defined rules to prioritize repairs to the grid after a storm, and visualized the results. But you’ve only just scratched the surface of what’s possible with RelationalAI!

Whether you’re managing critical infrastructure, optimizing operational flows, predicting market trends, or identifying fraudulent activity, RelationalAI provides the secure, scalable, and dynamic modeling environment you need to power every decision with intelligence.

Want to learn more?