Change Data Capture (CDC) Stream Management#

Streams share Snowflake data with the RAI Native App using change data capture (CDC) to capture source table and view changes once every minute.

Users of the relationalai Python package use streams to provide data to models within a RAI schema. Each stream is tied to a single schema and can be used by multiple models within that schema. Ensure Python users have CDC privileges on their model’s schema to create streams.

Managing Change Data Capture Support#

You can enable, suspend, and resume CDC using the RAI CLI or the SQL procedures provided by the RAI Native App. These SQL procedures require the cdc_admin application role.

Enable CDC#

CDC requires a dedicated engine to process CDC data and must be enabled before streams can be created. A HighMem|S engine dedicated to CDC is the recommended default, but a larger engine may be necessary depending on the amount of data it needs to process. Use an existing engine compute pool or create a new one dedicated to CDC if workload isolation is a concern.

IMPORTANT

Your Native App instance must have EXECUTE TASK and EXECUTE MANAGED TASK privileges in order to enable CDC. These privileges are granted to the RAI Native App during installation.

  1. Use the create_engine() procedure to create a CDC engine:

    CALL relationalai.api.create_engine('<engine_name>', '<compute_pool_name>', 'HighMem|S');
    
  2. Enable CDC using the setup_cdc() procedure, passing the name of the CDC engine that you created in the previous step:

    -- NOTE: Engine names are case-sensitive.
    CALL relationalai.app.setup_cdc('<engine_name>');
    

Get CDC Status#

Use the cdc_status() procedure to check CDC status:

CALL relationalai.app.cdc_status();

The output has three columns:

See the reference documentation for more details.

Suspend the CDC Service#

To suspend the CDC service, use the suspend_cdc() procedure:

CALL relationalai.app.suspend_cdc();

Suspending CDC stops the ingestion of CDC data into the RAI Native App. RAI schemas retain access to the data they have already received.

All batches currently being processed are stopped. Changes to the source tables and views are tracked while CDC is suspended, but not ingested. Change tracking consumes compute and storage resources, so disable CDC if you do not plan to resume it to avoid unnecessary costs.

Resume the CDC Service#

To resume the CDC service after suspending it, use the resume_cdc() procedure:

CALL relationalai.app.resume_cdc();

Changes tracked while the service was suspended are ingested once the service is resumed. New changes to the source tables and views are tracked and ingested as usual.

Disable CDC#

To disable CDC, delete all streams using delete_data_stream(), then delete the CDC engine using the delete_engine() procedure:

-- Delete each data stream in every RAI schema. Replace <rai_schema>
-- with the name of each schem and <sf_db>.<sf_schema>.<table_or_view>
-- with the fully-qualified name of each source object.
CALL relationalai.<rai_schema>.delete_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

-- Delete the CDC engine. Replace <engine_name> with the name of the
-- CDC engine. The second argument is `true` to force deletion without
-- assigning a new CDC engine.
CALL relationalai.api.delete_engine('<engine_name>', true);

When CDC is disabled, no costs related to CDC are incurred. Note that RAI schemas retain access to a snapshot of the data they have already received.

Managing Streams#

You can manage streams using the RAI CLI or SQL procedures provided by the RAI Native App. These SQL procedures are available in relationalai.<rai_schema>, where <rai_schema> is the name of the RAI schema where the stream is created. To manage streams in a RAI schema, you must have either the cdc_admin or the schema-specific <schema>_cdc application roles.

Create a Stream#

To stream data from a Snowflake table or view into a RAI schema:

  1. Make sure change tracking is enabled for the source table or view, and that you have SELECT privileges on it.

  2. Use the create_data_stream() procedure to create a stream in the RAI schema:

    -- NOTE: Replace <rai_schema> with the name of the RAI schema.
     CALL relationalai.<rai_schema>.create_data_stream(
          relationalai.api.object_reference('TABLE', '<sf_db>.<sf_schema>.<table_or_view>'),
          '<stream_name>'
     );
    

    You may create references for up to 100 tables and 100 views in a single RAI schema, for a total of 200 source objects per schema. Each source object may have multiple streams associated with it.

Get Stream Status#

Pass the fully-qualified name of a stream’s source table or view to the get_data_stream() procedure to view its status:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.get_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

The status field in the output indicates the stream’s current status and may be one of CREATING, CREATED, or DELETING.

The data_sync_status field indicates the stream’s data synchronization status:

List Streams#

To list all streams in a RAI schema, query the data_streams view:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
SELECT * FROM relationalai.<rai_schema>.data_streams;

List Stream Batches#

Query the data_stream_batches view to see batch processing information for a stream:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
SELECT * from relationalai.<rai_schema>.data_stream_batches;

Use this view to understand data synchronization progress and diagnose issues.

Delete a Stream#

To delete a stream, use the delete_data_stream() procedure:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.delete_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

Suspend or Resume a Stream#

You may suspend a stream to stop data synchronization temporarily. Consumers of the stream retain access to a snapshot of the data without receiving updates.

To suspend a stream, use the suspend_data_stream() procedure:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.suspend_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

When a stream is suspended, schemas retain access to the data they have already received. Changes to the source table or view are still tracked, but not ingested. Change tracking consumes compute and storage resources. Delete the stream if you do not plan to resume it to avoid unnecessary costs.

Use the resume_data_stream() procedure to resume a suspended stream:

-- NOTE: Replace <rai_schema> with the name of the RAI schema.
CALL relationalai.<rai_schema>.resume_data_stream('<sf_db>.<sf_schema>.<table_or_view>');

Changes tracked while the stream was suspended are ingested once the stream is resumed. New changes to the source table or view are tracked and ingested as usual.