Data Management#
RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.
Support for data streams is disabled by default. You must enable the CDC service before data can be shared with the RAI Native App.
Table of Contents#
The CDC Service#
Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.
The CDC service requires that your RAI Native App be granted the EXECUTE TASK
and
EXECUTE MANAGED TASK
privileges.
This is typically done when you install the app.
You can manage the CDC service using SQL, Python, or the RAI CLI.
Enable CDC#
Requires the cdc_admin
application role.
To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc()
procedure:
#-- Enable the CDC service.
CALL relationalai.app.resume_cdc();
/*+--------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been resumed |
+--------------------------------------------------------------------+ */
To enable the CDC service, or to resume the service after suspending it, create a Provider
instance and use its .sql()
method to execute the app.resume_cdc()
SQL procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Enable the CDC service.
app.sql("CALL relationalai.app.resume_cdc()")
To enable the CDC service, or to resume the service after suspending it, use the imports:setup
command’s --resume
flag:
## Enable the CDC service.
rai imports:setup --resume
While CDC is enabled, a dedicated engine responsible for processing changes tracked by data streams is provisioned whenever changes are detected. See CDC Engine for details.
Disable CDC#
Requires the cdc_admin
application role.
To disable the CDC service, use the app.suspend_cdc()
procedure:
#-- Disable the CDC service.
CALL relationalai.app.suspend_cdc();
/*+-----------------------------------------------------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped |
+-----------------------------------------------------------------------------------------------------------------+ */
To disable the CDC service, create a Provider
instance and use its .sql()
method to execute the app.suspend_cdc()
procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Disable the CDC service.
app.sql("CALL relationalai.app.suspend_cdc()")
To disable the CDC service, use the imports:setup
command’s --suspend
flag:
## Disable the CDC service.
rai imports:setup --suspend
Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.
When you disable the CDC service, RAI Python models retain access to a snapshot of the source data. Model queries are evaluated against the snapshot.
View CDC Service Status#
Requires the app_user
application role.
To view the status of the CDC service, use the app.cdc_status()
procedure:
#-- Get the CDC service status.
CALL relationalai.app.cdc_status();
/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO |
|--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} |
+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */
Refer to the reference docs for more details on the output of the cdc_status()
procedure.
To view the status of the CDC service, create a Provider
instance and use its .sql()
method to execute the app.cdc_status()
procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Get the CDC service status.
cdc_status = app.sql("CALL relationalai.app.cdc_status()")
print(cdc_status)
# [Row(CDC_ENABLED=True, CDC_ENGINE_NAME='CDC_MANAGED_ENGINE', CDC_ENGINE_STATUS='READY', CDC_ENGINE_SIZE='M', CDC_TASK_STATUS='started', CDC_TASK_INFO='{\n "createdOn": "2024-10-24 22:53:15.200 -0700",\n "lastSuspendedOn": null,\n "lastSuspendedReason": null,\n "state": "started"\n}')]
To view the status of the CDC service, use the imports:status
command:
#$ rai imports:setup
---------------------------------------------------
▰▰▰▰ Imports setup fetched
To suspend imports, use 'rai imports:setup --suspend'
Field Value
───────────────────────────────────────────
engine CDC_MANAGED_ENGINE
engine_size M
engine_status READY
status STARTED
enabled True
createdOn 2024-10-24 22:53:15
lastSuspendedOn N/A
lastSuspendedReason N/A
---------------------------------------------------
Configure CDC Engine Size#
Requires the cdc_admin
application role.
To change the size of the CDC engine, use the app.alter_cdc_engine_size()
procedure:
#-- Change the size of the CDC engine to HIGHMEM_X64_M.
CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');
/*+--------------------------------------+
| CDC engine size set to HIGHMEM_X64_M |
+--------------------------------------+ */
To change the size of the CDC engine, create a Provider
instance and use its .sql()
method to execute the app.alter_cdc_engine_size()
procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Change the size of the CDC engine to HIGHMEM_X64_M.
app.sql("CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M')")
To change the size of the CDC engine, pass the new engine size to the imports:setup
command’s --engine_size
flag:
## Change the size of the CDC engine to HIGHMEM_X64_M.
rai imports:setup --engine_size HIGHMEM_X64_M
If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.
Whe the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S
size.
The size of the CDC engine is determined by the instance family of the engine’s host compute pool.
See Compute Pools for available sizes.
Data Streams#
Data streams track changes to tables and views in your Snowflake account to ensure that queries from RAI Python models are evaluated against the most recent Snowflake data. Changes are batched and processed by the CDC Service once per minute.
You must enable the CDC service before creating data streams.
Supported Column Types#
Data streams support the following Snowflake column types:
Tables or views with unsupported column types cannot be streamed into the RAI Native App.
If you need to stream data from a source object with unsupported column types, consider creating a view without those columns or that casts the unsupported columns to a supported type.
Change Tracking#
Data streams use Snowflake’s native change tracking feature to capture changes to tables and views.
Change tracking stores information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT
, UPDATE
, and DELETE
operations.
You must enable change tracking on the source table or view before creating a data stream.
Changes to a table or view’s schema are not tracked. If columns are added or removed from a table or view, or if the column types change, you must delete the stream and re-create it before RAI Python models can query the updated data.
Change tracking data consumed by data streams is temporarily stored in a stage in the app’s Snowflake database. If the CDC Service is enabled, the staged data is processed in batches and made available to RAI models. You may suspend a data stream to stop it from consuming change tracking data.
Staleness#
When a stream is suspended, change tracking data for its source table or view is no longer consumed by the data stream. Snowflake retains change tracking data for a limited period, after which the data is inaccessible. If a stream is suspended for longer than its data retention period, the stream becomes stale.
Resume suspended data streams periodically to avoid staleness. Note that the length of a stream’s data retention period is configurable. See the Snowflake documentation for details.
Quarantined Streams#
If the CDC Service encounters too many failures while attempting to process a data stream, the stream is given a QUARANTINED
status.
Change tracking data for quarantined streams is consumed, but not processed.
Resolve the errors causing the stream to be quarantined, then resume the data stream to process the pending changes.
See View Quarantined Streams for details on viewing errors for quarantined streams.
Data Stream Management#
You can manage data streams using SQL, Python, or the RAI CLI.
Enable Change Tracking on a Table or View#
Requires ownership privileges on the table or view.
To enable change tracking on a table or view, use the ALTER TABLE
statement:
#-- Enable change tracking on a table.
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the
-- CHANGE_TRACKING column will be set to ON.
SHOW TABLES LIKE 'MyTable';
To enable change tracking on a table or view, create a Provider
instance and use its .sql()
method to execute the ALTER TABLE
statement:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Enable change tracking on a table.
app.sql("ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE")
# Enable change tracking on a view.
app.sql("ALTER VIEW MyView SET CHANGE_TRACKING = TRUE")
# Verify that change tracking is enabled. If change tracking is enabled, the
# CHANGE_TRACKING column will be set to ON.
print(app.sql("SHOW TABLES LIKE 'MyTable'"))
See Change Tracking for details on what operations are tracked.
Create a Data Stream#
Requires the cdc_admin
application role.
To create a data stream, use the api.create_data_stream()
procedure:
#-- Replace the placeholders with your database, schema, and table/view names.
SET obj_name = '<db>.<schema>.<table_or_view>';
SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.
SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or view
ALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Stream the table or view into the model 'MyModel', using the fully-qualified
-- name as the name of the data stream.
CALL relationalai.api.create_data_stream($obj_ref, 'MyModel', $obj_name, TRUE);
/*+----------------------------------+
| Datastream created successfully. |
+----------------------------------+ */
To create a data stream, create a Provider
instance and use its .create_streams()
method:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
table_or_view = "<db>.<schema>.<table_or_view>"
# Enable change tracking on the table or view.
app.sql(f"ALTER TABLE IDENTIFIER({table_or_view}) SET CHANGE_TRACKING = TRUE;")
# Stream data from a table or view into a model. Note that multiple streams can
# be created simultaneously.
app.create_streams([table_or_view], 'MyModel')
To create a data stream, pass the fully-qualified table or view name and the model name to the imports:stream
command’s --source
and --model
flags:
#rai imports:stream --source <db>.<schema>.<table_or_view> --model MyModel
Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.
Currently, you may use up to 100 tables and 100 views as source objects for data streams. However, each source object may have multiple streams associated with it.
Suspend or Resume a Data Stream#
Requires the cdc_admin
application role.
To suspend a data stream, use the api.suspend_data_stream()
procedure:
#-- Suspend a data stream for the model named MyModel.
CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------+
| Data stream suspended |
+-----------------------+ */
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
When you suspend a data stream, RAI Python models retain access to a snapshot of the source data. Queries from models that consume data from a suspended stream are evaluated against the snapshot.
To resume a suspended data stream, use the api.resume_data_stream()
procedure:
#-- Resume a data stream for the model named MyModel.
CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+---------------------+
| Data stream resumed |
+---------------------+ */
To suspend or resume a data stream, create a Provider
instance and use its .sql()
method to execute the api.suspend_data_stream()
SQL procedure:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Suspend a data stream for the model named MyModel.
app.sql("CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>', 'MyModel')")
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
When you suspend a data stream, RAI Python models retain access to a snapshot of the source data. Queries from models that consume data from a suspended stream are evaluated against the snapshot.
To resume a suspended data stream, use the resume_data_stream()
procedure:
## Resume a data stream for the model named MyModel.
app.sql("CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>', 'MyModel')")
Delete a Data Stream#
Requires the cdc_admin
application role.
To delete a data stream, use the api.delete_data_stream()
procedure:
#-- Delete a data stream for the model named MyModel. Replace the placeholders
-- with your database, schema, and table or view name.
CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------+
| Data stream deleted successfully. |
+-----------------------------------+ */
To delete a data stream, create a Provider
instance and use its .delete_stream()
method:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Delete a data stream for the model named MyModel.
app.delete_stream('<db>.<schema>.<table_or_view>', 'MyModel')
To delete a data stream, pass the fully-qualified table or view name and the model name to the imports:delete
command’s --object
and --model
flags:
#rai imports:delete --object <db>.<schema>.<table_or_view> --model MyModel
When you delete a data stream, RAI Python models retain access to a snapshot of the source data. Queries from models that consume data from a deleted stream are evaluated against the snapshot.
List Data Streams#
Requires the cdc_admin
application role.
To view a list all data streams, query the api.data_streams
view:
#SELECT * FROM relationalai.api.data_streams;
/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION |
|-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------|
| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 |
| ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees |
| ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget|
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */
To view a list all data streams, create a Provider
instance and use its .list_streams()
method:
#from pprint import pprint
import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# List all data streams.
streams = app.list_streams()
pprint(streams)
To view a list all data streams, use the imports:list
command:
#rai imports:list
To list data streams for a specific model, pass the model name to the --model
flag:
#rai imports:list --model MyModel
Get Data Stream Details#
Requires the cdc_admin
application role.
To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream()
procedure:
#-- Get details about the data stream for the model named MyModel. Replace the
-- placeholders with your database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */
To get details about a specific data stream, create a Provider
instance and use its .sql()
method to execute the api.get_data_stream()
SQL procedure with the name of the data stream and model:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# Get details about a data stream. Replace the placeholders with your model,
# database, schema, and table or view name.
model_name = 'MyModel'
stream_name = '<db>.<schema>.<table_or_view>'
stream_details = app.sql(f"CALL relationalai.api.get_data_stream({stream_name}, {model_name})")
print(stream_details)
To get details about a specific data stream, pass the stream ID to the --id
option of the imports:get
command:
#rai imports:get --id <stream_id>
The stream ID is the unique identifier for the data stream, as returned in the output of the imports:list
command.
View Quarantined Streams#
Requires the cdc_admin
application role.
To view quarantined streams and their errors, use the api.data_stream_batches
view:
#SELECT
data_stream_id,
fq_object_name,
status,
error.VALUE::string AS processing_error
FROM
relationalai.api.data_stream_batches,
LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
status = 'QUARANTINED';
To view quarantined streams and their errors, create a Provider
instance and use its .sql()
method to query the api.data_stream_batches
view:
#import relationalai as rai
# Get a Provider instance.
app = rai.Provider()
# View quarantined streams and their errors.
quarantined_streams = app.sql("""
SELECT
data_stream_id,
fq_object_name,
status,
error.VALUE::string AS processing_error
FROM
relationalai.api.data_stream_batches,
LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
status = 'QUARANTINED';
""")
print(quarantined_streams)
Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.