FeaturesUse CasesBlogAPI ReferenceWhy CorePlexMLPricing
Start Free
← Back to Blog
Developer Guide9 min read

The Complete Guide to CorePlexML's Python SDK

CorePlexML Team·

Introduction

CorePlexML is an API-first platform. Every operation available in the web interface, from uploading datasets to deploying models to generating synthetic data, is backed by a REST API. The Python SDK wraps that API into an idiomatic, strongly-typed client library that handles authentication, serialization, error handling, and pagination so you can focus on your ML workflows.

Whether you are automating training pipelines in CI/CD, building custom dashboards, or integrating CorePlexML into a larger data platform, the SDK is your primary integration point.

Installation

Install the SDK from PyPI:

"hl-kw">pip install coreplexml

The SDK requires Python 3.9 or later. It has minimal dependencies: requests for HTTP, pydantic for response models, and no heavy ML frameworks. This keeps the install lightweight and suitable for CI environments.

Authentication

Initialize the client with your API base URL and API key:

from coreplexml import CorePlexMLClient

client = CorePlexMLClient(
    base_url="https://api.coreplexml.io",
    api_key="sk_your_api_key"
)

The client attaches your API key as a Bearer token to every request. For production use, store the API key in an environment variable rather than hardcoding it:

import os

client = CorePlexMLClient(
    base_url=os.environ["COREPLEXML_URL"],
    api_key=os.environ["COREPLEXML_API_KEY"]
)

API keys are scoped to your user account and inherit all project permissions. You can generate and revoke keys from the Account Settings page.

The Six SDK Modules

The SDK is organized into six modules, each corresponding to a major platform capability. All modules follow the same pattern: CRUD operations return dictionaries with typed fields, list operations support pagination, and long-running jobs return a job ID that you can poll.

1. Projects

Projects are the top-level organizational unit. Every dataset, experiment, model, and deployment belongs to a project.

# Create a project
project = client.projects.create(name="Customer Churn Analysis")

# List all projects
projects = client.projects.list()
for p in projects["items"]:
    print(f"{p['id']}: {p['name']}")

# Get project details
project = client.projects.get(project_id="proj_abc123")

# Update a project
client.projects.update(project_id="proj_abc123", name="Churn Analysis v2")

# Delete a project (and all contents)
client.projects.delete(project_id="proj_abc123")

2. Datasets

Datasets support versioned file uploads with automatic schema detection. Each upload creates a new dataset version, preserving full lineage.

# Upload a CSV file
dataset = client.datasets.upload(
    project_id="proj_abc123",
    file_path="/data/customers.csv",
    name="Customer Data"
)
print(f"Dataset: {dataset['id']}, Version: {dataset['version_id']}")

# List dataset versions
versions = client.datasets.list_versions(dataset_id=dataset["id"])

# Get schema for a specific version
schema = client.datasets.get_schema(version_id=dataset["version_id"])
for col in schema["columns"]:
    print(f"  {col['name']}: {col['type']} "
          f"(missing: {col['missing_pct']}%)")

The upload method handles multipart file transfer automatically. Supported formats include CSV, Excel (xlsx), JSON, and XML. The platform infers column types, computes summary statistics, and stores everything in the schema metadata.

3. Experiments

Experiments run AutoML training jobs. You point the experiment at a dataset version and a target column, and the platform tests dozens of algorithms with automated hyperparameter optimization.

# Create an AutoML experiment
experiment = client.experiments.create(
    project_id="proj_abc123",
    dataset_version_id="dv_xyz789",
    target_column="Churn",
    problem_type="classification",
    max_runtime_secs=600,
    max_models=20
)
print(f"Training job: {experiment['job_id']}")

# Wait for training to complete
client.jobs.wait(experiment["job_id"], timeout=1200)

# Get the model leaderboard
leaderboard = client.experiments.get_leaderboard(
    experiment_id=experiment["experiment_id"]
)
for rank, model in enumerate(leaderboard["models"], 1):
    print(f"  #{rank} {model['algorithm']}: "
          f"AUC={model['metrics']['auc']:.4f}")

# Get detailed model information
model = client.experiments.get_model(
    model_id=leaderboard["models"][0]["id"]
)
print(f"Best model: {model['algorithm']}")
print(f"Features used: {len(model['feature_importance'])}")

The max_runtime_secs parameter controls how long AutoML searches for models. Longer runtimes generally produce better results, but with diminishing returns after 10-15 minutes for most datasets.

4. Deployments

Deployments take a trained model and make it available for real-time predictions. CorePlexML supports four deployment strategies with built-in monitoring.

