Skip to content

spec

Core interface definitions for file system interaction with lakeFS from Python.

In particular, the core LakeFSFileSystem and LakeFSFile classes.

LakeFSFileSystem

Bases: AbstractFileSystem

lakeFS file system implementation.

Instances of this class are cached based on their constructor arguments.

For more information, see the fsspec documentation https://filesystem-spec.readthedocs.io/en/latest/features.html#instance-caching.

PARAMETER DESCRIPTION
host

The address of your lakeFS instance.

TYPE: str | None DEFAULT: None

username

The access key name to use in case of access key authentication.

TYPE: str | None DEFAULT: None

password

The access key secret to use in case of access key authentication.

TYPE: str | None DEFAULT: None

api_key

The API key to use in case of authentication with an API key.

TYPE: str | None DEFAULT: None

api_key_prefix

A string prefix to use for the API key in authentication.

TYPE: str | None DEFAULT: None

access_token

An access token to use in case of access token authentication.

TYPE: str | None DEFAULT: None

verify_ssl

Whether to verify SSL certificates in API interactions. Do not disable in production.

TYPE: bool DEFAULT: True

ssl_ca_cert

A custom certificate PEM file to use to verify the peer in SSL connections.

TYPE: str | None DEFAULT: None

proxy

Proxy address to use when connecting to a lakeFS server.

TYPE: str | None DEFAULT: None

create_branch_ok

Whether to create branches implicitly when not-existing branches are referenced on file uploads.

TYPE: bool DEFAULT: True

source_branch

Source branch set as origin when a new branch is implicitly created.

TYPE: str DEFAULT: 'main'

**storage_options

Configuration options to pass to the file system's directory cache.

TYPE: Any DEFAULT: {}

