Skip to content

fenic.api.functions

Functions for working with DataFrame columns.

Functions:

  • array

    Creates a new array column from multiple input columns.

  • array_agg

    Alias for collect_list().

  • array_contains

    Checks if array column contains a specific value.

  • array_size

    Returns the number of elements in an array column.

  • asc

    Mark this column for ascending sort order with nulls first.

  • asc_nulls_first

    Alias for asc().

  • asc_nulls_last

    Mark this column for ascending sort order with nulls last.

  • async_udf

    A decorator for creating async user-defined functions (UDFs) with configurable concurrency and retries.

  • avg

    Aggregate function: returns the average (mean) of all values in the specified column. Applies to numeric and embedding types.

  • coalesce

    Returns the first non-null value from the given columns for each row.

  • col

    Creates a Column expression referencing a column in the DataFrame.

  • collect_list

    Aggregate function: collects all values from the specified column into a list.

  • count

    Aggregate function: returns the count of non-null values in the specified column.

  • desc

    Mark this column for descending sort order with nulls first.

  • desc_nulls_first

    Alias for desc().

  • desc_nulls_last

    Mark this column for descending sort order with nulls last.

  • empty

    Creates a Column expression representing an empty value of the given type.

  • first

    Aggregate function: returns the first non-null value in the specified column.

  • greatest

    Returns the greatest value from the given columns for each row.

  • least

    Returns the least value from the given columns for each row.

  • lit

    Creates a Column expression representing a literal value.

  • max

    Aggregate function: returns the maximum value in the specified column.

  • mean

    Aggregate function: returns the mean (average) of all values in the specified column.

  • min

    Aggregate function: returns the minimum value in the specified column.

  • null

    Creates a Column expression representing a null value of the specified data type.

  • stddev

    Aggregate function: returns the sample standard deviation of the specified column.

  • struct

    Creates a new struct column from multiple input columns.

  • sum

    Aggregate function: returns the sum of all values in the specified column.

  • tool_param

    Creates an unresolved literal placeholder column with a declared data type.

  • udf

    A decorator or function for creating user-defined functions (UDFs) that can be applied to DataFrame rows.

  • when

    Evaluates a condition and returns a value if true.

array

array(*args: Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]]) -> Column

Creates a new array column from multiple input columns.

Parameters:

  • *args (Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]], default: () ) –

    Columns or column names to combine into an array. Can be:

    • Individual arguments
    • Lists of columns/column names
    • Tuples of columns/column names

Returns:

  • Column

    A Column expression representing an array containing values from the input columns

Raises:

  • TypeError

    If any argument is not a Column, string, or collection of Columns/strings

Source code in src/fenic/api/functions/builtin.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def array(
    *args: Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]]
) -> Column:
    """Creates a new array column from multiple input columns.

    Args:
        *args: Columns or column names to combine into an array. Can be:

            - Individual arguments
            - Lists of columns/column names
            - Tuples of columns/column names

    Returns:
        A Column expression representing an array containing values from the input columns

    Raises:
        TypeError: If any argument is not a Column, string, or collection of
            Columns/strings
    """
    flattened_args = []
    for arg in args:
        if isinstance(arg, (list, tuple)):
            flattened_args.extend(arg)
        else:
            flattened_args.append(arg)

    expr_columns = [Column._from_col_or_name(c)._logical_expr for c in flattened_args]

    return Column._from_logical_expr(ArrayExpr(expr_columns))

array_agg

array_agg(column: ColumnOrName) -> Column

Alias for collect_list().

Source code in src/fenic/api/functions/builtin.py
167
168
169
170
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def array_agg(column: ColumnOrName) -> Column:
    """Alias for collect_list()."""
    return collect_list(column)

array_contains

array_contains(column: ColumnOrName, value: Union[str, int, float, bool, Column]) -> Column

Checks if array column contains a specific value.

This function returns True if the array in the specified column contains the given value, and False otherwise. Returns False if the array is None.

Parameters:

  • column (ColumnOrName) –

    Column or column name containing the arrays to check.

  • value (Union[str, int, float, bool, Column]) –

    Value to search for in the arrays. Can be: - A literal value (string, number, boolean) - A Column expression

Returns:

  • Column

    A boolean Column expression (True if value is found, False otherwise).

Raises:

  • TypeError

    If value type is incompatible with the array element type.

  • TypeError

    If the column does not contain array data.

Check for values in arrays
# Check if 'python' exists in arrays in the 'tags' column
df.select(array_contains("tags", "python"))

# Check using a value from another column
df.select(array_contains("tags", col("search_term")))
Source code in src/fenic/api/functions/builtin.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def array_contains(
    column: ColumnOrName, value: Union[str, int, float, bool, Column]
) -> Column:
    """Checks if array column contains a specific value.

    This function returns True if the array in the specified column contains the given value,
    and False otherwise. Returns False if the array is None.

    Args:
        column: Column or column name containing the arrays to check.

        value: Value to search for in the arrays. Can be:
            - A literal value (string, number, boolean)
            - A Column expression

    Returns:
        A boolean Column expression (True if value is found, False otherwise).

    Raises:
        TypeError: If value type is incompatible with the array element type.
        TypeError: If the column does not contain array data.

    Example: Check for values in arrays
        ```python
        # Check if 'python' exists in arrays in the 'tags' column
        df.select(array_contains("tags", "python"))

        # Check using a value from another column
        df.select(array_contains("tags", col("search_term")))
        ```
    """
    value_column = None
    if isinstance(value, Column):
        value_column = value
    else:
        value_column = lit(value)
    return Column._from_logical_expr(
        ArrayContainsExpr(
            Column._from_col_or_name(column)._logical_expr, value_column._logical_expr
        )
    )

