Skip to content

Rockfish Entity Data Generator

The rockfish.actions.ent module provides actions for generating synthetic data tables just from data schema specifications. This is useful for testing, development, and creating realistic datasets with complex relationships and temporal patterns.

Overview

The GenerateFromDataSchema action generates synthetic data based on a comprehensive schema definition that supports:

  • Multiple entities (tables) with relationships
  • Independent columns (IDs, categorical values, numerical distributions)
  • Stateful columns (timeseries, state machines for temporal data)
  • Derived columns (foreign keys, computed values)
  • Temporal data with configurable timestamps and intervals

Each entity is generated as a separate PyArrow table and uploaded as a labeled Rockfish dataset.

Quick Start

Here's a simple example that generates a metadata-only user table:

import asyncio
import rockfish as rf
import rockfish.actions as ra
from rockfish.actions.ent import (
    DataSchema,
    Entity,
    Column,
    ColumnType,
    ColumnCategoryType,
    Domain,
    DomainType,
    IDParams,
    NormalDistParams,
)


async def main():
    # Define the schema using typed Python objects
    schema = DataSchema(
        entities=[
            Entity(
                name="users",
                cardinality=50,
                columns=[
                    Column(
                        name="user_id",
                        data_type="string",
                        column_type=ColumnType.INDEPENDENT,
                        column_category_type=ColumnCategoryType.METADATA,
                        domain=Domain(
                            type=DomainType.ID,
                            params=IDParams(template_str="USER_{id}")
                        )
                    ),
                    Column(
                        name="age",
                        data_type="int64",
                        column_type=ColumnType.INDEPENDENT,
                        column_category_type=ColumnCategoryType.METADATA,
                        domain=Domain(
                            type=DomainType.NORMAL_DIST,
                            params=NormalDistParams(mean=35.0, std=10.0)
                        )
                    )
                ]
            )
        ]
    )

    action = ra.GenerateFromDataSchema(
        schema=schema,
        entity_labels={"users": {"use_for": "testing"}}
    )

    builder = rf.WorkflowBuilder()
    builder.add(action)

    async with rf.Connection.from_env() as conn:
        workflow = await builder.start(conn)
        print(f"Workflow ID: {workflow.id()}")

        remote_dataset = await workflow.datasets().nth(0)
        dataset = await remote_dataset.to_local(conn)

        # Save to file
        dataset.to_pandas().to_csv(f"{dataset.name()}.csv", index=False)


asyncio.run(main())

rockfish.actions.ent.GenerateFromDataSchemaConfig

Config class for the GenerateFromDataSchema action.

from rockfish.actions.ent import (
    DataSchema, Entity, Column,
    GenerateFromDataSchemaConfig
)

schema = DataSchema(entities=[...])
config = GenerateFromDataSchemaConfig(
    schema=schema,
    entity_labels={"users": {"use_for", "testing"}}
)

Attributes:

Name Type Description
schema DataSchema

DataSchema configuration defining entities and relationships. Use the rockfish.actions.ent module to construct schema objects programmatically, or provide a dict that will be structured into a DataSchema.

entity_labels dict[str, LabelDict]

Optional mapping of entity names to a Rockfish LabelDict. Labels are applied to the generated datasets for organization. Example: {"users": {"use_for", "testing"}, "transactions": {"type": "fraud"}}

dataset_name_prefix str

Prefix for dataset names (default: "")

upload_datasets bool

If True, upload each entity as a dataset. If False, yield tables for downstream actions (default: True).

rockfish.actions.ent.DataSchema

Root data schema specification.

The top-level specification defining all entities, their relationships, and a global timestamp configuration (optional for metadata-only entities).

Complete data schema with entities and relationships
DataSchema(
    entities=[
        Entity(name="users", cardinality=50, columns=[...]),
        Entity(name="sessions", cardinality=200, columns=[...],
               timestamp=Timestamp(column_name="timestamp"))
    ],
    entity_relationships=[
        EntityRelationship(
            from_entity="sessions",
            to_entity="users",
            relationship_type=EntityRelationshipType.MANY_TO_ONE,
            join_columns={"user_id": "user_id"}
        )
    ],
    global_timestamp=GlobalTimestamp(
        t_start="2025-01-01T00:00:00Z",
        t_end="2025-01-01T01:00:00Z",
        time_interval="1min"
    )
)