Source code in src/lakefs_spec/spec.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
class LakeFSFileSystem(AbstractFileSystem):
    """
    lakeFS file system implementation.

    Instances of this class are cached based on their constructor arguments.

    For more information, see the fsspec documentation <https://filesystem-spec.readthedocs.io/en/latest/features.html#instance-caching>.

    Parameters
    ----------
    host: str | None
        The address of your lakeFS instance.
    username: str | None
        The access key name to use in case of access key authentication.
    password: str | None
        The access key secret to use in case of access key authentication.
    api_key: str | None
        The API key to use in case of authentication with an API key.
    api_key_prefix: str | None
        A string prefix to use for the API key in authentication.
    access_token: str | None
        An access token to use in case of access token authentication.
    verify_ssl: bool
        Whether to verify SSL certificates in API interactions. Do not disable in production.
    ssl_ca_cert: str | None
        A custom certificate PEM file to use to verify the peer in SSL connections.
    proxy: str | None
        Proxy address to use when connecting to a lakeFS server.
    create_branch_ok: bool
        Whether to create branches implicitly when not-existing branches are referenced on file uploads.
    source_branch: str
        Source branch set as origin when a new branch is implicitly created.
    **storage_options: Any
        Configuration options to pass to the file system's directory cache.
    """

    protocol = "lakefs"
    transaction_type = LakeFSTransaction

    def __init__(
        self,
        host: str | None = None,
        username: str | None = None,
        password: str | None = None,
        api_key: str | None = None,
        api_key_prefix: str | None = None,
        access_token: str | None = None,
        verify_ssl: bool = True,
        ssl_ca_cert: str | None = None,
        proxy: str | None = None,
        create_branch_ok: bool = True,
        source_branch: str = "main",
        **storage_options: Any,
    ):
        super().__init__(**storage_options)

        # lakeFS client arguments
        cargs = [host, username, password, api_key, api_key_prefix, access_token, ssl_ca_cert]

        if all(arg is None for arg in cargs):
            # empty kwargs means envvar and configfile autodiscovery
            self.client = Client()
        else:
            self.client = Client(
                host=host,
                username=username,
                password=password,
                api_key=api_key,
                api_key_prefix=api_key_prefix,
                access_token=access_token,
                ssl_ca_cert=ssl_ca_cert,
            )

        # proxy address, not part of the constructor
        self.client.config.proxy = proxy
        # whether to verify SSL certs, not part of the constructor
        self.client.config.verify_ssl = verify_ssl

        self.create_branch_ok = create_branch_ok
        self.source_branch = source_branch

    @cached_property
    def _lakefs_server_version(self):
        with self.wrapped_api_call():
            return tuple(int(t) for t in self.client.version.split("."))

    @classmethod
    @overload
    def _strip_protocol(cls, path: str | os.PathLike[str] | Path) -> str: ...

    @classmethod
    @overload
    def _strip_protocol(cls, path: list[str | os.PathLike[str] | Path]) -> list[str]: ...

    @classmethod
    def _strip_protocol(cls, path):
        """Copied verbatim from the base class, save for the slash rstrip."""
        if isinstance(path, list):
            return [cls._strip_protocol(p) for p in path]
        spath = super()._strip_protocol(path)
        if stringify_path(path).endswith("/"):
            return spath + "/"
        return spath

    @contextmanager
    def wrapped_api_call(
        self, rpath: str | None = None, message: str | None = None, set_cause: bool = True
    ) -> Generator[None, None, None]:
        """
        A context manager to wrap lakeFS API calls, translating any API errors to Python-native OS errors.

        Meant for internal use.

        Parameters
        ----------
        rpath: str | None
            The remote path involved in the requested API call.
        message: str | None
            A custom error message to emit instead of parsing the API error response.
        set_cause: bool
            Whether to include the original lakeFS API error in the resulting traceback.

        Yields
        ------
        None
            An empty generator, to be used as a context manager.

        Raises
        ------
        OSError
            Translated error from the lakeFS API call, if any.
        """
        try:
            yield
        except ServerException as e:
            raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause)

    def checksum(self, path: str | os.PathLike[str]) -> str | None:
        """
        Get a remote lakeFS file object's checksum.

        This is usually its MD5 hash, unless another hash function was used on upload.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The remote path to look up the lakeFS checksum for. Must point to a single file object.

        Returns
        -------
        str | None
            The remote file's checksum, or ``None`` if ``path`` points to a directory or does not exist.
        """
        path = stringify_path(path)
        try:
            return self.info(path).get("checksum")
        except FileNotFoundError:
            return None

    def exists(self, path: str | os.PathLike[str], **kwargs: Any) -> bool:
        """
        Check existence of a remote path in a lakeFS repository.

        Input paths can either be files or directories.

        If the path refers to the root of the repository, this method will return
        ``True`` if the reference or branch exists.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The remote path whose existence to check. Must be a fully qualified lakeFS URI.
        **kwargs: Any
            Additional keyword arguments for fsspec compatibility, unused.

        Returns
        -------
        bool
            ``True`` if the requested path exists, ``False`` if it does not.

        Raises
        ------
        PermissionError
            If the user does not have sufficient permissions to query object existence.
        """
        path = stringify_path(path)
        repository, ref, resource = parse(path)
        try:
            reference = lakefs.Reference(repository, ref, client=self.client)

            # Repo root (i.e., empty resource) boils down to checking if the ref exists
            if resource == "":
                return reference.get_commit() is not None

            if reference.object(resource).exists():
                return True
            # if it isn't an object, it might be a common prefix (i.e. "directory").
            children = reference.objects(
                max_amount=1, prefix=resource.rstrip("/") + "/", delimiter="/"
            )
            return len(list(children)) > 0
        except NotFoundException:
            return False
        except ServerException as e:
            # in case of an error other than "not found", existence cannot be
            # decided, so raise the translated error.
            raise translate_lakefs_error(e)

    def cp_file(
        self, path1: str | os.PathLike[str], path2: str | os.PathLike[str], **kwargs: Any
    ) -> None:
        """
        Copy a single file from one remote location to another in lakeFS.

        Parameters
        ----------
        path1: str | os.PathLike[str]
            The remote file location to be copied.
        path2: str | os.PathLike[str]
            The (remote) target location to which to copy the file.
        **kwargs: Any
            Additional keyword arguments for fsspec compatibility, unused.

        Raises
        ------
        ValueError
            When attempting to copy objects between repositories.
        """
        path1 = stringify_path(path1)
        path2 = stringify_path(path2)
        if path1 == path2:
            return

        orig_repo, orig_ref, orig_path = parse(path1)
        dest_repo, dest_ref, dest_path = parse(path2)

        if orig_repo != dest_repo:
            raise ValueError(
                "can only copy objects within a repository, but got source "
                f"repository {orig_repo!r} and destination repository {dest_repo!r}"
            )

        with self.wrapped_api_call():
            reference = lakefs.Reference(orig_repo, orig_ref, client=self.client)
            reference.object(orig_path).copy(dest_ref, dest_path)

    def get_file(
        self,
        rpath: str | os.PathLike[str],
        lpath: str | os.PathLike[str],
        callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
        outfile: Any = None,
        precheck: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Download a single file from a remote lakeFS server to local storage.

        Parameters
        ----------
        rpath: str | os.PathLike[str]
            The remote path to download to local storage. Must be a fully qualified lakeFS URI, and point to a single file.
        lpath: str | os.PathLike[str]
            The local path on disk to save the downloaded file to.
        callback: fsspec.callbacks.Callback
            An fsspec callback to use during the operation. Can be used to report download progress.
        outfile: Any
            A file-like object to save the downloaded content to. Can be used in place of ``lpath``.
        precheck: bool
            Check if ``lpath`` already exists and compare its checksum with that of ``rpath``, skipping the download if they match.
        **kwargs: Any
            Additional keyword arguments passed to ``AbstractFileSystem.open()``.
        """
        rpath = stringify_path(rpath)
        lpath = stringify_path(lpath)

        if precheck and Path(lpath).is_file():
            local_checksum = md5_checksum(lpath, blocksize=self.blocksize)
            remote_checksum = self.info(rpath).get("checksum")
            if local_checksum == remote_checksum:
                logger.info(
                    f"Skipping download of resource {rpath!r} to local path {lpath!r}: "
                    f"Resource {lpath!r} exists and checksums match."
                )
                return

        with self.wrapped_api_call(rpath=rpath):
            super().get_file(rpath, lpath, callback=callback, outfile=outfile, **kwargs)

    def info(self, path: str | os.PathLike[str], **kwargs: Any) -> dict[str, Any]:
        """
        Query a remote lakeFS object's metadata.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The object for which to obtain metadata. Must be a fully qualified lakeFS URI, can either point to a file or a directory.
        **kwargs: Any
            Additional keyword arguments to pass to ``LakeFSFileSystem.ls()`` if ``path`` points to a directory.

        Returns
        -------
        dict[str, Any]
            A dictionary containing metadata on the object, including its full remote path and object type (file or directory).

        Raises
        ------
        FileNotFoundError
            If the ``path`` refers to a non-file path that does not exist in the repository.
        """
        path = stringify_path(path)
        repository, ref, resource = parse(path)
        # first, try with `stat_object` in case of a file.
        # the condition below checks edge cases of resources that cannot be files.
        if resource and not resource.endswith("/"):
            try:
                reference = lakefs.Reference(repository, ref, client=self.client)
                res = reference.object(resource).stat()
                return {
                    "checksum": res.checksum,
                    "content-type": res.content_type,
                    "mtime": res.mtime,
                    "name": f"{repository}/{ref}/{res.path}",
                    "size": res.size_bytes,
                    "type": "file",
                }
            except NotFoundException:
                # fall through, retry with `ls` if it's a directory.
                pass
            except ServerException as e:
                raise translate_lakefs_error(e, rpath=path)

        out = self.ls(path, detail=True, recursive=True, **kwargs)
        if not out:
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)

        return {
            "name": path.rstrip("/"),
            "size": sum(o.get("size") or 0 for o in out),
            "type": "directory",
        }

    def _update_dircache(self, info: list) -> None:
        """Update logic for dircache (optionally recursive) based on lakeFS API response"""
        parents = {self._parent(i["name"].rstrip("/")) for i in info}
        for pp in parents:
            # subset of info entries which are direct descendants of `parent`
            dir_info = [i for i in info if self._parent(i["name"].rstrip("/")) == pp]
            if pp not in self.dircache:
                self.dircache[pp] = dir_info
                continue

            # Merge existing dircache entry with updated listing, which contains either:
            # - files not present in the cache yet
            # - a fresh listing (if `refresh=True`)

            cache_entry = self.dircache[pp][:]

            old_names = {e["name"] for e in cache_entry}
            new_names = {e["name"] for e in dir_info}

            to_remove = old_names - new_names
            to_update = old_names.intersection(new_names)

            # Remove all entries no longer present in the current listing
            cache_entry = [e for e in cache_entry if e["name"] not in to_remove]

            # Overwrite existing entries in the cache with its updated values
            for name in to_update:
                old_idx = next(idx for idx, e in enumerate(cache_entry) if e["name"] == name)
                new_entry = next(e for e in info if e["name"] == name)

                cache_entry[old_idx] = new_entry
                dir_info.remove(new_entry)

            # Add the remaining (new) entries to the cache
            cache_entry.extend(dir_info)
            self.dircache[pp] = sorted(cache_entry, key=operator.itemgetter("name"))

    def _ls_from_cache(self, path: str, recursive: bool = False) -> list[dict[str, Any]] | None:
        """Override of ``AbstractFileSystem._ls_from_cache`` with support for recursive listings."""
        if not recursive:
            return super()._ls_from_cache(path)

        result = None
        for key, files in self.dircache.items():
            if not (key.startswith(path) or path == key + "/"):
                continue
            if result is None:
                result = []
            result.extend(files)
        if not result:
            return result
        return sorted(result, key=operator.itemgetter("name"))

    @overload
    def ls(
        self,
        path: str | os.PathLike[str],
        detail: Literal[True] = ...,
        **kwargs: Any,
    ) -> list[dict[str, Any]]: ...

    @overload
    def ls(
        self,
        path: str | os.PathLike[str],
        detail: Literal[False],
        **kwargs: Any,
    ) -> list[str]: ...

    @overload
    def ls(
        self,
        path: str | os.PathLike[str],
        detail: bool = True,
        **kwargs: Any,
    ) -> list[str] | list[dict[str, Any]]: ...

    def ls(
        self,
        path: str | os.PathLike[str],
        detail: bool = True,
        **kwargs: Any,
    ) -> list[str] | list[dict[str, Any]]:
        """
        List all available objects under a given path in lakeFS.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The path under which to list objects. Must be a fully qualified lakeFS URI.
            Can also point to a file, in which case the file's metadata will be returned.
        detail: bool
            Whether to obtain all metadata on the requested objects or just their names.
        **kwargs: Any
            Additional keyword arguments for fsspec compatibility.

            In particular:
                `refresh: bool`: whether to skip the directory listing cache,
                `recursive: bool`: whether to list subdirectory contents recursively

        Returns
        -------
        list[str] | list[dict[str, Any]]
            A list of all objects' metadata under the given remote path if ``detail=True``, or alternatively only their names if ``detail=False``.
        """
        path = self._strip_protocol(path)
        repository, ref, prefix = parse(path)

        recursive = kwargs.pop("recursive", False)

        # Try lookup in dircache unless explicitly disabled by `refresh=True` kwarg
        use_dircache = not kwargs.pop("refresh", False)

        if use_dircache:
            cache_entry: list[Any] | None = None
            try:
                cache_entry = self._ls_from_cache(path, recursive=recursive)
            except FileNotFoundError:
                # we patch files missing from an ls call in the cache entry below,
                # so this should not be an error.
                pass

            if cache_entry is not None:
                if not detail:
                    return [e["name"] for e in cache_entry]
                return cache_entry[:]

        kwargs["prefix"] = prefix

        info = []
        # stat infos are either the path only (`detail=False`) or a dict full of metadata
        delimiter = "" if recursive else "/"
        reference = lakefs.Reference(repository, ref, client=self.client)

        with self.wrapped_api_call(rpath=path):
            for obj in reference.objects(prefix=prefix, delimiter=delimiter):
                if isinstance(obj, CommonPrefix):
                    # prefixes are added below.
                    info.append(
                        {
                            "name": f"{repository}/{ref}/{obj.path}",
                            "size": 0,
                            "type": "directory",
                        }
                    )
                elif isinstance(obj, ObjectInfo):
                    info.append(
                        {
                            "checksum": obj.checksum,
                            "content-type": obj.content_type,
                            "mtime": obj.mtime,
                            "name": f"{repository}/{ref}/{obj.path}",
                            "size": obj.size_bytes,
                            "type": "object",
                        }
                    )

        # Retry the API call with appended slash if the current result
        # is just a single directory entry only (not its contents).
        # This is useful to allow `ls("repo/branch/dir")` calls without a trailing slash.
        if len(info) == 1 and info[0]["type"] == "directory" and info[0]["name"] == path + "/":
            return self.ls(
                path + "/",
                detail=detail,
                **kwargs | {"refresh": not use_dircache, "recursive": recursive},
            )

        if recursive:
            # To make recursive ls behave identical to the non-recursive case,
            # add back virtual `directory` entries, which are only returned by
            # the lakeFS API when querying non-recursively.
            here = self._strip_protocol(path).rstrip("/")
            subdirs = {parent for o in info if (parent := self._parent(o["name"])) != here}
            for subdir in subdirs:
                info.append(
                    {
                        "name": subdir + "/",
                        "size": 0,
                        "type": "directory",
                    }
                )

        if info:
            self._update_dircache(info[:])

        if not detail:
            info = [o["name"] for o in info]  # type: ignore

        return info

    def open(
        self,
        path: str | os.PathLike[str],
        mode: Literal["r", "rb", "rt", "w", "wb", "wt", "x", "xb", "xt"] = "rb",
        pre_sign: bool = False,
        content_type: str | None = None,
        metadata: dict[str, str] | None = None,
        autocommit: bool = True,
        **kwargs: Any,
    ) -> LakeFSIOBase:
        """
        Dispatch a lakeFS file-like object (local buffer on disk) for the given remote path for up- or downloads depending on ``mode``.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The remote path for which to open a local ``LakeFSFile``. Must be a fully qualified lakeFS URI.
        mode: Literal["r", "rb", "rt", "w", "wb", "wt", "x", "xb", "xt"]
            The file mode indicating its purpose. Use ``r/rb`` for downloads from lakeFS, ``w/wb/x/xb`` for uploads to lakeFS.
        pre_sign: bool
            Whether to use a pre-signed URL for the file up-/download.
        content_type: str | None
            Content type to use for the file, relevant for uploads only.
        metadata: dict[str, str] | None
            Additional metadata to attach to the file, relevant for uploads only.
        autocommit: bool
            Whether to process the file immediately instead of queueing it for transaction while in a transaction context.
        **kwargs: Any
            Additional keyword arguments for fsspec compatibility, unused.

        Returns
        -------
        LakeFSIOBase
            A local file-like object ready to hold data to be received from / sent to a lakeFS server.

        Raises
        ------
        NotImplementedError
            If ``mode`` is not supported.
        """
        if mode.endswith("t"):
            # text modes {r,w,x}t are equivalent to {r,w,x} here respectively.
            mode = mode[:-1]  # type: ignore

        if mode not in {"r", "rb", "w", "wb", "x", "xb"}:
            raise NotImplementedError(f"unsupported mode {mode!r}")

        path = stringify_path(path)
        repo, ref, resource = parse(path)

        if mode.startswith("r"):
            reference = lakefs.Reference(repo, ref, client=self.client)
            obj = reference.object(resource)

            if not obj.exists():
                raise FileNotFoundError(path)
            handler = ObjectReader(obj, mode=mode, pre_sign=pre_sign, client=self.client)
        else:
            # for writing ops, ref must be a branch
            branch = lakefs.Branch(repo, ref, client=self.client)
            if self.create_branch_ok:
                branch.create(self.source_branch, exist_ok=True)

            obj = branch.object(resource)
            handler = ObjectWriter(
                obj,
                mode=mode,
                pre_sign=pre_sign,
                content_type=content_type,
                metadata=metadata,
                client=self.client,
            )

        ac = kwargs.pop("autocommit", not self._intrans)
        if not ac and "r" not in mode:
            self._transaction.files.append(handler)

        return handler

    def put_file(
        self,
        lpath: str | os.PathLike[str],
        rpath: str | os.PathLike[str],
        callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
        precheck: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Upload a local file to a remote location on a lakeFS server.

        Note that depending on the block store type, additional configuration like credentials may need to be configured when ``use_blockstore=True`` and ``presign=False``.

        Parameters
        ----------
        lpath: str | os.PathLike[str]
            The local path on disk to upload to the lakeFS server.
        rpath: str | os.PathLike[str]
            The remote target path to upload the local file to. Must be a fully qualified lakeFS URI.
        callback: fsspec.callbacks.Callback
            An fsspec callback to use during the operation. Can be used to report download progress.
        precheck: bool
            Check if ``lpath`` already exists and compare its checksum with that of ``rpath``, skipping the download if they match.
        **kwargs: Any
            Additional keyword arguments to pass to ``LakeFSFileSystem.open()``.
        """
        lpath = stringify_path(lpath)
        rpath = stringify_path(rpath)

        if precheck and Path(lpath).is_file():
            remote_checksum = self.checksum(rpath)
            local_checksum = md5_checksum(lpath, blocksize=self.blocksize)
            if local_checksum == remote_checksum:
                logger.info(
                    f"Skipping upload of resource {lpath!r} to remote path {rpath!r}: "
                    f"Resource {rpath!r} exists and checksums match."
                )
                return

        with self.wrapped_api_call(rpath=rpath):
            super().put_file(lpath, rpath, callback=callback, **kwargs)

    def rm_file(self, path: str | os.PathLike[str]) -> None:  # pragma: no cover
        """
        Stage a remote file for removal on a lakeFS server.

        The file will not actually be removed from the requested branch until a commit is created.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The remote file to delete. Must be a fully qualified lakeFS URI.
        """
        self.rm(path)

    def rm(
        self, path: str | os.PathLike[str], recursive: bool = False, maxdepth: int | None = None
    ) -> None:
        """
        Stage multiple remote files for removal on a lakeFS server.

        The files will not actually be removed from the requested branch until a commit is created.

        Parameters
        ----------
        path: str | os.PathLike[str]
            File(s) to delete.
        recursive: bool
            If file(s) include nested directories, recursively delete their contents.
        maxdepth: int | None
            Depth to pass to walk for finding files to delete, if recursive.
            If None, there will be no limit and infinite recursion may be
            possible.
        """

        path = stringify_path(path)
        repository, ref, prefix = parse(path)

        with self.wrapped_api_call(rpath=path):
            branch = lakefs.Branch(repository, ref, client=self.client)
            objgen_batched = batched(
                branch.objects(prefix=prefix, delimiter="" if recursive else "/"), n=MAX_DELETE_OBJS
            )
            if maxdepth is None:
                for objgen in objgen_batched:
                    branch.delete_objects(obj.path for obj in objgen)
            else:
                for objgen in objgen_batched:
                    # nesting level is just the amount of "/"s in the path, no leading "/".
                    branch.delete_objects(
                        obj.path for obj in objgen if obj.path.count("/") <= maxdepth
                    )

            # Directory listing cache for the containing folder must be invalidated
            self.dircache.pop(self._parent(path), None)

    def touch(self, path: str | os.PathLike[str], truncate: bool = True, **kwargs: Any) -> None:
        """
        Create an empty file or update an existing file on a lakeFS server.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The file path to create or update. Must be a fully qualified lakeFS URI.
        truncate: bool
            Whether to set the file size to 0 (zero) bytes, even if the path already exists.
        **kwargs: Any
            Additional keyword arguments to pass to ``LakeFSFileSystem.open()``.

        Raises
        ------
        NotImplementedError
            If the targeted lakeFS server version does not support `touch()` operations.
        """

        # empty buffer upload errors were fixed in https://github.com/treeverse/lakeFS/issues/7130,
        # which was first released in lakeFS v1.3.1.
        if self._lakefs_server_version < (1, 3, 1):
            version_string = ".".join(str(v) for v in self._lakefs_server_version)
            raise NotImplementedError(
                "LakeFSFileSystem.touch() is not supported for your lakeFS server version. "
                f"minimum required version: '1.3.1', actual version: {version_string!r}"
            )

        super().touch(path=path, truncate=truncate, **kwargs)

    def tail(self, path: str | os.PathLike[str], size: int = 1024) -> bytes:
        """
        Get the last ``size`` bytes from a remote file.

        Parameters
        ----------
        path: str | os.PathLike[str]
            The file path to read. Must be a fully qualified lakeFS URI.
        size: int
            The amount of bytes to get.

        Returns
        -------
        bytes
            The bytes at the end of the requested file.
        """
        f: ObjectReader
        with self.open(path, "rb") as f:
            f.seek(max(-size, -f._obj.stat().size_bytes), 2)
            return f.read()

wrapped_api_call

wrapped_api_call(
    rpath: str | None = None, message: str | None = None, set_cause: bool = True
) -> Generator[None, None, None]

A context manager to wrap lakeFS API calls, translating any API errors to Python-native OS errors.

Meant for internal use.

PARAMETER DESCRIPTION
rpath

The remote path involved in the requested API call.

TYPE: str | None DEFAULT: None

message

A custom error message to emit instead of parsing the API error response.

TYPE: str | None DEFAULT: None

set_cause

Whether to include the original lakeFS API error in the resulting traceback.

TYPE: bool DEFAULT: True

YIELDS DESCRIPTION
None

An empty generator, to be used as a context manager.

RAISES DESCRIPTION
OSError

Translated error from the lakeFS API call, if any.

Source code in src/lakefs_spec/spec.py
@contextmanager
def wrapped_api_call(
    self, rpath: str | None = None, message: str | None = None, set_cause: bool = True
) -> Generator[None, None, None]:
    """
    A context manager to wrap lakeFS API calls, translating any API errors to Python-native OS errors.

    Meant for internal use.

    Parameters
    ----------
    rpath: str | None
        The remote path involved in the requested API call.
    message: str | None
        A custom error message to emit instead of parsing the API error response.
    set_cause: bool
        Whether to include the original lakeFS API error in the resulting traceback.

    Yields
    ------
    None
        An empty generator, to be used as a context manager.

    Raises
    ------
    OSError
        Translated error from the lakeFS API call, if any.
    """
    try:
        yield
    except ServerException as e:
        raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause)