array_size

array_size(column: ColumnOrName) -> Column

Returns the number of elements in an array column.

This function computes the length of arrays stored in the specified column. Returns None for None arrays.

Parameters:

  • column (ColumnOrName) –

    Column or column name containing arrays whose length to compute.

Returns:

  • Column

    A Column expression representing the array length.

Raises:

  • TypeError

    If the column does not contain array data.

Get array sizes
# Get the size of arrays in 'tags' column
df.select(array_size("tags"))

# Use with column reference
df.select(array_size(col("tags")))
Source code in src/fenic/api/functions/builtin.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def array_size(column: ColumnOrName) -> Column:
    """Returns the number of elements in an array column.

    This function computes the length of arrays stored in the specified column.
    Returns None for None arrays.

    Args:
        column: Column or column name containing arrays whose length to compute.

    Returns:
        A Column expression representing the array length.

    Raises:
        TypeError: If the column does not contain array data.

    Example: Get array sizes
        ```python
        # Get the size of arrays in 'tags' column
        df.select(array_size("tags"))

        # Use with column reference
        df.select(array_size(col("tags")))
        ```
    """
    return Column._from_logical_expr(
        ArrayLengthExpr(Column._from_col_or_name(column)._logical_expr)
    )

asc

asc(column: ColumnOrName) -> Column

Mark this column for ascending sort order with nulls first.

Parameters:

  • column (ColumnOrName) –

    The column to apply the ascending ordering to.

Returns:

  • Column

    A sort expression with ascending order and nulls first.

Source code in src/fenic/api/functions/builtin.py
422
423
424
425
426
427
428
429
430
431
432
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def asc(column: ColumnOrName) -> Column:
    """Mark this column for ascending sort order with nulls first.

    Args:
        column: The column to apply the ascending ordering to.

    Returns:
        A sort expression with ascending order and nulls first.
    """
    return Column._from_col_or_name(column).asc()

asc_nulls_first

asc_nulls_first(column: ColumnOrName) -> Column

Alias for asc().

Parameters:

  • column (ColumnOrName) –

    The column to apply the ascending ordering to.

Returns:

  • Column

    A sort expression with ascending order and nulls first.

Source code in src/fenic/api/functions/builtin.py
435
436
437
438
439
440
441
442
443
444
445
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def asc_nulls_first(column: ColumnOrName) -> Column:
    """Alias for asc().

    Args:
        column: The column to apply the ascending ordering to.

    Returns:
        A sort expression with ascending order and nulls first.
    """
    return Column._from_col_or_name(column).asc_nulls_first()

asc_nulls_last

asc_nulls_last(column: ColumnOrName) -> Column

Mark this column for ascending sort order with nulls last.

Parameters:

  • column (ColumnOrName) –

    The column to apply the ascending ordering to.

Returns:

  • Column

    A Column expression representing the column and the ascending sort order with nulls last.

Source code in src/fenic/api/functions/builtin.py
448
449
450
451
452
453
454
455
456
457
458
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def asc_nulls_last(column: ColumnOrName) -> Column:
    """Mark this column for ascending sort order with nulls last.

    Args:
        column: The column to apply the ascending ordering to.

    Returns:
        A Column expression representing the column and the ascending sort order with nulls last.
    """
    return Column._from_col_or_name(column).asc_nulls_last()

async_udf

async_udf(f: Optional[Callable[..., Awaitable[Any]]] = None, *, return_type: DataType, max_concurrency: int = 10, timeout_seconds: float = 30, num_retries: int = 0)

A decorator for creating async user-defined functions (UDFs) with configurable concurrency and retries.

Async UDFs allow IO-bound operations (API calls, database queries, MCP tool calls) to be executed concurrently while maintaining DataFrame semantics.

Parameters:

  • f (Optional[Callable[..., Awaitable[Any]]], default: None ) –

    Async function to convert to UDF

  • return_type (DataType) –

    Expected return type of the UDF. Required parameter.

  • max_concurrency (int, default: 10 ) –

    Maximum number of concurrent executions (default: 10)

  • timeout_seconds (float, default: 30 ) –

    Per-item timeout in seconds (default: 30)

  • num_retries (int, default: 0 ) –

    Number of retries for failed items (default: 0)

Basic async UDF

```python @async_udf(return_type=IntegerType) async def slow_add(x: int, y: int) -> int: await asyncio.sleep(1) return x + y

df = df.select(slow_add(fc.col("x"), fc.col("y")).alias("slow_sum"))

Or

async def slow_add_fn(x: int, y: int) -> int: await asyncio.sleep(1) return x + y

slow_add = async_udf( slow_add_fn, return_type=IntegerType )

```

Example: API call with custom concurrency and retries python @async_udf( return_type=StructType([ StructField("status", IntegerType), StructField("data", StringType) ]), max_concurrency=20, timeout_seconds=5, num_retries=2 ) async def fetch_data(id: str) -> dict: async with aiohttp.ClientSession() as session: async with session.get(f"https://api.example.com/{id}") as resp: return { "status": resp.status, "data": await resp.text() }