Attributes:

Name Type Description
entities list[Entity]

List of entity specifications

entity_relationships list[EntityRelationship]

List of relationships between entities

global_timestamp Optional[GlobalTimestamp]

Optional global timestamp configuration for entities with measurements

Key validation rules:

  • At least one entity must be defined
  • Entity names must be unique
  • If any entity has a timestamp, global_timestamp must be provided
  • All relationship references must point to valid entities

rockfish.actions.ent.Entity

Entity specification.

Defines a complete entity (table) with its cardinality, columns, and optional timestamp configuration for time-series data.

Metadata-only entity (no timestamps)
Entity(
    name="users",
    cardinality=50,
    columns=[
        Column(
            name="user_id",
            data_type="string",
            column_type=ColumnType.INDEPENDENT,
            column_category_type=ColumnCategoryType.METADATA,
            domain=Domain(type=DomainType.ID, params=IDParams(template_str="USER_{id}"))
        )
    ]
)
Entity with measurements and a primary timestamp
Entity(
    name="sessions",
    cardinality=200,
    timestamp=Timestamp(column_name="timestamp"),
    columns=[
        Column(
            name="session_id",
            data_type="string",
            column_type=ColumnType.INDEPENDENT,
            column_category_type=ColumnCategoryType.METADATA,
            domain=Domain(type=DomainType.ID, params=IDParams(template_str="SESSION_{id}"))
        ),
        Column(
            name="response_time",
            data_type="float64",
            column_type=ColumnType.STATEFUL,
            column_category_type=ColumnCategoryType.MEASUREMENT,
            domain=Domain(type=DomainType.TIMESERIES, params=TimeseriesParams(...))
        )
    ]
)

Attributes:

Name Type Description
name str

Entity name (e.g., "users", "sessions", "transactions")

cardinality int

Number of rows/instances to generate

columns list[Column]

List of column specifications

timestamp Optional[Timestamp]

Optional timestamp configuration for entities with measurements

Key validation rules:

  • Cardinality must be positive
  • At least one column must be defined
  • Column names must be unique within an entity
  • If timestamp is specified, at least one measurement column must exist

rockfish.actions.ent.EntityRelationship

Specification for relationships between entities.

Defines how two entities are related through foreign key columns.

Many sessions belong to one user
EntityRelationship(
    from_entity="sessions",
    to_entity="users",
    relationship_type=EntityRelationshipType.MANY_TO_ONE,
    join_columns={"user_id": "user_id"}
)
Composite foreign key
EntityRelationship(
    from_entity="cell_sites",
    to_entity="transport_interfaces",
    relationship_type=EntityRelationshipType.MANY_TO_ONE,
    join_columns={
        "transport_device_id": "device_id",
        "transport_interface_id": "interface_id"
    }
)

Attributes:

Name Type Description
from_entity str

Source entity name

to_entity str

Target entity name

relationship_type EntityRelationshipType

Type of relationship (one_to_one, one_to_many, many_to_one, many_to_many)

join_columns dict[str, str]

Mapping of column names from source to target entity. Keys are column names in from_entity, values are column names in to_entity

Composite Foreign Keys:

When join_columns contains multiple column pairs, the system automatically:

  1. Samples matching tuples from the referenced entity
  2. Ensures referential integrity across all columns
  3. Marks FK columns with internal derivations

All FK columns must be declared as column_type=ColumnType.FOREIGN_KEY without derivations.

rockfish.actions.ent.EntityRelationshipType
Source code in src/rockfish/actions/ent/generate.py
880
881
882
883
class EntityRelationshipType(str, Enum):
    ONE_TO_ONE = "one_to_one"
    ONE_TO_MANY = "one_to_many"
    MANY_TO_ONE = "many_to_one"

Relationship semantics:

  • ONE_TO_ONE: Each instance in from_entity relates to exactly one unique instance in to_entity
  • ONE_TO_MANY: Each instance in to_entity can be referenced by multiple instances in from_entity (inverse perspective of MANY_TO_ONE)
  • MANY_TO_ONE: Multiple instances in from_entity can relate to the same instance in to_entity