checksum

checksum(path: str | PathLike[str]) -> str | None

Get a remote lakeFS file object's checksum.

This is usually its MD5 hash, unless another hash function was used on upload.

PARAMETER DESCRIPTION
path

The remote path to look up the lakeFS checksum for. Must point to a single file object.

TYPE: str | PathLike[str]

RETURNS DESCRIPTION
str | None

The remote file's checksum, or None if path points to a directory or does not exist.

Source code in src/lakefs_spec/spec.py
def checksum(self, path: str | os.PathLike[str]) -> str | None:
    """
    Get a remote lakeFS file object's checksum.

    This is usually its MD5 hash, unless another hash function was used on upload.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The remote path to look up the lakeFS checksum for. Must point to a single file object.

    Returns
    -------
    str | None
        The remote file's checksum, or ``None`` if ``path`` points to a directory or does not exist.
    """
    path = stringify_path(path)
    try:
        return self.info(path).get("checksum")
    except FileNotFoundError:
        return None

exists

exists(path: str | PathLike[str], **kwargs: Any) -> bool

Check existence of a remote path in a lakeFS repository.

Input paths can either be files or directories.

If the path refers to the root of the repository, this method will return True if the reference or branch exists.

PARAMETER DESCRIPTION
path

The remote path whose existence to check. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

