Saat Anda mengelola scraping atau otomatisasi untuk banyak klien, setiap proyek pada akhirnya mencapai CAPTCHA. Daripada menulis kode penyelesaian satu kali per proyek, buatlah pipeline yang dapat digunakan kembali. Panduan ini menjelaskan arsitekturnya.
Arsitektur Pipeline
┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ Client A │──▶ │ │ │ │
│ Client B │──▶ │ Task Queue │──▶ │ CaptchaAI │
│ Client C │──▶ │ │ │ API │
└──────────────┘ └───────────────┘ └──────────────┘
│ │
▼ ▼
┌───────────────┐ ┌──────────────┐
│ Result Store │◀── │ Polling │
│ (Redis/DB) │ │ Workers │
└───────────────┘ └──────────────┘
Komponen:
- Intake tugas - menerima permintaan penyelesaian dari scraper klien
- Antrian - mem-buffer tugas, menerapkan batas concurrency per klien
- Worker solver – kirimkan ke CaptchaAI dan polling untuk hasilnya
- Result store - menyimpan token yang sudah diselesaikan untuk pengambilan consumer
Pipa Python
Kelas pemecah inti
import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class SolveRequest:
client_id: str
method: str
params: dict
callback: Optional[callable] = None
@dataclass
class SolveResult:
client_id: str
task_id: str
token: Optional[str] = None
error: Optional[str] = None
class CaptchaPipeline:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.queue = deque()
self.active = {}
self.lock = Lock()
def enqueue(self, request: SolveRequest):
with self.lock:
self.queue.append(request)
def submit_task(self, request: SolveRequest) -> Optional[str]:
data = {
"key": self.api_key,
"method": request.method,
"json": 1,
**request.params
}
try:
resp = requests.post(SUBMIT_URL, data=data, timeout=15)
result = resp.json()
if result.get("status") == 1:
return result["request"]
else:
print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException as e:
print(f"[{request.client_id}] Network error: {e}")
return None
def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
elapsed = 0
interval = 5
while elapsed < max_wait:
time.sleep(interval)
elapsed += interval
try:
resp = requests.get(RESULT_URL, params={
"key": self.api_key,
"action": "get",
"id": task_id,
"json": 1
}, timeout=10)
result = resp.json()
if result.get("status") == 1:
return result["request"]
elif result.get("request") == "CAPCHA_NOT_READY":
continue
else:
print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException:
continue
return None
def process_queue(self):
while self.queue or self.active:
# Fill active slots
with self.lock:
while self.queue and len(self.active) < self.max_concurrent:
request = self.queue.popleft()
task_id = self.submit_task(request)
if task_id:
self.active[task_id] = request
# Poll active tasks
completed = []
for task_id, request in list(self.active.items()):
token = self.poll_result(task_id, max_wait=10)
if token:
result = SolveResult(
client_id=request.client_id,
task_id=task_id,
token=token
)
if request.callback:
request.callback(result)
completed.append(task_id)
with self.lock:
for task_id in completed:
del self.active[task_id]
Penggunaan multi-klien
pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)
# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
client_id="client_a",
method="userrecaptcha",
params={
"googlekey": "6Le-SITEKEY-A",
"pageurl": "https://client-a-staging.example.com/qa-form"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
# Client B — Turnstile
pipeline.enqueue(SolveRequest(
client_id="client_b",
method="turnstile",
params={
"sitekey": "0x4AAAA-SITEKEY-B",
"pageurl": "https://client-b-target.com/login"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
pipeline.process_queue()
Pipa Node.js
const axios = require("axios");
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class CaptchaPipeline {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.maxConcurrent = maxConcurrent;
this.queue = [];
this.activeCount = 0;
}
enqueue(clientId, method, params) {
return new Promise((resolve, reject) => {
this.queue.push({ clientId, method, params, resolve, reject });
this._processNext();
});
}
async _processNext() {
if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;
this.activeCount++;
const task = this.queue.shift();
try {
const token = await this._solve(task);
task.resolve({ clientId: task.clientId, token });
} catch (err) {
task.reject(err);
} finally {
this.activeCount--;
this._processNext();
}
}
async _solve(task) {
const submitResp = await axios.post(SUBMIT_URL, null, {
params: {
key: this.apiKey,
method: task.method,
json: 1,
...task.params,
},
timeout: 15000,
});
if (submitResp.data.status !== 1) {
throw new Error(submitResp.data.error_text || submitResp.data.request);
}
const taskId = submitResp.data.request;
return this._poll(taskId);
}
async _poll(taskId, maxWait = 120000) {
const interval = 5000;
let elapsed = 0;
while (elapsed < maxWait) {
await new Promise((r) => setTimeout(r, interval));
elapsed += interval;
try {
const resp = await axios.get(RESULT_URL, {
params: {
key: this.apiKey,
action: "get",
id: taskId,
json: 1,
},
timeout: 10000,
});
if (resp.data.status === 1) return resp.data.request;
if (resp.data.request !== "CAPCHA_NOT_READY") {
throw new Error(resp.data.error_text || resp.data.request);
}
} catch (err) {
if (err.response) throw err;
}
}
throw new Error(`Timeout waiting for task ${taskId}`);
}
}
// Usage
(async () => {
const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);
const results = await Promise.allSettled([
pipeline.enqueue("client_a", "userrecaptcha", {
googlekey: "6Le-SITEKEY-A",
pageurl: "https://client-a-staging.example.com/qa-form",
}),
pipeline.enqueue("client_b", "turnstile", {
sitekey: "0x4AAAA-SITEKEY-B",
pageurl: "https://client-b-target.com/login",
}),
]);
results.forEach((r) => {
if (r.status === "fulfilled") {
console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
} else {
console.error(`Failed: ${r.reason.message}`);
}
});
})();
Konfigurasi per klien
Lacak pengaturan per klien seperti proxy, preferensi solver, dan rate limit:
CLIENT_CONFIG = {
"client_a": {
"proxy": "host:port:user:pass",
"proxytype": "HTTP",
"max_concurrent": 5,
"default_method": "userrecaptcha"
},
"client_b": {
"proxy": None,
"proxytype": None,
"max_concurrent": 10,
"default_method": "turnstile"
}
}
def build_params(client_id, params):
config = CLIENT_CONFIG.get(client_id, {})
if config.get("proxy"):
params["proxy"] = config["proxy"]
params["proxytype"] = config["proxytype"]
return params
Strategi penanganan kesalahan
| Kesalahan | Respon |
|---|---|
ERROR_ZERO_BALANCE |
Hentikan antrian, peringatkan semua klien |
ERROR_NO_SLOT_AVAILABLE |
Mengantri ulang tugas dengan penundaan |
ERROR_WRONG_CAPTCHA_ID |
Buang, kesalahan log |
ERROR_CAPTCHA_UNSOLVABLE |
Coba lagi sekali, lalu gagal |
| Batas waktu jaringan habis | Coba lagi dengan backoff (maks 3 percobaan ulang) |
Pemecahan Masalah
| Masalah | Penyebab | Solusi |
|---|---|---|
| Antrian bertambah tanpa batas | Slot aktif penuh | Tingkatkan max_concurrent atau tambahkan worker |
| Callback tidak terpicu | Tugas gagal secara diam-diam | Periksa kembalian error dalam loop polling |
| Token tercampur antar klien | Result store bersama | Key hasil berdasarkan client_id + task_id |
| Error rate limit (429) | Terlalu banyak pengiriman concurrent | Turunkan concurrency, tambahkan jeda pengiriman |
Pertanyaan Umum
Berapa banyak tugas concurrent yang harus saya jalankan per klien?
Mulai dengan 5–10. Pantau waktu penyelesaian dan tingkat error, lalu sesuaikan. CaptchaAI mendukung concurrency tinggi namun pool proxy Anda mungkin menjadi bottleneck.
Haruskah saya menggunakan kunci API terpisah untuk setiap klien?
Ini menyederhanakan penagihan. Gunakan parameter CaptchaAI soft_id jika Anda memerlukan pelacakan dengan satu kunci.
Bagaimana cara menangani antrian semalaman?
Pertahankan antrian (Redis atau database). Saat memulai ulang, muat ulang tugas yang tertunda dan lanjutkan pemrosesan.
Bangun Pipeline CAPTCHA Anda dengan CaptchaAI
Mulai membangun pipeline klien di captchaai.com.
Panduan Terkait
- Menerapkan Logika Retry CaptchaAI API
- Redis Queue + CaptchaAI Distributed Processing
- Membangun Antrian Pemecahan CAPTCHA di Python