Skip to content

fenic.api.dataframe.semantic_extensions

Semantic extensions for DataFrames providing clustering and semantic join operations.

Classes:

SemanticExtensions

SemanticExtensions(df: DataFrame)

A namespace for semantic dataframe operators.

Initialize semantic extensions.

Parameters:

  • df (DataFrame) –

    The DataFrame to extend with semantic operations.

Methods:

  • join

    Performs a semantic join between two DataFrames using a natural language predicate.

  • sim_join

    Performs a semantic similarity join between two DataFrames using embedding expressions.

  • with_cluster_labels

    Cluster rows using K-means and add cluster metadata columns.

Source code in src/fenic/api/dataframe/semantic_extensions.py
30
31
32
33
34
35
36
def __init__(self, df: DataFrame):
    """Initialize semantic extensions.

    Args:
        df: The DataFrame to extend with semantic operations.
    """
    self._df = df

join

join(other: DataFrame, join_instruction: str, examples: Optional[JoinExampleCollection] = None, model_alias: Optional[str] = None) -> DataFrame

Performs a semantic join between two DataFrames using a natural language predicate.

That evaluates to either true or false for each potential row pair.

The join works by: 1. Evaluating the provided join_instruction as a boolean predicate for each possible pair of rows 2. Including ONLY the row pairs where the predicate evaluates to True in the result set 3. Excluding all row pairs where the predicate evaluates to False

The instruction must reference exactly two columns, one from each DataFrame, using the :left and :right suffixes to indicate column origin.

This is useful when row pairing decisions require complex reasoning based on a custom predicate rather than simple equality or similarity matching.

Parameters:

  • other (DataFrame) –

    The DataFrame to join with.

  • join_instruction (str) –

    A natural language description of how to match values.

    • Must include one placeholder from the left DataFrame (e.g. {resume_summary:left}) and one from the right (e.g. {job_description:right}).
    • This instruction is evaluated as a boolean predicate - pairs where it's True are included, pairs where it's False are excluded.
  • examples (Optional[JoinExampleCollection], default: None ) –

    Optional JoinExampleCollection containing labeled pairs (left, right, output) to guide the semantic join behavior.

  • model_alias (Optional[str], default: None ) –

    Optional alias for the language model to use for the mapping. If None, will use the language model configured as the default.

Returns:

  • DataFrame ( DataFrame ) –

    A new DataFrame containing only the row pairs where the join_instruction predicate evaluates to True.

Raises:

  • TypeError

    If other is not a DataFrame or join_instruction is not a string.

  • ValueError

    If the instruction format is invalid or references invalid columns.

Basic semantic join
# Match job listings with candidate resumes based on title/skills
# Only includes pairs where the predicate evaluates to True
df_jobs.semantic.join(df_resumes,
    join_instruction="Given a candidate's resume_summary: {resume_summary:left} and a job description: {job_description:right}, does the candidate have the appropriate skills for the job?"
)
Semantic join with examples
# Improve join quality with examples
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="5 years experience building backend services in Python using asyncio, FastAPI, and PostgreSQL",
    right="Senior Software Engineer - Backend",
    output=True))  # This pair WILL be included in similar cases
examples.create_example(JoinExample(
    left="5 years experience with growth strategy, private equity due diligence, and M&A",
    right="Product Manager - Hardware",
    output=False))  # This pair will NOT be included in similar cases
df_jobs.semantic.join(df_resumes,
    join_instruction="Given a candidate's resume_summary: {resume_summary:left} and a job description: {job_description:right}, does the candidate have the appropriate skills for the job?",
    examples=examples)