**kwargs

Additional keyword arguments for fsspec compatibility, unused.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
bool

True if the requested path exists, False if it does not.

RAISES DESCRIPTION
PermissionError

If the user does not have sufficient permissions to query object existence.

Source code in src/lakefs_spec/spec.py
def exists(self, path: str | os.PathLike[str], **kwargs: Any) -> bool:
    """
    Check existence of a remote path in a lakeFS repository.

    Input paths can either be files or directories.

    If the path refers to the root of the repository, this method will return
    ``True`` if the reference or branch exists.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The remote path whose existence to check. Must be a fully qualified lakeFS URI.
    **kwargs: Any
        Additional keyword arguments for fsspec compatibility, unused.

    Returns
    -------
    bool
        ``True`` if the requested path exists, ``False`` if it does not.

    Raises
    ------
    PermissionError
        If the user does not have sufficient permissions to query object existence.
    """
    path = stringify_path(path)
    repository, ref, resource = parse(path)
    try:
        reference = lakefs.Reference(repository, ref, client=self.client)

        # Repo root (i.e., empty resource) boils down to checking if the ref exists
        if resource == "":
            return reference.get_commit() is not None

        if reference.object(resource).exists():
            return True
        # if it isn't an object, it might be a common prefix (i.e. "directory").
        children = reference.objects(
            max_amount=1, prefix=resource.rstrip("/") + "/", delimiter="/"
        )
        return len(list(children)) > 0
    except NotFoundException:
        return False
    except ServerException as e:
        # in case of an error other than "not found", existence cannot be
        # decided, so raise the translated error.
        raise translate_lakefs_error(e)