Note: MANY_TO_MANY relationships are not currently supported. To model many-to-many relationships, create an explicit junction table entity with two MANY_TO_ONE relationships.

rockfish.actions.ent.Timestamp

Timestamp specification for entities with measurements.

Specifies that an entity should have timestamps, and what the timestamp column should be called. The actual timestamp range and interval are defined in the global_timestamp.

Timestamp with default data type
Timestamp(column_name="timestamp")
Timestamp with custom column name
Timestamp(column_name="event_time", data_type="timestamp")

Attributes:

Name Type Description
column_name str

Name of the timestamp column (e.g., "timestamp", "event_time")

data_type str

Data type for the timestamp column (default: "timestamp")

rockfish.actions.ent.GlobalTimestamp

Global timestamp specification for entities.

Defines the time range and interval for all entities with timestamps.

Define global timestamp with interval
GlobalTimestamp(
    t_start="2025-01-01T00:00:00Z",
    t_end="2025-01-01T01:00:00Z",
    time_interval="1min"
)

Attributes:

Name Type Description
t_start str

Start timestamp in ISO 8601 format (e.g., "2025-01-01T00:00:00Z")

t_end str

End timestamp in ISO 8601 format (e.g., "2025-01-01T23:59:59Z")

time_interval str

Optional time interval between measurements (e.g., "1min", "15min", "1hour")

Supported time interval formats:

  • "1min", "5min", "15min", etc.
  • "1hour", "2hour", etc.
  • "1day", "7day", etc.
  • "1month", "3month", etc.

rockfish.actions.ent.Column

Column specification within an entity.

Defines a column's type, data type, category, and how its values are generated (via domain for independent/stateful columns or derivation for derived columns).

Independent column
Column(
    name="user_id",
    data_type="string",
    column_type=ColumnType.INDEPENDENT,
    column_category_type=ColumnCategoryType.METADATA,
    domain=Domain(
        type=DomainType.ID,
        params=IDParams(template_str="USER_{id}")
    )
)
Derived column
Column(
    name="total",
    data_type="float64",
    column_type=ColumnType.DERIVED,
    column_category_type=ColumnCategoryType.MEASUREMENT,
    derivation=Derivation(
        function_type=DerivationFunctionType.SUM,
        dependent_columns=["amount1", "amount2"],
        params=SumParams()
    )
)

Attributes:

Name Type Description
name str

Column name

data_type str

String alias for a pyarrow data type (e.g., "string", "int64", "float64", "timestamp"). Data from the specified domain will be cast to this type on a best-effort basis.

column_type ColumnType

Type of column (independent, stateful, derived, foreign_key)

column_category_type ColumnCategoryType

Data model category type for column (metadata or measurement)

domain Optional[Domain]

Domain specification (for independent/stateful columns only)

derivation Optional[Derivation]

Derivation specification (for derived columns only)

The column category can be metadata or measurement (see supported data models for examples).

rockfish.actions.ent.ColumnCategoryType

Source code in src/rockfish/actions/ent/generate.py
635
636
637
class ColumnCategoryType(str, Enum):
    METADATA = "metadata"
    MEASUREMENT = "measurement"

rockfish.actions.ent.ColumnType

Source code in src/rockfish/actions/ent/generate.py
628
629
630
631
632
class ColumnType(str, Enum):
    INDEPENDENT = "independent"
    STATEFUL = "stateful"
    DERIVED = "derived"
    FOREIGN_KEY = "foreign_key"
  • Independent: Generated independently using a domain (cannot use temporal domains)
  • Stateful: Temporal columns using state machines or timeseries (must be of measurement category)
  • Derived: Computed from other columns using derivation functions
  • Foreign Key: References another entity (must be metadata category)

Validation rules by column type:

Column Type Requires Cannot Have Category Type
independent domain (non-temporal) derivation Any
stateful domain (STATE_MACHINE or TIMESERIES) derivation measurement only
derived derivation domain Any
foreign_key (auto-generated) domain, derivation metadata only

rockfish.actions.ent.Domain

Domain specification for independent and stateful columns.

