La Guia Completa del Python SDK de CorePlexML
Introduccion
CorePlexML es una plataforma API-first. Cada operacion disponible en la interfaz web, desde la carga de datasets hasta el despliegue de modelos y la generacion de datos sinteticos, esta respaldada por una API REST. El Python SDK envuelve esa API en una biblioteca cliente idiomatica y fuertemente tipada que maneja la autenticacion, la serializacion, el manejo de errores y la paginacion para que puedas enfocarte en tus flujos de trabajo de ML.
Ya sea que estes automatizando pipelines de entrenamiento en CI/CD, construyendo dashboards personalizados o integrando CorePlexML en una plataforma de datos mas grande, el SDK es tu punto de integracion principal.
Instalacion
Instala el SDK desde PyPI:
"hl-kw">pip install coreplexml
El SDK requiere Python 3.9 o posterior. Tiene dependencias minimas: requests para HTTP, pydantic para modelos de respuesta, y ningun framework pesado de ML. Esto mantiene la instalacion liviana y adecuada para entornos de CI.
Autenticacion
Inicializa el cliente con la URL base de tu API y tu clave API:
from coreplexml import CorePlexMLClient
client = CorePlexMLClient(
base_url="https://api.coreplexml.io",
api_key="sk_your_api_key"
)
El cliente adjunta tu clave API como un Bearer token a cada solicitud. Para uso en produccion, almacena la clave API en una variable de entorno en lugar de codificarla directamente:
import os
client = CorePlexMLClient(
base_url=os.environ["COREPLEXML_URL"],
api_key=os.environ["COREPLEXML_API_KEY"]
)
Las claves API estan asociadas a tu cuenta de usuario y heredan todos los permisos de proyecto. Puedes generar y revocar claves desde la pagina de Configuracion de Cuenta.
Los Seis Modulos del SDK
El SDK esta organizado en seis modulos, cada uno correspondiente a una capacidad principal de la plataforma. Todos los modulos siguen el mismo patron: las operaciones CRUD devuelven diccionarios con campos tipados, las operaciones de listado soportan paginacion, y los trabajos de larga duracion devuelven un ID de trabajo que puedes consultar.
1. Projects
Los proyectos son la unidad organizacional de nivel superior. Cada dataset, experimento, modelo y despliegue pertenece a un proyecto.
# Create a project
project = client.projects.create(name="Customer Churn Analysis")
# List all projects
projects = client.projects.list()
for p in projects["items"]:
print(f"{p['id']}: {p['name']}")
# Get project details
project = client.projects.get(project_id="proj_abc123")
# Update a project
client.projects.update(project_id="proj_abc123", name="Churn Analysis v2")
# Delete a project (and all contents)
client.projects.delete(project_id="proj_abc123")
2. Datasets
Los datasets soportan cargas de archivos versionados con deteccion automatica de esquema. Cada carga crea una nueva version del dataset, preservando el linaje completo.
# Upload a CSV file
dataset = client.datasets.upload(
project_id="proj_abc123",
file_path="/data/customers.csv",
name="Customer Data"
)
print(f"Dataset: {dataset['id']}, Version: {dataset['version_id']}")
# List dataset versions
versions = client.datasets.list_versions(dataset_id=dataset["id"])
# Get schema for a specific version
schema = client.datasets.get_schema(version_id=dataset["version_id"])
for col in schema["columns"]:
print(f" {col['name']}: {col['type']} "
f"(missing: {col['missing_pct']}%)")
El metodo de carga maneja la transferencia de archivos multipart automaticamente. Los formatos soportados incluyen CSV, Excel (xlsx), JSON y XML. La plataforma infiere los tipos de columna, calcula estadisticas resumidas y almacena todo en los metadatos del esquema.
3. Experiments
Los experimentos ejecutan trabajos de entrenamiento AutoML. Apuntas el experimento a una version de dataset y una columna objetivo, y la plataforma prueba docenas de algoritmos con optimizacion automatizada de hiperparametros.
# Create an AutoML experiment
experiment = client.experiments.create(
project_id="proj_abc123",
dataset_version_id="dv_xyz789",
target_column="Churn",
problem_type="classification",
max_runtime_secs=600,
max_models=20
)
print(f"Training job: {experiment['job_id']}")
# Wait for training to complete
client.jobs.wait(experiment["job_id"], timeout=1200)
# Get the model leaderboard
leaderboard = client.experiments.get_leaderboard(
experiment_id=experiment["experiment_id"]
)
for rank, model in enumerate(leaderboard["models"], 1):
print(f" #{rank} {model['algorithm']}: "
f"AUC={model['metrics']['auc']:.4f}")
# Get detailed model information
model = client.experiments.get_model(
model_id=leaderboard["models"][0]["id"]
)
print(f"Best model: {model['algorithm']}")
print(f"Features used: {len(model['feature_importance'])}")
El parametro max_runtime_secs controla cuanto tiempo AutoML busca modelos. Tiempos de ejecucion mas largos generalmente producen mejores resultados, pero con rendimientos decrecientes despues de 10-15 minutos para la mayoria de los datasets.
4. Deployments
Los despliegues toman un modelo entrenado y lo ponen disponible para predicciones en tiempo real. CorePlexML soporta cuatro estrategias de despliegue con monitoreo integrado.
# Deploy the best model with canary strategy
deployment = client.deployments.create(
project_id="proj_abc123",
model_id=leaderboard["models"][0]["id"],
name="Churn Predictor v1",
strategy="canary",
canary_percent=10
)
print(f"Deployment ID: {deployment['id']}")
# Make a prediction
prediction = client.deployments.predict(
deployment_id=deployment["id"],
features={
"tenure": 24,
"MonthlyCharges": 70.0,
"Contract": "Month-to-month",
"InternetService": "Fiber optic"
}
)
print(f"Prediction: {prediction['result']}")
print(f"Confidence: {prediction['probability']:.2f}")
# Get deployment metrics
metrics = client.deployments.get_metrics(
deployment_id=deployment["id"]
)
print(f"Total predictions: {metrics['total_predictions']}")
print(f"Avg latency: {metrics['avg_latency_ms']}ms")
Las cuatro estrategias son direct (intercambio instantaneo), canary (cambio gradual de trafico), blue_green (entornos paralelos con cambio atomico) y shadow (ejecucion paralela sin servir). Elige segun tu tolerancia al riesgo y volumen de trafico.
5. Privacy
El modulo Privacy escanea datasets en busca de informacion de identificacion personal (PII) y aplica transformaciones de anonimizacion para cumplir con marcos regulatorios.
# Scan a dataset for PII
scan = client.privacy.scan(
dataset_version_id="dv_xyz789"
)
print(f"PII columns found: {scan['pii_count']}")
for finding in scan["findings"]:
print(f" {finding['column']}: {finding['pii_type']} "
f"(confidence: {finding['confidence']:.0%})")
# Apply HIPAA-compliant anonymization
transform = client.privacy.transform(
dataset_version_id="dv_xyz789",
profile="HIPAA",
rules=[
{"column": "email", "action": "mask"},
{"column": "ssn", "action": "redact"},
{"column": "zip_code", "action": "generalize", "level": 3}
]
)
print(f"Anonymized version: {transform['output_version_id']}")
# Export audit report
audit = client.privacy.export_audit(
dataset_version_id="dv_xyz789",
format="pdf"
)
El modulo Privacy soporta mas de 72 tipos de PII a traves de cuatro perfiles de cumplimiento: HIPAA, GDPR, PCI-DSS y CCPA. Cada perfil viene con reglas de transformacion predeterminadas que puedes personalizar por columna.
6. SynthGen
SynthGen genera datos tabulares sinteticos que preservan las propiedades estadisticas de los datos reales sin contener ningun registro real.
# Train a synthetic data model
model = client.synthgen.create_model(
dataset_version_id="dv_xyz789",
engine="CTGAN",
epochs=300
)
client.jobs.wait(model["job_id"])
# Generate synthetic rows
synthetic = client.synthgen.generate(
model_id=model["model_id"],
num_rows=10000
)
print(f"Generated: {synthetic['row_count']} rows")
# Evaluate synthetic data quality
evaluation = client.synthgen.evaluate(
model_id=model["model_id"],
metric="ks_test"
)
print(f"Statistical similarity: {evaluation['aggregate_score']:.2f}")
Tres motores estan disponibles: CTGAN para generacion de proposito general, CopulaGAN para datos sensibles a correlaciones, y TVAE para generacion rapida en datasets grandes.
Manejo de Errores
El SDK lanza excepciones tipadas que se mapean a condiciones de error especificas de la API. Esto facilita el manejo de diferentes modos de fallo en tu codigo de automatizacion.
from coreplexml.exceptions import (
AuthenticationError,
NotFoundError,
ValidationError,
RateLimitError,
ServerError
)
try:
prediction = client.deployments.predict(
deployment_id="dep_nonexistent",
features={"tenure": 24}
)
except AuthenticationError:
print("Invalid or expired API key")
except NotFoundError as e:
print(f"Resource not found: {e.resource_id}")
except ValidationError as e:
print(f"Invalid input: {e.details}")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except ServerError:
print("Platform error. Retry or contact support")
Para errores transitorios como limites de tasa y errores del servidor, implementa una estrategia de reintentos:
import time
def predict_with_retry(client, deployment_id, features, max_retries=3):
for attempt in range(max_retries):
try:
return client.deployments.predict(
deployment_id=deployment_id,
features=features
)
except RateLimitError as e:
if attempt < max_retries - 1:
time.sleep(e.retry_after)
else:
raise
except ServerError:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
Integracion CI/CD
El SDK esta disenado para la automatizacion de pipelines. Aqui tienes un workflow de GitHub Actions que entrena un modelo, lo valida y lo despliega si las metricas pasan un umbral de calidad.
name: ML Pipeline
on:
push:
branches: [main]
paths: ["data/**", "config/**"]
jobs:
train-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install SDK
run: pip install coreplexml
- name: Train Model
env:
COREPLEXML_URL: ${{ secrets.COREPLEXML_URL }}
COREPLEXML_API_KEY: ${{ secrets.COREPLEXML_API_KEY }}
run: python scripts/train.py
- name: Validate Model
run: python scripts/validate.py
- name: Deploy Model
if: success()
run: python scripts/deploy.py
El script de validacion (validate.py) verifica que el nuevo modelo cumple con los umbrales minimos de rendimiento antes de proceder con el despliegue:
import os
import sys
from coreplexml import CorePlexMLClient
client = CorePlexMLClient(
base_url=os.environ["COREPLEXML_URL"],
api_key=os.environ["COREPLEXML_API_KEY"]
)
experiment_id = os.environ["EXPERIMENT_ID"]
leaderboard = client.experiments.get_leaderboard(experiment_id)
best_model = leaderboard["models"][0]
auc = best_model["metrics"]["auc"]
min_auc = float(os.environ.get("MIN_AUC", "0.80"))
if auc < min_auc:
print(f"Model AUC {auc:.4f} below threshold {min_auc}")
sys.exit(1)
print(f"Model passed validation: AUC={auc:.4f}")
Configuracion Basada en Entorno
Usa diferentes endpoints de API y claves para cada entorno:
import os
ENV = os.environ.get("DEPLOY_ENV", "dev")
config = {
"dev": {
"url": "https://dev.coreplexml.io",
"key": os.environ.get("COREPLEXML_DEV_KEY")
},
"staging": {
"url": "https://staging.coreplexml.io",
"key": os.environ.get("COREPLEXML_STAGING_KEY")
},
"prod": {
"url": "https://api.coreplexml.io",
"key": os.environ.get("COREPLEXML_PROD_KEY")
}
}
client = CorePlexMLClient(
base_url=config[ENV]["url"],
api_key=config[ENV]["key"]
)
Consejos
Usa variables de entorno para las claves API. Nunca incluyas claves API en el control de versiones. El SDK lee las credenciales desde los argumentos del constructor, asi que tienes control total sobre como se obtienen.
Predicciones por lote para mayor rendimiento. Si necesitas puntuar muchos registros, usa el endpoint de trabajo de prediccion por lotes en lugar de llamar al endpoint de prediccion individual en un bucle. Los trabajos por lotes se ejecutan del lado del servidor y evitan la sobrecarga de ida y vuelta HTTP.
Consulta el estado del trabajo para operaciones largas. El entrenamiento y la generacion de datos sinteticos son asincronos. Usa client.jobs.wait() para scripts simples, o implementa tu propio bucle de consulta con client.jobs.get() para mayor control sobre el tiempo de espera y el comportamiento de reintentos.
Fija la version de tu SDK. Usa coreplexml==X.Y.Z en tu archivo de requerimientos en lugar de una instalacion sin version fija. Esto previene cambios inesperados en los pipelines de CI/CD cuando se lanza una nueva version del SDK.
Verifica los limites de tasa. La API aplica limites de tasa por clave. El SDK expone errores de limite de tasa con un campo retry_after. En pipelines de produccion, siempre implementa logica de reintentos para errores de limite de tasa y errores transitorios del servidor.
El SDK pone todo el poder de la plataforma CorePlexML en tus scripts de Python, notebooks y pipelines de CI/CD. Combinado con la interfaz web para exploracion y la API para integraciones personalizadas, te da multiples formas de interactuar con cada capacidad que la plataforma ofrece.
Nuevos Modulos del SDK (v2.4+)
Las versiones recientes de la plataforma han agregado varios nuevos modulos al SDK que extienden el cliente hacia territorio avanzado de MLOps:
Model Registry
Gestiona versiones de modelos con versionado semantico y transiciones de etapa:
version = client.registry.create_version(
project_id="proj_abc",
model_id="mod_xgb_v2",
version="1.2.0",
model_card={"description": "Improved feature set", "metrics": {"auc": 0.94}}
)
client.registry.transition_stage(version["id"], stage="production")
versions = client.registry.list_versions(project_id="proj_abc")
A/B Testing
Crea y monitorea experimentos entre variantes de modelos:
test = client.ab_tests.create(
project_id="proj_abc",
model_a_id="mod_v1",
model_b_id="mod_v2",
traffic_split_a=50,
primary_metric="accuracy",
min_sample_size=1000
)
results = client.ab_tests.get_results(test["id"])
if results["is_significant"]:
client.ab_tests.declare_winner(test["id"], variant="B")
Alertas y Monitoreo
Configura reglas de alerta con notificaciones multicanal:
channel = client.alerts.create_channel(
name="Slack Ops",
channel_type="slack",
config={"webhook_url": "https://hooks.slack.com/..."}
)
rule = client.alerts.create_rule(
deployment_id="dep_prod",
name="Drift Alert",
metric="drift_psi",
operator="gt",
threshold=0.2,
severity="critical",
channel_ids=[channel["id"]]
)
Predicciones por Lote y Streaming
Ejecuta predicciones a escala con procesamiento asincrono y streaming por WebSocket:
job = client.predictions.create(
deployment_id="dep_prod",
file_path="batch_input.csv"
)
result = client.predictions.wait(job["id"])
client.predictions.download(job["id"], "predictions.csv")
for row in client.streaming.predict(deployment_id="dep_prod", data=records):
process(row["prediction"])
Para la referencia completa del SDK, consulta nuestra pagina del Python SDK y la documentacion de la API.