DevOps & Skalabilitas

Worker Pemecahan CAPTCHA dengan Auto-Scaling

Kumpulan worker statis membuang-buang uang selama waktu tenang dan menciptakan kemacetan selama jam sibuk. Penskalaan otomatis mencocokkan jumlah pekerja dengan permintaan aktual, sehingga mengoptimalkan biaya dan hasil.


Sinyal Penskalaan

Sinyal Tingkatkan Kapan Turunkan Skala Kapan
Kedalaman antrian > 20 tugas yang tertunda < 5 tugas yang tertunda
Pemanfaatan pekerja > 80% sibuk < 20% sibuk
Menyelesaikan latensi Hlm95 > 60 detik P95 <20 detik
Tingkat kesalahan > 5% (membutuhkan pekerja baru) Stabil <1%
Saldo N/A Saldo < $1 (hentikan penskalaan)

Penskala Otomatis Berbasis Thread

Menskalakan thread pekerja dalam satu proses:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Auto-Scaler Berbasis Proses

Menskalakan proses pekerja untuk isolasi CPU:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Penskalaan Sadar Keseimbangan

Hentikan penskalaan ketika dana hampir habis:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Integrasikan ke dalam loop penskalaan:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Strategi Penskalaan Dibandingkan

Strategi Terbaik Untuk Latensi Kompleksitas
Kumpulan benang I/O-bound (panggilan API) Rendah Rendah
Kumpulan proses Pemrosesan awal yang terikat CPU Sedang Sedang
Kubernetes HPA Penerapan cloud-native Lebih tinggi Tinggi
KEDA Penskalaan berdasarkan peristiwa Sedang Sedang

Pemecahan Masalah

Masalah Penyebab Solusi
Worker terus meningkatkan skala Antrian tidak pernah habis Periksa apakah worker benar-benar sedang memproses
Penurunan skala terlalu agresif Ambang batas rendah Tingkatkan penundaan scale-down menjadi 30 detik+
Proses zombie Proses tidak dibersihkan Gunakan _cleanup_dead() secara rutin
Saldo terkuras cepat Terlalu banyak worker Tambahkan pemeriksaan saldo ke logika scaling

Pertanyaan Umum

Berapa rasio pekerja terhadap antrian yang tepat?

Targetkan 1 pekerja untuk setiap 5-10 tugas yang diantri. Setiap pekerja memproses ~3-6 CAPTCHA per menit bergantung pada jenisnya.

Haruskah saya menggunakan utas atau proses?

Thread untuk panggilan API murni (CaptchaAI adalah I/O-bound). Memproses saat Anda juga melakukan prapemrosesan gambar atau komputasi berat bersamaan dengan penyelesaian.

Seberapa cepat saya harus meningkatkannya?

Tingkatkan skala dengan cepat (setiap pemeriksaan 10-15 detik), turunkan secara perlahan (tunggu 30-60 detik pada beban rendah). Hal ini mencegah pertikaian antar negara bagian.


Panduan Terkait

  • Membangun Antrian Pemecahan CAPTCHA Python
  • Pemantauan CaptchaAI dengan Datadog

Skala cerdas – dapatkan kunci CaptchaAI Anda hari ini.

Komentar dinonaktifkan untuk artikel ini.