Skip to content

fenic.api.dataframe.semantic_extensions

Semantic extensions for DataFrames providing clustering and semantic join operations.

Classes:

SemanticExtensions

SemanticExtensions(df: DataFrame)

A namespace for semantic dataframe operators.

Initialize semantic extensions.

Parameters:

  • df (DataFrame) –

    The DataFrame to extend with semantic operations.

Methods:

  • join

    Performs a semantic join between two DataFrames using a natural language predicate.

  • sim_join

    Performs a semantic similarity join between two DataFrames using embedding expressions.

  • with_cluster_labels

    Cluster rows using K-means and add cluster metadata columns.

Source code in src/fenic/api/dataframe/semantic_extensions.py
29
30
31
32
33
34
35
def __init__(self, df: DataFrame):
    """Initialize semantic extensions.

    Args:
        df: The DataFrame to extend with semantic operations.
    """
    self._df = df

join

join(other: DataFrame, predicate: str, left_on: Column, right_on: Column, strict: bool = True, examples: Optional[JoinExampleCollection] = None, model_alias: Optional[Union[str, ModelAlias]] = None) -> DataFrame

Performs a semantic join between two DataFrames using a natural language predicate.

This method evaluates a boolean predicate for each potential row pair between the two DataFrames, including only those pairs where the predicate evaluates to True.

The join process: 1. For each row in the left DataFrame, evaluates the predicate in the jinja template against each row in the right DataFrame 2. Includes row pairs where the predicate returns True 3. Excludes row pairs where the predicate returns False 4. Returns a new DataFrame containing all columns from both DataFrames for the matched pairs

The jinja template must use exactly two column placeholders: - One from the left DataFrame: {{ left_on }} - One from the right DataFrame: {{ right_on }}

Parameters:

  • other (DataFrame) –

    The DataFrame to join with.

  • predicate (str) –

    A Jinja2 template containing the natural language predicate. Must include placeholders for exactly one column from each DataFrame. The template is evaluated as a boolean - True includes the pair, False excludes it.

  • left_on (Column) –

    The column from the left DataFrame (self) to use in the join predicate.

  • right_on (Column) –

    The column from the right DataFrame (other) to use in the join predicate.

  • strict (bool, default: True ) –

    If True, when either the left_on or right_on column has a None value for a row pair, that pair is automatically excluded from the join (predicate is not evaluated). If False, None values are rendered according to Jinja2's null rendering behavior. Default is True.

  • examples (Optional[JoinExampleCollection], default: None ) –

    Optional JoinExampleCollection containing labeled examples to guide the join. Each example should have: - left: Sample value from the left column - right: Sample value from the right column - output: Boolean indicating whether this pair should be joined (True) or not (False)

  • model_alias (Optional[Union[str, ModelAlias]], default: None ) –

    Optional alias for the language model to use. If None, uses the default model.

Returns:

  • DataFrame ( DataFrame ) –

    A new DataFrame containing matched row pairs with all columns from both DataFrames.

Basic semantic join
# Match job listings with candidate resumes based on title/skills
# Only includes pairs where the predicate evaluates to True
df_jobs.semantic.join(df_resumes,
    predicate=dedent('''                    Job Description: {{left_on}}
        Candidate Background: {{right_on}}
        The candidate is qualified for the job.'''),
    left_on=col("job_description"),
    right_on=col("work_experience"),
    examples=examples
)
Semantic join with examples
# Improve join quality with examples
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="5 years experience building backend services in Python using asyncio, FastAPI, and PostgreSQL",
    right="Senior Software Engineer - Backend",
    output=True))  # This pair WILL be included in similar cases
examples.create_example(JoinExample(
    left="5 years experience with growth strategy, private equity due diligence, and M&A",
    right="Product Manager - Hardware",
    output=False))  # This pair will NOT be included in similar cases