cp_file

cp_file(path1: str | PathLike[str], path2: str | PathLike[str], **kwargs: Any) -> None

Copy a single file from one remote location to another in lakeFS.

PARAMETER DESCRIPTION
path1

The remote file location to be copied.

TYPE: str | PathLike[str]

path2

The (remote) target location to which to copy the file.

TYPE: str | PathLike[str]

**kwargs

Additional keyword arguments for fsspec compatibility, unused.

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
ValueError

When attempting to copy objects between repositories.

Source code in src/lakefs_spec/spec.py
def cp_file(
    self, path1: str | os.PathLike[str], path2: str | os.PathLike[str], **kwargs: Any
) -> None:
    """
    Copy a single file from one remote location to another in lakeFS.

    Parameters
    ----------
    path1: str | os.PathLike[str]
        The remote file location to be copied.
    path2: str | os.PathLike[str]
        The (remote) target location to which to copy the file.
    **kwargs: Any
        Additional keyword arguments for fsspec compatibility, unused.

    Raises
    ------
    ValueError
        When attempting to copy objects between repositories.
    """
    path1 = stringify_path(path1)
    path2 = stringify_path(path2)
    if path1 == path2:
        return

    orig_repo, orig_ref, orig_path = parse(path1)
    dest_repo, dest_ref, dest_path = parse(path2)

    if orig_repo != dest_repo:
        raise ValueError(
            "can only copy objects within a repository, but got source "
            f"repository {orig_repo!r} and destination repository {dest_repo!r}"
        )

    with self.wrapped_api_call():
        reference = lakefs.Reference(orig_repo, orig_ref, client=self.client)
        reference.object(orig_path).copy(dest_ref, dest_path)

get_file

get_file(
    rpath: str | PathLike[str],
    lpath: str | PathLike[str],
    callback: Callback = _DEFAULT_CALLBACK,
    outfile: Any = None,
    precheck: bool = True,
    **kwargs: Any
) -> None

Download a single file from a remote lakeFS server to local storage.

PARAMETER DESCRIPTION
rpath

The remote path to download to local storage. Must be a fully qualified lakeFS URI, and point to a single file.

TYPE: str | PathLike[str]

lpath

The local path on disk to save the downloaded file to.

TYPE: str | PathLike[str]

callback

An fsspec callback to use during the operation. Can be used to report download progress.

TYPE: Callback DEFAULT: _DEFAULT_CALLBACK

outfile

A file-like object to save the downloaded content to. Can be used in place of lpath.

TYPE: Any DEFAULT: None

precheck

Check if lpath already exists and compare its checksum with that of rpath, skipping the download if they match.

TYPE: bool DEFAULT: True

**kwargs

Additional keyword arguments passed to AbstractFileSystem.open().

TYPE: Any DEFAULT: {}

