Data Management#

RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.

IMPORTANT

Support for data streams is disabled by default. You must enable the CDC service before data can be shared with the RAI Native App.

Table of Contents#

The CDC Service#

Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.

NOTE

The CDC service requires that your RAI Native App be granted the EXECUTE TASK and EXECUTE MANAGED TASK privileges. This is typically done when you install the app.

You can manage the CDC service using SQL, Python, or the RAI CLI.

Enable CDC#

Requires the cdc_admin application role.

To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc() procedure:

#-- Enable the CDC service.
CALL relationalai.app.resume_cdc();
/*+--------------------------------------------------------------------+
  | CDC functionality on the RelationalAI application has been resumed |
  +--------------------------------------------------------------------+ */
NOTE

While CDC is enabled, a dedicated engine responsible for processing changes tracked by data streams is provisioned whenever changes are detected. See CDC Engine for details.

Disable CDC#

Requires the cdc_admin application role.

To disable the CDC service, use the app.suspend_cdc() procedure:

#-- Disable the CDC service.
CALL relationalai.app.suspend_cdc();
/*+-----------------------------------------------------------------------------------------------------------------+
  | CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped |
  +-----------------------------------------------------------------------------------------------------------------+ */

Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.

NOTE

When you disable the CDC service, RAI Python models retain access to a snapshot of the source data. Model queries are evaluated against the snapshot.

View CDC Service Status#

Requires the app_user application role.

To view the status of the CDC service, use the app.cdc_status() procedure:

#-- Get the CDC service status.
CALL relationalai.app.cdc_status();
/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
  | CDC_ENABLED  | CDC_ENGINE_NAME    | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE  | CDC_TASK_STATUS | CDC_TASK_INFO                                                                                                            |
  |--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
  | TRUE         | CDC_MANAGED_ENGINE | READY             | HIGHMEM_X64_S    | started         | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} |
  +--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */

Refer to the reference docs for more details on the output of the cdc_status() procedure.

Configure CDC Engine Size#

Requires the cdc_admin application role.

To change the size of the CDC engine, use the app.alter_cdc_engine_size() procedure:

#-- Change the size of the CDC engine to HIGHMEM_X64_M.
CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');
/*+--------------------------------------+
  | CDC engine size set to HIGHMEM_X64_M |
  +--------------------------------------+ */

If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.

NOTE

Whe the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S size. The size of the CDC engine is determined by the instance family of the engine’s host compute pool. See Compute Pools for available sizes.

Data Streams#

Data streams track changes to tables and views in your Snowflake account to ensure that queries from RAI Python models are evaluated against the most recent Snowflake data. Changes are batched and processed by the CDC Service once per minute.

NOTE

You must enable the CDC service before creating data streams.

Supported Column Types#

Data streams support the following Snowflake column types:

Tables or views with unsupported column types cannot be streamed into the RAI Native App.

NOTE

If you need to stream data from a source object with unsupported column types, consider creating a view without those columns or that casts the unsupported columns to a supported type.

Change Tracking#

Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. Change tracking stores information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT, UPDATE, and DELETE operations.

NOTE

Change tracking must be enabled on a source table or view before a data stream is created on it. If you want the RAI Python library to perform this step for you as needed, you can set ensure_change_tracking to true in raiconfig.toml or use the ensure_change_tracking parameter of Model.

Change tracking data consumed by data streams is temporarily stored in a stage in the app’s Snowflake database. If the CDC Service is enabled, the staged data is processed in batches and made available to RAI models. You may suspend a data stream to stop it from consuming change tracking data.

Staleness#

When a stream is suspended, change tracking data for its source table or view is no longer consumed by the data stream. Snowflake retains change tracking data for a limited period, after which the data is inaccessible. If a stream is suspended for longer than its data retention period, the stream becomes stale.

TIP

Resume suspended data streams periodically to avoid staleness. Note that the length of a stream’s data retention period is configurable. See the Snowflake documentation for details.

Quarantined Streams#

If the CDC Service encounters too many failures while attempting to process a data stream, the stream is given a QUARANTINED status. Change tracking data for quarantined streams is consumed, but not processed. Resolve the errors causing the stream to be quarantined, then resume the data stream to process the pending changes. See View Quarantined Streams for details on viewing errors for quarantined streams.

Data Stream Management#

