Skip to main content
Advanced examples for common use cases with the Sinkove Python SDK. See SDK Installation for setup.

Basic Operations

Create and Download Dataset

import uuid
from sinkove import Client

client = Client(uuid.UUID("your-organization-id"))

dataset = client.datasets.create(
    model_id=uuid.UUID("your-model-id"),
    num_samples=25,
    args={"prompt": "chest x-ray showing cardiomegaly"}
)

dataset.wait()
dataset.download("cardiomegaly_dataset.zip", strategy="replace")
print(f"Dataset {dataset.id} downloaded!")

Monitor Dataset Progress

import time

# Get dataset
dataset = client.datasets.get(uuid.UUID("your-dataset-id"))

# Check status periodically
while not dataset.finished:
    dataset._reload_metadata()
    
    if dataset.metadata:
        print(f"Status: {dataset.state} - Progress: {dataset.metadata.progress}%")
    
    if dataset.ready:
        print("Dataset is ready!")
        break
    elif dataset.state == "FAILED":
        print("Dataset generation failed!")
        break
    
    time.sleep(30)

Filter and Manage Datasets

from datetime import datetime, timedelta

# Get all datasets
datasets = client.datasets.list()

# Filter by state and date
ready_datasets = [d for d in datasets if d.ready]
recent_cutoff = (datetime.now() - timedelta(days=7)).isoformat()
recent_datasets = [d for d in datasets if d.created_at > recent_cutoff]

print(f"Ready: {len(ready_datasets)}, Recent: {len(recent_datasets)}")

Advanced Patterns

Batch Dataset Creation

import concurrent.futures

def create_dataset_with_prompt(client, model_id, prompt, num_samples=20):
    try:
        dataset = client.datasets.create(model_id, num_samples, {"prompt": prompt})
        return {"prompt": prompt, "dataset_id": dataset.id, "success": True}
    except Exception as e:
        return {"prompt": prompt, "error": str(e), "success": False}

# Multiple prompts
prompts = [
    "chest x-ray showing pneumonia",
    "chest x-ray showing cardiomegaly", 
    "normal chest x-ray"
]

client = Client(uuid.UUID("your-organization-id"))
model_id = uuid.UUID("your-model-id")

# Create in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(create_dataset_with_prompt, client, model_id, p) for p in prompts]
    
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        if result["success"]:
            print(f"✓ Created: {result['dataset_id']}")
        else:
            print(f"✗ Failed: {result['error']}")

Robust Download with Retry

def download_with_retry(dataset, output_file, max_retries=3, retry_delay=10):
    for attempt in range(max_retries):
        try:
            if not dataset.ready:
                dataset.wait(timeout=300)
            dataset.download(output_file, strategy="replace")
            return True
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(retry_delay)
    return False

# Usage
success = download_with_retry(dataset, "medical_dataset.zip")

Progress Monitoring with Callbacks

def monitor_progress(dataset, callback, poll_interval=15):
    previous_state, previous_progress = None, -1
    
    while not dataset.finished:
        dataset._reload_metadata()
        state = dataset.state
        progress = dataset.metadata.progress if dataset.metadata else 0
        
        if state != previous_state or progress != previous_progress:
            callback(state, progress)
            previous_state, previous_progress = state, progress
        
        time.sleep(poll_interval)
    
    return dataset.ready

def progress_callback(state, progress):
    timestamp = time.strftime("%H:%M:%S")
    print(f"[{timestamp}] {state}: {progress}%")

# Usage
success = monitor_progress(dataset, progress_callback)

Multiple Organizations

from sinkove.connector import Connector
from sinkove.organizations.client import OrganizationClient

connector = Connector(api_key="your-api-key")
org_client = OrganizationClient(connector)

organizations = org_client.list()
for org in organizations:
    datasets = org.datasets.list()
    ready_count = sum(1 for d in datasets if d.ready)
    print(f"{org.organization_name}: {ready_count}/{len(datasets)} ready")

Error Handling

Comprehensive Error Handling

import logging

def safe_dataset_creation(org_id, model_id, num_samples, args):
    try:
        client = Client(org_id)
        dataset = client.datasets.create(model_id, num_samples, args)
        
        try:
            dataset.wait(timeout=1800)  # 30 minutes
            return dataset, "success" if dataset.ready else "failed"
        except TimeoutError:
            return dataset, "timeout"
            
    except ValueError as e:
        return None, f"config_error: {e}"
    except Exception as e:
        return None, f"error: {e}"

# Usage
dataset, status = safe_dataset_creation(
    uuid.UUID("your-org-id"), 
    uuid.UUID("your-model-id"), 
    50, 
    {"prompt": "chest x-ray"}
)

if status == "success":
    dataset.download("output.zip")
elif dataset and status == "timeout":
    print("Dataset still processing...")
else:
    print(f"Failed: {status}")

API Error Handling

import json

def handle_api_error(error):
    try:
        error_data = json.loads(str(error))
        error_code = error_data.get("code", "UNKNOWN")
        message = error_data.get("message", str(error))
        
        error_messages = {
            "RATE_LIMIT_EXCEEDED": "Rate limit hit. Wait and retry.",
            "INVALID_MODEL": "Invalid model ID provided.",
            "INSUFFICIENT_CREDITS": "Not enough credits."
        }
        
        print(error_messages.get(error_code, f"API Error: {message}"))
    except json.JSONDecodeError:
        print(f"Error: {error}")

# Usage
try:
    dataset = client.datasets.create(uuid.UUID("invalid-id"), 10, {"prompt": "test"})
except Exception as e:
    handle_api_error(e)

Integration Examples

Save Dataset Metadata

import json
from datetime import datetime

def save_dataset_info(dataset, filename="dataset_info.json"):
    info = {
        "dataset_id": str(dataset.id),
        "model_id": str(dataset.model_id), 
        "num_samples": dataset.num_samples,
        "args": dataset.args,
        "state": dataset.state,
        "created_at": dataset.created_at,
        "download_timestamp": datetime.now().isoformat()
    }
    
    with open(filename, "w") as f:
        json.dump(info, f, indent=2)

# Usage
if dataset.ready:
    dataset.download("dataset.zip")
    save_dataset_info(dataset)

Dataset Processing Pipeline

import zipfile
from pathlib import Path

class DatasetPipeline:
    def __init__(self, organization_id, output_dir="./datasets"):
        self.client = Client(organization_id)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
    
    def create_and_process(self, model_id, prompt, num_samples=20):
        # Create and wait
        dataset = self.client.datasets.create(model_id, num_samples, {"prompt": prompt})
        dataset.wait()
        
        # Download and extract
        zip_path = self.output_dir / f"{dataset.id}.zip"
        dataset.download(str(zip_path), strategy="replace")
        
        extract_dir = self.output_dir / str(dataset.id)
        extract_dir.mkdir(exist_ok=True)
        
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)
        
        # Count files
        image_files = list(extract_dir.glob("**/*.png")) + list(extract_dir.glob("**/*.jpg"))
        print(f"Extracted {len(image_files)} images to {extract_dir}")
        
        # Cleanup
        zip_path.unlink()
        return dataset.id, extract_dir

# Usage
pipeline = DatasetPipeline(uuid.UUID("your-organization-id"))
dataset_id, output_path = pipeline.create_and_process(
    uuid.UUID("your-model-id"), 
    "chest x-ray showing pneumonia", 
    50
)

Next Steps