Source code in src/lakefs_spec/spec.py
def get_file(
    self,
    rpath: str | os.PathLike[str],
    lpath: str | os.PathLike[str],
    callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
    outfile: Any = None,
    precheck: bool = True,
    **kwargs: Any,
) -> None:
    """
    Download a single file from a remote lakeFS server to local storage.

    Parameters
    ----------
    rpath: str | os.PathLike[str]
        The remote path to download to local storage. Must be a fully qualified lakeFS URI, and point to a single file.
    lpath: str | os.PathLike[str]
        The local path on disk to save the downloaded file to.
    callback: fsspec.callbacks.Callback
        An fsspec callback to use during the operation. Can be used to report download progress.
    outfile: Any
        A file-like object to save the downloaded content to. Can be used in place of ``lpath``.
    precheck: bool
        Check if ``lpath`` already exists and compare its checksum with that of ``rpath``, skipping the download if they match.
    **kwargs: Any
        Additional keyword arguments passed to ``AbstractFileSystem.open()``.
    """
    rpath = stringify_path(rpath)
    lpath = stringify_path(lpath)

    if precheck and Path(lpath).is_file():
        local_checksum = md5_checksum(lpath, blocksize=self.blocksize)
        remote_checksum = self.info(rpath).get("checksum")
        if local_checksum == remote_checksum:
            logger.info(
                f"Skipping download of resource {rpath!r} to local path {lpath!r}: "
                f"Resource {lpath!r} exists and checksums match."
            )
            return

    with self.wrapped_api_call(rpath=rpath):
        super().get_file(rpath, lpath, callback=callback, outfile=outfile, **kwargs)

info

info(path: str | PathLike[str], **kwargs: Any) -> dict[str, Any]

Query a remote lakeFS object's metadata.

PARAMETER DESCRIPTION
path

The object for which to obtain metadata. Must be a fully qualified lakeFS URI, can either point to a file or a directory.

TYPE: str | PathLike[str]

**kwargs

Additional keyword arguments to pass to LakeFSFileSystem.ls() if path points to a directory.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict[str, Any]

A dictionary containing metadata on the object, including its full remote path and object type (file or directory).

RAISES DESCRIPTION
FileNotFoundError

If the path refers to a non-file path that does not exist in the repository.

Source code in src/lakefs_spec/spec.py
def info(self, path: str | os.PathLike[str], **kwargs: Any) -> dict[str, Any]:
    """
    Query a remote lakeFS object's metadata.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The object for which to obtain metadata. Must be a fully qualified lakeFS URI, can either point to a file or a directory.
    **kwargs: Any
        Additional keyword arguments to pass to ``LakeFSFileSystem.ls()`` if ``path`` points to a directory.

    Returns
    -------
    dict[str, Any]
        A dictionary containing metadata on the object, including its full remote path and object type (file or directory).

    Raises
    ------
    FileNotFoundError
        If the ``path`` refers to a non-file path that does not exist in the repository.
    """
    path = stringify_path(path)
    repository, ref, resource = parse(path)
    # first, try with `stat_object` in case of a file.
    # the condition below checks edge cases of resources that cannot be files.
    if resource and not resource.endswith("/"):
        try:
            reference = lakefs.Reference(repository, ref, client=self.client)
            res = reference.object(resource).stat()
            return {
                "checksum": res.checksum,
                "content-type": res.content_type,
                "mtime": res.mtime,
                "name": f"{repository}/{ref}/{res.path}",
                "size": res.size_bytes,
                "type": "file",
            }
        except NotFoundException:
            # fall through, retry with `ls` if it's a directory.
            pass
        except ServerException as e:
            raise translate_lakefs_error(e, rpath=path)

    out = self.ls(path, detail=True, recursive=True, **kwargs)
    if not out:
        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)

    return {
        "name": path.rstrip("/"),
        "size": sum(o.get("size") or 0 for o in out),
        "type": "directory",
    }

ls

ls(
    path: str | PathLike[str], detail: bool = True, **kwargs: Any
) -> list[str] | list[dict[str, Any]]

List all available objects under a given path in lakeFS.

PARAMETER DESCRIPTION
path

The path under which to list objects. Must be a fully qualified lakeFS URI. Can also point to a file, in which case the file's metadata will be returned.

TYPE: str | PathLike[str]

detail

Whether to obtain all metadata on the requested objects or just their names.

TYPE: bool DEFAULT: True

**kwargs

Additional keyword arguments for fsspec compatibility.

In particular: refresh: bool: whether to skip the directory listing cache, recursive: bool: whether to list subdirectory contents recursively

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[str] | list[dict[str, Any]]

A list of all objects' metadata under the given remote path if detail=True, or alternatively only their names if detail=False.

Source code in src/lakefs_spec/spec.py
def ls(
    self,
    path: str | os.PathLike[str],
    detail: bool = True,
    **kwargs: Any,
) -> list[str] | list[dict[str, Any]]:
    """
    List all available objects under a given path in lakeFS.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The path under which to list objects. Must be a fully qualified lakeFS URI.
        Can also point to a file, in which case the file's metadata will be returned.
    detail: bool
        Whether to obtain all metadata on the requested objects or just their names.
    **kwargs: Any
        Additional keyword arguments for fsspec compatibility.

        In particular:
            `refresh: bool`: whether to skip the directory listing cache,
            `recursive: bool`: whether to list subdirectory contents recursively

    Returns
    -------
    list[str] | list[dict[str, Any]]
        A list of all objects' metadata under the given remote path if ``detail=True``, or alternatively only their names if ``detail=False``.
    """
    path = self._strip_protocol(path)
    repository, ref, prefix = parse(path)

    recursive = kwargs.pop("recursive", False)

    # Try lookup in dircache unless explicitly disabled by `refresh=True` kwarg
    use_dircache = not kwargs.pop("refresh", False)

    if use_dircache:
        cache_entry: list[Any] | None = None
        try:
            cache_entry = self._ls_from_cache(path, recursive=recursive)
        except FileNotFoundError:
            # we patch files missing from an ls call in the cache entry below,
            # so this should not be an error.
            pass

        if cache_entry is not None:
            if not detail:
                return [e["name"] for e in cache_entry]
            return cache_entry[:]

    kwargs["prefix"] = prefix

    info = []
    # stat infos are either the path only (`detail=False`) or a dict full of metadata
    delimiter = "" if recursive else "/"
    reference = lakefs.Reference(repository, ref, client=self.client)

    with self.wrapped_api_call(rpath=path):
        for obj in reference.objects(prefix=prefix, delimiter=delimiter):
            if isinstance(obj, CommonPrefix):
                # prefixes are added below.
                info.append(
                    {
                        "name": f"{repository}/{ref}/{obj.path}",
                        "size": 0,
                        "type": "directory",
                    }
                )
            elif isinstance(obj, ObjectInfo):
                info.append(
                    {
                        "checksum": obj.checksum,
                        "content-type": obj.content_type,
                        "mtime": obj.mtime,
                        "name": f"{repository}/{ref}/{obj.path}",
                        "size": obj.size_bytes,
                        "type": "object",
                    }
                )

    # Retry the API call with appended slash if the current result
    # is just a single directory entry only (not its contents).
    # This is useful to allow `ls("repo/branch/dir")` calls without a trailing slash.
    if len(info) == 1 and info[0]["type"] == "directory" and info[0]["name"] == path + "/":
        return self.ls(
            path + "/",
            detail=detail,
            **kwargs | {"refresh": not use_dircache, "recursive": recursive},
        )

    if recursive:
        # To make recursive ls behave identical to the non-recursive case,
        # add back virtual `directory` entries, which are only returned by
        # the lakeFS API when querying non-recursively.
        here = self._strip_protocol(path).rstrip("/")
        subdirs = {parent for o in info if (parent := self._parent(o["name"])) != here}
        for subdir in subdirs:
            info.append(
                {
                    "name": subdir + "/",
                    "size": 0,
                    "type": "directory",
                }
            )

    if info:
        self._update_dircache(info[:])

    if not detail:
        info = [o["name"] for o in info]  # type: ignore

    return info

