Advanced examples for common use cases with the Sinkove Python SDK. See SDK Installation for setup.

Basic Operations

Create and Download Dataset

import uuid
from sinkove import Client

client = Client(uuid.UUID("your-organization-id"))

dataset = client.datasets.create(
    model_id=uuid.UUID("your-model-id"),
    num_samples=25,
    args={"prompt": "chest x-ray showing cardiomegaly"}
)

dataset.wait()
dataset.download("cardiomegaly_dataset.zip", strategy="replace")
print(f"Dataset {dataset.id} downloaded!")

Monitor Dataset Progress

import time

# Get dataset
dataset = client.datasets.get(uuid.UUID("your-dataset-id"))

# Check status periodically
while not dataset.finished:
    dataset._reload_metadata()
    
    if dataset.metadata:
        print(f"Status: {dataset.state} - Progress: {dataset.metadata.progress}%")
    
    if dataset.ready:
        print("Dataset is ready!")
        break
    elif dataset.state == "FAILED":
        print("Dataset generation failed!")
        break
    
    time.sleep(30)

Filter and Manage Datasets

from datetime import datetime, timedelta

# Get all datasets
datasets = client.datasets.list()

# Filter by state and date
ready_datasets = [d for d in datasets if d.ready]
recent_cutoff = (datetime.now() - timedelta(days=7)).isoformat()
recent_datasets = [d for d in datasets if d.created_at > recent_cutoff]

print(f"Ready: {len(ready_datasets)}, Recent: {len(recent_datasets)}")

Advanced Patterns

Batch Dataset Creation

import concurrent.futures

def create_dataset_with_prompt(client, model_id, prompt, num_samples=20):
    try:
        dataset = client.datasets.create(model_id, num_samples, {"prompt": prompt})
        return {"prompt": prompt, "dataset_id": dataset.id, "success": True}
    except Exception as e:
        return {"prompt": prompt, "error": str(e), "success": False}

# Multiple prompts
prompts = [
    "chest x-ray showing pneumonia",
    "chest x-ray showing cardiomegaly", 
    "normal chest x-ray"
]

client = Client(uuid.UUID("your-organization-id"))
model_id = uuid.UUID("your-model-id")

# Create in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(create_dataset_with_prompt, client, model_id, p) for p in prompts]
    
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        if result["success"]:
            print(f"✓ Created: {result['dataset_id']}")
        else:
            print(f"✗ Failed: {result['error']}")

Robust Download with Retry

def download_with_retry(dataset, output_file, max_retries=3, retry_delay=10):
    for attempt in range(max_retries):
        try:
            if not dataset.ready:
                dataset.wait(timeout=300)
            dataset.download(output_file, strategy="replace")
            return True
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(retry_delay)
    return False

# Usage
success = download_with_retry(dataset, "medical_dataset.zip")

Progress Monitoring with Callbacks

def monitor_progress(dataset, callback, poll_interval=15):
    previous_state, previous_progress = None, -1
    
    while not dataset.finished:
        dataset._reload_metadata()
        state = dataset.state
        progress = dataset.metadata.progress if dataset.metadata else 0
        
        if state != previous_state or progress != previous_progress:
            callback(state, progress)
            previous_state, previous_progress = state, progress
        
        time.sleep(poll_interval)
    
    return dataset.ready

def progress_callback(state, progress):
    timestamp = time.strftime("%H:%M:%S")
    print(f"[{timestamp}] {state}: {progress}%")

# Usage
success = monitor_progress(dataset, progress_callback)

Multiple Organizations

from sinkove.connector import Connector
from sinkove.organizations.client import OrganizationClient

connector = Connector(api_key="your-api-key")
org_client = OrganizationClient(connector)

organizations = org_client.list()
for org in organizations:
    datasets = org.datasets.list()
    ready_count = sum(1 for d in datasets if d.ready)
    print(f"{org.organization_name}: {ready_count}/{len(datasets)} ready")

Error Handling

Comprehensive Error Handling

import logging

def safe_dataset_creation(org_id, model_id, num_samples, args):
    try:
        client = Client(org_id)
        dataset = client.datasets.create(model_id, num_samples, args)
        
        try:
            dataset.wait(timeout=1800)  # 30 minutes
            return dataset, "success" if dataset.ready else "failed"
        except TimeoutError:
            return dataset, "timeout"
            
    except ValueError as e:
        return None, f"config_error: {e}"
    except Exception as e:
        return None, f"error: {e}"

# Usage
dataset, status = safe_dataset_creation(
    uuid.UUID("your-org-id"), 
    uuid.UUID("your-model-id"), 
    50, 
    {"prompt": "chest x-ray"}
)

if status == "success":
    dataset.download("output.zip")
elif dataset and status == "timeout":
    print("Dataset still processing...")
else:
    print(f"Failed: {status}")

API Error Handling

import json

def handle_api_error(error):
    try:
        error_data = json.loads(str(error))
        error_code = error_data.get("code", "UNKNOWN")
        message = error_data.get("message", str(error))
        
        error_messages = {
            "RATE_LIMIT_EXCEEDED": "Rate limit hit. Wait and retry.",
            "INVALID_MODEL": "Invalid model ID provided.",
            "INSUFFICIENT_CREDITS": "Not enough credits."
        }
        
        print(error_messages.get(error_code, f"API Error: {message}"))
    except json.JSONDecodeError:
        print(f"Error: {error}")

# Usage
try:
    dataset = client.datasets.create(uuid.UUID("invalid-id"), 10, {"prompt": "test"})
except Exception as e:
    handle_api_error(e)

Integration Examples

Save Dataset Metadata

import json
from datetime import datetime

def save_dataset_info(dataset, filename="dataset_info.json"):
    info = {
        "dataset_id": str(dataset.id),
        "model_id": str(dataset.model_id), 
        "num_samples": dataset.num_samples,
        "args": dataset.args,
        "state": dataset.state,
        "created_at": dataset.created_at,
        "download_timestamp": datetime.now().isoformat()
    }
    
    with open(filename, "w") as f:
        json.dump(info, f, indent=2)

# Usage
if dataset.ready:
    dataset.download("dataset.zip")
    save_dataset_info(dataset)

Dataset Processing Pipeline

import zipfile
from pathlib import Path

class DatasetPipeline:
    def __init__(self, organization_id, output_dir="./datasets"):
        self.client = Client(organization_id)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
    
    def create_and_process(self, model_id, prompt, num_samples=20):
        # Create and wait
        dataset = self.client.datasets.create(model_id, num_samples, {"prompt": prompt})
        dataset.wait()
        
        # Download and extract
        zip_path = self.output_dir / f"{dataset.id}.zip"
        dataset.download(str(zip_path), strategy="replace")
        
        extract_dir = self.output_dir / str(dataset.id)
        extract_dir.mkdir(exist_ok=True)
        
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)
        
        # Count files
        image_files = list(extract_dir.glob("**/*.png")) + list(extract_dir.glob("**/*.jpg"))
        print(f"Extracted {len(image_files)} images to {extract_dir}")
        
        # Cleanup
        zip_path.unlink()
        return dataset.id, extract_dir

# Usage
pipeline = DatasetPipeline(uuid.UUID("your-organization-id"))
dataset_id, output_path = pipeline.create_and_process(
    uuid.UUID("your-model-id"), 
    "chest x-ray showing pneumonia", 
    50
)

Next Steps