Note: - Individual failures return None instead of raising exceptions - Async UDFs should not block or do CPU-intensive work, as they will block execution of other instances of the function call.

Source code in src/fenic/api/functions/builtin.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def async_udf(
    f: Optional[Callable[..., Awaitable[Any]]] = None,
    *,
    return_type: DataType,
    max_concurrency: int = 10,
    timeout_seconds: float = 30,
    num_retries: int = 0,
):
    """A decorator for creating async user-defined functions (UDFs) with configurable concurrency and retries.

    Async UDFs allow IO-bound operations (API calls, database queries, MCP tool calls)
    to be executed concurrently while maintaining DataFrame semantics.

    Args:
        f: Async function to convert to UDF
        return_type: Expected return type of the UDF. Required parameter.
        max_concurrency: Maximum number of concurrent executions (default: 10)
        timeout_seconds: Per-item timeout in seconds (default: 30)
        num_retries: Number of retries for failed items (default: 0)

    Example: Basic async UDF
        ```python
        @async_udf(return_type=IntegerType)
        async def slow_add(x: int, y: int) -> int:
            await asyncio.sleep(1)
            return x + y

        df = df.select(slow_add(fc.col("x"), fc.col("y")).alias("slow_sum"))

        # Or
        async def slow_add_fn(x: int, y: int) -> int:
            await asyncio.sleep(1)
            return x + y

        slow_add = async_udf(
            slow_add_fn,
            return_type=IntegerType
        )
    ```

    Example: API call with custom concurrency and retries
        ```python
        @async_udf(
            return_type=StructType([
                StructField("status", IntegerType),
                StructField("data", StringType)
            ]),
            max_concurrency=20,
            timeout_seconds=5,
            num_retries=2
        )
        async def fetch_data(id: str) -> dict:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"https://api.example.com/{id}") as resp:
                    return {
                        "status": resp.status,
                        "data": await resp.text()
                    }
        ```

    Note:
        - Individual failures return None instead of raising exceptions
        - Async UDFs should not block or do CPU-intensive work, as they
          will block execution of other instances of the function call.
    """

    def _create_async_udf(func: Callable[..., Awaitable[Any]]) -> Callable:
        if not inspect.iscoroutinefunction(func):
            raise ValidationError(
                f"@async_udf requires an async function, but found a synchronous "
                f"function {func.__name__!r} of type {type(func)}"
            )

        @wraps(func)
        def _async_udf_wrapper(*cols: ColumnOrName) -> Column:
            col_exprs = [Column._from_col_or_name(c)._logical_expr for c in cols]
            return Column._from_logical_expr(
                AsyncUDFExpr(
                    func,
                    col_exprs,
                    return_type,
                    max_concurrency=max_concurrency,
                    timeout_seconds=timeout_seconds,
                    num_retries=num_retries
                )
            )
        return _async_udf_wrapper

    if _is_logical_type(return_type):
        raise NotImplementedError(f"return_type {return_type} is not supported for async UDFs")

    # Support both @async_udf and async_udf(...) syntax
    if f is None:
        return _create_async_udf
    else:
        return _create_async_udf(f)

avg

avg(column: ColumnOrName) -> Column

Aggregate function: returns the average (mean) of all values in the specified column. Applies to numeric and embedding types.

Parameters:

  • column (ColumnOrName) –

    Column or column name to compute the average of

Returns:

  • Column

    A Column expression representing the average aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def avg(column: ColumnOrName) -> Column:
    """Aggregate function: returns the average (mean) of all values in the specified column. Applies to numeric and embedding types.

    Args:
        column: Column or column name to compute the average of

    Returns:
        A Column expression representing the average aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        AvgExpr(Column._from_col_or_name(column)._logical_expr)
    )

coalesce

coalesce(*cols: ColumnOrName) -> Column

Returns the first non-null value from the given columns for each row.

This function mimics the behavior of SQL's COALESCE function. It evaluates the input columns in order and returns the first non-null value encountered. If all values are null, returns null.

Parameters:

  • *cols (ColumnOrName, default: () ) –

    Column expressions or column names to evaluate. Each argument should be a single column expression or column name string.

Returns:

  • Column

    A Column expression containing the first non-null value from the input columns.

Raises:

coalesce usage
df.select(coalesce("col1", "col2", "col3"))
Source code in src/fenic/api/functions/builtin.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def coalesce(*cols: ColumnOrName) -> Column:
    """Returns the first non-null value from the given columns for each row.

    This function mimics the behavior of SQL's COALESCE function. It evaluates the input columns
    in order and returns the first non-null value encountered. If all values are null, returns null.

    Args:
        *cols: Column expressions or column names to evaluate. Each argument should be a single
            column expression or column name string.

    Returns:
        A Column expression containing the first non-null value from the input columns.

    Raises:
        ValidationError: If no columns are provided.

    Example: coalesce usage
        ```python
        df.select(coalesce("col1", "col2", "col3"))
        ```
    """
    if not cols:
        raise ValidationError("No columns were provided. Please specify at least one column to use with the coalesce method.")

    exprs = [
        Column._from_col_or_name(c)._logical_expr for c in cols
    ]
    return Column._from_logical_expr(CoalesceExpr(exprs))

col

col(col_name: str) -> Column

Creates a Column expression referencing a column in the DataFrame.

Parameters:

  • col_name (str) –

    Name of the column to reference

Returns:

  • Column

    A Column expression for the specified column

Raises:

  • TypeError

    If colName is not a string

Source code in src/fenic/api/functions/core.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@validate_call(config=ConfigDict(strict=True))
def col(col_name: str) -> Column:
    """Creates a Column expression referencing a column in the DataFrame.

    Args:
        col_name: Name of the column to reference

    Returns:
        A Column expression for the specified column

    Raises:
        TypeError: If colName is not a string
    """
    return Column._from_column_name(col_name)

collect_list

collect_list(column: ColumnOrName) -> Column

Aggregate function: collects all values from the specified column into a list.

Parameters:

  • column (ColumnOrName) –

    Column or column name to collect values from

Returns:

  • Column

    A Column expression representing the list aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def collect_list(column: ColumnOrName) -> Column:
    """Aggregate function: collects all values from the specified column into a list.

    Args:
        column: Column or column name to collect values from

    Returns:
        A Column expression representing the list aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        ListExpr(Column._from_col_or_name(column)._logical_expr)
    )

