Skip to content

fenic.api.lineage

Query interface for tracing data lineage through a query plan.

Classes:

  • Lineage

    Query interface for tracing data lineage through a query plan.

Lineage

Lineage(lineage: BaseLineage)

Query interface for tracing data lineage through a query plan.

This class allows you to navigate through the query plan both forwards and backwards, tracing how specific rows are transformed through each operation.

Example
# Create a lineage query starting from the root
query = LineageQuery(lineage, session.execution)

# Or start from a specific source
query.start_from_source("my_table")

# Trace rows backwards through a transformation
result = query.backward(["uuid1", "uuid2"])

# Trace rows forward to see their outputs
result = query.forward(["uuid3", "uuid4"])

Initialize a Lineage instance.

Parameters:

  • lineage (BaseLineage) –

    The underlying lineage implementation.

Methods:

  • backwards

    Trace rows backwards to see which input rows produced them.

  • forwards

    Trace rows forward to see how they are transformed by the next operation.

  • get_result_df

    Get the result of the query as a Polars DataFrame.

  • get_source_df

    Get a query source by name as a Polars DataFrame.

  • get_source_names

    Get the names of all sources in the query plan. Used to determine where to start the lineage traversal.

  • show

    Print the operator tree of the query.

  • skip_backwards

    [Not Implemented] Trace rows backwards through multiple operations at once.

  • skip_forwards

    [Not Implemented] Trace rows forward through multiple operations at once.

  • start_from_source

    Set the current position to a specific source in the query plan.

Source code in src/fenic/api/lineage.py
34
35
36
37
38
39
40
def __init__(self, lineage: BaseLineage):
    """Initialize a Lineage instance.

    Args:
        lineage: The underlying lineage implementation.
    """
    self.lineage = lineage

backwards

backwards(ids: List[str], branch_side: Optional[BranchSide] = None) -> pl.DataFrame

Trace rows backwards to see which input rows produced them.

Parameters:

  • ids (List[str]) –

    List of UUIDs identifying the rows to trace back

  • branch_side (Optional[BranchSide], default: None ) –

    For operators with multiple inputs (like joins), specify which input to trace ("left" or "right"). Not needed for single-input operations.

Returns:

  • DataFrame

    DataFrame containing the source rows that produced the specified outputs

Raises:

  • ValueError

    If invalid ids format or incorrect branch_side specification

Example
# Simple backward trace
source_rows = query.backward(["result_uuid1"])

# Trace back through a join
left_rows = query.backward(["join_uuid1"], branch_side="left")
right_rows = query.backward(["join_uuid1"], branch_side="right")
Source code in src/fenic/api/lineage.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@validate_call(config=ConfigDict(strict=True))
def backwards(
    self, ids: List[str], branch_side: Optional[BranchSide] = None
) -> pl.DataFrame:
    """Trace rows backwards to see which input rows produced them.

    Args:
        ids: List of UUIDs identifying the rows to trace back
        branch_side: For operators with multiple inputs (like joins), specify which
            input to trace ("left" or "right"). Not needed for single-input operations.

    Returns:
        DataFrame containing the source rows that produced the specified outputs

    Raises:
        ValueError: If invalid ids format or incorrect branch_side specification

    Example:
        ```python
        # Simple backward trace
        source_rows = query.backward(["result_uuid1"])

        # Trace back through a join
        left_rows = query.backward(["join_uuid1"], branch_side="left")
        right_rows = query.backward(["join_uuid1"], branch_side="right")
        ```
    """
    return self.lineage.backwards(ids, branch_side)

forwards

forwards(row_ids: List[str]) -> pl.DataFrame

Trace rows forward to see how they are transformed by the next operation.

Parameters:

  • row_ids (List[str]) –

    List of UUIDs identifying the rows to trace

Returns:

  • DataFrame

    DataFrame containing the transformed rows in the next operation

Raises:

  • ValueError

    If at root node or if row_ids format is invalid

Example
# Trace how specific customer rows are transformed
transformed = query.forward(["customer_uuid1", "customer_uuid2"])
Source code in src/fenic/api/lineage.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@validate_call(config=ConfigDict(strict=True))
def forwards(self, row_ids: List[str]) -> pl.DataFrame:
    """Trace rows forward to see how they are transformed by the next operation.

    Args:
        row_ids: List of UUIDs identifying the rows to trace

    Returns:
        DataFrame containing the transformed rows in the next operation

    Raises:
        ValueError: If at root node or if row_ids format is invalid

    Example:
        ```python
        # Trace how specific customer rows are transformed
        transformed = query.forward(["customer_uuid1", "customer_uuid2"])
        ```
    """
    return self.lineage.forwards(row_ids)