Source code in src/fenic/api/dataframe/semantic_extensions.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def join(
    self,
    other: DataFrame,
    join_instruction: str,
    examples: Optional[JoinExampleCollection] = None,
    model_alias: Optional[str] = None,
) -> DataFrame:
    """Performs a semantic join between two DataFrames using a natural language predicate.

    That evaluates to either true or false for each potential row pair.

    The join works by:
    1. Evaluating the provided join_instruction as a boolean predicate for each possible pair of rows
    2. Including ONLY the row pairs where the predicate evaluates to True in the result set
    3. Excluding all row pairs where the predicate evaluates to False

    The instruction must reference **exactly two columns**, one from each DataFrame,
    using the `:left` and `:right` suffixes to indicate column origin.

    This is useful when row pairing decisions require complex reasoning based on a custom predicate rather than simple equality or similarity matching.

    Args:
        other: The DataFrame to join with.
        join_instruction: A natural language description of how to match values.

            - Must include one placeholder from the left DataFrame (e.g. `{resume_summary:left}`)
            and one from the right (e.g. `{job_description:right}`).
            - This instruction is evaluated as a boolean predicate - pairs where it's `True` are included,
            pairs where it's `False` are excluded.
        examples: Optional JoinExampleCollection containing labeled pairs (`left`, `right`, `output`)
            to guide the semantic join behavior.
        model_alias: Optional alias for the language model to use for the mapping. If None, will use the language model configured as the default.

    Returns:
        DataFrame: A new DataFrame containing only the row pairs where the join_instruction
                  predicate evaluates to True.

    Raises:
        TypeError: If `other` is not a DataFrame or `join_instruction` is not a string.
        ValueError: If the instruction format is invalid or references invalid columns.

    Example: Basic semantic join
        ```python
        # Match job listings with candidate resumes based on title/skills
        # Only includes pairs where the predicate evaluates to True
        df_jobs.semantic.join(df_resumes,
            join_instruction="Given a candidate's resume_summary: {resume_summary:left} and a job description: {job_description:right}, does the candidate have the appropriate skills for the job?"
        )
        ```

    Example: Semantic join with examples
        ```python
        # Improve join quality with examples
        examples = JoinExampleCollection()
        examples.create_example(JoinExample(
            left="5 years experience building backend services in Python using asyncio, FastAPI, and PostgreSQL",
            right="Senior Software Engineer - Backend",
            output=True))  # This pair WILL be included in similar cases
        examples.create_example(JoinExample(
            left="5 years experience with growth strategy, private equity due diligence, and M&A",
            right="Product Manager - Hardware",
            output=False))  # This pair will NOT be included in similar cases
        df_jobs.semantic.join(df_resumes,
            join_instruction="Given a candidate's resume_summary: {resume_summary:left} and a job description: {job_description:right}, does the candidate have the appropriate skills for the job?",
            examples=examples)
        ```
    """
    from fenic.api.dataframe.dataframe import DataFrame

    if not isinstance(other, DataFrame):
        raise TypeError(f"other argument must be a DataFrame, got {type(other)}")

    if not isinstance(join_instruction, str):
        raise TypeError(
            f"join_instruction argument must be a string, got {type(join_instruction)}"
        )
    join_columns = utils.parse_instruction(join_instruction)
    if len(join_columns) != 2:
        raise ValueError(
            f"join_instruction must contain exactly two columns, got {len(join_columns)}"
        )
    left_on = None
    right_on = None
    for join_col in join_columns:
        if join_col.endswith(":left"):
            if left_on is not None:
                raise ValueError(
                    "join_instruction cannot contain multiple :left columns"
                )
            left_on = col(join_col.split(":")[0])
        elif join_col.endswith(":right"):
            if right_on is not None:
                raise ValueError(
                    "join_instruction cannot contain multiple :right columns"
                )
            right_on = col(join_col.split(":")[0])
        else:
            raise ValueError(
                f"Column '{join_col}' must end with either :left or :right"
            )

    if left_on is None or right_on is None:
        raise ValueError(
            "join_instruction must contain exactly one :left and one :right column"
        )

    return self._df._from_logical_plan(
        SemanticJoin(
            left=self._df._logical_plan,
            right=other._logical_plan,
            left_on=left_on._logical_expr,
            right_on=right_on._logical_expr,
            join_instruction=join_instruction,
            examples=examples,
            model_alias=model_alias,
        ),
    )

sim_join

sim_join(other: DataFrame, left_on: ColumnOrName, right_on: ColumnOrName, k: int = 1, similarity_metric: SemanticSimilarityMetric = 'cosine', similarity_score_column: Optional[str] = None) -> DataFrame

Performs a semantic similarity join between two DataFrames using embedding expressions.

For each row in the left DataFrame, returns the top k most semantically similar rows from the right DataFrame based on the specified similarity metric.

Parameters:

  • other (DataFrame) –

    The right-hand DataFrame to join with.

  • left_on (ColumnOrName) –

    Expression or column representing embeddings in the left DataFrame.

  • right_on (ColumnOrName) –

    Expression or column representing embeddings in the right DataFrame.

  • k (int, default: 1 ) –

    Number of most similar matches to return per row.

  • similarity_metric (SemanticSimilarityMetric, default: 'cosine' ) –

    Similarity metric to use: "l2", "cosine", or "dot".

  • similarity_score_column (Optional[str], default: None ) –

    If set, adds a column with this name containing similarity scores. If None, the scores are omitted.

Returns:

  • DataFrame

    A DataFrame containing one row for each of the top-k matches per row in the left DataFrame.

  • DataFrame

    The result includes all columns from both DataFrames, optionally augmented with a similarity score column

  • DataFrame

    if similarity_score_column is provided.

Raises:

  • ValidationError

    If k is not positive or if the columns are invalid.

  • ValidationError

    If similarity_metric is not one of "l2", "cosine", "dot"

