CDC Stream Management#

Streams share Snowflake data with the RAI Native App using change data capture (CDC) to capture source table and view changes once every minute.

Users of the RAI Python package use streams to provide data to models within a RAI schema. Each stream is tied to a single schema and can be used by multiple models within that schema. Ensure Python users have CDC privileges on their model’s schema to create streams.

Managing Change Data Capture Support#

You can enable, suspend, and resume CDC using the RAI CLI or the SQL procedures provided by the RAI Native App. These SQL procedures require the cdc_admin application role.

Enable or Resume CDC#

To enable CDC, or to resume CDC after suspending it, use the resume_cdc() procedure:

#CALL relationalai.app.resume_cdc();

Enabling CDC provisions an engine that is used to ingest CDC data from Snowflake. Note that this engine is not provisioned immediately. It is created only when updates to a stream are detected.

By default, the CDC engine has size HIGHMEM_X64_S, but you can alter this using the alter_cdc_engine_size() procedure.

IMPORTANT

Your Native App instance must have EXECUTE TASK and EXECUTE MANAGED TASK privileges in order to enable CDC. These privileges are granted to the RAI Native App during installation.

Get CDC Status#

Use the cdc_status() procedure to check CDC status:

#CALL relationalai.app.cdc_status();

The output has six columns:

Column NameDescription
cdc_enabledWhether CDC is enabled.
cdc_engine_nameThe name of the CDC engine.
cdc_engine_statusThe status of the CDC engine.
cdc_engine_sizeThe configured size of the CDC engine.
cdc_task_statusThe status of the CDC service. May be started or suspended.
cdc_task_infoAdditional information about the CDC service.

Suspend CDC#

To suspend the CDC service, use the suspend_cdc() procedure:

#CALL relationalai.app.suspend_cdc();

Suspending CDC stops the ingestion of CDC data into the RAI Native App and deleted the CDC engine if one exists. RAI schemas retain access to the data they have already received.

Change tracking consumes compute and storage resources. To disable CDC completely and avoid all CDC-related charges, delete all streams using delete_data_stream().

#-- Delete each data stream in every RAI schema. Replace <rai_schema>
-- with the name of each schem and <sf_db>.<sf_schema>.<table_or_view>
-- with the fully-qualified name of each source object.
CALL relationalai.<rai_schema>.delete_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

Managing Streams#

You can manage streams using the RAI CLI or SQL procedures provided by the RAI Native App. These SQL procedures are available in relationalai.<rai_schema>, where <rai_schema> is the name of the RAI schema where the stream is created. To manage streams in a RAI schema, you must have either the cdc_admin or the schema-specific <schema>_cdc application roles.

Create a Stream#

To stream data from a Snowflake table or view into a RAI schema:

  1. Make sure change tracking is enabled for the source table or view, and that you have SELECT privileges on it.

  2. Use the create_data_stream() procedure to create a stream in the RAI schema:

    #-- NOTE: Replace <rai_schema> with the name of the RAI schema.
     CALL relationalai.<rai_schema>.create_data_stream(
          relationalai.api.object_reference('TABLE', '<sf_db>.<sf_schema>.<table_or_view>'),
          '<stream_name>'
     );
    

    You may create references for up to 100 tables and 100 views in a single RAI schema, for a total of 200 source objects per schema. Each source object may have multiple streams associated with it.

The following Snowflake data types are supported for columns in the stream’s source table or view:

Get Stream Status#

Pass the fully-qualified name of a stream’s source table or view to the get_data_stream() procedure to view its status:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.get_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

The status field in the output indicates the stream’s current status and may be one of CREATING, CREATED, or DELETING.

The data_sync_status field indicates the stream’s data synchronization status:

List Streams#

To list all streams in a RAI schema, query the data_streams view:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
SELECT * FROM relationalai.<rai_schema>.data_streams;

List Stream Batches#

Query the data_stream_batches view to see batch processing information for a stream:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
SELECT * from relationalai.<rai_schema>.data_stream_batches;

Use this view to understand data synchronization progress and diagnose issues.

Delete a Stream#

To delete a stream, use the delete_data_stream() procedure:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.delete_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

Suspend or Resume a Stream#

You may suspend a stream to stop data synchronization temporarily. Consumers of the stream retain access to a snapshot of the data without receiving updates.

To suspend a stream, use the suspend_data_stream() procedure:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.suspend_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

When a stream is suspended, schemas retain access to the data they have already received. Changes to the source table or view are still tracked, but not ingested. Change tracking consumes compute and storage resources. Delete the stream if you do not plan to resume it to avoid unnecessary costs.

Use the resume_data_stream() procedure to resume a suspended stream:

#-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.resume_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

Changes tracked while the stream was suspended are ingested once the stream is resumed. New changes to the source table or view are tracked and ingested as usual.