count

count(column: ColumnOrName) -> Column

Aggregate function: returns the count of non-null values in the specified column.

Parameters:

  • column (ColumnOrName) –

    Column or column name to count values in

Returns:

  • Column

    A Column expression representing the count aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def count(column: ColumnOrName) -> Column:
    """Aggregate function: returns the count of non-null values in the specified column.

    Args:
        column: Column or column name to count values in

    Returns:
        A Column expression representing the count aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    if isinstance(column, str) and column == "*":
        return Column._from_logical_expr(CountExpr(lit("*")._logical_expr))
    return Column._from_logical_expr(
        CountExpr(Column._from_col_or_name(column)._logical_expr)
    )

desc

desc(column: ColumnOrName) -> Column

Mark this column for descending sort order with nulls first.

Parameters:

  • column (ColumnOrName) –

    The column to apply the descending ordering to.

Returns:

  • Column

    A sort expression with descending order and nulls first.

Source code in src/fenic/api/functions/builtin.py
461
462
463
464
465
466
467
468
469
470
471
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def desc(column: ColumnOrName) -> Column:
    """Mark this column for descending sort order with nulls first.

    Args:
        column: The column to apply the descending ordering to.

    Returns:
        A sort expression with descending order and nulls first.
    """
    return Column._from_col_or_name(column).desc()

desc_nulls_first

desc_nulls_first(column: ColumnOrName) -> Column

Alias for desc().

Parameters:

  • column (ColumnOrName) –

    The column to apply the descending ordering to.

Returns:

  • Column

    A sort expression with descending order and nulls first.

Source code in src/fenic/api/functions/builtin.py
474
475
476
477
478
479
480
481
482
483
484
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def desc_nulls_first(column: ColumnOrName) -> Column:
    """Alias for desc().

    Args:
        column: The column to apply the descending ordering to.

    Returns:
        A sort expression with descending order and nulls first.
    """
    return Column._from_col_or_name(column).desc_nulls_first()

desc_nulls_last

desc_nulls_last(column: ColumnOrName) -> Column

Mark this column for descending sort order with nulls last.

Parameters:

  • column (ColumnOrName) –

    The column to apply the descending ordering to.

Returns:

  • Column

    A sort expression with descending order and nulls last.

Source code in src/fenic/api/functions/builtin.py
487
488
489
490
491
492
493
494
495
496
497
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def desc_nulls_last(column: ColumnOrName) -> Column:
    """Mark this column for descending sort order with nulls last.

    Args:
        column: The column to apply the descending ordering to.

    Returns:
        A sort expression with descending order and nulls last.
    """
    return Column._from_col_or_name(column).desc_nulls_last()

empty

empty(data_type: DataType) -> Column

Creates a Column expression representing an empty value of the given type.

  • If the data type is ArrayType(...), the empty value will be an empty array.
  • If the data type is StructType(...), the empty value will be an instance of the struct type with all fields set to None.
  • For all other data types, the empty value is None (equivalent to calling null(data_type))

This function is useful for creating columns with empty values of a particular type.

Parameters:

  • data_type (DataType) –

    The data type of the empty value

Returns:

  • Column

    A Column expression representing the empty value

Raises:

Creating a column with an empty array type
# The newly created `b` column will have a value of `[]` for all rows
df.select(fc.col("a"), fc.empty(fc.ArrayType(fc.IntegerType)).alias("b"))
Creating a column with an empty struct type
# The newly created `b` column will have a value of `{b: None}` for all rows
df.select(fc.col("a"), fc.empty(fc.StructType([fc.StructField("b", fc.IntegerType)])).alias("b"))
Creating a column with an empty primitive type
# The newly created `b` column will have a value of `None` for all rows
df.select(fc.col("a"), fc.empty(fc.IntegerType).alias("b"))
Source code in src/fenic/api/functions/core.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def empty(data_type: DataType) -> Column:
    """Creates a Column expression representing an empty value of the given type.

    - If the data type is `ArrayType(...)`, the empty value will be an empty array.
    - If the data type is `StructType(...)`, the empty value will be an instance of the struct type with all fields set to `None`.
    - For all other data types, the empty value is None (equivalent to calling `null(data_type)`)

    This function is useful for creating columns with empty values of a particular type.

    Args:
        data_type: The data type of the empty value

    Returns:
        A Column expression representing the empty value

    Raises:
        ValidationError: If the data type is not a valid data type

    Example: Creating a column with an empty array type
        ```python
        # The newly created `b` column will have a value of `[]` for all rows
        df.select(fc.col("a"), fc.empty(fc.ArrayType(fc.IntegerType)).alias("b"))
        ```

    Example: Creating a column with an empty struct type
        ```python
        # The newly created `b` column will have a value of `{b: None}` for all rows
        df.select(fc.col("a"), fc.empty(fc.StructType([fc.StructField("b", fc.IntegerType)])).alias("b"))
        ```

    Example: Creating a column with an empty primitive type
        ```python
        # The newly created `b` column will have a value of `None` for all rows
        df.select(fc.col("a"), fc.empty(fc.IntegerType).alias("b"))
        ```
    """
    if isinstance(data_type, ArrayType):
        return Column._from_logical_expr(LiteralExpr([], data_type))
    elif isinstance(data_type, StructType):
        return Column._from_logical_expr(LiteralExpr({}, data_type))
    return null(data_type)

first

first(column: ColumnOrName) -> Column

Aggregate function: returns the first non-null value in the specified column.

Typically used in aggregations to select the first observed value per group.

Parameters:

  • column (ColumnOrName) –

    Column or column name.

Returns:

  • Column

    Column expression for the first value.

Source code in src/fenic/api/functions/builtin.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def first(column: ColumnOrName) -> Column:
    """Aggregate function: returns the first non-null value in the specified column.

    Typically used in aggregations to select the first observed value per group.

    Args:
        column: Column or column name.

    Returns:
        Column expression for the first value.
    """
    return Column._from_logical_expr(
        FirstExpr(Column._from_col_or_name(column)._logical_expr)
    )

greatest

greatest(*cols: ColumnOrName) -> Column

Returns the greatest value from the given columns for each row.

This function mimics the behavior of SQL's GREATEST function. It evaluates the input columns in order and returns the greatest value encountered. If all values are null, returns null.

All arguments must be of the same primitive type (e.g., StringType, BooleanType, FloatType, IntegerType, etc).

Parameters:

  • *cols (ColumnOrName, default: () ) –

    Column expressions or column names to evaluate. Each argument should be a single column expression or column name string.

Returns:

  • Column

    A Column expression containing the greatest value from the input columns.

Raises:

greatest usage
df.select(fc.greatest("col1", "col2", "col3"))
Source code in src/fenic/api/functions/builtin.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def greatest(*cols: ColumnOrName) -> Column:
    """Returns the greatest value from the given columns for each row.

    This function mimics the behavior of SQL's GREATEST function. It evaluates the input columns
    in order and returns the greatest value encountered. If all values are null, returns null.

    All arguments must be of the same primitive type (e.g., StringType, BooleanType, FloatType, IntegerType, etc).

    Args:
        *cols: Column expressions or column names to evaluate. Each argument should be a single
            column expression or column name string.

    Returns:
        A Column expression containing the greatest value from the input columns.

    Raises:
        ValidationError: If fewer than two columns are provided.

    Example: greatest usage
        ```python
        df.select(fc.greatest("col1", "col2", "col3"))
        ```
    """
    if len(cols) < 2:
        raise ValidationError(f"greatest() requires at least 2 columns, got {len(cols)}")

    exprs = [
        Column._from_col_or_name(c)._logical_expr for c in cols
    ]
    return Column._from_logical_expr(GreatestExpr(exprs))

least

least(*cols: ColumnOrName) -> Column

Returns the least value from the given columns for each row.

This function mimics the behavior of SQL's LEAST function. It evaluates the input columns in order and returns the least value encountered. If all values are null, returns null.

All arguments must be of the same primitive type (e.g., StringType, BooleanType, FloatType, IntegerType, etc).

Parameters:

  • *cols (ColumnOrName, default: () ) –

    Column expressions or column names to evaluate. Each argument should be a single column expression or column name string.

Returns:

  • Column

    A Column expression containing the least value from the input columns.

Raises:

least usage
df.select(fc.least("col1", "col2", "col3"))
Source code in src/fenic/api/functions/builtin.py
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def least(*cols: ColumnOrName) -> Column:
    """Returns the least value from the given columns for each row.

    This function mimics the behavior of SQL's LEAST function. It evaluates the input columns
    in order and returns the least value encountered. If all values are null, returns null.

    All arguments must be of the same primitive type (e.g., StringType, BooleanType, FloatType, IntegerType, etc).

    Args:
        *cols: Column expressions or column names to evaluate. Each argument should be a single
            column expression or column name string.

    Returns:
        A Column expression containing the least value from the input columns.

    Raises:
        ValidationError: If fewer than two columns are provided.

    Example: least usage
        ```python
        df.select(fc.least("col1", "col2", "col3"))
        ```
    """
    if len(cols) < 2:
        raise ValidationError(f"least() requires at least 2 columns, got {len(cols)}")

    exprs = [
        Column._from_col_or_name(c)._logical_expr for c in cols
    ]
    return Column._from_logical_expr(LeastExpr(exprs))

lit

lit(value: Any) -> Column

Creates a Column expression representing a literal value.

Parameters:

  • value (Any) –

    The literal value to create a column for

Returns:

  • Column

    A Column expression representing the literal value

Raises: ValidationError: If the type of the value cannot be inferred

Source code in src/fenic/api/functions/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def lit(value: Any) -> Column:
    """Creates a Column expression representing a literal value.

    Args:
        value: The literal value to create a column for


    Returns:
        A Column expression representing the literal value
    Raises:
        ValidationError: If the type of the value cannot be inferred
    """
    if value is None:
        raise ValidationError("Cannot create a literal with value `None`. Use `null(...)` instead.")
    elif value == []:
        raise ValidationError(f"Cannot create a literal with empty value `{value}` Use `empty(ArrayType(...))` instead.")
    elif value == {}:
        raise ValidationError(f"Cannot create a literal with empty value `{value}` Use `empty(StructType(...))` instead.")
    try:
        inferred_type = infer_dtype_from_pyobj(value)
    except TypeInferenceError as e:
        raise ValidationError(f"`lit` failed to infer type for value `{value}`") from e
    literal_expr = LiteralExpr(value, inferred_type)
    return Column._from_logical_expr(literal_expr)

max

max(column: ColumnOrName) -> Column

Aggregate function: returns the maximum value in the specified column.

Parameters:

  • column (ColumnOrName) –

    Column or column name to compute the maximum of

Returns:

  • Column

    A Column expression representing the maximum aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def max(column: ColumnOrName) -> Column:
    """Aggregate function: returns the maximum value in the specified column.

    Args:
        column: Column or column name to compute the maximum of

    Returns:
        A Column expression representing the maximum aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        MaxExpr(Column._from_col_or_name(column)._logical_expr)
    )