Categorical domain
Domain(
    type=DomainType.CATEGORICAL,
    params=CategoricalParams(
        values=["alice", "bob", "charlie"],
        with_replacement=False
    )
)

Attributes:

Name Type Description
type DomainType

The type of domain/generator to use

params Union[IDParams, SequentialIntParams, CategoricalParams, UniformDistParams, NormalDistParams, ExponentialDistParams, TimeseriesParams, StateMachineParams]

Typed parameters specific to the domain type

rockfish.actions.ent.DomainType

Source code in src/rockfish/actions/ent/generate.py
23
24
25
26
27
28
29
30
31
class DomainType(str, Enum):
    ID = "id"
    SEQUENTIAL_INT = "sequential_int"
    CATEGORICAL = "categorical"
    UNIFORM_DIST = "uniform_dist"
    NORMAL_DIST = "normal_dist"
    EXPONENTIAL_DIST = "exponential_dist"
    STATE_MACHINE = "state_machine"
    TIMESERIES = "timeseries"

Domain Parameters

Each domain type has specific parameters:

rockfish.actions.ent.IDParams

Parameters for ID domain generation. Generates unique ID strings using a template with {id} placeholder.

Create ID parameters
IDParams(template_str="USER_{id}")

Attributes:

Name Type Description
template_str str

Format string with {id} placeholder (e.g., "USER_{id}")

rockfish.actions.ent.SequentialIntParams

Parameters for sequential integer ID generation.

Sequential integers starting from 1
SequentialIntParams(start=1)
Sequential integers starting from 100
SequentialIntParams(start=100)

Attributes:

Name Type Description
start int

Starting value for the sequence

rockfish.actions.ent.CategoricalParams

Parameters for categorical value sampling.

Sample from categorical values without replacement
CategoricalParams(
    values=["alice", "bob", "charlie"],
    with_replacement=False
)

Attributes:

Name Type Description
values list[Any]

List of categorical values to sample from

weights Optional[list[float]]

Optional probability weights for each value (will be normalized)

seed Optional[int]

Random seed for reproducibility

with_replacement bool

If True, allow repeated values; if False, sample without replacement

rockfish.actions.ent.UniformDistParams

Parameters for uniform distribution generation.

Uniform distribution between 0 and 10
UniformDistParams(lower=0.0, upper=10.0)

Attributes:

Name Type Description
lower float

Lower bound (inclusive)

upper float

Upper bound (exclusive)

seed Optional[int]

Random seed for reproducibility

rockfish.actions.ent.NormalDistParams

Parameters for normal (Gaussian) distribution generation.

Normal distribution with mean 100 and std 15
NormalDistParams(mean=100.0, std=15.0)

Attributes:

Name Type Description
mean float

Mean (center) of the distribution

std float

Standard deviation (spread) of the distribution

seed Optional[int]

Random seed for reproducibility

rockfish.actions.ent.ExponentialDistParams

Parameters for exponential distribution generation. Often used for modeling time between events or waiting times.

Exponential distribution with scale 2.0
ExponentialDistParams(scale=2.0)

Attributes:

Name Type Description
scale float

Scale parameter (1/lambda), controls the mean of the distribution

seed Optional[int]

Random seed for reproducibility

rockfish.actions.ent.TimeseriesParams

Parameters for timeseries generation with seasonality patterns.

Timeseries with symmetric seasonality
TimeseriesParams(
    base_value=150.0,
    value_range=(50.0, 300.0),
    seasonality_type="symmetric",
    seasonality_strength=0.3,
    noise_level=0.2
)

Attributes:

Name Type Description
base_value float

Central value around which the series oscillates

min_value float

Minimum value used to clip final values

max_value float

Maximum value used to clip final values

seasonality_type Literal['symmetric', 'peak_offpeak', 'none']

Type of seasonal pattern ("symmetric", "peak_offpeak", "none")

peak_start_hour int

Start hour for peak_offpeak type (default: 8)

peak_end_hour int

End hour for peak_offpeak type (default: 22)

seasonality_strength float

Strength of seasonal pattern (0-1)

noise_level float

Amount of random noise (0-1)

spike_probability float

Probability of anomalous spikes (0-1)

spike_magnitude float