# Deploy the best model with canary strategy
deployment = client.deployments.create(
    project_id="proj_abc123",
    model_id=leaderboard["models"][0]["id"],
    name="Churn Predictor v1",
    strategy="canary",
    canary_percent=10
)
print(f"Deployment ID: {deployment['id']}")

# Make a prediction
prediction = client.deployments.predict(
    deployment_id=deployment["id"],
    features={
        "tenure": 24,
        "MonthlyCharges": 70.0,
        "Contract": "Month-to-month",
        "InternetService": "Fiber optic"
    }
)
print(f"Prediction: {prediction['result']}")
print(f"Confidence: {prediction['probability']:.2f}")

# Get deployment metrics
metrics = client.deployments.get_metrics(
    deployment_id=deployment["id"]
)
print(f"Total predictions: {metrics['total_predictions']}")
print(f"Avg latency: {metrics['avg_latency_ms']}ms")

The four strategies are direct (instant swap), canary (gradual traffic shift), blue_green (parallel environments with atomic switch), and shadow (parallel execution without serving). Choose based on your risk tolerance and traffic volume.

5. Privacy

The Privacy module scans datasets for personally identifiable information (PII) and applies anonymization transforms to comply with regulatory frameworks.

# Scan a dataset for PII
scan = client.privacy.scan(
    dataset_version_id="dv_xyz789"
)
print(f"PII columns found: {scan['pii_count']}")
for finding in scan["findings"]:
    print(f"  {finding['column']}: {finding['pii_type']} "
          f"(confidence: {finding['confidence']:.0%})")

# Apply HIPAA-compliant anonymization
transform = client.privacy.transform(
    dataset_version_id="dv_xyz789",
    profile="HIPAA",
    rules=[
        {"column": "email", "action": "mask"},
        {"column": "ssn", "action": "redact"},
        {"column": "zip_code", "action": "generalize", "level": 3}
    ]
)
print(f"Anonymized version: {transform['output_version_id']}")

# Export audit report
audit = client.privacy.export_audit(
    dataset_version_id="dv_xyz789",
    format="pdf"
)

The Privacy module supports 72+ PII types across four compliance profiles: HIPAA, GDPR, PCI-DSS, and CCPA. Each profile comes with default transformation rules that you can customize per column.

6. SynthGen

SynthGen generates synthetic tabular data that preserves the statistical properties of real data without containing any actual records.

# Train a synthetic data model
model = client.synthgen.create_model(
    dataset_version_id="dv_xyz789",
    engine="CTGAN",
    epochs=300
)
client.jobs.wait(model["job_id"])

# Generate synthetic rows
synthetic = client.synthgen.generate(
    model_id=model["model_id"],
    num_rows=10000
)
print(f"Generated: {synthetic['row_count']} rows")

# Evaluate synthetic data quality
evaluation = client.synthgen.evaluate(
    model_id=model["model_id"],
    metric="ks_test"
)
print(f"Statistical similarity: {evaluation['aggregate_score']:.2f}")

Three engines are available: CTGAN for general-purpose generation, CopulaGAN for correlation-sensitive data, and TVAE for fast generation on large datasets.

Error Handling

The SDK raises typed exceptions that map to specific API error conditions. This makes it straightforward to handle different failure modes in your automation code.

from coreplexml.exceptions import (
    AuthenticationError,
    NotFoundError,
    ValidationError,
    RateLimitError,
    ServerError
)

try:
    prediction = client.deployments.predict(
        deployment_id="dep_nonexistent",
        features={"tenure": 24}
    )
except AuthenticationError:
    print("Invalid or expired API key")
except NotFoundError as e:
    print(f"Resource not found: {e.resource_id}")
except ValidationError as e:
    print(f"Invalid input: {e.details}")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")
except ServerError:
    print("Platform error. Retry or contact support")

For transient errors like rate limits and server errors, implement a retry strategy:

import time

