# Sharing On-Demand Logs with RAI

## Overview

This notebook creates and shares a [secure-share view](https://docs.snowflake.com/en/user-guide/data-sharing-intro) with RAI to provide access to on-demand logs.


In [None]:
import snowflake.connector
from datetime import datetime, timedelta, date

In [None]:
def get_date_range(start_date, end_date):
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    dates = [(start + timedelta(days=i)).strftime("%Y-%m-%d")
             for i in range((end - start).days + 1)]
    return "|".join(dates)

Edit the values in the next cell with your specific values:


In [None]:
# Override this with the account name in the format ORG-account_nameL
snowflake_account = ""
snowflake_user = ""
snowflake_password = ""

# Override this to the date you want to get logs starting from in the format YYYY-MM-DD:
start_date = date.today().strftime("%Y-%m-%d")

# Override this to the date you want to get logs until in the format YYYY-MM-DD
end_date = date.today().strftime("%Y-%m-%d")
date_range = get_date_range(start_date, end_date)

# Override this to True if you want to share spcs_control_plane logs
include_erps_logs = False

# Override this to the engine name (ex, 'testEngine') or leave it as it is if you want to get logs for all engines
engine_name = ".*"
warehouse = ""

# Override this to a unique id and share it with RAI
id = ""

# The account you want to share the logs with
event_sharing_account = ""

# Your native app name (usually relationalai)
native_app_name = "relationalai"

In [None]:
engine_file_pattern = f'{engine_name}/clientlogs-.*({date_range}).*\\.json|{engine_name}/clientlogs-engine.json'
erp_file_pattern = f'|clientlogs-cp-({date_range}).*\\.json|.*clientlogs-cp.json'
if include_erps_logs:
    file_pattern = f'.*({engine_file_pattern}{erp_file_pattern}).*'
else:
    file_pattern = f'.*({engine_file_pattern}).*'

In [None]:
sql_query = """
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE &warehouse; -- Update to use another warehouse if necessary.

CREATE DATABASE IF NOT EXISTS TELEMETRY_SHARING;
USE DATABASE TELEMETRY_SHARING;
CREATE SCHEMA IF NOT EXISTS LOGS;
USE SCHEMA LOGS;

--*****--
-- Load staged data files to temporary tables
--*****--
CREATE OR REPLACE TABLE TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id (
  LOG_RECORD VARCHAR
);

CREATE OR REPLACE FILE FORMAT json_format TYPE = 'json';

CREATE OR REPLACE TABLE TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id  (
  TIMESTAMP TIMESTAMP,
  OBSERVED_TIMESTAMP TIMESTAMP,
  SPAN_ID VARCHAR,
  TRACE_ID VARCHAR,
  MESSAGE VARCHAR,
  LOG_RECORD VARCHAR
);

--*****--
-- Create secure view from table with target log records
--*****--
CREATE OR REPLACE SECURE VIEW TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_VIEW_&id
COMMENT = 'View containing telemetry records to share with the RAI provider account'
AS
    SELECT *
    FROM TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id ;

--*****--
-- Share secure view with the RAI provider account
--*****--
CREATE OR REPLACE SHARE TELEMETRY_SHARE_&id;

CREATE DATABASE ROLE IF NOT EXISTS TELEMETRY_SHARE_ROLE;
GRANT USAGE ON DATABASE TELEMETRY_SHARING TO SHARE TELEMETRY_SHARE_&id;
GRANT USAGE ON SCHEMA TELEMETRY_SHARING.LOGS TO DATABASE ROLE TELEMETRY_SHARE_ROLE;
GRANT SELECT ON VIEW TELEMETRY_SHARE_VIEW_&id TO DATABASE ROLE TELEMETRY_SHARE_ROLE;
GRANT DATABASE ROLE TELEMETRY_SHARE_ROLE TO SHARE TELEMETRY_SHARE_&id;

ALTER SHARE TELEMETRY_SHARE_&id ADD ACCOUNTS = NDSOEBE.&event_sharing_account;

COPY INTO TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id
FROM (
    SELECT
        $1 AS log_record
    FROM @&native_app_name.app_state.client_log_stage
)
PATTERN =  '&file_pattern'
FILE_FORMAT = (TYPE = JSON)
ON_ERROR = CONTINUE; -- This will skip any log records that are invalid JSON.
-- The output of the query will indicate if any records were skipped due to errors.


-- Copy from TELEMETRY_LOAD_TABLE_&id into TELEMETRY_SHARE_TABLE_&id and remove safe logs
INSERT INTO TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id
    SELECT
        to_timestamp(timeUnixNano) as timestamp,
        to_timestamp(observedTimeUnixNano) as observed_timestamp,
        spanId,
        traceId,
        a.value:value:stringValue as message,
        log_record
    FROM (SELECT
            value:timeUnixNano as timeUnixNano,
            value:observedTimeUnixNano as observedTimeUnixNano,
            value:spanId as spanId,
            value:traceId as traceId,
            value as log_record
        FROM TELEMETRY_SHARING.LOGS.TELEMETRY_LOAD_TABLE_&id, LATERAL FLATTEN( INPUT => TRY_PARSE_JSON($1):resourceLogs[0]:scopeLogs[0]:logRecords, OUTER => TRUE )),
        LATERAL FLATTEN( INPUT => log_record:body:kvlistValue:values, OUTER => TRUE) a, LATERAL FLATTEN( INPUT => log_record:attributes, OUTER => TRUE) b
    WHERE a.VALUE:key = 'message'
    and
        (
            ( -- engine unsafe logs
                b.value:key = 'log.file.name'
                and b.value:value:stringValue ='engine-unsafe.log'
            )
            or
            (
                -- erps unsafe logs
                log_record not like '%___safe_to_log%'
                and log_record not like '%engine-safe.log%'
                and log_record like '%spcs_control_plane%'

            )
        )
;
"""
sql_query = (
    sql_query
        .replace('&warehouse', warehouse)
        .replace('&id', id)
        .replace('&event_sharing_account', event_sharing_account)
        .replace('&date_range', date_range)
        .replace('&native_app_name', native_app_name)
        .replace('&file_pattern', file_pattern)
)

sql_statements = sql_query.split(';')

In [None]:
conn = snowflake.connector.connect(
    account=snowflake_account,
    user=snowflake_user,
    password= snowflake_password,
)
cur = conn.cursor()
try:
    for statement in sql_statements:
        statement = statement.strip()
        if statement:
            cur.execute(statement)
            cur.fetchall()
finally:
    cur.close()
    conn.close()

Run the following cell to confirm that the logs are copied to the secure share:


In [None]:
import pandas as pd
query = """
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE &warehouse;
SELECT * FROM TELEMETRY_SHARING.LOGS.TELEMETRY_SHARE_TABLE_&id limit 5;
"""
query = query.replace('&id', id)
query = query.replace('&warehouse', warehouse)
conn = snowflake.connector.connect(
    account=snowflake_account,
    user=snowflake_user,
    password= snowflake_password,
)
cur = conn.cursor()
try:
    sql_statements = query.split(';')
    for statement in sql_statements:
        statement = statement.strip()
        if statement:
            cur.execute(statement)
            result = cur.fetchall()

    columns = [desc[0] for desc in cur.description]
    df = pd.DataFrame(result, columns=columns)
    print(df)
finally:
    cur.close()
    conn.close()