Magnitude of spikes relative to range (0-1)

interval_minutes int

Time interval between points

seed Optional[int]

Random seed for reproducibility

Seasonality types:

  • "symmetric": Smooth sinusoidal pattern throughout the day
  • "peak_offpeak": Higher values during peak_hours, lower during off-peak
  • "none": No seasonal pattern, only base value + noise
rockfish.actions.ent.StateMachineParams

State machine definition for generating session-based timeseries data. Models sequential behavior patterns such as user browsing sessions, transaction flows, and system state progressions.

Simple e-commerce browsing state machine
sm = StateMachineParams(
    column_name="page",
    trigger_column_name="action",
    initial_state="homepage",
    states=["homepage", "search", "product", "cart", "checkout", "exit"],
    terminal_states=["exit"],
    transitions=[
        Transition(trigger="browse", source="homepage", dest="search", probability=0.6),
        Transition(trigger="view_product", source="homepage", dest="product", probability=0.3),
        Transition(trigger="leave", source="homepage", dest="exit", probability=0.1),
        Transition(trigger="add_to_cart", source="product", dest="cart", probability=0.3),
        Transition(trigger="checkout", source="cart", dest="checkout", probability=0.6),
        Transition(trigger="complete", source="checkout", dest="exit", probability=0.8),
    ]
)
State machine with context variables
sm_with_context = StateMachineParams(
    column_name="order_status",
    trigger_column_name="event",
    initial_state="pending",
    states=["pending", "processing", "shipped", "delivered"],
    terminal_states=["delivered"],
    transitions=[
        Transition(
            trigger="process",
            source="pending",
            dest="processing",
            probability=0.9,
            conditions=["payment_received"],
            context_updates={"in_fulfillment": True}
        ),
    ],
    context_variables={"payment_received": False, "in_fulfillment": False}
)

Attributes:

Name Type Description
column_name str

Name of the column that will store state values (e.g., "page", "status")

trigger_column_name str

Name of the column that will store trigger/action values (e.g., "action", "event", "user_action")

initial_state str

The starting state for all sessions/sequences

states list[str]

Complete list of all valid states in the state machine

terminal_states list[str]

List of states that end the session (no outgoing transitions)

transitions list[Transition]

List of Transition objects defining all possible state changes

context_variables dict[str, bool]

Dictionary of boolean context variables with their initial values. Used for conditional transitions !!! note - All states in terminal_states must be present in states - initial_state must be present in states - Each transition's source and dest must be valid states - Multiple transitions from the same source state will have their probabilities normalized

rockfish.actions.ent.Transition

Represents a single transition in a state machine.

Simple transition from homepage to search
t1 = Transition(
    trigger="browse",
    source="homepage",
    dest="search",
    probability=0.6
)
Transition with conditions and context updates
t2 = Transition(
    trigger="checkout",
    source="cart",
    dest="checkout",
    probability=0.6,
    conditions=["has_items"],
    context_updates={"checkout_started": True}
)
Self-loop transition (staying in same state)
t3 = Transition(
    trigger="refine_search",
    source="search",
    dest="search",
    probability=0.2
)

Attributes:

Name Type Description
trigger str

The action/event that causes this transition (e.g., "browse", "view_product")

source str

The originating state (e.g., "homepage", "search")

dest str

The destination state (e.g., "product", "cart")

probability float

Probability weight for this transition (0 < p <= 1). When multiple transitions share the same source state, probabilities are normalized to sum to 1.0

conditions list[str]

List of context variable names that must be True for this transition to be eligible. Empty list means no conditions

context_updates dict[str, bool]

Dictionary of context variable updates to apply after this transition executes. Keys are variable names, values are booleans !!! note Probabilities are weights, not exact probabilities. If a source state has transitions with probabilities [0.6, 0.3, 0.1], they will be normalized to [0.6, 0.3, 0.1] since they already sum to 1.0. If they were [2, 1, 1], they would normalize to [0.5, 0.25, 0.25].

