Practical examples and advanced patterns for the Sinkove Python SDK
import uuid
from sinkove import Client
client = Client(uuid.UUID("your-organization-id"))
dataset = client.datasets.create(
model_id=uuid.UUID("your-model-id"),
num_samples=25,
args={"prompt": "chest x-ray showing cardiomegaly"}
)
dataset.wait()
dataset.download("cardiomegaly_dataset.zip", strategy="replace")
print(f"Dataset {dataset.id} downloaded!")
import time
# Get dataset
dataset = client.datasets.get(uuid.UUID("your-dataset-id"))
# Check status periodically
while not dataset.finished:
dataset._reload_metadata()
if dataset.metadata:
print(f"Status: {dataset.state} - Progress: {dataset.metadata.progress}%")
if dataset.ready:
print("Dataset is ready!")
break
elif dataset.state == "FAILED":
print("Dataset generation failed!")
break
time.sleep(30)
from datetime import datetime, timedelta
# Get all datasets
datasets = client.datasets.list()
# Filter by state and date
ready_datasets = [d for d in datasets if d.ready]
recent_cutoff = (datetime.now() - timedelta(days=7)).isoformat()
recent_datasets = [d for d in datasets if d.created_at > recent_cutoff]
print(f"Ready: {len(ready_datasets)}, Recent: {len(recent_datasets)}")
import concurrent.futures
def create_dataset_with_prompt(client, model_id, prompt, num_samples=20):
try:
dataset = client.datasets.create(model_id, num_samples, {"prompt": prompt})
return {"prompt": prompt, "dataset_id": dataset.id, "success": True}
except Exception as e:
return {"prompt": prompt, "error": str(e), "success": False}
# Multiple prompts
prompts = [
"chest x-ray showing pneumonia",
"chest x-ray showing cardiomegaly",
"normal chest x-ray"
]
client = Client(uuid.UUID("your-organization-id"))
model_id = uuid.UUID("your-model-id")
# Create in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(create_dataset_with_prompt, client, model_id, p) for p in prompts]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result["success"]:
print(f"✓ Created: {result['dataset_id']}")
else:
print(f"✗ Failed: {result['error']}")
def download_with_retry(dataset, output_file, max_retries=3, retry_delay=10):
for attempt in range(max_retries):
try:
if not dataset.ready:
dataset.wait(timeout=300)
dataset.download(output_file, strategy="replace")
return True
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
return False
# Usage
success = download_with_retry(dataset, "medical_dataset.zip")
def monitor_progress(dataset, callback, poll_interval=15):
previous_state, previous_progress = None, -1
while not dataset.finished:
dataset._reload_metadata()
state = dataset.state
progress = dataset.metadata.progress if dataset.metadata else 0
if state != previous_state or progress != previous_progress:
callback(state, progress)
previous_state, previous_progress = state, progress
time.sleep(poll_interval)
return dataset.ready
def progress_callback(state, progress):
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] {state}: {progress}%")
# Usage
success = monitor_progress(dataset, progress_callback)
from sinkove.connector import Connector
from sinkove.organizations.client import OrganizationClient
connector = Connector(api_key="your-api-key")
org_client = OrganizationClient(connector)
organizations = org_client.list()
for org in organizations:
datasets = org.datasets.list()
ready_count = sum(1 for d in datasets if d.ready)
print(f"{org.organization_name}: {ready_count}/{len(datasets)} ready")
import logging
def safe_dataset_creation(org_id, model_id, num_samples, args):
try:
client = Client(org_id)
dataset = client.datasets.create(model_id, num_samples, args)
try:
dataset.wait(timeout=1800) # 30 minutes
return dataset, "success" if dataset.ready else "failed"
except TimeoutError:
return dataset, "timeout"
except ValueError as e:
return None, f"config_error: {e}"
except Exception as e:
return None, f"error: {e}"
# Usage
dataset, status = safe_dataset_creation(
uuid.UUID("your-org-id"),
uuid.UUID("your-model-id"),
50,
{"prompt": "chest x-ray"}
)
if status == "success":
dataset.download("output.zip")
elif dataset and status == "timeout":
print("Dataset still processing...")
else:
print(f"Failed: {status}")
import json
def handle_api_error(error):
try:
error_data = json.loads(str(error))
error_code = error_data.get("code", "UNKNOWN")
message = error_data.get("message", str(error))
error_messages = {
"RATE_LIMIT_EXCEEDED": "Rate limit hit. Wait and retry.",
"INVALID_MODEL": "Invalid model ID provided.",
"INSUFFICIENT_CREDITS": "Not enough credits."
}
print(error_messages.get(error_code, f"API Error: {message}"))
except json.JSONDecodeError:
print(f"Error: {error}")
# Usage
try:
dataset = client.datasets.create(uuid.UUID("invalid-id"), 10, {"prompt": "test"})
except Exception as e:
handle_api_error(e)
import json
from datetime import datetime
def save_dataset_info(dataset, filename="dataset_info.json"):
info = {
"dataset_id": str(dataset.id),
"model_id": str(dataset.model_id),
"num_samples": dataset.num_samples,
"args": dataset.args,
"state": dataset.state,
"created_at": dataset.created_at,
"download_timestamp": datetime.now().isoformat()
}
with open(filename, "w") as f:
json.dump(info, f, indent=2)
# Usage
if dataset.ready:
dataset.download("dataset.zip")
save_dataset_info(dataset)
import zipfile
from pathlib import Path
class DatasetPipeline:
def __init__(self, organization_id, output_dir="./datasets"):
self.client = Client(organization_id)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def create_and_process(self, model_id, prompt, num_samples=20):
# Create and wait
dataset = self.client.datasets.create(model_id, num_samples, {"prompt": prompt})
dataset.wait()
# Download and extract
zip_path = self.output_dir / f"{dataset.id}.zip"
dataset.download(str(zip_path), strategy="replace")
extract_dir = self.output_dir / str(dataset.id)
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Count files
image_files = list(extract_dir.glob("**/*.png")) + list(extract_dir.glob("**/*.jpg"))
print(f"Extracted {len(image_files)} images to {extract_dir}")
# Cleanup
zip_path.unlink()
return dataset.id, extract_dir
# Usage
pipeline = DatasetPipeline(uuid.UUID("your-organization-id"))
dataset_id, output_path = pipeline.create_and_process(
uuid.UUID("your-model-id"),
"chest x-ray showing pneumonia",
50
)