open

open(
    path: str | PathLike[str],
    mode: Literal["r", "rb", "rt", "w", "wb", "wt", "x", "xb", "xt"] = "rb",
    pre_sign: bool = False,
    content_type: str | None = None,
    metadata: dict[str, str] | None = None,
    autocommit: bool = True,
    **kwargs: Any
) -> LakeFSIOBase

Dispatch a lakeFS file-like object (local buffer on disk) for the given remote path for up- or downloads depending on mode.

PARAMETER DESCRIPTION
path

The remote path for which to open a local LakeFSFile. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

mode

The file mode indicating its purpose. Use r/rb for downloads from lakeFS, w/wb/x/xb for uploads to lakeFS.

TYPE: Literal['r', 'rb', 'rt', 'w', 'wb', 'wt', 'x', 'xb', 'xt'] DEFAULT: 'rb'

pre_sign

Whether to use a pre-signed URL for the file up-/download.

TYPE: bool DEFAULT: False

content_type

Content type to use for the file, relevant for uploads only.

TYPE: str | None DEFAULT: None

metadata

Additional metadata to attach to the file, relevant for uploads only.

TYPE: dict[str, str] | None DEFAULT: None

autocommit

Whether to process the file immediately instead of queueing it for transaction while in a transaction context.

TYPE: bool DEFAULT: True

**kwargs

Additional keyword arguments for fsspec compatibility, unused.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
LakeFSIOBase

A local file-like object ready to hold data to be received from / sent to a lakeFS server.

RAISES DESCRIPTION
NotImplementedError

If mode is not supported.

Source code in src/lakefs_spec/spec.py
def open(
    self,
    path: str | os.PathLike[str],
    mode: Literal["r", "rb", "rt", "w", "wb", "wt", "x", "xb", "xt"] = "rb",
    pre_sign: bool = False,
    content_type: str | None = None,
    metadata: dict[str, str] | None = None,
    autocommit: bool = True,
    **kwargs: Any,
) -> LakeFSIOBase:
    """
    Dispatch a lakeFS file-like object (local buffer on disk) for the given remote path for up- or downloads depending on ``mode``.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The remote path for which to open a local ``LakeFSFile``. Must be a fully qualified lakeFS URI.
    mode: Literal["r", "rb", "rt", "w", "wb", "wt", "x", "xb", "xt"]
        The file mode indicating its purpose. Use ``r/rb`` for downloads from lakeFS, ``w/wb/x/xb`` for uploads to lakeFS.
    pre_sign: bool
        Whether to use a pre-signed URL for the file up-/download.
    content_type: str | None
        Content type to use for the file, relevant for uploads only.
    metadata: dict[str, str] | None
        Additional metadata to attach to the file, relevant for uploads only.
    autocommit: bool
        Whether to process the file immediately instead of queueing it for transaction while in a transaction context.
    **kwargs: Any
        Additional keyword arguments for fsspec compatibility, unused.

    Returns
    -------
    LakeFSIOBase
        A local file-like object ready to hold data to be received from / sent to a lakeFS server.

    Raises
    ------
    NotImplementedError
        If ``mode`` is not supported.
    """
    if mode.endswith("t"):
        # text modes {r,w,x}t are equivalent to {r,w,x} here respectively.
        mode = mode[:-1]  # type: ignore

    if mode not in {"r", "rb", "w", "wb", "x", "xb"}:
        raise NotImplementedError(f"unsupported mode {mode!r}")

    path = stringify_path(path)
    repo, ref, resource = parse(path)

    if mode.startswith("r"):
        reference = lakefs.Reference(repo, ref, client=self.client)
        obj = reference.object(resource)

        if not obj.exists():
            raise FileNotFoundError(path)
        handler = ObjectReader(obj, mode=mode, pre_sign=pre_sign, client=self.client)
    else:
        # for writing ops, ref must be a branch
        branch = lakefs.Branch(repo, ref, client=self.client)
        if self.create_branch_ok:
            branch.create(self.source_branch, exist_ok=True)

        obj = branch.object(resource)
        handler = ObjectWriter(
            obj,
            mode=mode,
            pre_sign=pre_sign,
            content_type=content_type,
            metadata=metadata,
            client=self.client,
        )

    ac = kwargs.pop("autocommit", not self._intrans)
    if not ac and "r" not in mode:
        self._transaction.files.append(handler)

    return handler

put_file

put_file(
    lpath: str | PathLike[str],
    rpath: str | PathLike[str],
    callback: Callback = _DEFAULT_CALLBACK,
    precheck: bool = True,
    **kwargs: Any
) -> None

Upload a local file to a remote location on a lakeFS server.

Note that depending on the block store type, additional configuration like credentials may need to be configured when use_blockstore=True and presign=False.

PARAMETER DESCRIPTION
lpath

The local path on disk to upload to the lakeFS server.

TYPE: str | PathLike[str]

rpath

The remote target path to upload the local file to. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

callback

An fsspec callback to use during the operation. Can be used to report download progress.

TYPE: Callback DEFAULT: _DEFAULT_CALLBACK

precheck

Check if lpath already exists and compare its checksum with that of rpath, skipping the download if they match.

TYPE: bool DEFAULT: True

**kwargs

Additional keyword arguments to pass to LakeFSFileSystem.open().

TYPE: Any DEFAULT: {}

