Skip to content

fenic.api.io.reader

Reader interface for loading DataFrames from external storage systems.

Classes:

  • DataFrameReader

    Interface used to load a DataFrame from external storage systems.

DataFrameReader

DataFrameReader(session_state: BaseSessionState)

Interface used to load a DataFrame from external storage systems.

Similar to PySpark's DataFrameReader.

Creates a DataFrameReader.

Parameters:

  • session_state (BaseSessionState) –

    The session state to use for reading

Methods:

  • csv

    Load a DataFrame from one or more CSV files.

  • parquet

    Load a DataFrame from one or more Parquet files.

Source code in src/fenic/api/io/reader.py
25
26
27
28
29
30
31
32
def __init__(self, session_state: BaseSessionState):
    """Creates a DataFrameReader.

    Args:
        session_state: The session state to use for reading
    """
    self._options: Dict[str, Any] = {}
    self._session_state = session_state

csv

csv(paths: Union[str, Path, list[Union[str, Path]]], schema: Optional[Schema] = None, merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more CSV files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.

  • schema (Optional[Schema], default: None ) –

    (optional) A complete schema definition of column names and their types. Only primitive types are supported. - For e.g.: - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)]) - If provided, all files must match this schema exactly—all column names must be present, and values must be convertible to the specified types. Partial schemas are not allowed.

  • merge_schemas (bool, default: False ) –

    Whether to merge schemas across all files. - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are inferred and widened as needed. - If False (default): Only accepts columns from the first file. Column types from the first file are inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised. - The "first file" is defined as: - The first file in lexicographic order (for glob patterns), or - The first file in the provided list (for lists of paths).

Notes
  • The first row in each file is assumed to be a header row.
  • Delimiters (e.g., comma, tab) are automatically inferred.
  • You may specify either schema or merge_schemas=True, but not both.
  • Any date/datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If both schema and merge_schemas=True are provided.

  • ValidationError

    If any path does not end with .csv.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single CSV file
df = session.read.csv("file.csv")
Read multiple CSV files with schema merging
df = session.read.csv("data/*.csv", merge_schemas=True)
Read CSV files with explicit schema

python df = session.read.csv( ["a.csv", "b.csv"], schema=Schema([ ColumnField(name="id", data_type=IntegerType), ColumnField(name="value", data_type=FloatType) ]) )

Source code in src/fenic/api/io/reader.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def csv(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    schema: Optional[Schema] = None,
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more CSV files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.
        schema: (optional) A complete schema definition of column names and their types. Only primitive types are supported.
            - For e.g.:
                - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)])
            - If provided, all files must match this schema exactly—all column names must be present, and values must be
            convertible to the specified types. Partial schemas are not allowed.
        merge_schemas: Whether to merge schemas across all files.
            - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are
            inferred and widened as needed.
            - If False (default): Only accepts columns from the first file. Column types from the first file are
            inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised.
            - The "first file" is defined as:
                - The first file in lexicographic order (for glob patterns), or
                - The first file in the provided list (for lists of paths).

    Notes:
        - The first row in each file is assumed to be a header row.
        - Delimiters (e.g., comma, tab) are automatically inferred.
        - You may specify either `schema` or `merge_schemas=True`, but not both.
        - Any date/datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If both `schema` and `merge_schemas=True` are provided.
        ValidationError: If any path does not end with `.csv`.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single CSV file
        ```python
        df = session.read.csv("file.csv")
        ```

    Example: Read multiple CSV files with schema merging
        ```python
        df = session.read.csv("data/*.csv", merge_schemas=True)
        ```

    Example: Read CSV files with explicit schema
        ```python
        df = session.read.csv(
            ["a.csv", "b.csv"],
            schema=Schema([
                ColumnField(name="id", data_type=IntegerType),
                ColumnField(name="value", data_type=FloatType)
            ])
        )            ```
    """
    if schema is not None and merge_schemas:
        raise ValidationError(
            "Cannot specify both 'schema' and 'merge_schemas=True' - these options conflict. "
            "Choose one approach: "
            "1) Use 'schema' to enforce a specific schema: csv(paths, schema=your_schema), "
            "2) Use 'merge_schemas=True' to automatically merge schemas: csv(paths, merge_schemas=True), "
            "3) Use neither to inherit schema from the first file: csv(paths)"
        )
    if schema is not None:
        for col_field in schema.column_fields:
            if not isinstance(
                col_field.data_type,
                _PrimitiveType,
            ):
                raise ValidationError(
                    f"CSV files only support primitive data types in schema definitions. "
                    f"Column '{col_field.name}' has type {type(col_field.data_type).__name__}, but CSV schemas must use: "
                    f"IntegerType, FloatType, DoubleType, BooleanType, or StringType. "
                    f"Example: Schema([ColumnField(name='id', data_type=IntegerType), ColumnField(name='name', data_type=StringType)])"
                )
    options = {
        "schema": schema,
        "merge_schemas": merge_schemas,
    }
    return self._read_file(
        paths, file_format="csv", file_extension=".csv", **options
    )

parquet

parquet(paths: Union[str, Path, list[Union[str, Path]]], merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more Parquet files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.

  • merge_schemas (bool, default: False ) –

    If True, infers and merges schemas across all files. Missing columns are filled with nulls, and differing types are widened to a common supertype.

Behavior
  • If merge_schemas=False (default), all files must match the schema of the first file exactly. Subsequent files must contain all columns from the first file with compatible data types. If any column is missing or has incompatible types, an error is raised.
  • If merge_schemas=True, column names are unified across all files, and data types are automatically widened to accommodate all values.
  • The "first file" is defined as:
    • The first file in lexicographic order (for glob patterns), or
    • The first file in the provided list (for lists of paths).
Notes
  • Date and datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If any file does not have a .parquet extension.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single Parquet file
df = session.read.parquet("file.parquet")
Read multiple Parquet files
df = session.read.parquet("data/*.parquet")
Read Parquet files with schema merging
df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
Source code in src/fenic/api/io/reader.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def parquet(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more Parquet files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.
        merge_schemas: If True, infers and merges schemas across all files.
            Missing columns are filled with nulls, and differing types are widened to a common supertype.

    Behavior:
        - If `merge_schemas=False` (default), all files must match the schema of the first file exactly.
        Subsequent files must contain all columns from the first file with compatible data types.
        If any column is missing or has incompatible types, an error is raised.
        - If `merge_schemas=True`, column names are unified across all files, and data types are automatically
        widened to accommodate all values.
        - The "first file" is defined as:
            - The first file in lexicographic order (for glob patterns), or
            - The first file in the provided list (for lists of paths).

    Notes:
        - Date and datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If any file does not have a `.parquet` extension.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single Parquet file
        ```python
        df = session.read.parquet("file.parquet")
        ```

    Example: Read multiple Parquet files
        ```python
        df = session.read.parquet("data/*.parquet")
        ```

    Example: Read Parquet files with schema merging
        ```python
        df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
        ```
    """
    options = {
        "merge_schemas": merge_schemas,
    }
    return self._read_file(
        paths, file_format="parquet", file_extension=".parquet", **options
    )