Skip to content

fenic.api.io.reader

Reader interface for loading DataFrames from external storage systems.

Classes:

  • DataFrameReader

    Interface used to load a DataFrame from external storage systems.

DataFrameReader

DataFrameReader(session_state: BaseSessionState)

Interface used to load a DataFrame from external storage systems.

Similar to PySpark's DataFrameReader.

Supported External Storage Schemes: - Amazon S3 (s3://) - Format: s3://{bucket_name}/{path_to_file}

- Notes:
    - Uses boto3 to aquire AWS credentials.

- Examples:
    - s3://my-bucket/data.csv
    - s3://my-bucket/data/*.parquet
  • Hugging Face Datasets (hf://)

    • Format: hf://{repo_type}/{repo_id}/{path_to_file}

    • Notes:

      • Supports glob patterns (, *)
      • Supports dataset revisions and branch aliases (e.g., @refs/convert/parquet, @~parquet)
      • HF_TOKEN environment variable is required to read private datasets.
    • Examples:

      • hf://datasets/datasets-examples/doc-formats-csv-1/data.csv
      • hf://datasets/cais/mmlu/astronomy/*.parquet
      • hf://datasets/datasets-examples/doc-formats-csv-1@~parquet/*/.parquet
  • Local Files (file:// or implicit)

    • Format: file://{absolute_or_relative_path}

    • Notes:

      • Paths without a scheme (e.g., ./data.csv or /tmp/data.parquet) are treated as local files
    • Examples:
      • file:///home/user/data.csv
      • ./data/*.parquet

Creates a DataFrameReader.

Parameters:

  • session_state (BaseSessionState) –

    The session state to use for reading

Methods:

  • csv

    Load a DataFrame from one or more CSV files.

  • docs

    Load a DataFrame from a list of paths of documents (markdown or json).

  • parquet

    Load a DataFrame from one or more Parquet files.

Source code in src/fenic/api/io/reader.py
60
61
62
63
64
65
66
67
def __init__(self, session_state: BaseSessionState):
    """Creates a DataFrameReader.

    Args:
        session_state: The session state to use for reading
    """
    self._options: Dict[str, Any] = {}
    self._session_state = session_state

csv

csv(paths: Union[str, Path, list[Union[str, Path]]], schema: Optional[Schema] = None, merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more CSV files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.

  • schema (Optional[Schema], default: None ) –

    (optional) A complete schema definition of column names and their types. Only primitive types are supported. - For e.g.: - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)]) - If provided, all files must match this schema exactly—all column names must be present, and values must be convertible to the specified types. Partial schemas are not allowed.

  • merge_schemas (bool, default: False ) –

    Whether to merge schemas across all files. - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are inferred and widened as needed. - If False (default): Only accepts columns from the first file. Column types from the first file are inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised. - The "first file" is defined as: - The first file in lexicographic order (for glob patterns), or - The first file in the provided list (for lists of paths).

Notes
  • The first row in each file is assumed to be a header row.
  • Delimiters (e.g., comma, tab) are automatically inferred.
  • You may specify either schema or merge_schemas=True, but not both.
  • Any date/datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If both schema and merge_schemas=True are provided.

  • ValidationError

    If any path does not end with .csv.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single CSV file
df = session.read.csv("file.csv")
Read multiple CSV files with schema merging
df = session.read.csv("data/*.csv", merge_schemas=True)
Read CSV files with explicit schema

python df = session.read.csv( ["a.csv", "b.csv"], schema=Schema([ ColumnField(name="id", data_type=IntegerType), ColumnField(name="value", data_type=FloatType) ]) )

Source code in src/fenic/api/io/reader.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def csv(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    schema: Optional[Schema] = None,
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more CSV files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.
        schema: (optional) A complete schema definition of column names and their types. Only primitive types are supported.
            - For e.g.:
                - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)])
            - If provided, all files must match this schema exactly—all column names must be present, and values must be
            convertible to the specified types. Partial schemas are not allowed.
        merge_schemas: Whether to merge schemas across all files.
            - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are
            inferred and widened as needed.
            - If False (default): Only accepts columns from the first file. Column types from the first file are
            inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised.
            - The "first file" is defined as:
                - The first file in lexicographic order (for glob patterns), or
                - The first file in the provided list (for lists of paths).

    Notes:
        - The first row in each file is assumed to be a header row.
        - Delimiters (e.g., comma, tab) are automatically inferred.
        - You may specify either `schema` or `merge_schemas=True`, but not both.
        - Any date/datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If both `schema` and `merge_schemas=True` are provided.
        ValidationError: If any path does not end with `.csv`.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single CSV file
        ```python
        df = session.read.csv("file.csv")
        ```

    Example: Read multiple CSV files with schema merging
        ```python
        df = session.read.csv("data/*.csv", merge_schemas=True)
        ```

    Example: Read CSV files with explicit schema
        ```python
        df = session.read.csv(
            ["a.csv", "b.csv"],
            schema=Schema([
                ColumnField(name="id", data_type=IntegerType),
                ColumnField(name="value", data_type=FloatType)
            ])
        )            ```
    """
    if schema is not None and merge_schemas:
        raise ValidationError(
            "Cannot specify both 'schema' and 'merge_schemas=True' - these options conflict. "
            "Choose one approach: "
            "1) Use 'schema' to enforce a specific schema: csv(paths, schema=your_schema), "
            "2) Use 'merge_schemas=True' to automatically merge schemas: csv(paths, merge_schemas=True), "
            "3) Use neither to inherit schema from the first file: csv(paths)"
        )
    if schema is not None:
        for col_field in schema.column_fields:
            if not isinstance(
                col_field.data_type,
                _PrimitiveType,
            ):
                raise ValidationError(
                    f"CSV files only support primitive data types in schema definitions. "
                    f"Column '{col_field.name}' has type {type(col_field.data_type).__name__}, but CSV schemas must use: "
                    f"IntegerType, FloatType, DoubleType, BooleanType, or StringType. "
                    f"Example: Schema([ColumnField(name='id', data_type=IntegerType), ColumnField(name='name', data_type=StringType)])"
                )
    options = {
        "merge_schemas": merge_schemas,
    }
    if schema:
        options["schema"] = schema
    return self._read_file(
        paths, file_format="csv", file_extension=".csv", **options
    )

docs

docs(paths: Union[str, list[str]], data_type: Union[MarkdownType, JsonType], exclude: Optional[str] = None, recursive: bool = False) -> DataFrame

Load a DataFrame from a list of paths of documents (markdown or json).

Parameters:

  • paths (Union[str, list[str]]) –

    Glob pattern (or list of glob patterns) to the folder(s) to load.

  • data_type (Union[MarkdownType, JsonType]) –

    Data type that will be used to cast the content of the files. One of MarkdownType or JsonType.

  • exclude (Optional[str], default: None ) –

    A regex pattern to exclude files. If it is not provided no files will be excluded.

  • recursive (bool, default: False ) –

    Whether to recursively load files from the folder.

Returns:

  • DataFrame ( DataFrame ) –

    A dataframe with all the documents found in the paths. Each document is a row in the dataframe.

Raises:

Notes
  • Each row in the dataframe corresponds to a file in the list of paths.
  • The dataframe has the following columns:
    • file_path: The path to the file.
    • error: The error message if the file failed to be loaded.
    • content: The content of the file casted to the data_type.
  • Recursive loading is supported in conjunction with the '*' glob pattern, e.g. data/**/*.md will load all markdown files in the data folder and all subfolders when recursive is set to True. Without recursive = True, then ** behaves like a single '' pattern.
Read all the markdown files in a folder and all its subfolders.
df = session.read.docs("data/docs/**/*.md", data_type=MarkdownType, recursive=True)
Read a folder of markdown files excluding some files.
df = session.read.docs("data/docs/*.md", data_type=MarkdownType, exclude=r"\.bak.md$")
Source code in src/fenic/api/io/reader.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def docs(
        self,
        paths: Union[str, list[str]],
        data_type: Union[MarkdownType, JsonType],
        exclude: Optional[str] = None,
        recursive: bool = False,
) -> DataFrame:
    r"""Load a DataFrame from a list of paths of documents (markdown or json).

    Args:
        paths: Glob pattern (or list of glob patterns) to the folder(s) to load.
        data_type: Data type that will be used to cast the content of the files.
                   One of MarkdownType or JsonType.
        exclude: A regex pattern to exclude files.
                 If it is not provided no files will be excluded.
        recursive: Whether to recursively load files from the folder.

    Returns:
        DataFrame: A dataframe with all the documents found in the paths.
                   Each document is a row in the dataframe.

    Raises:
        ValidationError: If any file does not have a `.md` or `.json` depending on the data_type.
        UnsupportedFileTypeError: If the data_type is not supported.

    Notes:
        - Each row in the dataframe corresponds to a file in the list of paths.
        - The dataframe has the following columns:
            - file_path: The path to the file.
            - error: The error message if the file failed to be loaded.
            - content: The content of the file casted to the data_type.
        - Recursive loading is supported in conjunction with the '**' glob pattern,
          e.g. `data/**/*.md` will load all markdown files in the `data` folder and all subfolders
               when recursive is set to True.
          Without recursive = True, then ** behaves like a single '*' pattern.

    Example: Read all the markdown files in a folder and all its subfolders.
        ```python
        df = session.read.docs("data/docs/**/*.md", data_type=MarkdownType, recursive=True)
        ```

    Example: Read a folder of markdown files excluding some files.
        ```python
        df = session.read.docs("data/docs/*.md", data_type=MarkdownType, exclude=r"\.bak.md$")
        ```

    """
    if data_type not in [MarkdownType, JsonType]:
        raise UnsupportedFileTypeError(f"Unsupported file type: {data_type}")

    if isinstance(paths, str):
        paths = [paths]

    valid_file_extension = "md" if data_type == MarkdownType else "json"
    logical_node = DocSource.from_session_state(
        paths=paths,
        valid_file_extension=valid_file_extension,
        exclude=exclude,
        recursive=recursive,
        session_state=self._session_state,
    )
    from fenic.api.dataframe import DataFrame

    df = DataFrame._from_logical_plan(logical_node, self._session_state)
    df = df.select(
        col("file_path"),
        col("error"),
        col("content").cast(data_type).alias("content"),
    )
    return df

parquet

parquet(paths: Union[str, Path, list[Union[str, Path]]], merge_schemas: bool = False) -> DataFrame

Load a DataFrame from one or more Parquet files.

Parameters:

  • paths (Union[str, Path, list[Union[str, Path]]]) –

    A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.

  • merge_schemas (bool, default: False ) –

    If True, infers and merges schemas across all files. Missing columns are filled with nulls, and differing types are widened to a common supertype.

Behavior
  • If merge_schemas=False (default), all files must match the schema of the first file exactly. Subsequent files must contain all columns from the first file with compatible data types. If any column is missing or has incompatible types, an error is raised.
  • If merge_schemas=True, column names are unified across all files, and data types are automatically widened to accommodate all values.
  • The "first file" is defined as:
    • The first file in lexicographic order (for glob patterns), or
    • The first file in the provided list (for lists of paths).
Notes
  • Date and datetime columns are cast to strings during ingestion.

Raises:

  • ValidationError

    If any file does not have a .parquet extension.

  • PlanError

    If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

Read a single Parquet file
df = session.read.parquet("file.parquet")
Read multiple Parquet files
df = session.read.parquet("data/*.parquet")
Read Parquet files with schema merging
df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
Source code in src/fenic/api/io/reader.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def parquet(
    self,
    paths: Union[str, Path, list[Union[str, Path]]],
    merge_schemas: bool = False,
) -> DataFrame:
    """Load a DataFrame from one or more Parquet files.

    Args:
        paths: A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.
        merge_schemas: If True, infers and merges schemas across all files.
            Missing columns are filled with nulls, and differing types are widened to a common supertype.

    Behavior:
        - If `merge_schemas=False` (default), all files must match the schema of the first file exactly.
        Subsequent files must contain all columns from the first file with compatible data types.
        If any column is missing or has incompatible types, an error is raised.
        - If `merge_schemas=True`, column names are unified across all files, and data types are automatically
        widened to accommodate all values.
        - The "first file" is defined as:
            - The first file in lexicographic order (for glob patterns), or
            - The first file in the provided list (for lists of paths).

    Notes:
        - Date and datetime columns are cast to strings during ingestion.

    Raises:
        ValidationError: If any file does not have a `.parquet` extension.
        PlanError: If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.

    Example: Read a single Parquet file
        ```python
        df = session.read.parquet("file.parquet")
        ```

    Example: Read multiple Parquet files
        ```python
        df = session.read.parquet("data/*.parquet")
        ```

    Example: Read Parquet files with schema merging
        ```python
        df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
        ```
    """
    options = {
        "merge_schemas": merge_schemas,
    }
    return self._read_file(
        paths, file_format="parquet", file_extension=".parquet", **options
    )