Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add export with filters #198

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ If your data is hosted in Google Cloud Storage (GCS), you can write a Python scr

## Export

* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio.
* [Export with filters](export_with_filters.py) - This example shows how to use the simplest version of exporting data with filters.
* [Export snapshots](export_snapshots.py) - This example shows how to export, check export status and download JSON shapshots from Label Studio. This is detailed code on how to use snapshots. It includes the following steps:
* Create a snapshot
* Check the snapshot status
* Download the snapshot


## Machine Learning
Expand Down
45 changes: 45 additions & 0 deletions examples/export_with_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
""" Export a snapshot with tasks filtered by ID range.

Note: at this moment it's not possible to export snapshots with filters,
LS API doesn't support it yet. However, it's achievable by creating a view
with a filter and then exporting a snapshot using this view.
This approach is hidden behind the `project.export()` method.
"""

from label_studio_sdk import Client
from label_studio_sdk.data_manager import Filters, Operator, Type, Column


# Usage example
host = 'https://app.heartex.com'
api_key = '<your_api_key>'
project_id = 14528
start_id = 10000
end_id = 20000

# Create a filter for task ID range
filters = Filters.create(
Filters.AND,
[
Filters.item(
Column.inner_id,
Operator.GREATER_OR_EQUAL,
Type.Number,
Filters.value(start_id),
),
Filters.item(
Column.inner_id,
Operator.LESS,
Type.Number,
Filters.value(end_id),
),
],
)

print('Export started ...')
ls = Client(url=host, api_key=api_key)
project = ls.get_project(project_id)
result = project.export(filters=filters, export_type="JSON", output_dir='exported')
print(
f"Export file saved as: exported/{result['filename']}, status: {result['status']}, export_id: {result['export_id']}"
)
131 changes: 92 additions & 39 deletions examples/migrate_ls_to_ls/migrate-ls-to-ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import time

from label_studio_sdk import Client
from label_studio_sdk.data_manager import Filters, Operator, Type, Column
from label_studio_sdk.users import User

logger = logging.getLogger("migration-ls-to-ls")
logger.setLevel(logging.DEBUG)

CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', 1000))
DEFAULT_STORAGE = os.getenv('DEFAULT_STORAGE', '') # 's3', 'gcs' or 'azure'
DEFAULT_STORAGE_REGEX = os.getenv(
'DEFAULT_STORAGE_REGEX', '.*'
Expand Down Expand Up @@ -45,6 +48,7 @@ def __init__(self, src_url, src_key, dst_url, dst_key, dest_workspace):
:param src_key: source Label Studio token
:param dst_url: destination Label Studio instance
:param dst_key: destination Label Studio token
:param dest_workspace: destination workspace id
"""
# Connect to the Label Studio API and check the connection
self.src_ls = Client(url=src_url, api_key=src_key)
Expand Down Expand Up @@ -137,33 +141,47 @@ def run(self, project_ids=None):
self.create_users(users)

# start exporting projects
success = 0
for project in projects:
logger.info(
f'Going to create export snapshot for project {project.id} {project.params["title"]}'
)
status, filename = self.export_snapshot(project)
logger.info(
f"Snapshot for project {project.id} created with status {status} and filename {filename}"
success += self.migrate_project(project)

logger.info(
f"Projects are processed, finishing with {success} successful and {len(projects)} total projects"
)

def migrate_project(self, project):
filenames = self.export_chunked_snapshots(project)
if not filenames:
logger.error(
f"No exported files found: skipping project {project.id}. Maybe project is empty?"
)
self.patch_snapshot_users(filename)
return False

if status != 200:
logger.info(f"Skipping project {project.id} because of errors {status}")
continue
logger.info(f"Patching snapshot users for project {project.id}")
for filename in filenames:
self.patch_snapshot_users(filename)

if self.dest_workspace is not None:
project.params["workspace"] = self.dest_workspace
new_project = self.create_project(project)
logger.info(f"New project creation for project {project.id}")
label_config = str(project.label_config)
project.params["label_config"] = '<View></View>'
new_project = self.create_project(project)

logger.info(f"Going to import {filename} to project {new_project.id}")
logger.info(f"Going to import {filenames} to project {new_project.id}")
for filename in filenames:
new_project.import_tasks(filename)
logger.info(f"Import {filename} finished for project {new_project.id}")
time.sleep(1)

self.add_default_import_storage(new_project)

logger.info("All projects are processed, finish")
project.set_params(label_config=label_config)
self.add_default_import_storage(new_project)
return True

def create_project(self, project):
if self.dest_workspace is not None:
project.params["workspace"] = self.dest_workspace
else:
project.params.pop("workspace", None)

logger.info(
f'Going to create a new project "{project.params["title"]}" from old project {project.id}'
)
Expand Down Expand Up @@ -273,30 +291,66 @@ def create_users(self, users: [User]):
logger.info(f"Created users: {[u.email for u in new_users]}")
return new_users

def export_snapshot(self, project):
"""Export all tasks from the project"""
# create new export snapshot
export_result = project.export_snapshot_create(
title="Migration snapshot",
serialization_options_drafts=False,
serialization_options_annotations__completed_by=False,
serialization_options_predictions=False,
)
assert "id" in export_result
export_id = export_result["id"]
def export_chunked_snapshots(self, project):
"""Export all tasks from the project in chunks and return filenames for each chunk"""

# wait until snapshot is ready
while project.export_snapshot_status(export_id).is_in_progress():
time.sleep(1.0)
logger.info(f'Creating chunked snapshots for project {project.id}')
file_size, filenames, chunk_i = 100, [], 0

# download snapshot file
status, file_name = project.export_snapshot_download(
export_id, export_type="JSON"
while file_size > 2: # 2 bytes is an empty json file
start_id = CHUNK_SIZE * chunk_i
end_id = CHUNK_SIZE * (chunk_i + 1)
logger.info(
f"Exporting chunk {chunk_i} from {start_id} to {end_id} tasks for project {project.id}"
)

# create a filters for inner ID range to split tasks into chunks
filters = self.inner_id_range_filter(start_id, end_id)

# create new export and save to disk
output_dir = "snapshots"
result = project.export(
filters=filters,
title=f"Migration snapshot for chunk {chunk_i}",
serialization_options_drafts=False,
serialization_options_annotations__completed_by=False,
serialization_options_predictions=False,
output_dir=output_dir,
)
status, filename = result["status"], result["filename"]
if status != 200:
logger.info(
f"Error while exporting chunk {chunk_i}: {status}, skipping export"
)
return []

chunk_i += 1
filename = os.path.join(output_dir, filename)
file_size = os.path.getsize(filename)
if file_size > 2:
filenames.append(filename)

return filenames

def inner_id_range_filter(self, start_id, end_id):
filters = Filters.create(
Filters.AND,
[
Filters.item(
Column.inner_id,
Operator.GREATER_OR_EQUAL,
Type.Number,
Filters.value(start_id),
),
Filters.item(
Column.inner_id,
Operator.LESS,
Type.Number,
Filters.value(end_id),
),
],
)
assert status == 200
assert file_name is not None
logger.info(f"Status of the export is {status}. File name is {file_name}")
return status, file_name
return filters

def patch_snapshot_users(self, filename):
"""
Expand Down Expand Up @@ -390,7 +444,6 @@ def run():
dst_key=args.dst_key,
dest_workspace=args.dest_workspace,
)
logging.basicConfig(level=logging.INFO)

project_ids = (
[int(i) for i in args.project_ids.split(",")] if args.project_ids else None
Expand Down
6 changes: 4 additions & 2 deletions label_studio_sdk/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class Column:

id = "tasks:id"
"""Task ID"""
inner_id = "tasks:inner_id"
"""Task Inner ID, it starts from 1 for all projects"""
ground_truth = "tasks:ground_truth"
"""Ground truth status of the tasks"""
annotations_results = "tasks:annotations_results"
Expand All @@ -191,9 +193,9 @@ class Column:
file_upload = "tasks:file_upload"
"""Name of the file uploaded to create the tasks"""
created_at = "tasks:created_at"
"""Time the task was updated at (e.g. new annotation was created, review added, etc)"""
created_at = "tasks:updated_at"
"""Time the task was created at"""
updated_at = "tasks:updated_at"
"""Time the task was updated at (e.g. new annotation was created, review added, etc)"""
annotators = "tasks:annotators"
"""Annotators that completed the task (Community). Can include assigned annotators (Enterprise only)"""
total_predictions = "tasks:total_predictions"
Expand Down
Loading