get_result_df

get_result_df() -> pl.DataFrame

Get the result of the query as a Polars DataFrame.

Source code in src/fenic/api/lineage.py
150
151
152
def get_result_df(self) -> pl.DataFrame:
    """Get the result of the query as a Polars DataFrame."""
    return self.lineage.get_result_df()

get_source_df

get_source_df(source_name: str) -> pl.DataFrame

Get a query source by name as a Polars DataFrame.

Source code in src/fenic/api/lineage.py
154
155
156
157
@validate_call(config=ConfigDict(strict=True))
def get_source_df(self, source_name: str) -> pl.DataFrame:
    """Get a query source by name as a Polars DataFrame."""
    return self.lineage.get_source_df(source_name)

get_source_names

get_source_names() -> List[str]

Get the names of all sources in the query plan. Used to determine where to start the lineage traversal.

Source code in src/fenic/api/lineage.py
42
43
44
45
@validate_call(config=ConfigDict(strict=True))
def get_source_names(self) -> List[str]:
    """Get the names of all sources in the query plan. Used to determine where to start the lineage traversal."""
    return self.lineage.get_source_names()

show

show() -> None

Print the operator tree of the query.

Source code in src/fenic/api/lineage.py
47
48
49
def show(self) -> None:
    """Print the operator tree of the query."""
    print(self.lineage.stringify_graph())

skip_backwards

skip_backwards(ids: List[str]) -> Dict[str, pl.DataFrame]

[Not Implemented] Trace rows backwards through multiple operations at once.

This method will allow efficient tracing through multiple operations without intermediate results.

Parameters:

  • ids (List[str]) –

    List of UUIDs identifying the rows to trace back

Returns:

  • Dict[str, DataFrame]

    Dictionary mapping operation names to their source DataFrames

Raises:

  • NotImplementedError

    This method is not yet implemented

Source code in src/fenic/api/lineage.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def skip_backwards(self, ids: List[str]) -> Dict[str, pl.DataFrame]:
    """[Not Implemented] Trace rows backwards through multiple operations at once.

    This method will allow efficient tracing through multiple operations without
    intermediate results.

    Args:
        ids: List of UUIDs identifying the rows to trace back

    Returns:
        Dictionary mapping operation names to their source DataFrames

    Raises:
        NotImplementedError: This method is not yet implemented
    """
    raise NotImplementedError("Skip backwards not yet implemented")

skip_forwards

skip_forwards(row_ids: List[str]) -> pl.DataFrame

[Not Implemented] Trace rows forward through multiple operations at once.

This method will allow efficient tracing through multiple operations without intermediate results.

Parameters:

  • row_ids (List[str]) –

    List of UUIDs identifying the rows to trace

Returns:

  • DataFrame

    DataFrame containing the final transformed rows

Raises:

  • NotImplementedError

    This method is not yet implemented

Source code in src/fenic/api/lineage.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def skip_forwards(self, row_ids: List[str]) -> pl.DataFrame:
    """[Not Implemented] Trace rows forward through multiple operations at once.

    This method will allow efficient tracing through multiple operations without
    intermediate results.

    Args:
        row_ids: List of UUIDs identifying the rows to trace

    Returns:
        DataFrame containing the final transformed rows

    Raises:
        NotImplementedError: This method is not yet implemented
    """
    raise NotImplementedError("Skip forwards not yet implemented")

start_from_source

start_from_source(source_name: str) -> None

Set the current position to a specific source in the query plan.

Parameters:

  • source_name (str) –

    Name of the source table to start from

Example
query.start_from_source("customers")
# Now you can trace forward from the customers table
Source code in src/fenic/api/lineage.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@validate_call(config=ConfigDict(strict=True))
def start_from_source(self, source_name: str) -> None:
    """Set the current position to a specific source in the query plan.

    Args:
        source_name: Name of the source table to start from

    Example:
        ```python
        query.start_from_source("customers")
        # Now you can trace forward from the customers table
        ```
    """
    self.lineage.start_from_source(source_name)