import datetime
import logging
import math
import os.path
import re
import time
from typing import Optional, List
from hbutils.string import plural_word
from huggingface_hub import CommitOperationAdd, CommitOperationDelete
from .base import RepoTypeTyping, get_hf_client, list_files_in_repository, _IGNORE_PATTERN_UNSET
from ..archive import get_archive_type, archive_pack
from ..utils import walk_files, TemporaryDirectory, tqdm
[docs]def upload_file_to_file(local_file, repo_id: str, file_in_repo: str,
repo_type: RepoTypeTyping = 'dataset', revision: str = 'main',
message: Optional[str] = None, hf_token: Optional[str] = None):
"""
Upload a local file to a specified path in a Hugging Face repository.
:param local_file: The local file path to be uploaded.
:type local_file: str
:param repo_id: The identifier of the repository.
:type repo_id: str
:param file_in_repo: The file path within the repository.
:type file_in_repo: str
:param repo_type: The type of the repository ('dataset', 'model', 'space').
:type repo_type: RepoTypeTyping
:param revision: The revision of the repository (e.g., branch, tag, commit hash).
:type revision: str
:param message: The commit message for the upload.
:type message: Optional[str]
:param hf_token: Huggingface token for API client, use ``HF_TOKEN`` variable if not assigned.
:type hf_token: str, optional
"""
hf_client = get_hf_client(hf_token)
hf_client.upload_file(
repo_id=repo_id,
repo_type=repo_type,
path_or_fileobj=local_file,
path_in_repo=file_in_repo,
revision=revision,
commit_message=message,
)
[docs]def upload_directory_as_archive(local_directory, repo_id: str, archive_in_repo: str,
repo_type: RepoTypeTyping = 'dataset', revision: str = 'main',
message: Optional[str] = None, silent: bool = False,
hf_token: Optional[str] = None):
"""
Upload a local directory as an archive file to a specified path in a Hugging Face repository.
:param local_directory: The local directory path to be uploaded.
:type local_directory: str
:param repo_id: The identifier of the repository.
:type repo_id: str
:param archive_in_repo: The archive file path within the repository.
:type archive_in_repo: str
:param repo_type: The type of the repository ('dataset', 'model', 'space').
:type repo_type: RepoTypeTyping
:param revision: The revision of the repository (e.g., branch, tag, commit hash).
:type revision: str
:param message: The commit message for the upload.
:type message: Optional[str]
:param silent: If True, suppress progress bar output.
:type silent: bool
:param hf_token: Huggingface token for API client, use ``HF_TOKEN`` variable if not assigned.
:type hf_token: str, optional
"""
archive_type = get_archive_type(archive_in_repo)
with TemporaryDirectory() as td:
local_archive_file = os.path.join(td, os.path.basename(archive_in_repo))
archive_pack(archive_type, local_directory, local_archive_file, silent=silent)
upload_file_to_file(local_archive_file, repo_id, archive_in_repo,
repo_type, revision, message, hf_token=hf_token)
_PATH_SEP = re.compile(r'[/\\]+')
[docs]def upload_directory_as_directory(
local_directory, repo_id: str, path_in_repo: str,
repo_type: RepoTypeTyping = 'dataset', revision: str = 'main',
message: Optional[str] = None, time_suffix: bool = True,
clear: bool = False, ignore_patterns: List[str] = _IGNORE_PATTERN_UNSET,
hf_token: Optional[str] = None, operation_chunk_size: Optional[int] = None,
upload_timespan: float = 5.0,
):
"""
Upload a local directory and its files to a specified path in a Hugging Face repository.
:param local_directory: The local directory path to be uploaded.
:type local_directory: str
:param repo_id: The identifier of the repository.
:type repo_id: str
:param path_in_repo: The directory path within the repository.
:type path_in_repo: str
:param repo_type: The type of the repository ('dataset', 'model', 'space').
:type repo_type: RepoTypeTyping
:param revision: The revision of the repository (e.g., branch, tag, commit hash).
:type revision: str
:param message: The commit message for the upload.
:type message: Optional[str]
:param time_suffix: If True, append a timestamp to the commit message.
:type time_suffix: bool
:param clear: If True, remove files in the repository not present in the local directory.
:type clear: bool
:param ignore_patterns: List of file patterns to ignore.
:type ignore_patterns: List[str]
:param hf_token: Huggingface token for API client, use ``HF_TOKEN`` variable if not assigned.
:type hf_token: str, optional
:param operation_chunk_size: Chunk size of the operations. All the operations will be
seperated into multiple commits when this is set.
:type operation_chunk_size: Optional[int]
:param upload_timespan: Upload minimal time interval when chunked uploading enabled.
:type upload_timespan: float
.. note::
When `operation_chunk_size` is set, multiple commits will be created. When some commits failed,
it will roll back to the startup commit, using :func:`hfutils.repository.hf_hub_rollback` function..
.. warning::
When `operation_chunk_size` is set, multiple commits will be created. But HuggingFace's repository
api cannot guarantee the atomic feature of your data. So **this function is not thread-safe**.
.. note::
The rate limit of HuggingFace repository commit creation is approximately 120 commits / hour.
So if you really have large number of chunks to create, please set the `upload_timespan` to a value
no less than `30.0` to make sure your uploading will not be rate-limited.
"""
hf_client = get_hf_client(hf_token)
if clear:
pre_exist_files = {
tuple(file.split('/')) for file in
list_files_in_repository(
repo_id=repo_id,
repo_type=repo_type,
subdir=path_in_repo,
revision=revision,
ignore_patterns=ignore_patterns,
hf_token=hf_token
)
}
else:
pre_exist_files = set()
operations = []
for file in walk_files(local_directory):
segments = tuple(seg for seg in _PATH_SEP.split(file) if seg)
if segments in pre_exist_files:
pre_exist_files.remove(segments)
operations.append(CommitOperationAdd(
path_or_fileobj=os.path.join(local_directory, file),
path_in_repo=f'{path_in_repo}/{"/".join(segments)}',
))
for segments in sorted(pre_exist_files):
operations.append(CommitOperationDelete(
path_in_repo=f'{path_in_repo}/{"/".join(segments)}',
))
current_time = datetime.datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %Z')
commit_message = message or f'Upload directory {os.path.basename(os.path.abspath(local_directory))!r}'
if time_suffix:
commit_message = f'{commit_message}, on {current_time}'
if operation_chunk_size:
initial_commit_id = hf_client.list_repo_commits(
repo_id=repo_id,
repo_type=repo_type,
revision=revision
)[0].commit_id
last_upload_at = None
try:
total_chunks = int(math.ceil(len(operations) / operation_chunk_size))
for chunk_id in tqdm(range(total_chunks), desc='Chunked Commits'):
operation_chunk = operations[chunk_id * operation_chunk_size:(chunk_id + 1) * operation_chunk_size]
# sleep for the given time
if last_upload_at:
sleep_time = last_upload_at + upload_timespan - time.time()
if sleep_time > 0:
logging.info(f'Sleep for {sleep_time:.1f}s due to the timespan limitation ...')
time.sleep(sleep_time)
last_upload_at = time.time()
logging.info(f'Uploading chunk #{chunk_id + 1}, '
f'with {plural_word(len(operation_chunk), "operation")} ...')
hf_client.create_commit(
repo_id=repo_id,
repo_type=repo_type,
revision=revision,
operations=operation_chunk,
commit_message=f'[Chunk #{chunk_id + 1}/{total_chunks}] {commit_message}',
)
except Exception:
from ..repository import hf_hub_rollback
logging.error(f'Error found when executing chunked uploading, '
f'revision {revision!r} will rollback to {initial_commit_id!r} ...')
hf_hub_rollback(
repo_id=repo_id,
repo_type=repo_type,
revision=revision,
rollback_to=initial_commit_id,
hf_token=hf_token,
)
raise
else:
hf_client.create_commit(
repo_id=repo_id,
repo_type=repo_type,
revision=revision,
operations=operations,
commit_message=commit_message,
)