You can manage data streams using SQL, Python, or the RAI CLI.

Enable Change Tracking on a Table or View#

Requires ownership privileges on the table or view.

To enable change tracking on a table or view, use the ALTER TABLE statement:

#-- Enable change tracking on a table.
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;

-- Enable change tracking on a view.
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;

-- Verify that change tracking is enabled. If change tracking is enabled, the
-- CHANGE_TRACKING column will be set to ON.
SHOW TABLES LIKE 'MyTable';

See Change Tracking for details on what operations are tracked.

Create a Data Stream#

Requires the cdc_admin application role.

To create a data stream, use the api.create_data_stream() procedure:

#-- Replace the placeholders with your database, schema, and table/view names.
SET obj_name = '<db>.<schema>.<table_or_view>';
SET obj_type = 'TABLE';  -- Set to 'VIEW' if needed.
SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);

-- Enable change tracking on the table or view
ALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;

-- Stream the table or view into the model 'MyModel', using the fully-qualified
-- name as the name of the data stream.
CALL relationalai.api.create_data_stream($obj_ref, 'MyModel', $obj_name, TRUE);
/*+----------------------------------+
  | Datastream created successfully. |
  +----------------------------------+ */

Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.

IMPORTANT

Currently, you may use up to 100 tables and 100 views as source objects for data streams. However, each source object may have multiple streams associated with it.

Suspend or Resume a Data Stream#

Requires the cdc_admin application role.

To suspend a data stream, use the api.suspend_data_stream() procedure:

#-- Suspend a data stream for the model named MyModel.
CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------+
  | Data stream suspended |
  +-----------------------+ */

While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.

NOTE

When you suspend a data stream, RAI Python models retain access to a snapshot of the source data. Queries from models that consume data from a suspended stream are evaluated against the snapshot.

To resume a suspended data stream, use the api.resume_data_stream() procedure:

#-- Resume a data stream for the model named MyModel.
CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+---------------------+
  | Data stream resumed |
  +---------------------+ */

Delete a Data Stream#

Requires the cdc_admin application role.

To delete a data stream, use the api.delete_data_stream() procedure:

#-- Delete a data stream for the model named MyModel. Replace the placeholders
-- with your database, schema, and table or view name.
CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------+
  | Data stream deleted successfully. |
  +-----------------------------------+ */
NOTE

When you delete a data stream, RAI Python models retain access to a snapshot of the source data. Queries from models that consume data from a deleted stream are evaluated against the snapshot.

List Data Streams#

Requires the cdc_admin application role.

To view a list all data streams, query the api.data_streams view:

#SELECT * FROM relationalai.api.data_streams;
/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+
  | ID                                  | CREATED_AT              | CREATED_BY               | STATUS   | REFERENCE_NAME    | REFERENCE_ALIAS                      | FQ_OBJECT_NAME            | RAI_DATABASE | RAI_RELATION             |
  |-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------|
  | ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com     | CREATED  | DATA_STREAM_VIEW  | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1    | SalesModel   | example_db.sales.view1   |
  | ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED  | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees   | HRModel      | example_db.hr.employees  |
  | ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com   | DELETING | DATA_STREAM_VIEW  | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget|
  +-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */

Get Data Stream Details#

Requires the cdc_admin application role.

To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:

#-- Get details about the data stream for the model named MyModel. Replace the
-- placeholders with your database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
  | ID                                      | CREATED_AT              | CREATED_BY            | STATUS | REFERENCE_NAME    | REFERENCE_ALIAS                      | FQ_OBJECT_NAME                | RAI_DATABASE | RAI_RELATION                  | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT  | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS              | LAST_BATCH_DETAILS            | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID                    | ERRORS | CDC_STATUS |
  |-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
  | ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com  | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel      | <db>.<schema>.<table_or_view> | SYNCED           | 0                      | NULL              | NULL                          | NULL                            | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456       | 02a1b234-5678-1234-abcdef-0123456789ab | []     | STARTED    |
  +-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */

View Quarantined Streams#

Requires the cdc_admin application role.

To view quarantined streams and their errors, use the api.data_stream_batches view:

#SELECT
  data_stream_id,
  fq_object_name,
  status,
  error.VALUE::string AS processing_error
FROM
  relationalai.api.data_stream_batches,
  LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
  status = 'QUARANTINED';

Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.