Skip to content

How to use lakeFS-spec with third-party data science libraries

lakeFS-spec is built on top of the fsspec library, which allows third-party libraries to make use of its file system abstraction to offer high-level features. The fsspec documentation lists examples of its users, mostly data science libraries.

This user guide page adds more detail on how lakeFS-spec can be used with four prominent data science libraries.

Code Examples

The code examples assume access to an existing lakeFS server with a quickstart repository containing the sample data already set up.

Please see the Quickstart guide or lakeFS quickstart guide if you need guidance in getting started.

The relevant lines for the lakeFS-spec integration in the following code snippets are highlighted.

Pandas

Pandas can read and write data from remote locations, and uses fsspec for all URLs that are not local or HTTP(S).

This means that (almost) all pd.read_* and pd.DataFrame.to_* operations can benefit from the lakeFS integration offered by our library without any additional configuration. See the Pandas documentation on reading/writing remote files for additional details.

The following code snippet illustrates how to read and write Pandas data frames in various formats from/to a lakeFS repository in the context of a transaction:

import pandas as pd

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction("quickstart", "main") as tx:
    lakes = pd.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet")
    german_lakes = lakes.query('Country == "Germany"')
    german_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/german_lakes.csv")

    tx.commit(message="Add German lakes")

DuckDB

The DuckDB in-memory database management system includes support for fsspec file systems as part of its Python API (see the official documentation on using fsspec filesystems for details). This allows DuckDB to transparently query and store data located in lakeFS repositories through lakeFS-spec.

Similar to the example above, the following code snippet illustrates how to read and write data from/to a lakeFS repository in the context of a transaction through the DuckDB Python API:

import duckdb

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()
duckdb.register_filesystem(fs)

with fs.transaction("quickstart", "main") as tx:
    lakes = duckdb.read_parquet("lakefs://quickstart/main/lakes.parquet")
    italian_lakes = duckdb.sql("SELECT * FROM lakes where Country='Italy'")
    italian_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/italian_lakes.csv")

    tx.commit(message="Add Italian lakes")
  1. Makes the lakeFS-spec file system known to DuckDB (duckdb.register_filesystem(fsspec.filesystem("lakefs")) can also be used to avoid the direct import of LakeFSFileSystem)

Hugging Face Datasets

Hugging Face 🤗 Datasets is a library for easily accessing and sharing datasets for Audio, Computer Vision, and Natural Language Processing (NLP) tasks. It uses fsspec internally to retrieve and store datasets located outside the Hugging Face Hub.

Reading a dataset from a lakeFS repository is as simple as passing the data_files argument to the load_dataset function with a generic dataset script (e.g., csv or parquet - see the docs for a list of available types). Datasets can be saved to a lakeFS repository by passing the repository URL to the save_to_disk() method of the dataset.

from datasets import load_dataset

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction("quickstart", "main") as tx:
    lakes = load_dataset("parquet", data_files="lakefs://quickstart/main/lakes.parquet")
    irish_lakes = lakes.filter(lambda lake: lake["Country"] == "Ireland")
    irish_lakes.save_to_disk(f"lakefs://quickstart/{tx.branch.id}/irish_lakes")

    tx.commit(message="Add Irish lakes dataset")

Polars

Warning

There is an ongoing discussion in the Polars development team whether to remove support for fsspec file systems, with no clear outcome as of the time this page was written. Please refer to the discussion on the relevant GitHub issue in case you encounter any problems.

The Python API wrapper for the Rust-based Polars DataFrame library can access remote storage through fsspec, similar to Pandas (see the official documentation on cloud storage).

Again, the following code example demonstrates how to read a Parquet file and save a modified version back in CSV format to a lakeFS repository from Polars in the context of a transaction:

import polars as pl

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction("quickstart", "main") as tx:
    lakes = pl.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet")
    us_lakes = lakes.filter(pl.col("Country") == "United States of America")

    with fs.open(f"lakefs://quickstart/{tx.branch.id}/us_lakes.csv", "wb") as f:
        us_lakes.write_csv(f)  # (1)!

    tx.commit(message="Add US lakes")
  1. Polars does not support directly writing to remote storage through the pl.DataFrame.write_* API (see docs)

PyArrow

Apache Arrow and its Python API, PyArrow, can also use fsspec file systems to perform I/O operations on data objects. The documentation has additional details on using fsspec-compatible file systems with Arrow.

PyArrow read_* and write_* functions take an explicit filesystem parameter, which accepts any fsspec file system, such as the LakeFSFileSystem provided by this library.

The following example code illustrates the use of lakeFS-spec with PyArrow, reading a Parquet file and writing it back to a lakeFS repository as a partitioned CSV dataset in the context of a transaction:

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction("quickstart", "main") as tx:
    lakes_table = pq.read_table(f"quickstart/{tx.branch.id}/lakes.parquet", filesystem=fs)

    ds.write_dataset(
        lakes_table,
        f"quickstart/{tx.branch.id}/lakes",
        filesystem=fs,
        format="csv",
        partitioning=ds.partitioning(pa.schema([lakes_table.schema.field("Country")])),
    )

    tx.commit("Add partitioned lakes data set")