Match queries to FAQ entries
# Match customer queries to FAQ entries
df_queries.semantic.sim_join(
    df_faqs,
    left_on=embeddings(col("query_text")),
    right_on=embeddings(col("faq_question")),
    k=1
)
Link headlines to articles
# Link news headlines to full articles
df_headlines.semantic.sim_join(
    df_articles,
    left_on=embeddings(col("headline")),
    right_on=embeddings(col("content")),
    k=3,
    return_similarity_scores=True
)
Find similar job postings
# Find similar job postings across two sources
df_linkedin.semantic.sim_join(
    df_indeed,
    left_on=embeddings(col("job_title")),
    right_on=embeddings(col("job_description")),
    k=2
)
Source code in src/fenic/api/dataframe/semantic_extensions.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def sim_join(
    self,
    other: DataFrame,
    left_on: ColumnOrName,
    right_on: ColumnOrName,
    k: int = 1,
    similarity_metric: SemanticSimilarityMetric = "cosine",
    similarity_score_column: Optional[str] = None,
) -> DataFrame:
    """Performs a semantic similarity join between two DataFrames using embedding expressions.

    For each row in the left DataFrame, returns the top `k` most semantically similar rows
    from the right DataFrame based on the specified similarity metric.

    Args:
        other: The right-hand DataFrame to join with.
        left_on: Expression or column representing embeddings in the left DataFrame.
        right_on: Expression or column representing embeddings in the right DataFrame.
        k: Number of most similar matches to return per row.
        similarity_metric: Similarity metric to use: "l2", "cosine", or "dot".
        similarity_score_column: If set, adds a column with this name containing similarity scores.
            If None, the scores are omitted.

    Returns:
        A DataFrame containing one row for each of the top-k matches per row in the left DataFrame.
        The result includes all columns from both DataFrames, optionally augmented with a similarity score column
        if `similarity_score_column` is provided.

    Raises:
        ValidationError: If `k` is not positive or if the columns are invalid.
        ValidationError: If `similarity_metric` is not one of "l2", "cosine", "dot"

    Example: Match queries to FAQ entries
        ```python
        # Match customer queries to FAQ entries
        df_queries.semantic.sim_join(
            df_faqs,
            left_on=embeddings(col("query_text")),
            right_on=embeddings(col("faq_question")),
            k=1
        )
        ```

    Example: Link headlines to articles
        ```python
        # Link news headlines to full articles
        df_headlines.semantic.sim_join(
            df_articles,
            left_on=embeddings(col("headline")),
            right_on=embeddings(col("content")),
            k=3,
            return_similarity_scores=True
        )
        ```

    Example: Find similar job postings
        ```python
        # Find similar job postings across two sources
        df_linkedin.semantic.sim_join(
            df_indeed,
            left_on=embeddings(col("job_title")),
            right_on=embeddings(col("job_description")),
            k=2
        )
        ```
    """
    from fenic.api.dataframe.dataframe import DataFrame

    if not isinstance(right_on, ColumnOrName):
        raise ValidationError(
            f"The `right_on` argument must be a `Column` or a string representing a column name, "
            f"but got `{type(right_on).__name__}` instead."
        )
    if not isinstance(other, DataFrame):
        raise ValidationError(
                        f"The `other` argument to `sim_join()` must be a DataFrame`, but got `{type(other).__name__}`."
                    )
    if not (isinstance(k, int) and k > 0):
        raise ValidationError(
            f"The parameter `k` must be a positive integer, but received `{k}`."
        )
    args = get_args(SemanticSimilarityMetric)
    if similarity_metric not in args:
        raise ValidationError(
            f"The `similarity_metric` argument must be one of {args}, but got `{similarity_metric}`."
        )

    def _validate_column(column: ColumnOrName, name: str):
        if column is None:
            raise ValidationError(f"The `{name}` argument must not be None.")
        if not isinstance(column, ColumnOrName):
            raise ValidationError(
                f"The `{name}` argument must be a `Column` or a string representing a column name, "
                f"but got `{type(column).__name__}` instead."
            )

    _validate_column(left_on, "left_on")
    _validate_column(right_on, "right_on")

    return self._df._from_logical_plan(
        SemanticSimilarityJoin(
            self._df._logical_plan,
            other._logical_plan,
            Column._from_col_or_name(left_on)._logical_expr,
            Column._from_col_or_name(right_on)._logical_expr,
            k,
            similarity_metric,
            similarity_score_column,
        ),
    )

with_cluster_labels

with_cluster_labels(by: ColumnOrName, num_clusters: int, label_column: str = 'cluster_label', centroid_column: Optional[str] = None) -> DataFrame

Cluster rows using K-means and add cluster metadata columns.