df_jobs.semantic.join(
    other=df_resumes,
    predicate=dedent('''                    Job Description: {{left_on}}
        Candidate Background: {{right_on}}
        The candidate is qualified for the job.'''),
    left_on=col("job_description"),
    right_on=col("work_experience"),
    examples=examples
)
Source code in src/fenic/api/dataframe/semantic_extensions.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def join(
    self,
    other: DataFrame,
    predicate: str,
    left_on: Column,
    right_on: Column,
    strict: bool = True,
    examples: Optional[JoinExampleCollection] = None,
    model_alias: Optional[Union[str, ModelAlias]] = None
) -> DataFrame:
    """Performs a semantic join between two DataFrames using a natural language predicate.

    This method evaluates a boolean predicate for each potential row pair between the two DataFrames,
    including only those pairs where the predicate evaluates to True.

    The join process:
    1. For each row in the left DataFrame, evaluates the predicate in the jinja template against each row in the right DataFrame
    2. Includes row pairs where the predicate returns True
    3. Excludes row pairs where the predicate returns False
    4. Returns a new DataFrame containing all columns from both DataFrames for the matched pairs

    The jinja template must use exactly two column placeholders:
    - One from the left DataFrame: `{{ left_on }}`
    - One from the right DataFrame: `{{ right_on }}`

    Args:
        other: The DataFrame to join with.
        predicate: A Jinja2 template containing the natural language predicate.
            Must include placeholders for exactly one column from each DataFrame.
            The template is evaluated as a boolean - True includes the pair, False excludes it.
        left_on: The column from the left DataFrame (self) to use in the join predicate.
        right_on: The column from the right DataFrame (other) to use in the join predicate.
        strict: If True, when either the left_on or right_on column has a None value for a row pair,
                that pair is automatically excluded from the join (predicate is not evaluated).
                If False, None values are rendered according to Jinja2's null rendering behavior.
                Default is True.
        examples: Optional JoinExampleCollection containing labeled examples to guide the join.
            Each example should have:
            - left: Sample value from the left column
            - right: Sample value from the right column
            - output: Boolean indicating whether this pair should be joined (True) or not (False)
        model_alias: Optional alias for the language model to use. If None, uses the default model.

    Returns:
        DataFrame: A new DataFrame containing matched row pairs with all columns from both DataFrames.

    Example: Basic semantic join
        ```python
        # Match job listings with candidate resumes based on title/skills
        # Only includes pairs where the predicate evaluates to True
        df_jobs.semantic.join(df_resumes,
            predicate=dedent('''\
                Job Description: {{left_on}}
                Candidate Background: {{right_on}}
                The candidate is qualified for the job.'''),
            left_on=col("job_description"),
            right_on=col("work_experience"),
            examples=examples
        )
        ```

    Example: Semantic join with examples
        ```python
        # Improve join quality with examples
        examples = JoinExampleCollection()
        examples.create_example(JoinExample(
            left="5 years experience building backend services in Python using asyncio, FastAPI, and PostgreSQL",
            right="Senior Software Engineer - Backend",
            output=True))  # This pair WILL be included in similar cases
        examples.create_example(JoinExample(
            left="5 years experience with growth strategy, private equity due diligence, and M&A",
            right="Product Manager - Hardware",
            output=False))  # This pair will NOT be included in similar cases
        df_jobs.semantic.join(
            other=df_resumes,
            predicate=dedent('''\
                Job Description: {{left_on}}
                Candidate Background: {{right_on}}
                The candidate is qualified for the job.'''),
            left_on=col("job_description"),
            right_on=col("work_experience"),
            examples=examples
        )
        ```
    """
    from fenic.api.dataframe.dataframe import DataFrame

    if not isinstance(other, DataFrame):
        raise ValidationError(f"other argument must be a DataFrame, got {type(other)}")

    if not isinstance(predicate, str):
        raise ValidationError(
            f"The `predicate` argument to `semantic.join` must be a string, got {type(predicate)}"
        )
    if not isinstance(left_on, Column):
        raise ValidationError(f"`left_on` argument must be a Column, got {type(left_on)} instead.")
    if not isinstance(right_on, Column):
        raise ValidationError(f"`right_on` argument must be a Column, got {type(right_on)} instead.")
    if examples is not None and not isinstance(examples, JoinExampleCollection):
        raise ValidationError(f"`examples` argument must be a JoinExampleCollection, got {type(examples)} instead.")
    if model_alias is not None and not isinstance(model_alias, (str, ModelAlias)):
        raise ValidationError(f"`model_alias` argument must be a string or ModelAlias, got {type(model_alias)} instead.")

    resolved_model_alias = _resolve_model_alias(model_alias)
    DataFrame._ensure_same_session(self._df._session_state, [other._session_state])

    return self._df._from_logical_plan(
        SemanticJoin.from_session_state(
            left=self._df._logical_plan,
            right=other._logical_plan,
            left_on=left_on._logical_expr,
            right_on=right_on._logical_expr,
            jinja_template=predicate,
            strict=strict,
            model_alias=resolved_model_alias,
            examples=examples,
            session_state=self._df._session_state,
        ),
        self._df._session_state,
    )