Source code in src/lakefs_spec/spec.py
def put_file(
    self,
    lpath: str | os.PathLike[str],
    rpath: str | os.PathLike[str],
    callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
    precheck: bool = True,
    **kwargs: Any,
) -> None:
    """
    Upload a local file to a remote location on a lakeFS server.

    Note that depending on the block store type, additional configuration like credentials may need to be configured when ``use_blockstore=True`` and ``presign=False``.

    Parameters
    ----------
    lpath: str | os.PathLike[str]
        The local path on disk to upload to the lakeFS server.
    rpath: str | os.PathLike[str]
        The remote target path to upload the local file to. Must be a fully qualified lakeFS URI.
    callback: fsspec.callbacks.Callback
        An fsspec callback to use during the operation. Can be used to report download progress.
    precheck: bool
        Check if ``lpath`` already exists and compare its checksum with that of ``rpath``, skipping the download if they match.
    **kwargs: Any
        Additional keyword arguments to pass to ``LakeFSFileSystem.open()``.
    """
    lpath = stringify_path(lpath)
    rpath = stringify_path(rpath)

    if precheck and Path(lpath).is_file():
        remote_checksum = self.checksum(rpath)
        local_checksum = md5_checksum(lpath, blocksize=self.blocksize)
        if local_checksum == remote_checksum:
            logger.info(
                f"Skipping upload of resource {lpath!r} to remote path {rpath!r}: "
                f"Resource {rpath!r} exists and checksums match."
            )
            return

    with self.wrapped_api_call(rpath=rpath):
        super().put_file(lpath, rpath, callback=callback, **kwargs)

rm_file

rm_file(path: str | PathLike[str]) -> None

Stage a remote file for removal on a lakeFS server.

The file will not actually be removed from the requested branch until a commit is created.

PARAMETER DESCRIPTION
path

The remote file to delete. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

Source code in src/lakefs_spec/spec.py
def rm_file(self, path: str | os.PathLike[str]) -> None:  # pragma: no cover
    """
    Stage a remote file for removal on a lakeFS server.

    The file will not actually be removed from the requested branch until a commit is created.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The remote file to delete. Must be a fully qualified lakeFS URI.
    """
    self.rm(path)

rm

rm(path: str | PathLike[str], recursive: bool = False, maxdepth: int | None = None) -> None

Stage multiple remote files for removal on a lakeFS server.

The files will not actually be removed from the requested branch until a commit is created.

PARAMETER DESCRIPTION
path

File(s) to delete.

TYPE: str | PathLike[str]

recursive

If file(s) include nested directories, recursively delete their contents.

TYPE: bool DEFAULT: False

maxdepth

Depth to pass to walk for finding files to delete, if recursive. If None, there will be no limit and infinite recursion may be possible.

TYPE: int | None DEFAULT: None

Source code in src/lakefs_spec/spec.py
def rm(
    self, path: str | os.PathLike[str], recursive: bool = False, maxdepth: int | None = None
) -> None:
    """
    Stage multiple remote files for removal on a lakeFS server.

    The files will not actually be removed from the requested branch until a commit is created.

    Parameters
    ----------
    path: str | os.PathLike[str]
        File(s) to delete.
    recursive: bool
        If file(s) include nested directories, recursively delete their contents.
    maxdepth: int | None
        Depth to pass to walk for finding files to delete, if recursive.
        If None, there will be no limit and infinite recursion may be
        possible.
    """

    path = stringify_path(path)
    repository, ref, prefix = parse(path)

    with self.wrapped_api_call(rpath=path):
        branch = lakefs.Branch(repository, ref, client=self.client)
        objgen_batched = batched(
            branch.objects(prefix=prefix, delimiter="" if recursive else "/"), n=MAX_DELETE_OBJS
        )
        if maxdepth is None:
            for objgen in objgen_batched:
                branch.delete_objects(obj.path for obj in objgen)
        else:
            for objgen in objgen_batched:
                # nesting level is just the amount of "/"s in the path, no leading "/".
                branch.delete_objects(
                    obj.path for obj in objgen if obj.path.count("/") <= maxdepth
                )

        # Directory listing cache for the containing folder must be invalidated
        self.dircache.pop(self._parent(path), None)

touch

touch(path: str | PathLike[str], truncate: bool = True, **kwargs: Any) -> None

Create an empty file or update an existing file on a lakeFS server.

PARAMETER DESCRIPTION
path

The file path to create or update. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

truncate

Whether to set the file size to 0 (zero) bytes, even if the path already exists.

TYPE: bool DEFAULT: True

**kwargs

Additional keyword arguments to pass to LakeFSFileSystem.open().

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
NotImplementedError

If the targeted lakeFS server version does not support touch() operations.

Source code in src/lakefs_spec/spec.py
def touch(self, path: str | os.PathLike[str], truncate: bool = True, **kwargs: Any) -> None:
    """
    Create an empty file or update an existing file on a lakeFS server.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The file path to create or update. Must be a fully qualified lakeFS URI.
    truncate: bool
        Whether to set the file size to 0 (zero) bytes, even if the path already exists.
    **kwargs: Any
        Additional keyword arguments to pass to ``LakeFSFileSystem.open()``.

    Raises
    ------
    NotImplementedError
        If the targeted lakeFS server version does not support `touch()` operations.
    """

    # empty buffer upload errors were fixed in https://github.com/treeverse/lakeFS/issues/7130,
    # which was first released in lakeFS v1.3.1.
    if self._lakefs_server_version < (1, 3, 1):
        version_string = ".".join(str(v) for v in self._lakefs_server_version)
        raise NotImplementedError(
            "LakeFSFileSystem.touch() is not supported for your lakeFS server version. "
            f"minimum required version: '1.3.1', actual version: {version_string!r}"
        )

    super().touch(path=path, truncate=truncate, **kwargs)

tail

tail(path: str | PathLike[str], size: int = 1024) -> bytes

Get the last size bytes from a remote file.

PARAMETER DESCRIPTION
path

The file path to read. Must be a fully qualified lakeFS URI.

TYPE: str | PathLike[str]

size

The amount of bytes to get.

TYPE: int DEFAULT: 1024

RETURNS DESCRIPTION
bytes

The bytes at the end of the requested file.

Source code in src/lakefs_spec/spec.py
def tail(self, path: str | os.PathLike[str], size: int = 1024) -> bytes:
    """
    Get the last ``size`` bytes from a remote file.

    Parameters
    ----------
    path: str | os.PathLike[str]
        The file path to read. Must be a fully qualified lakeFS URI.
    size: int
        The amount of bytes to get.

    Returns
    -------
    bytes
        The bytes at the end of the requested file.
    """
    f: ObjectReader
    with self.open(path, "rb") as f:
        f.seek(max(-size, -f._obj.stat().size_bytes), 2)
        return f.read()