mean

mean(column: ColumnOrName) -> Column

Aggregate function: returns the mean (average) of all values in the specified column.

Alias for avg().

Parameters:

  • column (ColumnOrName) –

    Column or column name to compute the mean of

Returns:

  • Column

    A Column expression representing the mean aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def mean(column: ColumnOrName) -> Column:
    """Aggregate function: returns the mean (average) of all values in the specified column.

    Alias for avg().

    Args:
        column: Column or column name to compute the mean of

    Returns:
        A Column expression representing the mean aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        AvgExpr(Column._from_col_or_name(column)._logical_expr)
    )

min

min(column: ColumnOrName) -> Column

Aggregate function: returns the minimum value in the specified column.

Parameters:

  • column (ColumnOrName) –

    Column or column name to compute the minimum of

Returns:

  • Column

    A Column expression representing the minimum aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def min(column: ColumnOrName) -> Column:
    """Aggregate function: returns the minimum value in the specified column.

    Args:
        column: Column or column name to compute the minimum of

    Returns:
        A Column expression representing the minimum aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        MinExpr(Column._from_col_or_name(column)._logical_expr)
    )

null

null(data_type: DataType) -> Column

Creates a Column expression representing a null value of the specified data type.

Regardless of the data type, the column will contain a null (None) value. This function is useful for creating columns with null values of a particular type.