sim_join

sim_join(other: DataFrame, left_on: ColumnOrName, right_on: ColumnOrName, k: int = 1, similarity_metric: SemanticSimilarityMetric = 'cosine', similarity_score_column: Optional[str] = None) -> DataFrame

Performs a semantic similarity join between two DataFrames using embedding expressions.

For each row in the left DataFrame, returns the top k most semantically similar rows from the right DataFrame based on the specified similarity metric.

Parameters:

  • other (DataFrame) –

    The right-hand DataFrame to join with.

  • left_on (ColumnOrName) –

    Expression or column representing embeddings in the left DataFrame.

  • right_on (ColumnOrName) –

    Expression or column representing embeddings in the right DataFrame.

  • k (int, default: 1 ) –

    Number of most similar matches to return per row.

  • similarity_metric (SemanticSimilarityMetric, default: 'cosine' ) –

    Similarity metric to use: "l2", "cosine", or "dot".

  • similarity_score_column (Optional[str], default: None ) –

    If set, adds a column with this name containing similarity scores. If None, the scores are omitted.

Returns:

  • DataFrame

    A DataFrame containing one row for each of the top-k matches per row in the left DataFrame.

  • DataFrame

    The result includes all columns from both DataFrames, optionally augmented with a similarity score column

  • DataFrame

    if similarity_score_column is provided.

Raises:

  • ValidationError

    If k is not positive or if the columns are invalid.

  • ValidationError

    If similarity_metric is not one of "l2", "cosine", "dot"

