Rockfish Entity Data Generator
The rockfish.actions.ent module provides actions for generating synthetic data tables just from data schema specifications.
This is useful for testing, development, and creating realistic datasets with complex relationships and temporal patterns.
Overview
The GenerateFromDataSchema action generates synthetic data based on a comprehensive schema definition that supports:
- Multiple entities (tables) with relationships
- Independent columns (IDs, categorical values, numerical distributions)
- Stateful columns (timeseries, state machines for temporal data)
- Derived columns (foreign keys, computed values)
- Temporal data with configurable timestamps and intervals
Each entity is generated as a separate PyArrow table and uploaded as a labeled Rockfish dataset.
Quick Start
Here's a simple example that generates a metadata-only user table:
import asyncio
import rockfish as rf
import rockfish.actions as ra
from rockfish.actions.ent import (
DataSchema,
Entity,
Column,
ColumnType,
ColumnCategoryType,
Domain,
DomainType,
IDParams,
NormalDistParams,
)
async def main():
# Define the schema using typed Python objects
schema = DataSchema(
entities=[
Entity(
name="users",
cardinality=50,
columns=[
Column(
name="user_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="USER_{id}")
)
),
Column(
name="age",
data_type="int64",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.NORMAL_DIST,
params=NormalDistParams(mean=35.0, std=10.0)
)
)
]
)
]
)
action = ra.GenerateFromDataSchema(
schema=schema,
entity_labels={"users": {"use_for": "testing"}}
)
builder = rf.WorkflowBuilder()
builder.add(action)
async with rf.Connection.from_env() as conn:
workflow = await builder.start(conn)
print(f"Workflow ID: {workflow.id()}")
remote_dataset = await workflow.datasets().nth(0)
dataset = await remote_dataset.to_local(conn)
# Save to file
dataset.to_pandas().to_csv(f"{dataset.name()}.csv", index=False)
asyncio.run(main())
rockfish.actions.ent.GenerateFromDataSchemaConfig
Config class for the
GenerateFromDataSchema action.
from rockfish.actions.ent import (
DataSchema, Entity, Column,
GenerateFromDataSchemaConfig
)
schema = DataSchema(entities=[...])
config = GenerateFromDataSchemaConfig(
schema=schema,
entity_labels={"users": {"use_for", "testing"}}
)
Attributes:
| Name | Type | Description |
|---|---|---|
schema |
DataSchema
|
DataSchema configuration defining entities and relationships. Use the rockfish.actions.ent module to construct schema objects programmatically, or provide a dict that will be structured into a DataSchema. |
entity_labels |
dict[str, LabelDict]
|
Optional mapping of entity names to a Rockfish LabelDict. Labels are applied to the generated datasets for organization. Example: {"users": {"use_for", "testing"}, "transactions": {"type": "fraud"}} |
dataset_name_prefix |
str
|
Prefix for dataset names (default: "") |
upload_datasets |
bool
|
If True, upload each entity as a dataset. If False, yield tables for downstream actions (default: True). |
rockfish.actions.ent.DataSchema
Root data schema specification.
The top-level specification defining all entities, their relationships, and a global timestamp configuration (optional for metadata-only entities).
DataSchema(
entities=[
Entity(name="users", cardinality=50, columns=[...]),
Entity(name="sessions", cardinality=200, columns=[...],
timestamp=Timestamp(column_name="timestamp"))
],
entity_relationships=[
EntityRelationship(
from_entity="sessions",
to_entity="users",
relationship_type=EntityRelationshipType.MANY_TO_ONE,
join_columns={"user_id": "user_id"}
)
],
global_timestamp=GlobalTimestamp(
t_start="2025-01-01T00:00:00Z",
t_end="2025-01-01T01:00:00Z",
time_interval="1min"
)
)
Attributes:
| Name | Type | Description |
|---|---|---|
entities |
list[Entity]
|
List of entity specifications |
entity_relationships |
list[EntityRelationship]
|
List of relationships between entities |
global_timestamp |
Optional[GlobalTimestamp]
|
Optional global timestamp configuration for entities with measurements |
Key validation rules:
- At least one entity must be defined
- Entity names must be unique
- If any entity has a timestamp,
global_timestampmust be provided - All relationship references must point to valid entities
rockfish.actions.ent.Entity
Entity specification.
Defines a complete entity (table) with its cardinality, columns, and optional timestamp configuration for time-series data.
Entity(
name="users",
cardinality=50,
columns=[
Column(
name="user_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(type=DomainType.ID, params=IDParams(template_str="USER_{id}"))
)
]
)
Entity(
name="sessions",
cardinality=200,
timestamp=Timestamp(column_name="timestamp"),
columns=[
Column(
name="session_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(type=DomainType.ID, params=IDParams(template_str="SESSION_{id}"))
),
Column(
name="response_time",
data_type="float64",
column_type=ColumnType.STATEFUL,
column_category_type=ColumnCategoryType.MEASUREMENT,
domain=Domain(type=DomainType.TIMESERIES, params=TimeseriesParams(...))
)
]
)
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Entity name (e.g., "users", "sessions", "transactions") |
cardinality |
int
|
Number of rows/instances to generate |
columns |
list[Column]
|
List of column specifications |
timestamp |
Optional[Timestamp]
|
Optional timestamp configuration for entities with measurements |
Key validation rules:
- Cardinality must be positive
- At least one column must be defined
- Column names must be unique within an entity
- If timestamp is specified, at least one measurement column must exist
rockfish.actions.ent.EntityRelationship
Specification for relationships between entities.
Defines how two entities are related through foreign key columns.
EntityRelationship(
from_entity="sessions",
to_entity="users",
relationship_type=EntityRelationshipType.MANY_TO_ONE,
join_columns={"user_id": "user_id"}
)
EntityRelationship(
from_entity="cell_sites",
to_entity="transport_interfaces",
relationship_type=EntityRelationshipType.MANY_TO_ONE,
join_columns={
"transport_device_id": "device_id",
"transport_interface_id": "interface_id"
}
)
Attributes:
| Name | Type | Description |
|---|---|---|
from_entity |
str
|
Source entity name |
to_entity |
str
|
Target entity name |
relationship_type |
EntityRelationshipType
|
Type of relationship (one_to_one, one_to_many, many_to_one, many_to_many) |
join_columns |
dict[str, str]
|
Mapping of column names from source to target entity. Keys are column names in from_entity, values are column names in to_entity |
Composite Foreign Keys:
When join_columns contains multiple column pairs, the system automatically:
- Samples matching tuples from the referenced entity
- Ensures referential integrity across all columns
- Marks FK columns with internal derivations
All FK columns must be declared as column_type=ColumnType.FOREIGN_KEY without derivations.
rockfish.actions.ent.EntityRelationshipType
Source code in src/rockfish/actions/ent/generate.py
880 881 882 883 | |
Relationship semantics:
ONE_TO_ONE: Each instance infrom_entityrelates to exactly one unique instance into_entityONE_TO_MANY: Each instance into_entitycan be referenced by multiple instances infrom_entity(inverse perspective ofMANY_TO_ONE)MANY_TO_ONE: Multiple instances infrom_entitycan relate to the same instance into_entity
Note: MANY_TO_MANY relationships are not currently supported.
To model many-to-many relationships, create an explicit junction table entity with two MANY_TO_ONE relationships.
rockfish.actions.ent.Timestamp
Timestamp specification for entities with measurements.
Specifies that an entity should have timestamps, and what the timestamp column should be called. The actual timestamp range and interval are defined in the global_timestamp.
Timestamp(column_name="timestamp")
Timestamp(column_name="event_time", data_type="timestamp")
Attributes:
| Name | Type | Description |
|---|---|---|
column_name |
str
|
Name of the timestamp column (e.g., "timestamp", "event_time") |
data_type |
str
|
Data type for the timestamp column (default: "timestamp") |
rockfish.actions.ent.GlobalTimestamp
Global timestamp specification for entities.
Defines the time range and interval for all entities with timestamps.
GlobalTimestamp(
t_start="2025-01-01T00:00:00Z",
t_end="2025-01-01T01:00:00Z",
time_interval="1min"
)
Attributes:
| Name | Type | Description |
|---|---|---|
t_start |
str
|
Start timestamp in ISO 8601 format (e.g., "2025-01-01T00:00:00Z") |
t_end |
str
|
End timestamp in ISO 8601 format (e.g., "2025-01-01T23:59:59Z") |
time_interval |
str
|
Optional time interval between measurements (e.g., "1min", "15min", "1hour") |
Supported time interval formats:
"1min","5min","15min", etc."1hour","2hour", etc."1day","7day", etc."1month","3month", etc.
rockfish.actions.ent.Column
Column specification within an entity.
Defines a column's type, data type, category, and how its values are generated (via domain for independent/stateful columns or derivation for derived columns).
Column(
name="user_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="USER_{id}")
)
)
Column(
name="total",
data_type="float64",
column_type=ColumnType.DERIVED,
column_category_type=ColumnCategoryType.MEASUREMENT,
derivation=Derivation(
function_type=DerivationFunctionType.SUM,
dependent_columns=["amount1", "amount2"],
params=SumParams()
)
)
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Column name |
data_type |
str
|
String alias for a pyarrow data type (e.g., "string", "int64", "float64", "timestamp"). Data from the specified domain will be cast to this type on a best-effort basis. |
column_type |
ColumnType
|
Type of column (independent, stateful, derived, foreign_key) |
column_category_type |
ColumnCategoryType
|
Data model category type for column (metadata or measurement) |
domain |
Optional[Domain]
|
Domain specification (for independent/stateful columns only) |
derivation |
Optional[Derivation]
|
Derivation specification (for derived columns only) |
The column category can be metadata or measurement (see supported data models for examples).
rockfish.actions.ent.ColumnCategoryType
Source code in src/rockfish/actions/ent/generate.py
635 636 637 | |
rockfish.actions.ent.ColumnType
Source code in src/rockfish/actions/ent/generate.py
628 629 630 631 632 | |
- Independent: Generated independently using a domain (cannot use temporal domains)
- Stateful: Temporal columns using state machines or timeseries (must be of measurement category)
- Derived: Computed from other columns using derivation functions
- Foreign Key: References another entity (must be metadata category)
Validation rules by column type:
| Column Type | Requires | Cannot Have | Category Type |
|---|---|---|---|
independent |
domain (non-temporal) |
derivation |
Any |
stateful |
domain (STATE_MACHINE or TIMESERIES) |
derivation |
measurement only |
derived |
derivation |
domain |
Any |
foreign_key |
(auto-generated) | domain, derivation |
metadata only |
rockfish.actions.ent.Domain
Domain specification for independent and stateful columns.
Domain(
type=DomainType.CATEGORICAL,
params=CategoricalParams(
values=["alice", "bob", "charlie"],
with_replacement=False
)
)
Attributes:
| Name | Type | Description |
|---|---|---|
type |
DomainType
|
The type of domain/generator to use |
params |
Union[IDParams, SequentialIntParams, CategoricalParams, UniformDistParams, NormalDistParams, ExponentialDistParams, TimeseriesParams, StateMachineParams]
|
Typed parameters specific to the domain type |
rockfish.actions.ent.DomainType
Source code in src/rockfish/actions/ent/generate.py
23 24 25 26 27 28 29 30 31 | |
Domain Parameters
Each domain type has specific parameters:
rockfish.actions.ent.IDParams
Parameters for ID domain generation. Generates unique ID strings using a template with {id} placeholder.
IDParams(template_str="USER_{id}")
Attributes:
| Name | Type | Description |
|---|---|---|
template_str |
str
|
Format string with {id} placeholder (e.g., "USER_{id}") |
rockfish.actions.ent.SequentialIntParams
Parameters for sequential integer ID generation.
SequentialIntParams(start=1)
SequentialIntParams(start=100)
Attributes:
| Name | Type | Description |
|---|---|---|
start |
int
|
Starting value for the sequence |
rockfish.actions.ent.CategoricalParams
Parameters for categorical value sampling.
CategoricalParams(
values=["alice", "bob", "charlie"],
with_replacement=False
)
Attributes:
| Name | Type | Description |
|---|---|---|
values |
list[Any]
|
List of categorical values to sample from |
weights |
Optional[list[float]]
|
Optional probability weights for each value (will be normalized) |
seed |
Optional[int]
|
Random seed for reproducibility |
with_replacement |
bool
|
If True, allow repeated values; if False, sample without replacement |
rockfish.actions.ent.UniformDistParams
rockfish.actions.ent.NormalDistParams
Parameters for normal (Gaussian) distribution generation.
NormalDistParams(mean=100.0, std=15.0)
Attributes:
| Name | Type | Description |
|---|---|---|
mean |
float
|
Mean (center) of the distribution |
std |
float
|
Standard deviation (spread) of the distribution |
seed |
Optional[int]
|
Random seed for reproducibility |
rockfish.actions.ent.ExponentialDistParams
Parameters for exponential distribution generation. Often used for modeling time between events or waiting times.
ExponentialDistParams(scale=2.0)
Attributes:
| Name | Type | Description |
|---|---|---|
scale |
float
|
Scale parameter (1/lambda), controls the mean of the distribution |
seed |
Optional[int]
|
Random seed for reproducibility |
rockfish.actions.ent.TimeseriesParams
Parameters for timeseries generation with seasonality patterns.
TimeseriesParams(
base_value=150.0,
value_range=(50.0, 300.0),
seasonality_type="symmetric",
seasonality_strength=0.3,
noise_level=0.2
)
Attributes:
| Name | Type | Description |
|---|---|---|
base_value |
float
|
Central value around which the series oscillates |
min_value |
float
|
Minimum value used to clip final values |
max_value |
float
|
Maximum value used to clip final values |
seasonality_type |
Literal['symmetric', 'peak_offpeak', 'none']
|
Type of seasonal pattern ("symmetric", "peak_offpeak", "none") |
peak_start_hour |
int
|
Start hour for peak_offpeak type (default: 8) |
peak_end_hour |
int
|
End hour for peak_offpeak type (default: 22) |
seasonality_strength |
float
|
Strength of seasonal pattern (0-1) |
noise_level |
float
|
Amount of random noise (0-1) |
spike_probability |
float
|
Probability of anomalous spikes (0-1) |
spike_magnitude |
float
|
Magnitude of spikes relative to range (0-1) |
interval_minutes |
int
|
Time interval between points |
seed |
Optional[int]
|
Random seed for reproducibility |
Seasonality types:
"symmetric": Smooth sinusoidal pattern throughout the day"peak_offpeak": Higher values during peak_hours, lower during off-peak"none": No seasonal pattern, only base value + noise
rockfish.actions.ent.StateMachineParams
State machine definition for generating session-based timeseries data. Models sequential behavior patterns such as user browsing sessions, transaction flows, and system state progressions.
sm = StateMachineParams(
column_name="page",
trigger_column_name="action",
initial_state="homepage",
states=["homepage", "search", "product", "cart", "checkout", "exit"],
terminal_states=["exit"],
transitions=[
Transition(trigger="browse", source="homepage", dest="search", probability=0.6),
Transition(trigger="view_product", source="homepage", dest="product", probability=0.3),
Transition(trigger="leave", source="homepage", dest="exit", probability=0.1),
Transition(trigger="add_to_cart", source="product", dest="cart", probability=0.3),
Transition(trigger="checkout", source="cart", dest="checkout", probability=0.6),
Transition(trigger="complete", source="checkout", dest="exit", probability=0.8),
]
)
sm_with_context = StateMachineParams(
column_name="order_status",
trigger_column_name="event",
initial_state="pending",
states=["pending", "processing", "shipped", "delivered"],
terminal_states=["delivered"],
transitions=[
Transition(
trigger="process",
source="pending",
dest="processing",
probability=0.9,
conditions=["payment_received"],
context_updates={"in_fulfillment": True}
),
],
context_variables={"payment_received": False, "in_fulfillment": False}
)
Attributes:
| Name | Type | Description |
|---|---|---|
column_name |
str
|
Name of the column that will store state values (e.g., "page", "status") |
trigger_column_name |
str
|
Name of the column that will store trigger/action values (e.g., "action", "event", "user_action") |
initial_state |
str
|
The starting state for all sessions/sequences |
states |
list[str]
|
Complete list of all valid states in the state machine |
terminal_states |
list[str]
|
List of states that end the session (no outgoing transitions) |
transitions |
list[Transition]
|
List of |
context_variables |
dict[str, bool]
|
Dictionary of boolean context variables with their initial values. Used for conditional transitions !!! note - All states in |
rockfish.actions.ent.Transition
Represents a single transition in a state machine.
t1 = Transition(
trigger="browse",
source="homepage",
dest="search",
probability=0.6
)
t2 = Transition(
trigger="checkout",
source="cart",
dest="checkout",
probability=0.6,
conditions=["has_items"],
context_updates={"checkout_started": True}
)
t3 = Transition(
trigger="refine_search",
source="search",
dest="search",
probability=0.2
)
Attributes:
| Name | Type | Description |
|---|---|---|
trigger |
str
|
The action/event that causes this transition (e.g., "browse", "view_product") |
source |
str
|
The originating state (e.g., "homepage", "search") |
dest |
str
|
The destination state (e.g., "product", "cart") |
probability |
float
|
Probability weight for this transition (0 < p <= 1). When multiple transitions share the same source state, probabilities are normalized to sum to 1.0 |
conditions |
list[str]
|
List of context variable names that must be True for this transition to be eligible. Empty list means no conditions |
context_updates |
dict[str, bool]
|
Dictionary of context variable updates to apply after this transition executes. Keys are variable names, values are booleans !!! note Probabilities are weights, not exact probabilities. If a source state has transitions with probabilities [0.6, 0.3, 0.1], they will be normalized to [0.6, 0.3, 0.1] since they already sum to 1.0. If they were [2, 1, 1], they would normalize to [0.5, 0.25, 0.25]. |
Important notes:
- Probabilities are weights that get automatically normalized (they don't need to sum to 1.0)
- Multiple transitions from the same source state will have their probabilities normalized
- Conditions must reference context variables defined in
context_variables - Context updates can enable/disable transitions dynamically
rockfish.actions.ent.Derivation
Derivation specification for derived columns.
Derivation(
function_type=DerivationFunctionType.SUM,
dependent_columns=["col1", "col2"],
params=SumParams()
)
Derivation(
function_type=DerivationFunctionType.SAMPLE_FROM_COLUMN,
dependent_columns=["users.user_id"],
params=SampleFromColumnParams(with_replacement=True, seed=42)
)
Derivation(
function_type=DerivationFunctionType.MAP_VALUES,
dependent_columns=["status"],
params=MapValuesParams(
mapping=[{"from": "active", "to": "high"}],
default="unknown"
)
)
Attributes:
| Name | Type | Description |
|---|---|---|
function_type |
DerivationFunctionType
|
Type of derivation function to apply |
dependent_columns |
list[str]
|
List of column references this derivation depends on. Format: "column_name" for same entity, "entity.column" for cross-entity |
params |
Union[SumParams, MultiplyParams, SampleFromColumnParams, MapValuesParams]
|
Typed parameters specific to the derivation function |
Column reference formats:
- Same entity:
"column_name" - Cross-entity:
"entity_name.column_name"
rockfish.actions.ent.DerivationFunctionType
Source code in src/rockfish/actions/ent/generate.py
482 483 484 485 486 487 | |
Note: For composite foreign keys, use column_type=ColumnType.FOREIGN_KEY with entity relationships instead of explicit derivations.
The system automatically handles multi-column sampling to maintain referential integrity.
Derivation Parameters
rockfish.actions.ent.SumParams
Parameters for sum derivation function. Sums multiple columns element-wise.
SumParams()
rockfish.actions.ent.MultiplyParams
Parameters for multiply derivation function. Multiplies multiple columns element-wise.
MultiplyParams()
rockfish.actions.ent.SampleFromColumnParams
Parameters for sample from column derivation function. Commonly used for foreign keys and derived references.
SampleFromColumnParams(with_replacement=True, seed=42)
Attributes:
| Name | Type | Description |
|---|---|---|
with_replacement |
bool
|
If True, allow repeated values; if False, sample without replacement |
seed |
Optional[int]
|
Random seed for reproducibility |
rockfish.actions.ent.MapValuesParams
Parameters for map values derivation function. Maps values from one or more source columns to new values using mapping rules.
MapValuesParams(
mapping=[
{"from": "active", "to": "high"},
{"from": "idle", "to": "low"}
],
default="unknown"
)
Attributes:
| Name | Type | Description |
|---|---|---|
mapping |
list[dict[str, Any]]
|
List of mapping rules, each a dict with "from" and "to" keys. "from" can be str (single column) or list[str] (tuple mapping), "to" is the mapped value |
default |
Any
|
Default value for unmapped entries |
Mapping rule format:
{
"from": "source_value", # or ["value1", "value2"] for tuple mapping
"to": "mapped_value"
}
Complete Examples
Example 1: E-commerce Session Data
Generate user sessions with state machine transitions:
import rockfish.actions as ra
from rockfish.actions.ent import (
DataSchema,
Entity,
Column,
ColumnType,
ColumnCategoryType,
Domain,
DomainType,
IDParams,
NormalDistParams,
Timestamp,
GlobalTimestamp,
Derivation,
DerivationFunctionType,
SampleFromColumnParams,
StateMachineParams,
Transition,
EntityRelationship,
EntityRelationshipType,
)
schema = DataSchema(
entities=[
Entity(
name="users",
cardinality=20,
columns=[
Column(
name="user_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="USER_{id}")
)
),
Column(
name="age",
data_type="int64",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.NORMAL_DIST,
params=NormalDistParams(mean=35.0, std=10.0)
)
)
]
),
Entity(
name="sessions",
cardinality=100,
timestamp=Timestamp(column_name="timestamp"),
columns=[
Column(
name="session_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="SESSION_{id}")
)
),
Column(
name="user_id",
data_type="string",
column_type=ColumnType.DERIVED,
column_category_type=ColumnCategoryType.METADATA,
derivation=Derivation(
function_type=DerivationFunctionType.SAMPLE_FROM_COLUMN,
dependent_columns=["users.user_id"],
params=SampleFromColumnParams(with_replacement=True, seed=42)
)
),
Column(
name="page",
data_type="string",
column_type=ColumnType.STATEFUL,
column_category_type=ColumnCategoryType.MEASUREMENT,
domain=Domain(
type=DomainType.STATE_MACHINE,
params=StateMachineParams(
column_name="page",
trigger_column_name="action",
initial_state="homepage",
states=["homepage", "search", "product", "cart", "checkout", "exit"],
terminal_states=["exit"],
transitions=[
Transition(trigger="browse", source="homepage", dest="search", probability=0.6),
Transition(trigger="view_product", source="homepage", dest="product", probability=0.3),
Transition(trigger="leave", source="homepage", dest="exit", probability=0.1),
Transition(trigger="add_to_cart", source="product", dest="cart", probability=0.3),
Transition(trigger="back", source="product", dest="search", probability=0.5),
Transition(trigger="checkout", source="cart", dest="checkout", probability=0.6),
Transition(trigger="complete", source="checkout", dest="exit", probability=0.8),
]
)
)
)
]
)
],
entity_relationships=[
EntityRelationship(
from_entity="sessions",
to_entity="users",
relationship_type=EntityRelationshipType.MANY_TO_ONE,
join_columns={"user_id": "user_id"}
)
],
global_timestamp=GlobalTimestamp(
t_start="2025-01-01T00:00:00Z",
t_end="2025-01-01T06:00:00Z",
time_interval="5min"
)
)
action = ra.GenerateFromDataSchema(
schema=schema,
entity_labels={
"users": {"use_for", "testing"},
"sessions": {"use_for": "testing", "domain": "retail"}
}
)
Example 2: IoT Device Monitoring
Generate device metrics with timeseries data:
import rockfish.actions as ra
from rockfish.actions.ent import (
DataSchema,
Entity,
Column,
ColumnType,
ColumnCategoryType,
Domain,
DomainType,
IDParams,
CategoricalParams,
TimeseriesParams,
Timestamp,
GlobalTimestamp,
)
schema = DataSchema(
entities=[
Entity(
name="devices",
cardinality=10,
timestamp=Timestamp(column_name="timestamp"),
columns=[
Column(
name="device_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="DEV_{id}")
)
),
Column(
name="location",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.CATEGORICAL,
params=CategoricalParams(
values=["datacenter-1", "datacenter-2", "datacenter-3"],
with_replacement=True
)
)
),
Column(
name="cpu_usage",
data_type="float64",
column_type=ColumnType.STATEFUL,
column_category_type=ColumnCategoryType.MEASUREMENT,
domain=Domain(
type=DomainType.TIMESERIES,
params=TimeseriesParams(
base_value=50.0,
min_value=10.0,
max_value=95.0,
seasonality_type="peak_offpeak",
peak_start_hour=8,
peak_end_hour=22,
seasonality_strength=0.4,
noise_level=0.15,
spike_probability=0.05,
spike_magnitude=0.3
)
)
),
Column(
name="memory_usage",
data_type="float64",
column_type=ColumnType.STATEFUL,
column_category_type=ColumnCategoryType.MEASUREMENT,
domain=Domain(
type=DomainType.TIMESERIES,
params=TimeseriesParams(
base_value=60.0,
min_value=20.0,
max_value=90.0,
seasonality_type="symmetric",
seasonality_strength=0.3,
noise_level=0.1
)
)
)
]
)
],
global_timestamp=GlobalTimestamp(
t_start="2025-01-01T00:00:00Z",
t_end="2025-01-02T00:00:00Z",
time_interval="15min"
)
)
action = ra.GenerateFromDataSchema(
schema=schema,
entity_labels={"devices": {"device_type": "iot"}}
)
Example 3: Composite Foreign Keys
Generate data with multi-column relationships:
import rockfish.actions as ra
from rockfish.actions.ent import (
DataSchema,
Entity,
Column,
ColumnType,
ColumnCategoryType,
Domain,
DomainType,
IDParams,
CategoricalParams,
EntityRelationship,
EntityRelationshipType,
)
schema = DataSchema(
entities=[
Entity(
name="transport_interfaces",
cardinality=50,
columns=[
Column(
name="device_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.CATEGORICAL,
params=CategoricalParams(
values=["DEV_1", "DEV_2", "DEV_3"],
with_replacement=True
)
)
),
Column(
name="interface_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="IF_{id}")
)
),
Column(
name="bandwidth",
data_type="int64",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.CATEGORICAL,
params=CategoricalParams(
values=[1000, 10000, 100000],
with_replacement=True
)
)
)
]
),
Entity(
name="cell_sites",
cardinality=200,
columns=[
Column(
name="site_id",
data_type="string",
column_type=ColumnType.INDEPENDENT,
column_category_type=ColumnCategoryType.METADATA,
domain=Domain(
type=DomainType.ID,
params=IDParams(template_str="SITE_{id}")
)
),
# Composite foreign key - both columns marked as foreign_key
Column(
name="transport_device_id",
data_type="string",
column_type=ColumnType.FOREIGN_KEY,
column_category_type=ColumnCategoryType.METADATA
),
Column(
name="transport_interface_id",
data_type="string",
column_type=ColumnType.FOREIGN_KEY,
column_category_type=ColumnCategoryType.METADATA
)
]
)
],
entity_relationships=[
EntityRelationship(
from_entity="cell_sites",
to_entity="transport_interfaces",
relationship_type=EntityRelationshipType.MANY_TO_ONE,
join_columns={
"transport_device_id": "device_id",
"transport_interface_id": "interface_id"
}
)
]
)
action = ra.GenerateFromDataSchema(schema=schema)
Tips and Best Practices
Schema Design
- Start simple: Begin with metadata-only entities, then add measurements and relationships
- Validate incrementally: Build your schema step by step to catch validation errors early
- Use meaningful names: Entity and column names should reflect the domain you're modeling
- Use typed objects: Prefer Python objects over dicts for better type checking and IDE support
Column Types
-
Independent columns are best for:
- Entity IDs and identifiers
- Static attributes (age, name, category)
- Random categorical values
-
Stateful columns are best for:
- Time-varying measurements (CPU usage, temperature)
- User behavior patterns (page navigation, transaction flows)
- Sequential state progressions
-
Derived columns are best for:
- Computed values (totals, aggregations)
- Mapped/transformed values
Common Patterns
- Composite foreign keys: Use
foreign_keycolumn type with entity relationships - mark all FK columns asFOREIGN_KEYtype and define the multi-column relationship inentity_relationships - Conditional behavior over time: Use state machine context variables for dynamic transitions
- Realistic timeseries: Use
TIMESERIEScolumns withpeak_offpeakseasonality configure noise, anomalies