Parameters:

  • data_type (DataType) –

    The data type of the null value

Returns:

  • Column

    A Column expression representing the null value

Raises:

Creating a column with a null value of a primitive type
# The newly created `b` column will have a value of `None` for all rows
df.select(fc.col("a"), fc.null(fc.IntegerType).alias("b"))
Creating a column with a null value of an array/struct type
# The newly created `b` and `c` columns will have a value of `None` for all rows
df.select(
    fc.col("a"),
    fc.null(fc.ArrayType(fc.IntegerType)).alias("b"),
    fc.null(fc.StructType([fc.StructField("b", fc.IntegerType)])).alias("c"),
)
Source code in src/fenic/api/functions/core.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def null(data_type: DataType) -> Column:
    """Creates a Column expression representing a null value of the specified data type.

    Regardless of the data type, the column will contain a null (None) value.
    This function is useful for creating columns with null values of a particular type.

    Args:
        data_type: The data type of the null value

    Returns:
        A Column expression representing the null value

    Raises:
        ValidationError: If the data type is not a valid data type

    Example: Creating a column with a null value of a primitive type
        ```python
        # The newly created `b` column will have a value of `None` for all rows
        df.select(fc.col("a"), fc.null(fc.IntegerType).alias("b"))
        ```

    Example: Creating a column with a null value of an array/struct type
        ```python
        # The newly created `b` and `c` columns will have a value of `None` for all rows
        df.select(
            fc.col("a"),
            fc.null(fc.ArrayType(fc.IntegerType)).alias("b"),
            fc.null(fc.StructType([fc.StructField("b", fc.IntegerType)])).alias("c"),
        )
        ```

    """
    return Column._from_logical_expr(LiteralExpr(None, data_type))

stddev

stddev(column: ColumnOrName) -> Column

Aggregate function: returns the sample standard deviation of the specified column.

Parameters:

  • column (ColumnOrName) –

    Column or column name.

Returns:

  • Column

    Column expression for sample standard deviation.

Source code in src/fenic/api/functions/builtin.py
188
189
190
191
192
193
194
195
196
197
198
199
200
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def stddev(column: ColumnOrName) -> Column:
    """Aggregate function: returns the sample standard deviation of the specified column.

    Args:
        column: Column or column name.

    Returns:
        Column expression for sample standard deviation.
    """
    return Column._from_logical_expr(
        StdDevExpr(Column._from_col_or_name(column)._logical_expr)
    )

struct

struct(*args: Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]]) -> Column

Creates a new struct column from multiple input columns.