Match queries to FAQ entries
# Match customer queries to FAQ entries
df_queries.semantic.sim_join(
    df_faqs,
    left_on=embeddings(col("query_text")),
    right_on=embeddings(col("faq_question")),
    k=1
)
Link headlines to articles
# Link news headlines to full articles
df_headlines.semantic.sim_join(
    df_articles,
    left_on=embeddings(col("headline")),
    right_on=embeddings(col("content")),
    k=3,
    return_similarity_scores=True
)
Find similar job postings
# Find similar job postings across two sources
df_linkedin.semantic.sim_join(
    df_indeed,
    left_on=embeddings(col("job_title")),
    right_on=embeddings(col("job_description")),
    k=2
)
Source code in src/fenic/api/dataframe/semantic_extensions.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def sim_join(
    self,
    other: DataFrame,
    left_on: ColumnOrName,
    right_on: ColumnOrName,
    k: int = 1,
    similarity_metric: SemanticSimilarityMetric = "cosine",
    similarity_score_column: Optional[str] = None,
) -> DataFrame:
    """Performs a semantic similarity join between two DataFrames using embedding expressions.

    For each row in the left DataFrame, returns the top `k` most semantically similar rows
    from the right DataFrame based on the specified similarity metric.

    Args:
        other: The right-hand DataFrame to join with.
        left_on: Expression or column representing embeddings in the left DataFrame.
        right_on: Expression or column representing embeddings in the right DataFrame.
        k: Number of most similar matches to return per row.
        similarity_metric: Similarity metric to use: "l2", "cosine", or "dot".
        similarity_score_column: If set, adds a column with this name containing similarity scores.
            If None, the scores are omitted.

    Returns:
        A DataFrame containing one row for each of the top-k matches per row in the left DataFrame.
        The result includes all columns from both DataFrames, optionally augmented with a similarity score column
        if `similarity_score_column` is provided.

    Raises:
        ValidationError: If `k` is not positive or if the columns are invalid.
        ValidationError: If `similarity_metric` is not one of "l2", "cosine", "dot"

    Example: Match queries to FAQ entries
        ```python
        # Match customer queries to FAQ entries
        df_queries.semantic.sim_join(
            df_faqs,
            left_on=embeddings(col("query_text")),
            right_on=embeddings(col("faq_question")),
            k=1
        )
        ```

    Example: Link headlines to articles
        ```python
        # Link news headlines to full articles
        df_headlines.semantic.sim_join(
            df_articles,
            left_on=embeddings(col("headline")),
            right_on=embeddings(col("content")),
            k=3,
            return_similarity_scores=True
        )
        ```

    Example: Find similar job postings
        ```python
        # Find similar job postings across two sources
        df_linkedin.semantic.sim_join(
            df_indeed,
            left_on=embeddings(col("job_title")),
            right_on=embeddings(col("job_description")),
            k=2
        )
        ```
    """
    from fenic.api.dataframe.dataframe import DataFrame

    if not isinstance(right_on, ColumnOrName):
        raise ValidationError(
            f"The `right_on` argument must be a `Column` or a string representing a column name, "
            f"but got `{type(right_on).__name__}` instead."
        )
    if not isinstance(other, DataFrame):
        raise ValidationError(
                        f"The `other` argument to `sim_join()` must be a DataFrame`, but got `{type(other).__name__}`."
                    )
    if not (isinstance(k, int) and k > 0):
        raise ValidationError(
            f"The parameter `k` must be a positive integer, but received `{k}`."
        )
    args = get_args(SemanticSimilarityMetric)
    if similarity_metric not in args:
        raise ValidationError(
            f"The `similarity_metric` argument must be one of {args}, but got `{similarity_metric}`."
        )

    def _validate_column(column: ColumnOrName, name: str):
        if column is None:
            raise ValidationError(f"The `{name}` argument must not be None.")
        if not isinstance(column, ColumnOrName):
            raise ValidationError(
                f"The `{name}` argument must be a `Column` or a string representing a column name, "
                f"but got `{type(column).__name__}` instead."
            )

    _validate_column(left_on, "left_on")
    _validate_column(right_on, "right_on")

    DataFrame._ensure_same_session(self._df._session_state, [other._session_state])
    return self._df._from_logical_plan(
        SemanticSimilarityJoin.from_session_state(
            self._df._logical_plan,
            other._logical_plan,
            Column._from_col_or_name(left_on)._logical_expr,
            Column._from_col_or_name(right_on)._logical_expr,
            k,
            similarity_metric,
            similarity_score_column,
            self._df._session_state,
        ),
        self._df._session_state,
    )

with_cluster_labels

with_cluster_labels(by: ColumnOrName, num_clusters: int, max_iter: int = 300, num_init: int = 1, label_column: str = 'cluster_label', centroid_column: Optional[str] = None) -> DataFrame

Cluster rows using K-means and add cluster metadata columns.

This method clusters rows based on the given embedding column or expression using K-means. It adds a new column with cluster assignments, and optionally includes the centroid embedding for each assigned cluster.

Parameters:

  • by (ColumnOrName) –

    Column or expression producing embeddings to cluster (e.g., embed(col("text"))).

  • num_clusters (int) –

    Number of clusters to compute (must be > 0).

  • max_iter (int, default: 300 ) –

    Maximum iterations for a single run of the k-means algorithm. The algorithm stops when it either converges or reaches this limit.

  • num_init (int, default: 1 ) –

    Number of independent runs of k-means with different centroid seeds. The best result is selected.

  • label_column (str, default: 'cluster_label' ) –

    Name of the output column for cluster IDs. Default is "cluster_label".

  • centroid_column (Optional[str], default: None ) –

    If provided, adds a column with this name containing the centroid embedding for each row's assigned cluster.

Returns:

  • DataFrame

    A DataFrame with all original columns plus:

  • DataFrame
    • <label_column>: integer cluster assignment (0 to num_clusters - 1)
  • DataFrame
    • <centroid_column>: cluster centroid embedding, if specified
Basic clustering
# Cluster customer feedback and add cluster metadata
clustered_df = df.semantic.with_cluster_labels("feedback_embeddings", num_clusters=5)

