Tutorial

Membangun Pipeline CAPTCHA Klien dengan CaptchaAI

Saat Anda mengelola scraping atau otomatisasi untuk banyak klien, setiap proyek pada akhirnya mencapai CAPTCHA. Daripada menulis kode penyelesaian satu kali per proyek, buatlah pipeline yang dapat digunakan kembali. Panduan ini menjelaskan arsitekturnya.


Arsitektur Pipeline

┌──────────────┐    ┌───────────────┐    ┌──────────────┐
│  Client A    │──▶ │               │    │              │
│  Client B    │──▶ │  Task Queue   │──▶ │  CaptchaAI   │
│  Client C    │──▶ │               │    │  API         │
└──────────────┘    └───────────────┘    └──────────────┘
                           │                    │
                           ▼                    ▼
                    ┌───────────────┐    ┌──────────────┐
                    │  Result Store │◀── │  Polling      │
                    │  (Redis/DB)   │    │  Workers      │
                    └───────────────┘    └──────────────┘

Komponen:

  1. Intake tugas - menerima permintaan penyelesaian dari scraper klien
  2. Antrian - mem-buffer tugas, menerapkan batas concurrency per klien
  3. Worker solver – kirimkan ke CaptchaAI dan polling untuk hasilnya
  4. Result store - menyimpan token yang sudah diselesaikan untuk pengambilan consumer

Pipa Python

Kelas pemecah inti

import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock

SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"

@dataclass
class SolveRequest:
    client_id: str
    method: str
    params: dict
    callback: Optional[callable] = None

@dataclass
class SolveResult:
    client_id: str
    task_id: str
    token: Optional[str] = None
    error: Optional[str] = None


class CaptchaPipeline:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.active = {}
        self.lock = Lock()

    def enqueue(self, request: SolveRequest):
        with self.lock:
            self.queue.append(request)

    def submit_task(self, request: SolveRequest) -> Optional[str]:
        data = {
            "key": self.api_key,
            "method": request.method,
            "json": 1,
            **request.params
        }

        try:
            resp = requests.post(SUBMIT_URL, data=data, timeout=15)
            result = resp.json()

            if result.get("status") == 1:
                return result["request"]
            else:
                print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
                return None
        except requests.RequestException as e:
            print(f"[{request.client_id}] Network error: {e}")
            return None

    def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
        elapsed = 0
        interval = 5
        while elapsed < max_wait:
            time.sleep(interval)
            elapsed += interval

            try:
                resp = requests.get(RESULT_URL, params={
                    "key": self.api_key,
                    "action": "get",
                    "id": task_id,
                    "json": 1
                }, timeout=10)
                result = resp.json()

                if result.get("status") == 1:
                    return result["request"]
                elif result.get("request") == "CAPCHA_NOT_READY":
                    continue
                else:
                    print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
                    return None
            except requests.RequestException:
                continue

        return None

    def process_queue(self):
        while self.queue or self.active:
            # Fill active slots
            with self.lock:
                while self.queue and len(self.active) < self.max_concurrent:
                    request = self.queue.popleft()
                    task_id = self.submit_task(request)
                    if task_id:
                        self.active[task_id] = request

            # Poll active tasks
            completed = []
            for task_id, request in list(self.active.items()):
                token = self.poll_result(task_id, max_wait=10)
                if token:
                    result = SolveResult(
                        client_id=request.client_id,
                        task_id=task_id,
                        token=token
                    )
                    if request.callback:
                        request.callback(result)
                    completed.append(task_id)

            with self.lock:
                for task_id in completed:
                    del self.active[task_id]

Penggunaan multi-klien

pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)

# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
    client_id="client_a",
    method="userrecaptcha",
    params={
        "googlekey": "6Le-SITEKEY-A",
        "pageurl": "https://client-a-staging.example.com/qa-form"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

# Client B — Turnstile
pipeline.enqueue(SolveRequest(
    client_id="client_b",
    method="turnstile",
    params={
        "sitekey": "0x4AAAA-SITEKEY-B",
        "pageurl": "https://client-b-target.com/login"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

pipeline.process_queue()

Pipa Node.js

const axios = require("axios");

const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  constructor(apiKey, maxConcurrent = 10) {
    this.apiKey = apiKey;
    this.maxConcurrent = maxConcurrent;
    this.queue = [];
    this.activeCount = 0;
  }

  enqueue(clientId, method, params) {
    return new Promise((resolve, reject) => {
      this.queue.push({ clientId, method, params, resolve, reject });
      this._processNext();
    });
  }

  async _processNext() {
    if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;

    this.activeCount++;
    const task = this.queue.shift();

    try {
      const token = await this._solve(task);
      task.resolve({ clientId: task.clientId, token });
    } catch (err) {
      task.reject(err);
    } finally {
      this.activeCount--;
      this._processNext();
    }
  }

  async _solve(task) {
    const submitResp = await axios.post(SUBMIT_URL, null, {
      params: {
        key: this.apiKey,
        method: task.method,
        json: 1,
        ...task.params,
      },
      timeout: 15000,
    });

    if (submitResp.data.status !== 1) {
      throw new Error(submitResp.data.error_text || submitResp.data.request);
    }

    const taskId = submitResp.data.request;
    return this._poll(taskId);
  }

  async _poll(taskId, maxWait = 120000) {
    const interval = 5000;
    let elapsed = 0;

    while (elapsed < maxWait) {
      await new Promise((r) => setTimeout(r, interval));
      elapsed += interval;

      try {
        const resp = await axios.get(RESULT_URL, {
          params: {
            key: this.apiKey,
            action: "get",
            id: taskId,
            json: 1,
          },
          timeout: 10000,
        });

        if (resp.data.status === 1) return resp.data.request;
        if (resp.data.request !== "CAPCHA_NOT_READY") {
          throw new Error(resp.data.error_text || resp.data.request);
        }
      } catch (err) {
        if (err.response) throw err;
      }
    }

    throw new Error(`Timeout waiting for task ${taskId}`);
  }
}

// Usage
(async () => {
  const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);

  const results = await Promise.allSettled([
    pipeline.enqueue("client_a", "userrecaptcha", {
      googlekey: "6Le-SITEKEY-A",
      pageurl: "https://client-a-staging.example.com/qa-form",
    }),
    pipeline.enqueue("client_b", "turnstile", {
      sitekey: "0x4AAAA-SITEKEY-B",
      pageurl: "https://client-b-target.com/login",
    }),
  ]);

  results.forEach((r) => {
    if (r.status === "fulfilled") {
      console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
    } else {
      console.error(`Failed: ${r.reason.message}`);
    }
  });
})();

Konfigurasi per klien

Lacak pengaturan per klien seperti proxy, preferensi solver, dan rate limit:

CLIENT_CONFIG = {
    "client_a": {
        "proxy": "host:port:user:pass",
        "proxytype": "HTTP",
        "max_concurrent": 5,
        "default_method": "userrecaptcha"
    },
    "client_b": {
        "proxy": None,
        "proxytype": None,
        "max_concurrent": 10,
        "default_method": "turnstile"
    }
}

def build_params(client_id, params):
    config = CLIENT_CONFIG.get(client_id, {})
    if config.get("proxy"):
        params["proxy"] = config["proxy"]
        params["proxytype"] = config["proxytype"]
    return params

Strategi penanganan kesalahan

Kesalahan Respon
ERROR_ZERO_BALANCE Hentikan antrian, peringatkan semua klien
ERROR_NO_SLOT_AVAILABLE Mengantri ulang tugas dengan penundaan
ERROR_WRONG_CAPTCHA_ID Buang, kesalahan log
ERROR_CAPTCHA_UNSOLVABLE Coba lagi sekali, lalu gagal
Batas waktu jaringan habis Coba lagi dengan backoff (maks 3 percobaan ulang)

Pemecahan Masalah

Masalah Penyebab Solusi
Antrian bertambah tanpa batas Slot aktif penuh Tingkatkan max_concurrent atau tambahkan worker
Callback tidak terpicu Tugas gagal secara diam-diam Periksa kembalian error dalam loop polling
Token tercampur antar klien Result store bersama Key hasil berdasarkan client_id + task_id
Error rate limit (429) Terlalu banyak pengiriman concurrent Turunkan concurrency, tambahkan jeda pengiriman

Pertanyaan Umum

Berapa banyak tugas concurrent yang harus saya jalankan per klien?

Mulai dengan 5–10. Pantau waktu penyelesaian dan tingkat error, lalu sesuaikan. CaptchaAI mendukung concurrency tinggi namun pool proxy Anda mungkin menjadi bottleneck.

Haruskah saya menggunakan kunci API terpisah untuk setiap klien?

Ini menyederhanakan penagihan. Gunakan parameter CaptchaAI soft_id jika Anda memerlukan pelacakan dengan satu kunci.

Bagaimana cara menangani antrian semalaman?

Pertahankan antrian (Redis atau database). Saat memulai ulang, muat ulang tugas yang tertunda dan lanjutkan pemrosesan.


Bangun Pipeline CAPTCHA Anda dengan CaptchaAI

Mulai membangun pipeline klien di captchaai.com.

Panduan Terkait

Komentar dinonaktifkan untuk artikel ini.