fenic.api.io.reader
Reader interface for loading DataFrames from external storage systems.
Classes:
-
DataFrameReader
–Interface used to load a DataFrame from external storage systems.
DataFrameReader
DataFrameReader(session_state: BaseSessionState)
Interface used to load a DataFrame from external storage systems.
Similar to PySpark's DataFrameReader.
Supported External Storage Schemes: - Amazon S3 (s3://) - Format: s3://{bucket_name}/{path_to_file}
- Notes:
- Uses boto3 to aquire AWS credentials.
- Examples:
- s3://my-bucket/data.csv
- s3://my-bucket/data/*.parquet
-
Hugging Face Datasets (hf://)
-
Format: hf://{repo_type}/{repo_id}/{path_to_file}
-
Notes:
- Supports glob patterns (, *)
- Supports dataset revisions and branch aliases (e.g., @refs/convert/parquet, @~parquet)
- HF_TOKEN environment variable is required to read private datasets.
-
Examples:
- hf://datasets/datasets-examples/doc-formats-csv-1/data.csv
- hf://datasets/cais/mmlu/astronomy/*.parquet
- hf://datasets/datasets-examples/doc-formats-csv-1@~parquet/*/.parquet
-
-
Local Files (file:// or implicit)
-
Format: file://{absolute_or_relative_path}
-
Notes:
- Paths without a scheme (e.g., ./data.csv or /tmp/data.parquet) are treated as local files
- Examples:
- file:///home/user/data.csv
- ./data/*.parquet
-
Creates a DataFrameReader.
Parameters:
-
session_state
(BaseSessionState
) –The session state to use for reading
Methods:
-
csv
–Load a DataFrame from one or more CSV files.
-
docs
–Load a DataFrame from a list of paths of documents (markdown or json).
-
parquet
–Load a DataFrame from one or more Parquet files.
Source code in src/fenic/api/io/reader.py
60 61 62 63 64 65 66 67 |
|
csv
csv(paths: Union[str, Path, list[Union[str, Path]]], schema: Optional[Schema] = None, merge_schemas: bool = False) -> DataFrame
Load a DataFrame from one or more CSV files.
Parameters:
-
paths
(Union[str, Path, list[Union[str, Path]]]
) –A single file path, a glob pattern (e.g., "data/*.csv"), or a list of paths.
-
schema
(Optional[Schema]
, default:None
) –(optional) A complete schema definition of column names and their types. Only primitive types are supported. - For e.g.: - Schema([ColumnField(name="id", data_type=IntegerType), ColumnField(name="name", data_type=StringType)]) - If provided, all files must match this schema exactly—all column names must be present, and values must be convertible to the specified types. Partial schemas are not allowed.
-
merge_schemas
(bool
, default:False
) –Whether to merge schemas across all files. - If True: Column names are unified across files. Missing columns are filled with nulls. Column types are inferred and widened as needed. - If False (default): Only accepts columns from the first file. Column types from the first file are inferred and applied across all files. If subsequent files do not have the same column name and order as the first file, an error is raised. - The "first file" is defined as: - The first file in lexicographic order (for glob patterns), or - The first file in the provided list (for lists of paths).
Notes
- The first row in each file is assumed to be a header row.
- Delimiters (e.g., comma, tab) are automatically inferred.
- You may specify either
schema
ormerge_schemas=True
, but not both. - Any date/datetime columns are cast to strings during ingestion.
Raises:
-
ValidationError
–If both
schema
andmerge_schemas=True
are provided. -
ValidationError
–If any path does not end with
.csv
. -
PlanError
–If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.
Read a single CSV file
df = session.read.csv("file.csv")
Read multiple CSV files with schema merging
df = session.read.csv("data/*.csv", merge_schemas=True)
Read CSV files with explicit schema
python
df = session.read.csv(
["a.csv", "b.csv"],
schema=Schema([
ColumnField(name="id", data_type=IntegerType),
ColumnField(name="value", data_type=FloatType)
])
)
Source code in src/fenic/api/io/reader.py
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
|
docs
docs(paths: Union[str, list[str]], data_type: Union[MarkdownType, JsonType], exclude: Optional[str] = None, recursive: bool = False) -> DataFrame
Load a DataFrame from a list of paths of documents (markdown or json).
Parameters:
-
paths
(Union[str, list[str]]
) –Glob pattern (or list of glob patterns) to the folder(s) to load.
-
data_type
(Union[MarkdownType, JsonType]
) –Data type that will be used to cast the content of the files. One of MarkdownType or JsonType.
-
exclude
(Optional[str]
, default:None
) –A regex pattern to exclude files. If it is not provided no files will be excluded.
-
recursive
(bool
, default:False
) –Whether to recursively load files from the folder.
Returns:
-
DataFrame
(DataFrame
) –A dataframe with all the documents found in the paths. Each document is a row in the dataframe.
Raises:
-
ValidationError
–If any file does not have a
.md
or.json
depending on the data_type. -
UnsupportedFileTypeError
–If the data_type is not supported.
Notes
- Each row in the dataframe corresponds to a file in the list of paths.
- The dataframe has the following columns:
- file_path: The path to the file.
- error: The error message if the file failed to be loaded.
- content: The content of the file casted to the data_type.
- Recursive loading is supported in conjunction with the '*' glob pattern,
e.g.
data/**/*.md
will load all markdown files in thedata
folder and all subfolders when recursive is set to True. Without recursive = True, then ** behaves like a single '' pattern.
Read all the markdown files in a folder and all its subfolders.
df = session.read.docs("data/docs/**/*.md", data_type=MarkdownType, recursive=True)
Read a folder of markdown files excluding some files.
df = session.read.docs("data/docs/*.md", data_type=MarkdownType, exclude=r"\.bak.md$")
Source code in src/fenic/api/io/reader.py
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
|
parquet
parquet(paths: Union[str, Path, list[Union[str, Path]]], merge_schemas: bool = False) -> DataFrame
Load a DataFrame from one or more Parquet files.
Parameters:
-
paths
(Union[str, Path, list[Union[str, Path]]]
) –A single file path, a glob pattern (e.g., "data/*.parquet"), or a list of paths.
-
merge_schemas
(bool
, default:False
) –If True, infers and merges schemas across all files. Missing columns are filled with nulls, and differing types are widened to a common supertype.
Behavior
- If
merge_schemas=False
(default), all files must match the schema of the first file exactly. Subsequent files must contain all columns from the first file with compatible data types. If any column is missing or has incompatible types, an error is raised. - If
merge_schemas=True
, column names are unified across all files, and data types are automatically widened to accommodate all values. - The "first file" is defined as:
- The first file in lexicographic order (for glob patterns), or
- The first file in the provided list (for lists of paths).
Notes
- Date and datetime columns are cast to strings during ingestion.
Raises:
-
ValidationError
–If any file does not have a
.parquet
extension. -
PlanError
–If schemas cannot be merged or if there's a schema mismatch when merge_schemas=False.
Read a single Parquet file
df = session.read.parquet("file.parquet")
Read multiple Parquet files
df = session.read.parquet("data/*.parquet")
Read Parquet files with schema merging
df = session.read.parquet(["a.parquet", "b.parquet"], merge_schemas=True)
Source code in src/fenic/api/io/reader.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
|