# Then use regular operations to analyze clusters
clustered_df.group_by("cluster_label").agg(count("*"), avg("rating"))
Filter outliers using centroids
# Cluster and filter out rows far from their centroid
clustered_df = df.semantic.with_cluster_labels("embeddings", num_clusters=3, num_init=10, centroid_column="cluster_centroid")
clean_df = clustered_df.filter(
    embedding.compute_similarity("embeddings", "cluster_centroid", metric="cosine") > 0.7
)
Source code in src/fenic/api/dataframe/semantic_extensions.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def with_cluster_labels(
    self,
    by: ColumnOrName,
    num_clusters: int,
    max_iter: int = 300,
    num_init: int = 1,
    label_column: str = "cluster_label",
    centroid_column: Optional[str] = None,
) -> DataFrame:
    """Cluster rows using K-means and add cluster metadata columns.

    This method clusters rows based on the given embedding column or expression using K-means.
    It adds a new column with cluster assignments, and optionally includes the centroid embedding
    for each assigned cluster.

    Args:
        by: Column or expression producing embeddings to cluster (e.g., `embed(col("text"))`).
        num_clusters: Number of clusters to compute (must be > 0).
        max_iter: Maximum iterations for a single run of the k-means algorithm. The algorithm stops when it either converges or reaches this limit.
        num_init: Number of independent runs of k-means with different centroid seeds. The best result is selected.
        label_column: Name of the output column for cluster IDs. Default is "cluster_label".
        centroid_column: If provided, adds a column with this name containing the centroid embedding
                        for each row's assigned cluster.

    Returns:
        A DataFrame with all original columns plus:
        - `<label_column>`: integer cluster assignment (0 to num_clusters - 1)
        - `<centroid_column>`: cluster centroid embedding, if specified

    Example: Basic clustering
        ```python
        # Cluster customer feedback and add cluster metadata
        clustered_df = df.semantic.with_cluster_labels("feedback_embeddings", num_clusters=5)

        # Then use regular operations to analyze clusters
        clustered_df.group_by("cluster_label").agg(count("*"), avg("rating"))
        ```

    Example: Filter outliers using centroids
        ```python
        # Cluster and filter out rows far from their centroid
        clustered_df = df.semantic.with_cluster_labels("embeddings", num_clusters=3, num_init=10, centroid_column="cluster_centroid")
        clean_df = clustered_df.filter(
            embedding.compute_similarity("embeddings", "cluster_centroid", metric="cosine") > 0.7
        )
        ```
    """
    # Validate num_clusters
    if not isinstance(num_clusters, int) or num_clusters <= 0:
        raise ValidationError("`num_clusters` must be a positive integer.")

    # Validate max_iter
    if not isinstance(max_iter, int) or max_iter <= 0:
        raise ValidationError("`max_iter` must be a positive integer.")

    # Validate num_init
    if not isinstance(num_init, int) or num_init <= 0:
        raise ValidationError("`num_init` must be a positive integer.")

    # Validate clustering target
    if not isinstance(by, ColumnOrName):
        raise ValidationError(
            f"Invalid cluster by: expected a column name (str) or Column object, got {type(by).__name__}."
        )

    # Validate label_column
    if not isinstance(label_column, str) or not label_column:
        raise ValidationError("`label_column` must be a non-empty string.")

    # Validate centroid_column if provided
    if centroid_column is not None:
        if not isinstance(centroid_column, str) or not centroid_column:
            raise ValidationError("`centroid_column` must be a non-empty string if provided.")

    # Check that the expression isn't a literal
    by_expr = Column._from_col_or_name(by)._logical_expr
    if isinstance(by_expr, LiteralExpr):
        raise ValidationError(
            f"Invalid cluster by: Cannot cluster by a literal value: {by_expr}."
        )

    return self._df._from_logical_plan(
        SemanticCluster.from_session_state(
            self._df._logical_plan,
            by_expr,
            num_clusters=num_clusters,
            max_iter=max_iter,
            num_init=num_init,
            label_column=label_column,
            centroid_column=centroid_column,
            session_state=self._df._session_state,
        ),
        self._df._session_state,
    )