Source code for twikit._captcha.capsolver

from __future__ import annotations

from time import sleep

import httpx

from .base import CaptchaSolver


[docs] class Capsolver(CaptchaSolver): """ You can automatically unlock the account by passing the `captcha_solver` argument when initialising the :class:`.Client`. First, visit https://capsolver.com and obtain your Capsolver API key. Next, pass the Capsolver instance to the client as shown in the example. .. code-block:: python from twikit.twikit_async import Capsolver, Client solver = Capsolver( api_key='your_api_key', max_attempts=10 ) client = Client(captcha_solver=solver) Parameters ---------- api_key : :class:`str` Capsolver API key. max_attempts : :class:`int`, default=3 The maximum number of attempts to solve the captcha. get_result_interval : :class:`float`, default=1.0 use_blob_data : :class:`bool`, default=False """ def __init__( self, api_key: str, max_attempts: int = 3, get_result_interval: float = 1.0, use_blob_data: bool = False ) -> None: self.api_key = api_key self.get_result_interval = get_result_interval self.max_attempts = max_attempts self.use_blob_data = use_blob_data def create_task(self, task_data: dict) -> dict: data = { 'clientKey': self.api_key, 'task': task_data } response = httpx.post( 'https://api.capsolver.com/createTask', json=data, headers={'content-type': 'application/json'} ).json() return response def get_task_result(self, task_id: str) -> dict: data = { 'clientKey': self.api_key, 'taskId': task_id } response = httpx.post( 'https://api.capsolver.com/getTaskResult', json=data, headers={'content-type': 'application/json'} ).json() return response def solve_funcaptcha(self, blob: str) -> dict: if self.client.proxy is None: captcha_type = 'FunCaptchaTaskProxyLess' else: captcha_type = 'FunCaptchaTask' task_data = { 'type': captcha_type, 'websiteURL': 'https://iframe.arkoselabs.com', 'websitePublicKey': self.CAPTCHA_SITE_KEY, 'funcaptchaApiJSSubdomain': 'https://client-api.arkoselabs.com', 'proxy': self.client.proxy } if self.use_blob_data: task_data['data'] = '{"blob":"%s"}' % blob task_data['userAgent'] = self.client._user_agent task = self.create_task(task_data) while True: sleep(self.get_result_interval) result = self.get_task_result(task['taskId']) if result['status'] in ('ready', 'failed'): return result