Skip to content

fenic.api.io

IO module for reading and writing DataFrames to external storage.

Classes:

  • DataFrameReader

    Interface used to load a DataFrame from external storage systems.

  • DataFrameWriter

    Interface used to write a DataFrame to external storage systems.

DataFrameReader

DataFrameReader(session_state: BaseSessionState)

Interface used to load a DataFrame from external storage systems.

Similar to PySpark's DataFrameReader.

Creates a DataFrameReader.

Parameters:

  • session_state (BaseSessionState) –

    The session state to use for reading

Methods:

  • csv

    Load a DataFrame from one or more CSV files.

  • parquet

    Load a DataFrame from one or more Parquet files.

Source code in src/fenic/api/io/reader.py
25
26
27
28
29
30
31
32
def __init__(self, session_state: BaseSessionState):
    """Creates a DataFrameReader.

    Args:
        session_state: The session state to use for reading
    """
    self._options: Dict[str, Any] = {}
    self._session_state = session_state

csv

csv(paths: Union[str, Path, list[Union[str, Path]]], schema: Optional[Schema] = None, merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more CSV files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.

  • schema (Optional[Schema], default: None ) –

    (optional) A complete schema definition of column names and their types. Only primitive types are supported. - For e.g.: - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)]) - If provided, all files must match this schema exactly—all column names must be present, and values must be convertible to the specified types. Partial schemas are not allowed.

  • merge_schemas (bool, default: False ) –

    Whether to merge schemas across all files. - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are inferred and widened as needed. - If False (default): Only accepts columns from the first file. Column types from the first file are inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised. - The "first file" is defined as: - The first file in lexicographic order (for glob patterns), or - The first file in the provided list (for lists of paths).

Notes
  • The first row in each file is assumed to be a header row.
  • Delimiters (e.g., comma, tab) are automatically inferred.
  • You may specify either schema or merge_schemas=True, but not both.
  • Any date/datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If both schema and merge_schemas=True are provided.

  • ValidationError

    If any path does not end with .csv.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single CSV file
df = session.read.csv("file.csv")
Read multiple CSV files with schema merging
df = session.read.csv("data/*.csv", merge_schemas=True)
Read CSV files with explicit schema

python df = session.read.csv( ["a.csv", "b.csv"], schema=Schema([ ColumnField(name="id", data_type=IntegerType), ColumnField(name="value", data_type=FloatType) ]) )