Parameters:

  • *args (Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]], default: () ) –

    Columns or column names to combine into a struct. Can be:

    • Individual arguments
    • Lists of columns/column names
    • Tuples of columns/column names

Returns:

  • Column

    A Column expression representing a struct containing the input columns

Raises:

  • TypeError

    If any argument is not a Column, string, or collection of Columns/strings

Source code in src/fenic/api/functions/builtin.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def struct(
    *args: Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName, ...]]
) -> Column:
    """Creates a new struct column from multiple input columns.

    Args:
        *args: Columns or column names to combine into a struct. Can be:

            - Individual arguments
            - Lists of columns/column names
            - Tuples of columns/column names

    Returns:
        A Column expression representing a struct containing the input columns

    Raises:
        TypeError: If any argument is not a Column, string, or collection of
            Columns/strings
    """
    flattened_args = []
    for arg in args:
        if isinstance(arg, (list, tuple)):
            flattened_args.extend(arg)
        else:
            flattened_args.append(arg)

    expr_columns = [Column._from_col_or_name(c)._logical_expr for c in flattened_args]

    return Column._from_logical_expr(StructExpr(expr_columns))

sum

sum(column: ColumnOrName) -> Column

Aggregate function: returns the sum of all values in the specified column.

Parameters:

  • column (ColumnOrName) –

    Column or column name to compute the sum of

Returns:

  • Column

    A Column expression representing the sum aggregation

Raises:

  • TypeError

    If column is not a Column or string

Source code in src/fenic/api/functions/builtin.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def sum(column: ColumnOrName) -> Column:
    """Aggregate function: returns the sum of all values in the specified column.

    Args:
        column: Column or column name to compute the sum of

    Returns:
        A Column expression representing the sum aggregation

    Raises:
        TypeError: If column is not a Column or string
    """
    return Column._from_logical_expr(
        SumExpr(Column._from_col_or_name(column)._logical_expr)
    )

tool_param

tool_param(parameter_name: str, data_type: DataType) -> Column

Creates an unresolved literal placeholder column with a declared data type.

A placeholder argument for a DataFrame, representing a literal value to be provided at execution time. If no value is supplied, it defaults to null. Enables parameterized views and macros over fenic DataFrames.

Notes

Supports only Primitive/Object/ArrayLike Types (StringType, IntegerType, FloatType, DoubleType, BooleanType, StructType, ArrayType)

Parameters:

  • parameter_name (str) –

    The name of the parameter to reference.

  • data_type (DataType) –

    The expected data type for the parameter value.

Returns:

  • Column

    A Column wrapping an UnresolvedLiteralExpr for the given parameter.

A simple tool with one parameter

```python

Assume we are reading data with a name column.

df = session.read.csv(data.csv) parameterized_df = df.filter(fc.col("name").contains(fc.tool_param('query', StringType))) ... session.catalog.create_tool( tool_name="my_tool", tool_description="A tool that searches the name field", tool_query=parameterized_df, result_limit=100, tool_params=[ToolParam(name="query", description="The name should contain the following value")] )

A tool with multiple filters

```python

Assume we are reading data with an age column.

df = session.read.csv(users.csv)

create multiple filters that evaluate to true if a param is not passed.

optional_min = fc.coalesce(fc.col("age") >= tool_param("min_age", IntegerType), fc.lit(True)) optional_max = fc.coalesce(fc.col("age") <= tool_param("max_age", IntegerType), fc.lit(True)) core_filter = df.filter(optional_min & optional_max) session.catalog.create_tool( "users_filter", "Filter users by age", core_filter, tool_params=[ ToolParam(name="min_age", description="Minimum age", has_default=True, default_value=None), ToolParam(name="max_age", description="Maximum age", has_default=True, default_value=None), ] )

Source code in src/fenic/api/functions/core.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def tool_param(parameter_name: str, data_type: DataType) -> Column:
    """Creates an unresolved literal placeholder column with a declared data type.

    A placeholder argument for a DataFrame, representing a literal value to be provided at execution time. 
    If no value is supplied, it defaults to null. Enables parameterized views and macros over fenic DataFrames.

    Notes:
        Supports only Primitive/Object/ArrayLike Types (StringType, IntegerType, FloatType, DoubleType, BooleanType, StructType, ArrayType)

    Args:
        parameter_name: The name of the parameter to reference.
        data_type: The expected data type for the parameter value.

    Returns:
        A Column wrapping an UnresolvedLiteralExpr for the given parameter.

    Example: A simple tool with one parameter
        ```python
        # Assume we are reading data with a `name` column.
        df = session.read.csv(data.csv)
        parameterized_df = df.filter(fc.col("name").contains(fc.tool_param('query', StringType)))
        ...
        session.catalog.create_tool(
            tool_name="my_tool",
            tool_description="A tool that searches the name field",
            tool_query=parameterized_df,
            result_limit=100,
            tool_params=[ToolParam(name="query", description="The name should contain the following value")]
        )

    Example: A tool with multiple filters
        ```python
        # Assume we are reading data with an `age` column.
        df = session.read.csv(users.csv)
        # create multiple filters that evaluate to true if a param is not passed.
        optional_min = fc.coalesce(fc.col("age") >= tool_param("min_age", IntegerType), fc.lit(True))
        optional_max = fc.coalesce(fc.col("age") <= tool_param("max_age", IntegerType), fc.lit(True))
        core_filter = df.filter(optional_min & optional_max)
        session.catalog.create_tool(
            "users_filter",
            "Filter users by age",
            core_filter,
            tool_params=[
                ToolParam(name="min_age", description="Minimum age", has_default=True, default_value=None),
                ToolParam(name="max_age", description="Maximum age", has_default=True, default_value=None),
            ]
        )
    """
    if isinstance(data_type, _LogicalType):
        raise ValidationError(f"Cannot use a logical type as a parameter type: {data_type}")

    return Column._from_logical_expr(UnresolvedLiteralExpr(data_type=data_type, parameter_name=parameter_name))

