customerio-load-scale by jeremylongshore
Implement Customer.io load testing and scaling.Use when preparing for high traffic, load testing,or scaling integrations for enterprise workloads.Trigger with phrases like "customer.io load test", "customer.io scale","customer.io high volume", "customer.io performance test".
Content & Writing
1.9K Stars
265 Forks
Updated Apr 3, 2026, 03:47 AM
Why Use This
This skill provides specialized capabilities for jeremylongshore's codebase.
Use Cases
- Developing new features in the jeremylongshore repository
- Refactoring existing code to follow jeremylongshore standards
- Understanding and working with jeremylongshore's codebase structure
Install Guide
2 steps- 1
Skip this step if Ananke is already installed.
- 2
Skill Snapshot
Auto scan of skill assets. Informational only.
Valid SKILL.md
Checks against SKILL.md specification
Source & Community
Repository claude-code-plugins-plus-skills
Skill Version
main
Community
1.9K 265
Updated At Apr 3, 2026, 03:47 AM
Skill Stats
SKILL.md 335 Lines
Total Files 2
Total Size 9.2 KB
License MIT
--- name: customerio-load-scale description: | Implement Customer.io load testing and horizontal scaling. Use when preparing for high traffic, running load tests, or designing queue-based architectures for scale. Trigger: "customer.io load test", "customer.io scale", "customer.io high volume", "customer.io k6", "customer.io performance test". allowed-tools: Read, Write, Edit, Bash(npm:*), Bash(npx:*), Bash(kubectl:*), Glob, Grep version: 1.0.0 license: MIT author: Jeremy Longshore <[email protected]> compatible-with: claude-code, codex, openclaw tags: [saas, customer-io, load-testing, scaling, performance] --- # Customer.io Load & Scale ## Overview Load testing and scaling strategies for high-volume Customer.io integrations: k6 load test scripts, scaling architecture selection based on volume tier, Kubernetes HPA autoscaling, message queue buffering, and rate-limit-aware batch processing. ## Scaling Architecture by Volume | Daily Events | Architecture | Key Components | |-------------|--------------|----------------| | < 100K | Direct API | Singleton client, retry, connection pooling | | 100K - 1M | Batched API | Event queue, batch processor, rate limiter | | 1M - 10M | Queue-backed | Redis/Kafka queue, worker pool, backpressure | | > 10M | Distributed | Multiple workspaces, sharded queues, regional routing | Customer.io rate limit is ~100 req/sec per workspace. Plan your architecture around this. ## Instructions ### Step 1: k6 Load Test Script ```javascript // load-tests/customerio.js // Run: k6 run --vus 10 --duration 60s load-tests/customerio.js import http from "k6/http"; import { check, sleep } from "k6"; import { Counter, Trend } from "k6/metrics"; const SITE_ID = __ENV.CUSTOMERIO_SITE_ID; const API_KEY = __ENV.CUSTOMERIO_TRACK_API_KEY; const BASE_URL = "https://track.customer.io/api/v1"; const AUTH = `${SITE_ID}:${API_KEY}`; const identifyLatency = new Trend("cio_identify_latency"); const trackLatency = new Trend("cio_track_latency"); const errors = new Counter("cio_errors"); export const options = { scenarios: { identify_load: { executor: "ramping-arrival-rate", startRate: 10, timeUnit: "1s", preAllocatedVUs: 20, maxVUs: 50, stages: [ { duration: "30s", target: 50 }, // Ramp to 50/sec { duration: "60s", target: 80 }, // Hold at 80/sec (near limit) { duration: "30s", target: 10 }, // Cool down ], }, }, thresholds: { cio_identify_latency: ["p(95)<500", "p(99)<2000"], cio_track_latency: ["p(95)<500", "p(99)<2000"], cio_errors: ["count<50"], }, }; export default function () { const userId = `k6-load-${__VU}-${__ITER}`; const headers = { "Content-Type": "application/json", Authorization: `Basic ${encoding.b64encode(AUTH)}`, }; // Identify const identifyRes = http.put( `${BASE_URL}/customers/${userId}`, JSON.stringify({ email: `${userId}@loadtest.example.com`, _load_test: true, created_at: Math.floor(Date.now() / 1000), }), { headers } ); identifyLatency.add(identifyRes.timings.duration); check(identifyRes, { "identify 200": (r) => r.status === 200 }) || errors.add(1); // Track event const trackRes = http.post( `${BASE_URL}/customers/${userId}/events`, JSON.stringify({ name: "load_test_event", data: { iteration: __ITER, vu: __VU }, }), { headers } ); trackLatency.add(trackRes.timings.duration); check(trackRes, { "track 200": (r) => r.status === 200 }) || errors.add(1); sleep(0.1); // Small delay between iterations } // Cleanup function — suppress test users after test export function teardown() { console.log("Load test complete. Clean up k6-load-* users in CIO dashboard."); } ``` Run: ```bash k6 run --env CUSTOMERIO_SITE_ID="$CUSTOMERIO_SITE_ID" \ --env CUSTOMERIO_TRACK_API_KEY="$CUSTOMERIO_TRACK_API_KEY" \ load-tests/customerio.js ``` ### Step 2: Queue-Based Architecture ```typescript // services/cio-queue-worker.ts import { Queue, Worker, QueueEvents } from "bullmq"; import { TrackClient, RegionUS } from "customerio-node"; import Bottleneck from "bottleneck"; const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379"; // Rate limiter: 80 requests per second (leave headroom under 100/sec limit) const limiter = new Bottleneck({ maxConcurrent: 15, reservoir: 80, reservoirRefreshAmount: 80, reservoirRefreshInterval: 1000, }); const eventQueue = new Queue("cio:events", { connection: { url: REDIS_URL }, defaultJobOptions: { attempts: 5, backoff: { type: "exponential", delay: 2000 }, removeOnComplete: { count: 10000 }, removeOnFail: { count: 50000 }, }, }); // Producer — your application enqueues events here export async function enqueueEvent( type: "identify" | "track", userId: string, data: Record<string, any> ): Promise<void> { await eventQueue.add(type, { userId, data, enqueuedAt: Date.now() }); } // Consumer — workers process events with rate limiting export function startEventWorkers(concurrency = 10): void { const cio = new TrackClient( process.env.CUSTOMERIO_SITE_ID!, process.env.CUSTOMERIO_TRACK_API_KEY!, { region: RegionUS } ); const worker = new Worker( "cio:events", async (job) => { await limiter.schedule(async () => { if (job.name === "identify") { await cio.identify(job.data.userId, job.data.data); } else { await cio.track(job.data.userId, job.data.data); } }); }, { connection: { url: REDIS_URL }, concurrency, } ); worker.on("failed", (job, err) => { console.error(`CIO event failed: ${job?.id} — ${err.message}`); }); // Monitor queue health const events = new QueueEvents("cio:events", { connection: { url: REDIS_URL }, }); setInterval(async () => { const counts = await eventQueue.getJobCounts(); console.log( `CIO queue: waiting=${counts.waiting} active=${counts.active} ` + `failed=${counts.failed} completed=${counts.completed}` ); }, 30000); } ``` ### Step 3: Kubernetes HPA Autoscaling ```yaml # k8s/hpa.yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: cio-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: cio-event-worker minReplicas: 2 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Pods pods: metric: name: cio_queue_depth target: type: AverageValue averageValue: "500" behavior: scaleUp: stabilizationWindowSeconds: 60 policies: - type: Pods value: 4 periodSeconds: 60 scaleDown: stabilizationWindowSeconds: 300 policies: - type: Pods value: 2 periodSeconds: 120 ``` ### Step 4: Batch Sender for Bulk Operations ```typescript // lib/cio-batch-sender.ts import { TrackClient, RegionUS } from "customerio-node"; import Bottleneck from "bottleneck"; export async function batchSend( operations: Array<{ type: "identify" | "track"; userId: string; data: Record<string, any>; }>, ratePerSec = 80 ): Promise<{ succeeded: number; failed: number }> { const cio = new TrackClient( process.env.CUSTOMERIO_SITE_ID!, process.env.CUSTOMERIO_TRACK_API_KEY!, { region: RegionUS } ); const limiter = new Bottleneck({ maxConcurrent: 15, reservoir: ratePerSec, reservoirRefreshAmount: ratePerSec, reservoirRefreshInterval: 1000, }); let succeeded = 0; let failed = 0; const promises = operations.map((op, i) => limiter.schedule(async () => { try { if (op.type === "identify") { await cio.identify(op.userId, op.data); } else { await cio.track(op.userId, op.data); } succeeded++; } catch { failed++; } if ((succeeded + failed) % 1000 === 0) { console.log(`Progress: ${succeeded + failed}/${operations.length}`); } }) ); await Promise.all(promises); return { succeeded, failed }; } ``` Install: `npm install bottleneck bullmq` ## Load Test Checklist - [ ] Test against staging workspace (NEVER production) - [ ] Start at 10% of target rate, ramp up gradually - [ ] Monitor 429 error rate during test - [ ] Check Customer.io dashboard for processing lag - [ ] Verify cleanup of test users after load test - [ ] Document baseline latency and throughput numbers - [ ] Set up alerts before running at production scale ## Error Handling | Issue | Solution | |-------|----------| | 429 during load test | Reduce rate, check limiter config | | Queue backlog growing | Scale workers, increase concurrency | | Memory pressure | Limit batch and queue sizes, enable GC | | k6 VU exhaustion | Increase `preAllocatedVUs` and `maxVUs` | ## Resources - [k6 Documentation](https://k6.io/docs/) - [Bottleneck npm](https://www.npmjs.com/package/bottleneck) - [BullMQ Documentation](https://bullmq.io/) - [Kubernetes HPA](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) ## Next Steps After load testing, proceed to `customerio-known-pitfalls` for anti-patterns to avoid.
Name Size