Source code in src/fenic/api/io/reader.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def csv(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    schema: Optional[Schema] = None,
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more CSV files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.
        schema: (optional) A complete schema definition of column names and their types. Only primitive types are supported.
            - For e.g.:
                - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)])
            - If provided, all files must match this schema exactly—all column names must be present, and values must be
            convertible to the specified types. Partial schemas are not allowed.
        merge_schemas: Whether to merge schemas across all files.
            - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are
            inferred and widened as needed.
            - If False (default): Only accepts columns from the first file. Column types from the first file are
            inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised.
            - The "first file" is defined as:
                - The first file in lexicographic order (for glob patterns), or
                - The first file in the provided list (for lists of paths).

    Notes:
        - The first row in each file is assumed to be a header row.
        - Delimiters (e.g., comma, tab) are automatically inferred.
        - You may specify either `schema` or `merge_schemas=True`, but not both.
        - Any date/datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If both `schema` and `merge_schemas=True` are provided.
        ValidationError: If any path does not end with `.csv`.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single CSV file
        ```python
        df = session.read.csv("file.csv")
        ```

    Example: Read multiple CSV files with schema merging
        ```python
        df = session.read.csv("data/*.csv", merge_schemas=True)
        ```

    Example: Read CSV files with explicit schema
        ```python
        df = session.read.csv(
            ["a.csv", "b.csv"],
            schema=Schema([
                ColumnField(name="id", data_type=IntegerType),
                ColumnField(name="value", data_type=FloatType)
            ])
        )            ```
    """
    if schema is not None and merge_schemas:
        raise ValidationError(
            "Cannot specify both 'schema' and 'merge_schemas=True' - these options conflict. "
            "Choose one approach: "
            "1) Use 'schema' to enforce a specific schema: csv(paths, schema=your_schema), "
            "2) Use 'merge_schemas=True' to automatically merge schemas: csv(paths, merge_schemas=True), "
            "3) Use neither to inherit schema from the first file: csv(paths)"
        )
    if schema is not None:
        for col_field in schema.column_fields:
            if not isinstance(
                col_field.data_type,
                _PrimitiveType,
            ):
                raise ValidationError(
                    f"CSV files only support primitive data types in schema definitions. "
                    f"Column '{col_field.name}' has type {type(col_field.data_type).__name__}, but CSV schemas must use: "
                    f"IntegerType, FloatType, DoubleType, BooleanType, or StringType. "
                    f"Example: Schema([ColumnField(name='id', data_type=IntegerType), ColumnField(name='name', data_type=StringType)])"
                )
    options = {
        "schema": schema,
        "merge_schemas": merge_schemas,
    }
    return self._read_file(
        paths, file_format="csv", file_extension=".csv", **options
    )

parquet

parquet(paths: Union[str, Path, list[Union[str, Path]]], merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more Parquet files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.

  • merge_schemas (bool, default: False ) –

    If True, infers and merges schemas across all files. Missing columns are filled with nulls, and differing types are widened to a common supertype.

Behavior
  • If merge_schemas=False (default), all files must match the schema of the first file exactly. Subsequent files must contain all columns from the first file with compatible data types. If any column is missing or has incompatible types, an error is raised.
  • If merge_schemas=True, column names are unified across all files, and data types are automatically widened to accommodate all values.
  • The "first file" is defined as:
    • The first file in lexicographic order (for glob patterns), or
    • The first file in the provided list (for lists of paths).
Notes
  • Date and datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If any file does not have a .parquet extension.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single Parquet file
df = session.read.parquet("file.parquet")
Read multiple Parquet files
df = session.read.parquet("data/*.parquet")
Read Parquet files with schema merging
df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
Source code in src/fenic/api/io/reader.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def parquet(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more Parquet files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.
        merge_schemas: If True, infers and merges schemas across all files.
            Missing columns are filled with nulls, and differing types are widened to a common supertype.

    Behavior:
        - If `merge_schemas=False` (default), all files must match the schema of the first file exactly.
        Subsequent files must contain all columns from the first file with compatible data types.
        If any column is missing or has incompatible types, an error is raised.
        - If `merge_schemas=True`, column names are unified across all files, and data types are automatically
        widened to accommodate all values.
        - The "first file" is defined as:
            - The first file in lexicographic order (for glob patterns), or
            - The first file in the provided list (for lists of paths).

    Notes:
        - Date and datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If any file does not have a `.parquet` extension.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single Parquet file
        ```python
        df = session.read.parquet("file.parquet")
        ```

    Example: Read multiple Parquet files
        ```python
        df = session.read.parquet("data/*.parquet")
        ```

    Example: Read Parquet files with schema merging
        ```python
        df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
        ```
    """
    options = {
        "merge_schemas": merge_schemas,
    }
    return self._read_file(
        paths, file_format="parquet", file_extension=".parquet", **options
    )

DataFrameWriter

DataFrameWriter(dataframe: DataFrame)

Interface used to write a DataFrame to external storage systems.

Similar to PySpark's DataFrameWriter.

Initialize a DataFrameWriter.

Parameters:

  • dataframe (DataFrame) –

    The DataFrame to write.

Methods:

  • csv

    Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

  • parquet

    Saves the content of the DataFrame as a single Parquet file.

  • save_as_table

    Saves the content of the DataFrame as the specified table.

Source code in src/fenic/api/io/writer.py
27
28
29
30
31
32
33
def __init__(self, dataframe: DataFrame):
    """Initialize a DataFrameWriter.

    Args:
        dataframe: The DataFrame to write.
    """
    self._dataframe = dataframe

csv

csv(file_path: Union[str, Path], mode: Literal['error', 'overwrite', 'ignore'] = 'overwrite') -> QueryMetrics

Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

Parameters:

  • file_path (Union[str, Path]) –

    Path to save the CSV file to

  • mode (Literal['error', 'overwrite', 'ignore'], default: 'overwrite' ) –

    Write mode. Default is "overwrite". - error: Raises an error if file exists - overwrite: Overwrites the file if it exists - ignore: Silently ignores operation if file exists

Returns:

Save with overwrite mode (default)
df.write.csv("output.csv")  # Overwrites if exists
Save with error mode
df.write.csv("output.csv", mode="error")  # Raises error if exists
Save with ignore mode
df.write.csv("output.csv", mode="ignore")  # Skips if exists
Source code in src/fenic/api/io/writer.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def csv(
    self,
    file_path: Union[str, Path],
    mode: Literal["error", "overwrite", "ignore"] = "overwrite",
) -> QueryMetrics:
    """Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

    Args:
        file_path: Path to save the CSV file to
        mode: Write mode. Default is "overwrite".
             - error: Raises an error if file exists
             - overwrite: Overwrites the file if it exists
             - ignore: Silently ignores operation if file exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with overwrite mode (default)
        ```python
        df.write.csv("output.csv")  # Overwrites if exists
        ```

    Example: Save with error mode
        ```python
        df.write.csv("output.csv", mode="error")  # Raises error if exists
        ```

    Example: Save with ignore mode
        ```python
        df.write.csv("output.csv", mode="ignore")  # Skips if exists
        ```
    """
    file_path = str(file_path)
    if not file_path.endswith(".csv"):
        raise ValidationError(
            f"CSV writer requires a '.csv' file extension. "
            f"Your path '{file_path}' is missing the extension."
        )

    sink_plan = FileSink(
        child=self._dataframe._logical_plan,
        sink_type="csv",
        path=file_path,
        mode=mode,
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_to_file(
        sink_plan, file_path=file_path, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics

parquet

parquet(file_path: Union[str, Path], mode: Literal['error', 'overwrite', 'ignore'] = 'overwrite') -> QueryMetrics

Saves the content of the DataFrame as a single Parquet file.

Parameters:

  • file_path (Union[str, Path]) –

    Path to save the Parquet file to

  • mode (Literal['error', 'overwrite', 'ignore'], default: 'overwrite' ) –

    Write mode. Default is "overwrite". - error: Raises an error if file exists - overwrite: Overwrites the file if it exists - ignore: Silently ignores operation if file exists

Returns:

Save with overwrite mode (default)
df.write.parquet("output.parquet")  # Overwrites if exists
Save with error mode
df.write.parquet("output.parquet", mode="error")  # Raises error if exists
Save with ignore mode
df.write.parquet("output.parquet", mode="ignore")  # Skips if exists
Source code in src/fenic/api/io/writer.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def parquet(
    self,
    file_path: Union[str, Path],
    mode: Literal["error", "overwrite", "ignore"] = "overwrite",
) -> QueryMetrics:
    """Saves the content of the DataFrame as a single Parquet file.

    Args:
        file_path: Path to save the Parquet file to
        mode: Write mode. Default is "overwrite".
             - error: Raises an error if file exists
             - overwrite: Overwrites the file if it exists
             - ignore: Silently ignores operation if file exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with overwrite mode (default)
        ```python
        df.write.parquet("output.parquet")  # Overwrites if exists
        ```

    Example: Save with error mode
        ```python
        df.write.parquet("output.parquet", mode="error")  # Raises error if exists
        ```

    Example: Save with ignore mode
        ```python
        df.write.parquet("output.parquet", mode="ignore")  # Skips if exists
        ```
    """
    file_path = str(file_path)
    if not file_path.endswith(".parquet"):
        raise ValidationError(
            f"Parquet writer requires a '.parquet' file extension. "
            f"Your path '{file_path}' is missing the extension."
        )

    sink_plan = FileSink(
        child=self._dataframe._logical_plan,
        sink_type="parquet",
        path=file_path,
        mode=mode,
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_to_file(
        sink_plan, file_path=file_path, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics

save_as_table

save_as_table(table_name: str, mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error') -> QueryMetrics

Saves the content of the DataFrame as the specified table.

Parameters:

  • table_name (str) –

    Name of the table to save to

  • mode (Literal['error', 'append', 'overwrite', 'ignore'], default: 'error' ) –

    Write mode. Default is "error". - error: Raises an error if table exists - append: Appends data to table if it exists - overwrite: Overwrites existing table - ignore: Silently ignores operation if table exists

Returns:

Save with error mode (default)
df.write.save_as_table("my_table")  # Raises error if table exists
Save with append mode
df.write.save_as_table("my_table", mode="append")  # Adds to existing table
Save with overwrite mode
df.write.save_as_table("my_table", mode="overwrite")  # Replaces existing table
Source code in src/fenic/api/io/writer.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def save_as_table(
    self,
    table_name: str,
    mode: Literal["error", "append", "overwrite", "ignore"] = "error",
) -> QueryMetrics:
    """Saves the content of the DataFrame as the specified table.

    Args:
        table_name: Name of the table to save to
        mode: Write mode. Default is "error".
             - error: Raises an error if table exists
             - append: Appends data to table if it exists
             - overwrite: Overwrites existing table
             - ignore: Silently ignores operation if table exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with error mode (default)
        ```python
        df.write.save_as_table("my_table")  # Raises error if table exists
        ```

    Example: Save with append mode
        ```python
        df.write.save_as_table("my_table", mode="append")  # Adds to existing table
        ```

    Example: Save with overwrite mode
        ```python
        df.write.save_as_table("my_table", mode="overwrite")  # Replaces existing table
        ```
    """
    sink_plan = TableSink(
        child=self._dataframe._logical_plan, table_name=table_name, mode=mode
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_as_table(
        sink_plan, table_name=table_name, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics