App Management#
The RelationalAI (RAI) Native App runs securely in your organization’s Snowflake account as a service on Snowpark Container Services (SPCS). This section provides an overview of the RAI Native App and how to manage and monitor its status.
Table of Contents#
The RAI SPCS Service#
The RAI Native App allows Python users to build and query models over data in your organization’s Snowflake account using the RAI Python API. Queries from RAI models are evaluated by RAI engines. The RAI SPCS service manages these engines, as well as the app’s CDC Service.
Resources#
The RAI SPCS service utilizes the following Snowflake resources:
Component | Description |
---|---|
App Database | The Snowflake database where the RAI Native App is installed. This database is named RELATIONALAI by default. It contains the stored procedures and views available in the RAI SQL API. |
App Warehouse | The Snowflake warehouse used by the RAI Native App to execute Snowflake queries. This warehouse is named RELATIONAL_AI_ERP_WAREHOUSE by default. |
App Compute Pool | The Snowflake compute pool used to run the RAI SPCS service. This compute pool is named RELATIONAL_AI_ERP_COMPUTE_POOL by default. |
Engine Compute Pools | Snowflake compute pools used to run the RAI engines that evaluate queries from RAI Python models. |
See Compute Resources for more information on the RAI Native App’s resources. For information on managing costs associated with these resources, see Cost Management.
Shared Data#
In order for Python users to create and query models, data for these models must be shared with the RAI Native App using data streams. These streams are managed by the CDC Service and ensure that queries from RAI models are evaluated against the most up-to-date data. See Data Management for more information on sharing data with the RAI Native App.
Service Management#
The RAI SPCS service can be managed using SQL or Python.
Activate the App#
Requires the app_admin
application role.
To activate the RAI Native App after installing it, or to re-activate an app that has been deactivated, use the app.activate()
procedure:
#CALL relationalai.app.activate();
/*+----------------------------------------------+
| RelationalAI service activated successfully. |
+----------------------------------------------+ */
To activate the RAI Native App, create a Provider
instance and call its .activate()
method:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Activate the app.
app.activate()
When you activate the app for the first time, several Snowflake compute resources are provisioned. It may take several minutes to provision these resources prior to the app being fully activated. See Compute Resources for more information.
When you reactivate a deactivated app:
- All engines that were deleted when the app was deactivated are recreated.
- The CDC service is resumed if it was active when the app was deactivated.
It may take several minutes to recreate engines after the app is reactivated. You can view a list of engines to check their status.
Deactivate the App#
Requires the app_admin
application role.
You can deactivate the RAI Native App to reduce costs when you are not using it.
Use the app.deactivate()
procedure to deactivate the RAI Native App:
#CALL relationalai.app.deactivate();
/*+------------------------------------------------+
| RelationalAI service deactivated successfully. |
+------------------------------------------------+ */
You can deactivate the RAI Native App to reduce costs when you are not using it.
To deactivate the RAI Native App, create a Provider
instance and call its .deactivate()
method:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Deactivate the app.
app.deactivate()
Python users cannot create or query models using the RAI Python API while the app is deactivated.
When you deactivate the app:
- The CDC service is suspended.
- All engines are deleted and all in-progress transactions are cancelled. Engines deleted during deactivation are automatically re-created when the app is re-activated.
Get Service Details#
Requires the app_user
application role.
Use the app.get_service()
procedure to get details about the RAI SPCS service:
#CALL relationalai.app.get_service();
/*+-----------------------------------------------------------+
| [ |
| { |
| "name": "SPCS_CONTROL_PLANE", |
| "owner": "RELATIONALAI", |
| "compute_pool": "RELATIONAL_AI_ERP_COMPUTE_POOL", |
| "query_warehouse": "RELATIONAL_AI_ERP_WAREHOUSE", |
| "resumed_on": "2024-10-27T22:10:05Z", |
| "updated_on": "2024-10-27T22:10:05Z" |
| } |
| ] |
+-----------------------------------------------------------+ */
To get details about the RAI SPCS service, create a Provider
instance and use its .sql()
method to execute the app.get_service()
SQL procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Get service details.
service = app.sql("CALL relationalai.app.get_service()", format="pandas")
print(service)
Refer to the reference documentation for more information on the output fields.
Check Service Status#
Requires the app_user
application role.
Use the app.service_status()
procedure to check the status of the RAI SPCS service:
#CALL relationalai.app.service_status();
/*+----------------------------------------------+
| [ |
| { |
| "message": "Running", |
| "name": "main", |
| "restartCount": 0, |
| "startTime": "2024-10-27T22:10:05Z", |
| "status": "READY" |
| }, |
| { |
| "message": "Running", |
| "name": "registry", |
| "restartCount": 0, |
| "startTime": "2024-10-27T22:10:06Z", |
| "status": "READY" |
| }, |
| { |
| "message": "Running", |
| "name": "otel-collector", |
| "restartCount": 0, |
| "startTime": "2024-10-27T22:10:07Z", |
| "status": "READY" |
| } |
| ] |
+----------------------------------------------+ */
To check the status of the RAI Native App, create a Provider
instance and use its .sql()
method to execute the app.service_status()
SQL procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Check the app status.
status = app.sql("CALL relationalai.app.service_status()")
print(status)
The output of app.service_status()
contains JSON objects with status details for the various SPCS containers associated with the RAI SPCS service.
If each container’s status is READY
, the service is activated and ready to use.
Refer to the reference documentation for more information on the output fields.
Telemetry and Logs#
Application Telemetry#
When you install the RAI Native App, you are prompted to share continuous telemetry data with RelationalAI. This data is written to your account’s active event table and contains operational information such as internal system logs, engine sizes, and usage data. Customer data and personally identifiable information are not included in continuous telemetry data.
See the Snowflake documentation for details on viewing and working with event tables.
Share On-Demand Logs#
Requires the sensitive_logs
application role.
Use the On-Demand Logs Notebook to create a secure-share view that can be shared with RelationalAI for support.
Create and share a secure-share view with RAI for support. →
Here is a preview of the notebook:
Sharing On-Demand Logs with RAI#
Overview#
This notebook creates and shares a secure-share view with RAI to provide access to on-demand logs.
#import snowflake.connector
from datetime import datetime, timedelta, date
#def get_date_range(start_date, end_date):
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
dates = [(start + timedelta(days=i)).strftime("%Y-%m-%d")
for i in range((end - start).days + 1)]
return "|".join(dates)
Edit the values in the next cell with your specific values:
## Override this with the account name in the format ORG-account_nameL
snowflake_account = ""
snowflake_user = ""
snowflake_password = ""
# Override this to the date you want to get logs starting from in the format YYYY-MM-DD:
start_date = date.today().strftime("%Y-%m-%d")
# Override this to the date you want to get logs until in the format YYYY-MM-DD
end_date = date.today().strftime("%Y-%m-%d")
date_range = get_date_range(start_date, end_date)
# Override this to True if you want to share spcs_control_plane logs
include_erps_logs = False
# Override this to the engine name (ex, 'testEngine') or leave it as it is if you want to get logs for all engines
engine_name = ".*"
warehouse = ""
# Override this to a unique id and share it with RAI
id = ""
# The account you want to share the logs with
event_sharing_account = ""
# Your native app name (usually relationalai)
native_app_name = "relationalai"
#engine_file_pattern = f'{engine_name}/clientlogs-.*({date_range}).*\\.json|{engine_name}/clientlogs-engine.json'
erp_file_pattern = f'|clientlogs-cp-({date_range}).*\\.json|.*clientlogs-cp.json'
if include_erps_logs:
file_pattern = f'.*({engine_file_pattern}{erp_file_pattern}).*'
else:
file_pattern = f'.*({engine_file_pattern}).*'
#sql_query = """
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE &warehouse; -- Update to use another warehouse if necessary.
CREATE DATABASE IF NOT EXISTS TELEMETRY_SHARING;
USE DATABASE TELEMETRY_SHARING;
CREATE SCHEMA IF NOT EXISTS LOGS;
USE SCHEMA LOGS;
--*****--
-- Load staged data files to temporary tables
--*****--
CREATE OR REPLACE TABLE TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id (
LOG_RECORD VARCHAR
);
CREATE OR REPLACE FILE FORMAT json_format TYPE = 'json';
CREATE OR REPLACE TABLE TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id (
TIMESTAMP TIMESTAMP,
OBSERVED_TIMESTAMP TIMESTAMP,
SPAN_ID VARCHAR,
TRACE_ID VARCHAR,
MESSAGE VARCHAR,
LOG_RECORD VARCHAR
);
--*****--
-- Create secure view from table with target log records
--*****--
CREATE OR REPLACE SECURE VIEW TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_VIEW_&id
COMMENT = 'View containing telemetry records to share with the RAI provider account'
AS
SELECT *
FROM TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id ;
--*****--
-- Share secure view with the RAI provider account
--*****--
CREATE OR REPLACE SHARE TELEMETRY_SHARE_&id;
CREATE DATABASE ROLE IF NOT EXISTS TELEMETRY_SHARE_ROLE;
GRANT USAGE ON DATABASE TELEMETRY_SHARING TO SHARE TELEMETRY_SHARE_&id;
GRANT USAGE ON SCHEMA TELEMETRY_SHARING.LOGS TO DATABASE ROLE TELEMETRY_SHARE_ROLE;
GRANT SELECT ON VIEW TELEMETRY_SHARE_VIEW_&id TO DATABASE ROLE TELEMETRY_SHARE_ROLE;
GRANT DATABASE ROLE TELEMETRY_SHARE_ROLE TO SHARE TELEMETRY_SHARE_&id;
ALTER SHARE TELEMETRY_SHARE_&id ADD ACCOUNTS = NDSOEBE.&event_sharing_account;
COPY INTO TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id
FROM (
SELECT
$1 AS log_record
FROM @&native_app_name.app_state.client_log_stage
)
PATTERN = '&file_pattern'
FILE_FORMAT = (TYPE = JSON)
ON_ERROR = CONTINUE; -- This will skip any log records that are invalid JSON.
-- The output of the query will indicate if any records were skipped due to errors.
-- Copy from TELEMETRY_LOAD_TABLE_&id into TELEMETRY_SHARE_TABLE_&id and remove safe logs
INSERT INTO TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id
SELECT
to_timestamp(timeUnixNano) as timestamp,
to_timestamp(observedTimeUnixNano) as observed_timestamp,
spanId,
traceId,
a.value:value:stringValue as message,
log_record
FROM (SELECT
value:timeUnixNano as timeUnixNano,
value:observedTimeUnixNano as observedTimeUnixNano,
value:spanId as spanId,
value:traceId as traceId,
value as log_record
FROM TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id, LATERAL FLATTEN( INPUT => TRY_PARSE_JSON($1):resourceLogs[0]:scopeLogs[0]:logRecords, OUTER => TRUE )),
LATERAL FLATTEN( INPUT => log_record:body:kvlistValue:values, OUTER => TRUE) a, LATERAL FLATTEN( INPUT => log_record:attributes, OUTER => TRUE) b
WHERE a.VALUE:key = 'message'
and
(
( -- engine unsafe logs
b.value:key = 'log.file.name'
and b.value:value:stringValue ='engine-unsafe.log'
)
or
(
-- erps unsafe logs
log_record not like '%___safe_to_log%'
and log_record not like '%engine-safe.log%'
and log_record like '%spcs_control_plane%'
)
)
;
"""
sql_query = (
sql_query
.replace('&warehouse', warehouse)
.replace('&id', id)
.replace('&event_sharing_account', event_sharing_account)
.replace('&date_range', date_range)
.replace('&native_app_name', native_app_name)
.replace('&file_pattern', file_pattern)
)
sql_statements = sql_query.split(';')
#conn = snowflake.connector.connect(
account=snowflake_account,
user=snowflake_user,
password= snowflake_password,
)
cur = conn.cursor()
try:
for statement in sql_statements:
statement = statement.strip()
if statement:
cur.execute(statement)
cur.fetchall()
finally:
cur.close()
conn.close()
Run the following cell to confirm that the logs are copied to the secure share:
#import pandas as pd
query = """
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE &warehouse;
SELECT * FROM TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id limit 5;
"""
query = query.replace('&id', id)
query = query.replace('&warehouse', warehouse)
conn = snowflake.connector.connect(
account=snowflake_account,
user=snowflake_user,
password= snowflake_password,
)
cur = conn.cursor()
try:
sql_statements = query.split(';')
for statement in sql_statements:
statement = statement.strip()
if statement:
cur.execute(statement)
result = cur.fetchall()
columns = [desc[0] for desc in cur.description]
df = pd.DataFrame(result, columns=columns)
print(df)
finally:
cur.close()
conn.close()