Important notes:

  • Probabilities are weights that get automatically normalized (they don't need to sum to 1.0)
  • Multiple transitions from the same source state will have their probabilities normalized
  • Conditions must reference context variables defined in context_variables
  • Context updates can enable/disable transitions dynamically

rockfish.actions.ent.Derivation

Derivation specification for derived columns.

SUM derivation
Derivation(
    function_type=DerivationFunctionType.SUM,
    dependent_columns=["col1", "col2"],
    params=SumParams()
)
SAMPLE_FROM_COLUMN derivation
Derivation(
    function_type=DerivationFunctionType.SAMPLE_FROM_COLUMN,
    dependent_columns=["users.user_id"],
    params=SampleFromColumnParams(with_replacement=True, seed=42)
)
MAP_VALUES derivation
Derivation(
    function_type=DerivationFunctionType.MAP_VALUES,
    dependent_columns=["status"],
    params=MapValuesParams(
        mapping=[{"from": "active", "to": "high"}],
        default="unknown"
    )
)

Attributes:

Name Type Description
function_type DerivationFunctionType

Type of derivation function to apply

dependent_columns list[str]

List of column references this derivation depends on. Format: "column_name" for same entity, "entity.column" for cross-entity

params Union[SumParams, MultiplyParams, SampleFromColumnParams, MapValuesParams]

Typed parameters specific to the derivation function

Column reference formats:

  • Same entity: "column_name"
  • Cross-entity: "entity_name.column_name"

rockfish.actions.ent.DerivationFunctionType

Source code in src/rockfish/actions/ent/generate.py
482
483
484
485
486
487
class DerivationFunctionType(str, Enum):
    SUM = "sum"
    MULTIPLY = "multiply"
    SAMPLE_FROM_COLUMN = "sample_from_column"
    SAMPLE_FROM_COLUMNS = "sample_from_columns"
    MAP_VALUES = "map_values"

Note: For composite foreign keys, use column_type=ColumnType.FOREIGN_KEY with entity relationships instead of explicit derivations. The system automatically handles multi-column sampling to maintain referential integrity.

Derivation Parameters

rockfish.actions.ent.SumParams

Parameters for sum derivation function. Sums multiple columns element-wise.

Create sum parameters
SumParams()
rockfish.actions.ent.MultiplyParams

Parameters for multiply derivation function. Multiplies multiple columns element-wise.

Create multiply parameters
MultiplyParams()
rockfish.actions.ent.SampleFromColumnParams

Parameters for sample from column derivation function. Commonly used for foreign keys and derived references.

Sample from column with replacement
SampleFromColumnParams(with_replacement=True, seed=42)

Attributes:

Name Type Description
with_replacement bool

If True, allow repeated values; if False, sample without replacement

seed Optional[int]

Random seed for reproducibility

rockfish.actions.ent.MapValuesParams

Parameters for map values derivation function. Maps values from one or more source columns to new values using mapping rules.

Map categorical values
MapValuesParams(
    mapping=[
        {"from": "active", "to": "high"},
        {"from": "idle", "to": "low"}
    ],
    default="unknown"
)

Attributes:

Name Type Description
mapping list[dict[str, Any]]

List of mapping rules, each a dict with "from" and "to" keys. "from" can be str (single column) or list[str] (tuple mapping), "to" is the mapped value

default Any

Default value for unmapped entries

Mapping rule format:

{
    "from": "source_value",  # or ["value1", "value2"] for tuple mapping
    "to": "mapped_value"
}

Complete Examples

Example 1: E-commerce Session Data

Generate user sessions with state machine transitions:

import rockfish.actions as ra
from rockfish.actions.ent import (
    DataSchema,
    Entity,
    Column,
    ColumnType,
    ColumnCategoryType,
    Domain,
    DomainType,
    IDParams,
    NormalDistParams,
    Timestamp,
    GlobalTimestamp,
    Derivation,
    DerivationFunctionType,
    SampleFromColumnParams,
    StateMachineParams,
    Transition,
    EntityRelationship,
    EntityRelationshipType,
)

schema = DataSchema(
    entities=[
        Entity(
            name="users",
            cardinality=20,
            columns=[
                Column(
                    name="user_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.ID,
                        params=IDParams(template_str="USER_{id}")
                    )
                ),
                Column(
                    name="age",
                    data_type="int64",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.NORMAL_DIST,
                        params=NormalDistParams(mean=35.0, std=10.0)
                    )
                )
            ]
        ),
        Entity(
            name="sessions",
            cardinality=100,
            timestamp=Timestamp(column_name="timestamp"),
            columns=[
                Column(
                    name="session_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.ID,
                        params=IDParams(template_str="SESSION_{id}")
                    )
                ),
                Column(
                    name="user_id",
                    data_type="string",
                    column_type=ColumnType.DERIVED,
                    column_category_type=ColumnCategoryType.METADATA,
                    derivation=Derivation(
                        function_type=DerivationFunctionType.SAMPLE_FROM_COLUMN,
                        dependent_columns=["users.user_id"],
                        params=SampleFromColumnParams(with_replacement=True, seed=42)
                    )
                ),
                Column(
                    name="page",
                    data_type="string",
                    column_type=ColumnType.STATEFUL,
                    column_category_type=ColumnCategoryType.MEASUREMENT,
                    domain=Domain(
                        type=DomainType.STATE_MACHINE,
                        params=StateMachineParams(
                            column_name="page",
                            trigger_column_name="action",
                            initial_state="homepage",
                            states=["homepage", "search", "product", "cart", "checkout", "exit"],
                            terminal_states=["exit"],
                            transitions=[
                                Transition(trigger="browse", source="homepage", dest="search", probability=0.6),
                                Transition(trigger="view_product", source="homepage", dest="product", probability=0.3),
                                Transition(trigger="leave", source="homepage", dest="exit", probability=0.1),
                                Transition(trigger="add_to_cart", source="product", dest="cart", probability=0.3),
                                Transition(trigger="back", source="product", dest="search", probability=0.5),
                                Transition(trigger="checkout", source="cart", dest="checkout", probability=0.6),
                                Transition(trigger="complete", source="checkout", dest="exit", probability=0.8),
                            ]
                        )
                    )
                )
            ]
        )
    ],
    entity_relationships=[
        EntityRelationship(
            from_entity="sessions",
            to_entity="users",
            relationship_type=EntityRelationshipType.MANY_TO_ONE,
            join_columns={"user_id": "user_id"}
        )
    ],
    global_timestamp=GlobalTimestamp(
        t_start="2025-01-01T00:00:00Z",
        t_end="2025-01-01T06:00:00Z",
        time_interval="5min"
    )
)