def predict_with_retry(client, deployment_id, features, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.deployments.predict(
                deployment_id=deployment_id,
                features=features
            )
        except RateLimitError as e:
            if attempt < max_retries - 1:
                time.sleep(e.retry_after)
            else:
                raise
        except ServerError:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

CI/CD Integration

The SDK is designed for pipeline automation. Here is a GitHub Actions workflow that trains a model, validates it, and deploys it if the metrics pass a quality gate.

name: ML Pipeline
on:
  push:
    branches: [main]
    paths: ["data/**", "config/**"]

jobs:
  train-and-deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install SDK
        run: pip install coreplexml

      - name: Train Model
        env:
          COREPLEXML_URL: ${{ secrets.COREPLEXML_URL }}
          COREPLEXML_API_KEY: ${{ secrets.COREPLEXML_API_KEY }}
        run: python scripts/train.py

      - name: Validate Model
        run: python scripts/validate.py

      - name: Deploy Model
        if: success()
        run: python scripts/deploy.py

The validation script (validate.py) checks that the new model meets minimum performance thresholds before deployment proceeds:

import os
import sys
from coreplexml import CorePlexMLClient

client = CorePlexMLClient(
    base_url=os.environ["COREPLEXML_URL"],
    api_key=os.environ["COREPLEXML_API_KEY"]
)

experiment_id = os.environ["EXPERIMENT_ID"]
leaderboard = client.experiments.get_leaderboard(experiment_id)
best_model = leaderboard["models"][0]

auc = best_model["metrics"]["auc"]
min_auc = float(os.environ.get("MIN_AUC", "0.80"))

if auc < min_auc:
    print(f"Model AUC {auc:.4f} below threshold {min_auc}")
    sys.exit(1)

print(f"Model passed validation: AUC={auc:.4f}")

Environment-Based Configuration

Use different API endpoints and keys for each environment:

import os

ENV = os.environ.get("DEPLOY_ENV", "dev")

config = {
    "dev": {
        "url": "https://dev.coreplexml.io",
        "key": os.environ.get("COREPLEXML_DEV_KEY")
    },
    "staging": {
        "url": "https://staging.coreplexml.io",
        "key": os.environ.get("COREPLEXML_STAGING_KEY")
    },
    "prod": {
        "url": "https://api.coreplexml.io",
        "key": os.environ.get("COREPLEXML_PROD_KEY")
    }
}

client = CorePlexMLClient(
    base_url=config[ENV]["url"],
    api_key=config[ENV]["key"]
)

Tips

Use environment variables for API keys. Never commit API keys to version control. The SDK reads credentials from the constructor arguments, so you have full control over how they are sourced.

Batch predictions for throughput. If you need to score many records, use the batch prediction job endpoint rather than calling the single-prediction endpoint in a loop. Batch jobs run server-side and avoid HTTP round-trip overhead.

Poll job status for long operations. Training and synthetic data generation are asynchronous. Use client.jobs.wait() for simple scripts, or implement your own polling loop with client.jobs.get() for more control over timeout and retry behavior.

Pin your SDK version. Use coreplexml==X.Y.Z in your requirements file rather than an unpinned install. This prevents unexpected breaking changes in CI/CD pipelines when a new SDK version is released.

Check rate limits. The API enforces per-key rate limits. The SDK surfaces rate limit errors with a retry_after field. In production pipelines, always implement retry logic for rate limit and transient server errors.

The SDK puts the full power of the CorePlexML platform into your Python scripts, notebooks, and CI/CD pipelines. Combined with the web interface for exploration and the API for custom integrations, it gives you multiple ways to interact with every capability the platform offers.

New SDK Modules (v2.4+)

Recent platform releases have added several new SDK modules that extend the client into advanced MLOps territory:

Model Registry

Manage model versions with semantic versioning and stage transitions:

version = client.registry.create_version(
    project_id="proj_abc",
    model_id="mod_xgb_v2",
    version="1.2.0",
    model_card={"description": "Improved feature set", "metrics": {"auc": 0.94}}
)

client.registry.transition_stage(version["id"], stage="production")
versions = client.registry.list_versions(project_id="proj_abc")

A/B Testing

Create and monitor experiments between model variants:

test = client.ab_tests.create(
    project_id="proj_abc",
    model_a_id="mod_v1",
    model_b_id="mod_v2",
    traffic_split_a=50,
    primary_metric="accuracy",
    min_sample_size=1000
)

results = client.ab_tests.get_results(test["id"])
if results["is_significant"]:
    client.ab_tests.declare_winner(test["id"], variant="B")

Alerts and Monitoring

Configure alert rules with multi-channel notifications:

channel = client.alerts.create_channel(
    name="Slack Ops",
    channel_type="slack",
    config={"webhook_url": "https://hooks.slack.com/..."}
)

rule = client.alerts.create_rule(
    deployment_id="dep_prod",
    name="Drift Alert",
    metric="drift_psi",
    operator="gt",
    threshold=0.2,
    severity="critical",
    channel_ids=[channel["id"]]
)

Batch and Streaming Predictions

Run predictions at scale with async processing and WebSocket streaming:

job = client.predictions.create(
    deployment_id="dep_prod",
    file_path="batch_input.csv"
)
result = client.predictions.wait(job["id"])
client.predictions.download(job["id"], "predictions.csv")

for row in client.streaming.predict(deployment_id="dep_prod", data=records):
    process(row["prediction"])

For the complete SDK reference, see our Python SDK page and the API documentation.