udf

udf(f: Optional[Callable] = None, *, return_type: DataType)

A decorator or function for creating user-defined functions (UDFs) that can be applied to DataFrame rows.

Warning

UDFs cannot be serialized and are not supported in cloud execution. User-defined functions contain arbitrary Python code that cannot be transmitted to remote workers. For cloud compatibility, use built-in fenic functions instead.

When applied, UDFs will: - Access StructType columns as Python dictionaries (dict[str, Any]). - Access ArrayType columns as Python lists (list[Any]). - Access primitive types (e.g., int, float, str) as their respective Python types.

Parameters:

  • f (Optional[Callable], default: None ) –

    Python function to convert to UDF

  • return_type (DataType) –

    Expected return type of the UDF. Required parameter.

UDF with primitive types
# UDF with primitive types
@udf(return_type=IntegerType)
def add_one(x: int):
    return x + 1

# Or
add_one = udf(lambda x: x + 1, return_type=IntegerType)
UDF with nested types
# UDF with nested types
@udf(return_type=StructType([StructField("value1", IntegerType), StructField("value2", IntegerType)]))
def example_udf(x: dict[str, int], y: list[int]):
    return {
        "value1": x["value1"] + x["value2"] + y[0],
        "value2": x["value1"] + x["value2"] + y[1],
    }
Source code in src/fenic/api/functions/builtin.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def udf(f: Optional[Callable] = None, *, return_type: DataType):
    """A decorator or function for creating user-defined functions (UDFs) that can be applied to DataFrame rows.

    Warning:
        UDFs cannot be serialized and are not supported in cloud execution.
        User-defined functions contain arbitrary Python code that cannot be transmitted
        to remote workers. For cloud compatibility, use built-in fenic functions instead.

    When applied, UDFs will:
    - Access `StructType` columns as Python dictionaries (`dict[str, Any]`).
    - Access `ArrayType` columns as Python lists (`list[Any]`).
    - Access primitive types (e.g., `int`, `float`, `str`) as their respective Python types.

    Args:
        f: Python function to convert to UDF

        return_type: Expected return type of the UDF. Required parameter.

    Example: UDF with primitive types
        ```python
        # UDF with primitive types
        @udf(return_type=IntegerType)
        def add_one(x: int):
            return x + 1

        # Or
        add_one = udf(lambda x: x + 1, return_type=IntegerType)
        ```

    Example: UDF with nested types
        ```python
        # UDF with nested types
        @udf(return_type=StructType([StructField("value1", IntegerType), StructField("value2", IntegerType)]))
        def example_udf(x: dict[str, int], y: list[int]):
            return {
                "value1": x["value1"] + x["value2"] + y[0],
                "value2": x["value1"] + x["value2"] + y[1],
            }
        ```
    """

    def _create_udf(func: Callable) -> Callable:
        @wraps(func)
        def _udf_wrapper(*cols: ColumnOrName) -> Column:
            col_exprs = [Column._from_col_or_name(c)._logical_expr for c in cols]
            return Column._from_logical_expr(UDFExpr(func, col_exprs, return_type))

        return _udf_wrapper

    if _is_logical_type(return_type):
        raise NotImplementedError(f"return_type {return_type} is not supported for UDFs")

    if f is not None:
        return _create_udf(f)
    return _create_udf

when

when(condition: Column, value: Column) -> Column

Evaluates a condition and returns a value if true.

This function is used to create conditional expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.

Parameters:

  • condition (Column) –

    A boolean Column expression to evaluate.

  • value (Column) –

    A Column expression to return if the condition is true.

Returns:

  • Column

    A Column expression that evaluates the condition and returns the specified value when true,

  • Column

    and None otherwise.

Raises:

  • TypeError

    If the condition is not a boolean Column expression.

Basic conditional expression
# Basic usage
df.select(when(col("age") > 18, lit("adult")))

# With otherwise
df.select(when(col("age") > 18, lit("adult")).otherwise(lit("minor")))
Source code in src/fenic/api/functions/builtin.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def when(condition: Column, value: Column) -> Column:
    """Evaluates a condition and returns a value if true.

    This function is used to create conditional expressions. If Column.otherwise() is not invoked,
    None is returned for unmatched conditions.

    Args:
        condition: A boolean Column expression to evaluate.

        value: A Column expression to return if the condition is true.

    Returns:
        A Column expression that evaluates the condition and returns the specified value when true,
        and None otherwise.

    Raises:
        TypeError: If the condition is not a boolean Column expression.

    Example: Basic conditional expression
        ```python
        # Basic usage
        df.select(when(col("age") > 18, lit("adult")))

        # With otherwise
        df.select(when(col("age") > 18, lit("adult")).otherwise(lit("minor")))
        ```
    """
    return Column._from_logical_expr(
        WhenExpr(None, condition._logical_expr, value._logical_expr)
    )