action = ra.GenerateFromDataSchema(
    schema=schema,
    entity_labels={
        "users": {"use_for", "testing"},
        "sessions": {"use_for": "testing", "domain": "retail"}
    }
)

Example 2: IoT Device Monitoring

Generate device metrics with timeseries data:

import rockfish.actions as ra
from rockfish.actions.ent import (
    DataSchema,
    Entity,
    Column,
    ColumnType,
    ColumnCategoryType,
    Domain,
    DomainType,
    IDParams,
    CategoricalParams,
    TimeseriesParams,
    Timestamp,
    GlobalTimestamp,
)

schema = DataSchema(
    entities=[
        Entity(
            name="devices",
            cardinality=10,
            timestamp=Timestamp(column_name="timestamp"),
            columns=[
                Column(
                    name="device_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.ID,
                        params=IDParams(template_str="DEV_{id}")
                    )
                ),
                Column(
                    name="location",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.CATEGORICAL,
                        params=CategoricalParams(
                            values=["datacenter-1", "datacenter-2", "datacenter-3"],
                            with_replacement=True
                        )
                    )
                ),
                Column(
                    name="cpu_usage",
                    data_type="float64",
                    column_type=ColumnType.STATEFUL,
                    column_category_type=ColumnCategoryType.MEASUREMENT,
                    domain=Domain(
                        type=DomainType.TIMESERIES,
                        params=TimeseriesParams(
                            base_value=50.0,
                            min_value=10.0, 
                            max_value=95.0,
                            seasonality_type="peak_offpeak",
                            peak_start_hour=8,
                            peak_end_hour=22,
                            seasonality_strength=0.4,
                            noise_level=0.15,
                            spike_probability=0.05,
                            spike_magnitude=0.3
                        )
                    )
                ),
                Column(
                    name="memory_usage",
                    data_type="float64",
                    column_type=ColumnType.STATEFUL,
                    column_category_type=ColumnCategoryType.MEASUREMENT,
                    domain=Domain(
                        type=DomainType.TIMESERIES,
                        params=TimeseriesParams(
                            base_value=60.0,
                            min_value=20.0,
                            max_value=90.0,
                            seasonality_type="symmetric",
                            seasonality_strength=0.3,
                            noise_level=0.1
                        )
                    )
                )
            ]
        )
    ],
    global_timestamp=GlobalTimestamp(
        t_start="2025-01-01T00:00:00Z",
        t_end="2025-01-02T00:00:00Z",
        time_interval="15min"
    )
)