This method clusters rows based on the given embedding column or expression using K-means. It adds a new column with cluster assignments, and optionally includes the centroid embedding for each assigned cluster.

Parameters:

  • by (ColumnOrName) –

    Column or expression producing embeddings to cluster (e.g., embed(col("text"))).

  • num_clusters (int) –

    Number of clusters to compute (must be > 0).

  • label_column (str, default: 'cluster_label' ) –

    Name of the output column for cluster IDs. Default is "cluster_label".

  • centroid_column (Optional[str], default: None ) –

    If provided, adds a column with this name containing the centroid embedding for each row's assigned cluster.

Returns:

  • DataFrame

    A DataFrame with all original columns plus:

  • DataFrame
    • <label_column>: integer cluster assignment (0 to num_clusters - 1)
  • DataFrame
    • <centroid_column>: cluster centroid embedding, if specified

Raises:

  • ValidationError

    If num_clusters is not a positive integer

  • ValidationError

    If label_column is not a non-empty string

  • ValidationError

    If centroid_column is not a non-empty string

  • TypeMismatchError

    If the column is not an EmbeddingType

Basic clustering
# Cluster customer feedback and add cluster metadata
clustered_df = df.semantic.with_cluster_labels("feedback_embeddings", 5)

# Then use regular operations to analyze clusters
clustered_df.group_by("cluster_label").agg(count("*"), avg("rating"))
Filter outliers using centroids
# Cluster and filter out rows far from their centroid
clustered_df = df.semantic.with_cluster_labels("embeddings", 3, centroid_column="cluster_centroid")
clean_df = clustered_df.filter(
    embedding.compute_similarity("embeddings", "cluster_centroid", metric="cosine") > 0.7
)
Source code in src/fenic/api/dataframe/semantic_extensions.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def with_cluster_labels(self, by: ColumnOrName, num_clusters: int, label_column: str = "cluster_label", centroid_column: Optional[str] = None) -> DataFrame:
    """Cluster rows using K-means and add cluster metadata columns.

    This method clusters rows based on the given embedding column or expression using K-means.
    It adds a new column with cluster assignments, and optionally includes the centroid embedding
    for each assigned cluster.

    Args:
        by: Column or expression producing embeddings to cluster (e.g., `embed(col("text"))`).
        num_clusters: Number of clusters to compute (must be > 0).
        label_column: Name of the output column for cluster IDs. Default is "cluster_label".
        centroid_column: If provided, adds a column with this name containing the centroid embedding
                        for each row's assigned cluster.

    Returns:
        A DataFrame with all original columns plus:
        - `<label_column>`: integer cluster assignment (0 to num_clusters - 1)
        - `<centroid_column>`: cluster centroid embedding, if specified

    Raises:
        ValidationError: If num_clusters is not a positive integer
        ValidationError: If label_column is not a non-empty string
        ValidationError: If centroid_column is not a non-empty string
        TypeMismatchError: If the column is not an EmbeddingType

    Example: Basic clustering
        ```python
        # Cluster customer feedback and add cluster metadata
        clustered_df = df.semantic.with_cluster_labels("feedback_embeddings", 5)

        # Then use regular operations to analyze clusters
        clustered_df.group_by("cluster_label").agg(count("*"), avg("rating"))
        ```

    Example: Filter outliers using centroids
        ```python
        # Cluster and filter out rows far from their centroid
        clustered_df = df.semantic.with_cluster_labels("embeddings", 3, centroid_column="cluster_centroid")
        clean_df = clustered_df.filter(
            embedding.compute_similarity("embeddings", "cluster_centroid", metric="cosine") > 0.7
        )
        ```
    """
    # Validate num_clusters
    if not isinstance(num_clusters, int) or num_clusters <= 0:
        raise ValidationError("`num_clusters` must be a positive integer greater than 0.")

    # Validate clustering target
    if not isinstance(by, ColumnOrName):
        raise ValidationError(
            f"Invalid cluster by: expected a column name (str) or Column object, got {type(by).__name__}."
        )

    # Validate label_column
    if not isinstance(label_column, str) or not label_column:
        raise ValidationError("`label_column` must be a non-empty string.")

    # Validate centroid_column if provided
    if centroid_column is not None:
        if not isinstance(centroid_column, str) or not centroid_column:
            raise ValidationError("`centroid_column` must be a non-empty string if provided.")

    # Check that the expression isn't a literal
    by_expr = Column._from_col_or_name(by)._logical_expr
    if isinstance(by_expr, LiteralExpr):
        raise ValidationError(
            f"Invalid cluster by: Cannot cluster by a literal value: {by_expr}."
        )

    return self._df._from_logical_plan(
        SemanticCluster(
            self._df._logical_plan, by_expr, num_clusters, label_column, centroid_column
        )
    )