action = ra.GenerateFromDataSchema(
    schema=schema,
    entity_labels={"devices": {"device_type": "iot"}}
)

Example 3: Composite Foreign Keys

Generate data with multi-column relationships:

import rockfish.actions as ra
from rockfish.actions.ent import (
    DataSchema,
    Entity,
    Column,
    ColumnType,
    ColumnCategoryType,
    Domain,
    DomainType,
    IDParams,
    CategoricalParams,
    EntityRelationship,
    EntityRelationshipType,
)

schema = DataSchema(
    entities=[
        Entity(
            name="transport_interfaces",
            cardinality=50,
            columns=[
                Column(
                    name="device_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.CATEGORICAL,
                        params=CategoricalParams(
                            values=["DEV_1", "DEV_2", "DEV_3"],
                            with_replacement=True
                        )
                    )
                ),
                Column(
                    name="interface_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.ID,
                        params=IDParams(template_str="IF_{id}")
                    )
                ),
                Column(
                    name="bandwidth",
                    data_type="int64",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.CATEGORICAL,
                        params=CategoricalParams(
                            values=[1000, 10000, 100000],
                            with_replacement=True
                        )
                    )
                )
            ]
        ),
        Entity(
            name="cell_sites",
            cardinality=200,
            columns=[
                Column(
                    name="site_id",
                    data_type="string",
                    column_type=ColumnType.INDEPENDENT,
                    column_category_type=ColumnCategoryType.METADATA,
                    domain=Domain(
                        type=DomainType.ID,
                        params=IDParams(template_str="SITE_{id}")
                    )
                ),
                # Composite foreign key - both columns marked as foreign_key
                Column(
                    name="transport_device_id",
                    data_type="string",
                    column_type=ColumnType.FOREIGN_KEY,
                    column_category_type=ColumnCategoryType.METADATA
                ),
                Column(
                    name="transport_interface_id",
                    data_type="string",
                    column_type=ColumnType.FOREIGN_KEY,
                    column_category_type=ColumnCategoryType.METADATA
                )
            ]
        )
    ],
    entity_relationships=[
        EntityRelationship(
            from_entity="cell_sites",
            to_entity="transport_interfaces",
            relationship_type=EntityRelationshipType.MANY_TO_ONE,
            join_columns={
                "transport_device_id": "device_id",
                "transport_interface_id": "interface_id"
            }
        )
    ]
)

action = ra.GenerateFromDataSchema(schema=schema)

Tips and Best Practices

Schema Design

  1. Start simple: Begin with metadata-only entities, then add measurements and relationships
  2. Validate incrementally: Build your schema step by step to catch validation errors early
  3. Use meaningful names: Entity and column names should reflect the domain you're modeling
  4. Use typed objects: Prefer Python objects over dicts for better type checking and IDE support

Column Types

  1. Independent columns are best for:

    • Entity IDs and identifiers
    • Static attributes (age, name, category)
    • Random categorical values
  2. Stateful columns are best for:

    • Time-varying measurements (CPU usage, temperature)
    • User behavior patterns (page navigation, transaction flows)
    • Sequential state progressions
  3. Derived columns are best for:

    • Computed values (totals, aggregations)
    • Mapped/transformed values

Common Patterns

  1. Composite foreign keys: Use foreign_key column type with entity relationships - mark all FK columns as FOREIGN_KEY type and define the multi-column relationship in entity_relationships
  2. Conditional behavior over time: Use state machine context variables for dynamic transitions
  3. Realistic timeseries: Use TIMESERIES columns with peak_offpeak seasonality configure noise, anomalies