commit 25403009f2514f82e85b91e68933c994e94c0142 Author: SamNet-dev Date: Thu Feb 19 18:12:57 2026 -0600 Initial release — cfray v1.0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f111d4f --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +results/ +debug.log +*.bak diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..959aafb --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 SamNet Technologies + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..074744d --- /dev/null +++ b/README.md @@ -0,0 +1,462 @@ +
+ +# ⚡ cfray + +### Cloudflare Config Scanner & Clean IP Finder + +**Test VLESS/VMess proxy configs for latency & speed + Scan all ~1.5M Cloudflare IPs to find clean, reachable edges** + +[![Python 3.8+](https://img.shields.io/badge/Python-3.8+-blue.svg)](https://python.org) +[![Zero Dependencies](https://img.shields.io/badge/Dependencies-Zero-green.svg)](#) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) + +[English](#-english) • [فارسی](#-فارسی) + +--- + +
+ +## 🇬🇧 English + +### 📖 What is cfray? + +cfray is a single-file Python tool that finds the **fastest Cloudflare edge IPs** for your VLESS/VMess proxy configs. It tests latency (ping) and download speed across hundreds of IPs, ranks them by score, and exports the best configs — ready to use. It also includes a **Clean IP Finder** that scans all ~1.5M Cloudflare IPv4 addresses (from 14 published subnets) to discover reachable edge IPs. Mega mode tests each IP on 2 ports for ~3M total probes. + +**Zero dependencies.** Just Python 3.8+ and one file. + +--- + +### 🚀 Quick Start + +```bash +# Download +git clone https://github.com/SamNet-dev/cfray.git +cd cfray + +# Run interactive TUI +python3 scanner.py + +# That's it! Pick your input, choose a mode, and watch the results. +``` + +--- + +### 📥 Input Methods + +cfray supports **5 ways** to load your configs: + +#### 1️⃣ Config File `[1-9]` +A `.txt` file with one VLESS or VMess URI per line: +``` +vless://uuid@domain1.ir:443?type=ws&host=sni.com&path=/dl&security=tls#config-1 +vless://uuid@domain2.ir:443?type=ws&host=sni.com&path=/dl&security=tls#config-2 +vmess://eyJ2IjoiMiIsImFkZCI6... +``` +Drop your file in the same folder as `scanner.py` and it shows up automatically. + +#### 2️⃣ Subscription URL `[S]` +Paste a remote URL that serves VLESS/VMess configs: +```bash +# Interactive +python3 scanner.py +# Press S, paste URL + +# CLI +python3 scanner.py --sub https://example.com/sub.txt +``` +Supports both **plain text** (one URI per line) and **base64-encoded** subscriptions. + +#### 3️⃣ Template + Address List `[T]` +Have ONE working config but want to test it against many Cloudflare IPs? This is for you. + +**How it works:** +1. You give a VLESS/VMess config as a **template** +2. You give a `.txt` file with **Cloudflare IPs or domains** (one per line) +3. cfray creates a config for **each address** by replacing the IP in your template +4. Tests them all and finds the fastest + +```bash +# Interactive +python3 scanner.py +# Press T, paste your config, enter path to address list + +# CLI +python3 scanner.py --template 'vless://uuid@placeholder:443?type=ws&...' -i addresses.txt +``` + +**Example address list** (`addresses.txt`): +``` +104.21.12.206 +188.114.96.7 +172.67.132.102 +172.67.166.192 +``` + +#### 4️⃣ Domain JSON File +A JSON file with domain + IP data: +```json +{"data": [ + {"domain": "example.ir", "ipv4": "104.21.x.x"}, + {"domain": "other.ir", "ipv4": "172.67.x.x"} +]} +``` + +#### 5️⃣ Clean IP Finder `[F]` +Don't have any configs or IPs? cfray can **scan all Cloudflare IP ranges** to find clean, reachable edge IPs — then use them directly with a template for speed testing. + +**How it works:** +1. Generates IPs from all 14 Cloudflare IPv4 subnets (~1.5M IPs) +2. Probes each IP with TLS handshake + Cloudflare header validation +3. Returns all responding IPs sorted by latency +4. Save the results or feed them into Template mode for a full speed test + +**Scan modes:** + +| Mode | IPs Tested | Ports | Est. Time | Description | +|-----------|-------------|-------------|-------------|------------------------------------------| +| Quick | ~4,000 | 443 | ~30 sec | 1 random IP per /24 block | +| Normal | ~12,000 | 443 | ~2 min | 3 IPs per /24 + CF verify (recommended) | +| Full | ~1,500,000 | 443 | 20+ min | All IPs + CF verify | +| Mega | ~3,000,000 | 443 + 8443 | 30-60 min | All IPs on 2 ports for maximum coverage | + +Cloudflare publishes [14 IPv4 subnets](https://www.cloudflare.com/ips-v4/) totaling **~1,511,808 unique IPs**. Full and Mega modes scan all of them. **Mega mode** tests every IP on both port 443 and 8443 (Cloudflare's alternate HTTPS port), doubling the probes to **~3M** (1.5M IPs × 2 ports). This is useful when some IPs are blocked on one port but reachable on another. Results include the port (e.g., `104.16.5.20:8443`). + +```bash +# Interactive +python3 scanner.py +# Press F, choose scan mode + +# CLI +python3 scanner.py --find-clean --no-tui --clean-mode mega + +# With custom subnets +python3 scanner.py --find-clean --no-tui --subnets "104.16.0.0/13,172.64.0.0/13" +``` + +Found IPs are saved to `results/clean_ips.txt` (full absolute path shown). You can then use them with Template mode to speed test a config against all discovered IPs. + +--- + +### 🔬 How the Scan Works + +``` +Step 1: 🔍 DNS Resolution + Resolve all domains to their Cloudflare edge IPs + Group configs by IP (many domains share the same CF edge) + +Step 2: 📡 Latency Test + TCP connect + TLS handshake to each unique IP + Mark alive/dead, measure ping & connection time + +Step 3: 📊 Speed Test (progressive rounds) + R1: Small file (1-5MB) → test all alive IPs + R2: Medium file (5-25MB) → test top candidates + R3: Large file (20-50MB) → test the best ones + (For <50 IPs, ALL are tested in every round) + +Step 4: 🏆 Scoring & Export + Score = Latency (35%) + Speed (50%) + TTFB (15%) + Export top configs ranked by score +``` + +--- + +### ⚙️ Scan Modes + +| Mode | Rounds | Est. Data | Est. Time | Best For | +|----------------|----------------------|-----------|------------|------------------------| +| ⚡ Quick | 1MB → 5MB | ~200 MB | ~2-3 min | Fast check | +| 🔄 Normal | 1MB → 5MB → 20MB | ~850 MB | ~5-10 min | Balanced (recommended) | +| 🔬 Thorough | 5MB → 25MB → 50MB | ~5-10 GB | ~20-45 min | Maximum accuracy | + +--- + +### 🖥️ Dashboard Controls + +After the scan, you get an interactive dashboard: + +| Key | Action | +|-------|--------------------------------------------| +| `S` | 🔄 Cycle sort: score → latency → speed | +| `C` | 📋 View all VLESS/VMess URIs for an IP | +| `D` | 🌐 View domains for an IP | +| `E` | 💾 Export results (CSV + top N configs) | +| `A` | 📦 Export ALL configs sorted best → worst | +| `J/K` | ⬆️⬇️ Scroll up/down | +| `N/P` | 📄 Page up/down | +| `B` | ◀️ Back to main menu (new scan) | +| `H` | ❓ Help | +| `Q` | 🚪 Quit | + +--- + +### 🔧 CLI Options + +```bash +python3 scanner.py [options] +``` + +| Option | Description | Default | +|---------------------|------------------------------------------|----------| +| `-i, --input` | Input file (VLESS URIs or .json) | — | +| `--sub` | Subscription URL | — | +| `--template` | VLESS/VMess template URI (use with `-i`) | — | +| `-m, --mode` | `quick` / `normal` / `thorough` | `normal` | +| `--rounds` | Custom rounds, e.g. `"1MB:200,5MB:50"` | auto | +| `-w, --workers` | Latency test workers | 300 | +| `--speed-workers` | Download test workers | 10 | +| `--timeout` | Latency timeout (seconds) | 5 | +| `--speed-timeout` | Download timeout (seconds) | 30 | +| `--skip-download` | Latency only, no speed test | off | +| `--top` | Export top N configs (0 = all) | 50 | +| `--no-tui` | Headless mode (plain text output) | off | +| `-o, --output` | CSV output path | auto | +| `--output-configs` | Config file output path | auto | +| `--find-clean` | Find clean Cloudflare IPs | off | +| `--clean-mode` | `quick` / `normal` / `full` / `mega` | `normal` | +| `--subnets` | Custom subnets (file or comma-separated) | all CF | + +--- + +### 💡 Examples + +```bash +# Interactive TUI — easiest way +python3 scanner.py + +# Quick scan with subscription +python3 scanner.py --sub https://example.com/sub.txt --mode quick + +# Template: test one config against 500 IPs +python3 scanner.py --template 'vless://uuid@x:443?type=ws&host=sni.com&security=tls#test' -i ips.txt + +# Headless mode for scripts/cron +python3 scanner.py -i configs.txt --no-tui --mode normal -o results.csv + +# Latency only (no download test) +python3 scanner.py -i configs.txt --skip-download + +# Custom rounds +python3 scanner.py -i configs.txt --rounds "2MB:100,10MB:30,50MB:10" + +# Find clean Cloudflare IPs (interactive) +python3 scanner.py # Press F + +# Find clean IPs (headless, mega mode — ~3M probes) +python3 scanner.py --find-clean --no-tui --clean-mode mega +``` + +--- + +### 📁 Output Files + +Results are saved to the `results/` folder: + +| File | Contents | +|----------------------|-----------------------------------------| +| `*_results.csv` | Full CSV with all metrics | +| `*_top50.txt` | Top 50 VLESS/VMess URIs (ready to use) | +| `*_full_sorted.txt` | ALL configs sorted best → worst | +| `clean_ips.txt` | Clean Cloudflare IPs from IP finder | + +--- + +### 🛡️ Rate Limiting & CDN Fallback + +cfray is smart about Cloudflare's speed test limits: +- Tracks request budget (550 requests per 10-minute window) +- When rate-limited (429), automatically switches to **CDN mirror** (`cloudflaremirrors.com`) +- When CF blocks large downloads (403), retries through CDN +- Shows countdown timer when waiting for rate limit reset + +--- + +## 🇮🇷 فارسی + +### 📖 cfray چیه؟ + +cfray یه ابزار پایتونه که **سریع‌ترین آی‌پی‌های کلادفلر** رو برای کانفیگ‌های VLESS/VMess پیدا می‌کنه. پینگ و سرعت دانلود رو تست می‌کنه، بهترین‌ها رو امتیاز میده و خروجی آماده استفاده میده. همچنین شامل **جستجوگر آی‌پی تمیز** هست که تمام ~۱.۵ میلیون آی‌پی IPv4 کلادفلر (از ۱۴ زیرشبکه) رو اسکن می‌کنه. حالت Mega هر آی‌پی رو روی ۲ پورت تست می‌کنه (~۳ میلیون پروب). + +**بدون نیاز به نصب چیز اضافه.** فقط Python 3.8+ و یه فایل. + +--- + +### 🚀 شروع سریع + +```bash +# دانلود +git clone https://github.com/SamNet-dev/cfray.git +cd cfray + +# اجرا +python3 scanner.py +``` + +--- + +### 📥 روش‌های ورودی + +cfray **۵ روش** برای بارگذاری کانفیگ‌ها داره: + +#### 1️⃣ فایل کانفیگ `[1-9]` +یه فایل `.txt` که هر خط یه آدرس VLESS یا VMess داره: +``` +vless://uuid@domain1.ir:443?type=ws&host=sni.com&path=/dl&security=tls#config-1 +vmess://eyJ2IjoiMiIsImFkZCI6... +``` +فایلتون رو کنار `scanner.py` بذارید، خودش پیداش می‌کنه. + +#### 2️⃣ لینک اشتراک (Subscription) `[S]` +یه لینک بدید که توش کانفیگ‌های VLESS/VMess هست: +```bash +python3 scanner.py --sub https://example.com/sub.txt +``` +هم **متن ساده** (هر خط یه URI) و هم **base64** رو ساپورت می‌کنه. + +#### 3️⃣ قالب + لیست آدرس (Template) `[T]` +یه کانفیگ دارید ولی می‌خواید با کلی آی‌پی کلادفلر تستش کنید؟ این روش مال شماست! + +**چطوری کار می‌کنه:** +1. یه کانفیگ VLESS/VMess به عنوان **قالب** میدید +2. یه فایل `.txt` با **آی‌پی‌ها یا دامنه‌های کلادفلر** میدید (هر خط یکی) +3. cfray به تعداد آدرس‌ها **کانفیگ می‌سازه** — آدرس توی قالب رو با هر آی‌پی عوض می‌کنه +4. همه رو تست می‌کنه و سریع‌ترین رو پیدا می‌کنه + +```bash +# تعاملی +python3 scanner.py +# T رو بزن، کانفیگتو پیست کن، مسیر فایل آدرس‌ها رو بده + +# خط فرمان +python3 scanner.py --template 'vless://uuid@x:443?type=ws&...' -i addresses.txt +``` + +**مثال فایل آدرس** (`addresses.txt`): +``` +104.21.12.206 +188.114.96.7 +172.67.132.102 +``` + +#### 4️⃣ فایل JSON دامنه‌ها +```json +{"data": [ + {"domain": "example.ir", "ipv4": "104.21.x.x"}, + {"domain": "other.ir", "ipv4": "172.67.x.x"} +]} +``` + +#### 5️⃣ پیدا کردن آی‌پی تمیز کلادفلر `[F]` +کانفیگ یا آی‌پی ندارید؟ cfray می‌تونه **تمام رنج آی‌پی‌های کلادفلر** رو اسکن کنه و آی‌پی‌های تمیز و قابل دسترس رو پیدا کنه — بعد مستقیم با حالت Template تست سرعت کنید. + +**چطوری کار می‌کنه:** +1. آی‌پی‌ها رو از ۱۴ زیرشبکه IPv4 کلادفلر تولید می‌کنه (~۱.۵ میلیون آی‌پی) +2. هر آی‌پی رو با TLS handshake + بررسی هدر کلادفلر تست می‌کنه +3. آی‌پی‌های جواب‌دهنده رو بر اساس پینگ مرتب برمی‌گردونه +4. نتایج رو ذخیره کنید یا با حالت Template برای تست سرعت استفاده کنید + +**حالت‌های اسکن:** + +| حالت | تعداد آی‌پی | پورت‌ها | زمان تقریبی | توضیحات | +|-----------|---------------|-------------|--------------|-------------------------------------------| +| Quick | ~۴,۰۰۰ | 443 | ~۳۰ ثانیه | ۱ آی‌پی تصادفی از هر بلاک /24 | +| Normal | ~۱۲,۰۰۰ | 443 | ~۲ دقیقه | ۳ آی‌پی از هر بلاک + تایید CF (پیشنهادی) | +| Full | ~۱,۵۰۰,۰۰۰ | 443 | ۲۰+ دقیقه | همه آی‌پی‌ها + تایید CF | +| Mega | ~۳,۰۰۰,۰۰۰ | 443 + 8443 | ۳۰-۶۰ دقیقه | همه آی‌پی‌ها روی ۲ پورت برای حداکثر پوشش | + +کلادفلر [۱۴ زیرشبکه IPv4](https://www.cloudflare.com/ips-v4/) منتشر کرده که مجموعاً **~۱,۵۱۱,۸۰۸ آی‌پی یکتا** هستن. حالت‌های Full و Mega همه رو اسکن می‌کنن. **حالت Mega** هر آی‌پی رو روی پورت 443 و 8443 (پورت جایگزین HTTPS کلادفلر) تست می‌کنه و تعداد پروب‌ها رو به **~۳ میلیون** می‌رسونه (۱.۵ میلیون آی‌پی × ۲ پورت). وقتی بعضی آی‌پی‌ها روی یه پورت مسدود هستن ولی روی پورت دیگه کار می‌کنن، این حالت خیلی مفیده. + +```bash +# تعاملی +python3 scanner.py +# F رو بزن، حالت اسکن رو انتخاب کن + +# خط فرمان +python3 scanner.py --find-clean --no-tui --clean-mode mega +``` + +آی‌پی‌های پیدا شده توی `results/clean_ips.txt` ذخیره میشن. بعد می‌تونید با حالت Template تست سرعت کنید. + +--- + +### 🔬 اسکن چطوری کار می‌کنه؟ + +``` +مرحله ۱: 🔍 تبدیل دامنه به آی‌پی (DNS) + هر دامنه رو به آی‌پی کلادفلرش تبدیل می‌کنه + کانفیگ‌هایی که آی‌پی مشترک دارن رو گروه می‌کنه + +مرحله ۲: 📡 تست پینگ (Latency) + اتصال TCP + TLS به هر آی‌پی + زنده/مرده مشخص میشه، پینگ اندازه‌گیری میشه + +مرحله ۳: 📊 تست سرعت (دانلود مرحله‌ای) + R1: فایل کوچک (1-5MB) → همه آی‌پی‌ها + R2: فایل متوسط (5-25MB) → بهترین‌ها + R3: فایل بزرگ (20-50MB) → برترین‌ها + (اگه کمتر از ۵۰ آی‌پی باشه، همه توی هر مرحله تست میشن) + +مرحله ۴: 🏆 امتیازدهی و خروجی + امتیاز = پینگ (۳۵%) + سرعت (۵۰%) + TTFB (۱۵%) + بهترین کانفیگ‌ها رتبه‌بندی و ذخیره میشن +``` + +--- + +### ⚙️ حالت‌های اسکن + +| حالت | مراحل | حجم تقریبی | زمان تقریبی | مناسب برای | +|---------------------|----------------------|------------|--------------|---------------------| +| ⚡ سریع (Quick) | 1MB → 5MB | ~200 MB | ~2-3 دقیقه | بررسی سریع | +| 🔄 معمولی (Normal) | 1MB → 5MB → 20MB | ~850 MB | ~5-10 دقیقه | متعادل (پیشنهادی) | +| 🔬 دقیق (Thorough) | 5MB → 25MB → 50MB | ~5-10 GB | ~20-45 دقیقه | حداکثر دقت | + +--- + +### 🖥️ کلیدهای داشبورد + +بعد از اتمام اسکن، یه داشبورد تعاملی دارید: + +| کلید | عملکرد | +|-------|--------------------------------------------------| +| `S` | 🔄 تغییر مرتب‌سازی: امتیاز → پینگ → سرعت | +| `C` | 📋 نمایش کانفیگ‌های VLESS/VMess یه آی‌پی | +| `D` | 🌐 نمایش دامنه‌های یه آی‌پی | +| `E` | 💾 خروجی گرفتن (CSV + بهترین N تا) | +| `A` | 📦 خروجی همه کانفیگ‌ها (مرتب شده) | +| `J/K` | ⬆️⬇️ بالا/پایین | +| `N/P` | 📄 صفحه بعد/قبل | +| `B` | ◀️ برگشت به منو (اسکن جدید) | +| `H` | ❓ راهنما | +| `Q` | 🚪 خروج | + +--- + +### 📁 فایل‌های خروجی + +نتایج توی پوشه `results/` ذخیره میشن: + +| فایل | محتوا | +|----------------------|-----------------------------------------| +| `*_results.csv` | فایل CSV با تمام اطلاعات | +| `*_top50.txt` | ۵۰ تا بهترین کانفیگ (آماده استفاده) | +| `*_full_sorted.txt` | همه کانفیگ‌ها مرتب شده | +| `clean_ips.txt` | آی‌پی‌های تمیز کلادفلر از IP Finder | + +--- + +### 🛡️ مدیریت محدودیت کلادفلر + +cfray هوشمندانه با محدودیت‌های سرعت‌سنجی کلادفلر کار می‌کنه: +- بودجه درخواست‌ها رو پیگیری می‌کنه (۵۵۰ درخواست در هر ۱۰ دقیقه) +- وقتی محدود بشه (429)، خودکار به **سرور CDN** سوئیچ می‌کنه +- وقتی فایل بزرگ رد بشه (403)، از طریق CDN دوباره امتحان می‌کنه +- تایمر شمارش معکوس نشون میده + +--- + +
+ +### ⭐ Made by Sam — SamNet Technologies + +
diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..500b455 --- /dev/null +++ b/scanner.py @@ -0,0 +1,3104 @@ +#!/usr/bin/env python3 +# +# ┌─────────────────────────────────────────────────────────────────┐ +# │ │ +# │ ⚡ CF CONFIG SCANNER v1.0 │ +# │ │ +# │ Test VLESS/VMess proxy configs for latency + download speed │ +# │ │ +# │ • Latency test (TCP + TLS) all IPs in seconds │ +# │ • Download speed test via progressive funnel │ +# │ • Live TUI dashboard with real-time results │ +# │ • Smart rate limiting with CDN fallback │ +# │ • Clean IP Finder — scan all Cloudflare ranges (up to 3M) │ +# │ • Multi-port scanning (443, 8443) for maximum coverage │ +# │ • Zero dependencies — Python 3.8+ stdlib only │ +# │ │ +# │ GitHub: https://github.com/SamNet-dev/cfray │ +# │ │ +# └─────────────────────────────────────────────────────────────────┘ +# +# Usage: +# python3 scanner.py Interactive TUI +# python3 scanner.py -i configs.txt Normal mode +# python3 scanner.py --sub https://example.com/sub Fetch from subscription +# python3 scanner.py --template "vless://..." -i addrs.json Generate + test +# python3 scanner.py --find-clean --no-tui --clean-mode mega Clean IP scan +# + +import asyncio +import argparse +import base64 +import csv +import glob as globmod +import ipaddress +import json +import os +import random +import re +import signal +import socket +import ssl +import statistics +import sys +import time +import urllib.parse +import urllib.request +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple + + +VERSION = "1.0" +SPEED_HOST = "speed.cloudflare.com" +SPEED_PATH = "/__down" +DEBUG_LOG = os.path.join("results", "debug.log") +LOG_MAX_BYTES = 5 * 1024 * 1024 + +LATENCY_WORKERS = 50 +SPEED_WORKERS = 10 +LATENCY_TIMEOUT = 5.0 +SPEED_TIMEOUT = 30.0 + +CDN_FALLBACK = ("cloudflaremirrors.com", "/archlinux/iso/latest/archlinux-x86_64.iso") + +# Cloudflare published IPv4 ranges (https://www.cloudflare.com/ips-v4/) +CF_SUBNETS = [ + "173.245.48.0/20", "103.21.244.0/22", "103.22.200.0/22", + "103.31.4.0/22", "141.101.64.0/18", "108.162.192.0/18", + "190.93.240.0/20", "188.114.96.0/20", "197.234.240.0/22", + "198.41.128.0/17", "162.158.0.0/15", "104.16.0.0/13", + "104.24.0.0/14", "172.64.0.0/13", +] + +CF_HTTPS_PORTS = [443, 8443, 2053, 2083, 2087, 2096] + +CLEAN_MODES = { + "quick": {"label": "Quick", "sample": 1, "workers": 500, "validate": False, + "ports": [443], "desc": "1 random IP per /24 (~4K IPs, ~30s)"}, + "normal": {"label": "Normal", "sample": 3, "workers": 500, "validate": True, + "ports": [443], "desc": "3 IPs per /24 + CF verify (~12K IPs, ~2 min)"}, + "full": {"label": "Full", "sample": 0, "workers": 1000, "validate": True, + "ports": [443], "desc": "All IPs + CF verify (~1.5M IPs, 20+ min)"}, + "mega": {"label": "Mega", "sample": 0, "workers": 1500, "validate": True, + "ports": [443, 8443], "desc": "All IPs × 2 ports (~3M probes, 30-60 min)"}, +} + +PRESETS = { + "quick": { + "label": "Quick", + "desc": "Latency sort -> 1MB top 100 -> 5MB top 20", + "dynamic": True, + "latency_cut": 50, + "round_sizes": [1_000_000, 5_000_000], + "round_pcts": [100, 20], + "round_min": [50, 10], + "round_max": [100, 20], + "data": "~200 MB", + "time": "~2-3 min", + }, + "normal": { + "label": "Normal", + "desc": "Latency sort -> 1MB top 200 -> 5MB top 50 -> 20MB top 20", + "dynamic": True, + "latency_cut": 40, + "round_sizes": [1_000_000, 5_000_000, 20_000_000], + "round_pcts": [100, 25, 10], + "round_min": [50, 20, 10], + "round_max": [200, 50, 20], + "data": "~850 MB", + "time": "~5-10 min", + }, + "thorough": { + "label": "Thorough", + "desc": "Deep funnel: 5MB / 25MB / 50MB", + "dynamic": True, + "latency_cut": 15, + "round_sizes": [5_000_000, 25_000_000, 50_000_000], + "round_pcts": [100, 25, 10], + "round_min": [0, 30, 15], + "round_max": [0, 150, 50], + "data": "~5-10 GB", + "time": "~20-45 min", + }, +} + + +class A: + RST = "\033[0m" + BOLD = "\033[1m" + DIM = "\033[2m" + ITAL = "\033[3m" + ULINE = "\033[4m" + RED = "\033[31m" + GRN = "\033[32m" + YEL = "\033[33m" + BLU = "\033[34m" + MAG = "\033[35m" + CYN = "\033[36m" + WHT = "\033[97m" + BGBL = "\033[44m" + BGDG = "\033[100m" + HOME = "\033[H" + CLR = "\033[H\033[J" + EL = "\033[2K" + HIDE = "\033[?25l" + SHOW = "\033[?25h" + + +_ansi_re = re.compile(r"\033\[[^m]*m") + + +def _dbg(msg: str): + """Append a debug line to results/debug.log with rotation.""" + try: + os.makedirs("results", exist_ok=True) + if os.path.exists(DEBUG_LOG): + try: + sz = os.path.getsize(DEBUG_LOG) + if sz > LOG_MAX_BYTES: + bak = DEBUG_LOG + ".1" + if os.path.exists(bak): + os.remove(bak) + os.rename(DEBUG_LOG, bak) + except Exception: + pass + with open(DEBUG_LOG, "a", encoding="utf-8") as f: + f.write(f"{time.strftime('%H:%M:%S')} {msg}\n") + except Exception: + pass + + +def _char_width(c: str) -> int: + """Return terminal column width of a single character (1 or 2).""" + o = ord(c) + # Common wide ranges: CJK, emojis, dingbats, symbols, etc. + if ( + 0x1100 <= o <= 0x115F # Hangul Jamo + or 0x2329 <= o <= 0x232A # angle brackets + or 0x2E80 <= o <= 0x303E # CJK radicals / ideographic + or 0x3040 <= o <= 0x33BF # Hiragana / Katakana / CJK compat + or 0x3400 <= o <= 0x4DBF # CJK Unified Extension A + or 0x4E00 <= o <= 0xA4CF # CJK Unified / Yi + or 0xA960 <= o <= 0xA97C # Hangul Jamo Extended-A + or 0xAC00 <= o <= 0xD7A3 # Hangul Syllables + or 0xF900 <= o <= 0xFAFF # CJK Compatibility Ideographs + or 0xFE10 <= o <= 0xFE6F # CJK compat forms / small forms + or 0xFF01 <= o <= 0xFF60 # Fullwidth forms + or 0xFFE0 <= o <= 0xFFE6 # Fullwidth signs + or 0x1F000 <= o <= 0x1FAFF # Mahjong, Domino, Playing Cards, Emojis, Symbols + or 0x20000 <= o <= 0x2FA1F # CJK Unified Extension B-F + or 0x2600 <= o <= 0x27BF # Misc symbols, Dingbats + or 0x2700 <= o <= 0x27BF # Dingbats + or 0xFE00 <= o <= 0xFE0F # Variation selectors (zero-width but paired with emoji) + or 0x200D == o # ZWJ (zero-width joiner) + or 0x231A <= o <= 0x231B # Watch, Hourglass + or 0x23E9 <= o <= 0x23F3 # Various symbols + or 0x23F8 <= o <= 0x23FA # Various symbols + or 0x25AA <= o <= 0x25AB # Small squares + or 0x25B6 == o or 0x25C0 == o # Play buttons + or 0x25FB <= o <= 0x25FE # Medium squares + or 0x2614 <= o <= 0x2615 # Umbrella, Hot beverage + or 0x2648 <= o <= 0x2653 # Zodiac signs + or 0x267F == o # Wheelchair + or 0x2693 == o # Anchor + or 0x26A1 == o # High voltage (⚡) + or 0x26AA <= o <= 0x26AB # Circles + or 0x26BD <= o <= 0x26BE # Soccer, Baseball + or 0x26C4 <= o <= 0x26C5 # Snowman, Sun behind cloud + or 0x26D4 == o # No entry + or 0x26EA == o # Church + or 0x26F2 <= o <= 0x26F3 # Fountain, Golf + or 0x26F5 == o # Sailboat + or 0x26FA == o # Tent + or 0x26FD == o # Fuel pump + or 0x2702 == o # Scissors + or 0x2705 == o # Check mark + or 0x2708 <= o <= 0x270D # Various + or 0x270F == o # Pencil + or 0x2753 <= o <= 0x2755 # Question marks (❓❔❕) + or 0x2757 == o # Exclamation + or 0x2795 <= o <= 0x2797 # Plus, Minus, Divide + or 0x27B0 == o or 0x27BF == o # Curly loop + ): + return 2 + # Zero-width characters + if o in (0xFE0F, 0xFE0E, 0x200D, 0x200B, 0x200C, 0x200E, 0x200F): + return 0 + return 1 + + +def _vl(s: str) -> int: + """Visible length of a string, accounting for ANSI codes and wide chars.""" + clean = _ansi_re.sub("", s) + return sum(_char_width(c) for c in clean) + + +def _w(text: str): + sys.stdout.write(text) + + +def _fl(): + sys.stdout.flush() + + +def enable_ansi(): + if sys.platform == "win32": + os.system("") + try: + import ctypes + k = ctypes.windll.kernel32 + h = k.GetStdHandle(-11) + m = ctypes.c_ulong() + k.GetConsoleMode(h, ctypes.byref(m)) + k.SetConsoleMode(h, m.value | 0x0004) + except Exception: + pass + + +def term_size() -> Tuple[int, int]: + try: + c, r = os.get_terminal_size() + return max(c, 60), max(r, 20) + except Exception: + return 80, 24 + + +def _read_key_blocking() -> str: + """Read a single key press (blocking). Returns key name.""" + if sys.platform == "win32": + import msvcrt + k = msvcrt.getch() + if k in (b"\x00", b"\xe0"): + k2 = msvcrt.getch() + return {b"H": "up", b"P": "down", b"K": "left", b"M": "right"}.get(k2, "") + if k == b"\r": + return "enter" + if k == b"\x03": + return "ctrl-c" + if k == b"\x1b": + return "esc" + return k.decode("latin-1", errors="replace") + else: + import select as _sel + import termios, tty + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + ch = sys.stdin.read(1) + if ch == "\x1b": + rdy, _, _ = _sel.select([sys.stdin], [], [], 0.2) + if rdy: + ch2 = sys.stdin.read(1) + if ch2 == "[": + rdy2, _, _ = _sel.select([sys.stdin], [], [], 0.2) + if rdy2: + ch3 = sys.stdin.read(1) + return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "esc") + return "esc" + return "esc" + if ch == "\r" or ch == "\n": + return "enter" + if ch == "\x03": + return "ctrl-c" + return ch + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def _read_key_nb(timeout: float = 0.05) -> Optional[str]: + """Non-blocking key read. Returns None if no key.""" + if sys.platform == "win32": + import msvcrt + if msvcrt.kbhit(): + return _read_key_blocking() + time.sleep(timeout) + return None + else: + import select + import termios, tty + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + rdy, _, _ = select.select([sys.stdin], [], [], timeout) + if rdy: + ch = sys.stdin.read(1) + if ch == "\x1b": + # Wait for escape sequence bytes (longer timeout for SSH) + rdy2, _, _ = select.select([sys.stdin], [], [], 0.2) + if rdy2: + ch2 = sys.stdin.read(1) + if ch2 == "[": + rdy3, _, _ = select.select([sys.stdin], [], [], 0.2) + if rdy3: + ch3 = sys.stdin.read(1) + return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "") + return "" + return "esc" # bare Esc key + if ch in ("\r", "\n"): + return "enter" + if ch == "\x03": + return "ctrl-c" + return ch + return None + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def _wait_any_key(): + """Simple blocking wait for any keypress. More robust than _read_key_blocking for popups.""" + if sys.platform == "win32": + import msvcrt + msvcrt.getch() + else: + import termios, tty + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setraw(fd) + sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def _prompt_number(prompt: str, max_val: int) -> Optional[int]: + """Show prompt, read a number from user. Returns None if cancelled.""" + _w(A.SHOW) + _w(f"\n {prompt}") + _fl() + buf = "" + if sys.platform == "win32": + import msvcrt + while True: + k = msvcrt.getch() + if k == b"\r": + break + if k == b"\x1b" or k == b"\x03": + _w("\n") + return None + if k == b"\x08" and buf: + buf = buf[:-1] + _w("\b \b") + _fl() + continue + ch = k.decode("latin-1", errors="replace") + if ch.isdigit(): + buf += ch + _w(ch) + _fl() + else: + import termios, tty + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + while True: + ch = sys.stdin.read(1) + if ch in ("\r", "\n"): + break + if ch == "\x1b" or ch == "\x03": + _w("\n") + return None + if ch == "\x7f" and buf: # backspace + buf = buf[:-1] + _w("\b \b") + _fl() + continue + if ch.isdigit(): + buf += ch + _w(ch) + _fl() + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + _w(A.HIDE) + if buf and buf.isdigit(): + n = int(buf) + if 1 <= n <= max_val: + return n + return None + + +def _fmt_elapsed(secs: float) -> str: + m, s = divmod(int(secs), 60) + if m > 0: + return f"{m}m {s:02d}s" + return f"{s}s" + + +@dataclass +class ConfigEntry: + address: str + name: str = "" + original_uri: str = "" + ip: str = "" + + +@dataclass +class RoundCfg: + size: int + keep: int + + @property + def label(self) -> str: + if self.size >= 1_000_000: + return f"{self.size // 1_000_000}MB" + return f"{self.size // 1000}KB" + + +@dataclass +class Result: + ip: str + domains: List[str] = field(default_factory=list) + uris: List[str] = field(default_factory=list) + tcp_ms: float = -1 + tls_ms: float = -1 + ttfb_ms: float = -1 + speeds: List[float] = field(default_factory=list) + best_mbps: float = -1 + colo: str = "" + score: float = 0 + error: str = "" + alive: bool = False + + +class State: + def __init__(self): + self.input_file = "" + self.configs: List[ConfigEntry] = [] + self.ip_map: Dict[str, List[ConfigEntry]] = defaultdict(list) + self.ips: List[str] = [] + self.res: Dict[str, Result] = {} + self.rounds: List[RoundCfg] = [] + self.mode = "normal" + + self.phase = "init" + self.phase_label = "" + self.cur_round = 0 + self.total = 0 + self.done_count = 0 + self.alive_n = 0 + self.dead_n = 0 + self.best_speed = 0.0 + self.start_time = 0.0 + self.notify = "" # notification message shown in footer + self.notify_until = 0.0 + + self.top = 50 # export top N (0 = all) + self.finished = False + self.interrupted = False + self.saved = False + self.latency_cut_n = 0 # how many IPs were cut after latency phase + + +class CFRateLimiter: + """Respects Cloudflare's per-IP rate limit window. + + CF allows ~600 requests per 10-minute window to speed.cloudflare.com. + When 429 is received, retry-after header tells us exactly when the + window resets. We track request count and pause when budget runs out + or when CF explicitly tells us to wait. + """ + BUDGET = 550 # conservative limit (CF allows ~600) + WINDOW = 600 # 10-minute window in seconds + + def __init__(self): + self.count = 0 + self.window_start = 0.0 + self.blocked_until = 0.0 + self._lock = asyncio.Lock() + + async def _wait_blocked(self, st: Optional["State"]): + """Wait out a 429 block period (called outside lock).""" + while time.monotonic() < self.blocked_until: + if st and st.interrupted: + return + left = int(self.blocked_until - time.monotonic()) + if st: + st.phase_label = f"CF rate limit — resuming in {left}s" + await asyncio.sleep(1) + + async def _wait_budget(self, wait_until: float, st: Optional["State"]): + """Wait for window reset when budget exhausted (called outside lock).""" + while time.monotonic() < wait_until: + if st and st.interrupted: + return + left = int(wait_until - time.monotonic()) + if st: + st.phase_label = f"Rate limit ({self.count} reqs) — next window in {left}s" + await asyncio.sleep(1) + + async def acquire(self, st: Optional["State"] = None): + """Wait if we're rate-limited, then count a request.""" + # Wait out any 429 block first (outside lock so others can also wait) + if self.blocked_until > 0 and time.monotonic() < self.blocked_until: + _dbg(f"RATE: waiting {self.blocked_until - time.monotonic():.0f}s for CF window reset") + await self._wait_blocked(st) + + await self._lock.acquire() + try: + # Re-check after acquiring lock + if self.blocked_until > 0 and time.monotonic() >= self.blocked_until: + self.count = 0 + self.window_start = time.monotonic() + self.blocked_until = 0.0 + + now = time.monotonic() + if self.window_start == 0.0: + self.window_start = now + + if now - self.window_start >= self.WINDOW: + self.count = 0 + self.window_start = now + + if self.count >= self.BUDGET: + remaining = self.WINDOW - (now - self.window_start) + if remaining > 0: + _dbg(f"RATE: budget exhausted ({self.count} reqs), waiting {remaining:.0f}s") + wait_until = self.window_start + self.WINDOW + saved_window = self.window_start + self._lock.release() + try: + await self._wait_budget(wait_until, st) + finally: + await self._lock.acquire() + # Only reset if no other coroutine already did + if self.window_start == saved_window: + self.count = 0 + self.window_start = time.monotonic() + else: + self.count = 0 + self.window_start = time.monotonic() + + self.count += 1 + finally: + self._lock.release() + + def would_block(self) -> bool: + """Check if speed.cloudflare.com is currently rate-limited.""" + now = time.monotonic() + if self.blocked_until > 0 and now < self.blocked_until: + return True + if self.window_start > 0 and now - self.window_start < self.WINDOW: + if self.count >= self.BUDGET: + return True + return False + + def report_429(self, retry_after: int): + """CF told us to wait. Set blocked_until so all workers pause. + Cap at 600s (10 min) — CF's actual window is 10 min but it sends + punitive retry-after (3600+) after repeated violations.""" + capped = min(max(retry_after, 30), 600) + until = time.monotonic() + capped + if until > self.blocked_until: + self.blocked_until = until + _dbg(f"RATE: 429 received (retry-after={retry_after}s, capped={capped}s)") + + +def build_dynamic_rounds(mode: str, alive_count: int) -> List[RoundCfg]: + """Build round configs dynamically based on mode and alive IP count.""" + preset = PRESETS.get(mode, PRESETS["normal"]) + + if not preset.get("dynamic"): + return [RoundCfg(1_000_000, alive_count)] + + sizes = preset["round_sizes"] + pcts = preset["round_pcts"] + mins = preset["round_min"] + maxs = preset["round_max"] + + # Small sets (<50 IPs): test ALL in every round — no funnel needed + small_set = alive_count <= 50 + + rounds = [] + for size, pct, mn, mx in zip(sizes, pcts, mins, maxs): + if small_set: + keep = alive_count + else: + keep = int(alive_count * pct / 100) if pct < 100 else alive_count + if mn > 0: + keep = max(mn, keep) + if mx > 0: + keep = min(mx, keep) + keep = min(keep, alive_count) + if keep > 0: + rounds.append(RoundCfg(size, keep)) + + return rounds + + +def parse_vless(uri: str) -> Optional[ConfigEntry]: + uri = uri.strip() + if not uri.startswith("vless://"): + return None + rest = uri[8:] + name = "" + if "#" in rest: + rest, name = rest.rsplit("#", 1) + name = urllib.parse.unquote(name) + if "?" in rest: + rest = rest.split("?", 1)[0] + if "@" not in rest: + return None + _, addr = rest.split("@", 1) + if addr.startswith("["): + if "]" not in addr: + return None + address = addr[1 : addr.index("]")] + else: + address = addr.rsplit(":", 1)[0] + return ConfigEntry(address=address, name=name, original_uri=uri.strip()) + + +def parse_vmess(uri: str) -> Optional[ConfigEntry]: + uri = uri.strip() + if not uri.startswith("vmess://"): + return None + b64 = uri[8:] + if "#" in b64: + b64 = b64.split("#", 1)[0] + b64 += "=" * (-len(b64) % 4) + try: + try: + raw = base64.b64decode(b64).decode("utf-8", errors="replace") + except Exception: + raw = base64.urlsafe_b64decode(b64).decode("utf-8", errors="replace") + obj = json.loads(raw) + if not isinstance(obj, dict): + return None + except Exception: + return None + address = str(obj.get("add", "")) + if not address: + return None + name = str(obj.get("ps", "")) + return ConfigEntry(address=address, name=name, original_uri=uri.strip()) + + +def parse_config(uri: str) -> Optional[ConfigEntry]: + """Try parsing as VLESS or VMess.""" + return parse_vless(uri) or parse_vmess(uri) + + +def load_input(path: str) -> List[ConfigEntry]: + try: + with open(path, "r", encoding="utf-8") as f: + raw = f.read() + except (FileNotFoundError, PermissionError, OSError) as e: + print(f" Error reading {path}: {e}") + return [] + try: + data = json.loads(raw) + if isinstance(data, dict) and "data" in data: + data = data["data"] + out: List[ConfigEntry] = [] + for i, e in enumerate(data): + d = e.get("domain", "") + if d: + out.append( + ConfigEntry(address=d, name=f"d-{i+1}", ip=e.get("ipv4", "")) + ) + if out: + return out + except (json.JSONDecodeError, TypeError, AttributeError): + pass + out = [] + for ln in raw.splitlines(): + c = parse_config(ln) + if c: + out.append(c) + return out + + +def fetch_sub(url: str) -> List[ConfigEntry]: + """Fetch configs from a subscription URL (base64 or plain VLESS URIs).""" + if not url.lower().startswith(("http://", "https://")): + print(f" Error: --sub only accepts http:// or https:// URLs") + return [] + _dbg(f"Fetching subscription: {url}") + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + with urllib.request.urlopen(req, timeout=15) as resp: + raw = resp.read().decode("utf-8", errors="replace").strip() + except Exception as e: + _dbg(f"Subscription fetch failed: {e}") + print(f" Error fetching subscription: {e}") + return [] + try: + decoded = base64.b64decode(raw).decode("utf-8", errors="replace") + if "://" in decoded: + raw = decoded + except Exception: + pass + out = [] + for ln in raw.splitlines(): + c = parse_config(ln.strip()) + if c: + out.append(c) + _dbg(f"Subscription loaded: {len(out)} configs") + return out + + +def generate_from_template(template: str, addresses: List[str]) -> List[ConfigEntry]: + """Generate configs by substituting addresses into a VLESS/VMess template.""" + out = [] + parsed = parse_config(template) + if not parsed: + return out + for i, addr in enumerate(addresses): + addr = addr.strip() + if not addr: + continue + # Handle ip:port format (e.g. from multi-port clean scan) + addr_ip = addr + addr_port = None + if ":" in addr and not addr.startswith("["): + parts = addr.rsplit(":", 1) + if parts[1].isdigit(): + addr_ip, addr_port = parts[0], parts[1] + uri = re.sub( + r"(@)(\[[^\]]+\]|[^:]+)(:|$)", + lambda m: m.group(1) + addr_ip + m.group(3), + template, + count=1, + ) + if addr_port: + # Replace existing port, or insert port if template had none + if re.search(r"@[^:/?#]+:\d+", uri): + uri = re.sub(r"(@[^:/?#]+:)\d+", lambda m: m.group(1) + addr_port, uri, count=1) + else: + uri = re.sub(r"(@[^/?#]+)([?/#])", lambda m: m.group(1) + ":" + addr_port + m.group(2), uri, count=1) + uri = re.sub(r"#.*$", f"#cfg-{i+1}-{addr_ip[:20]}", uri) + c = parse_config(uri) + if c: + out.append(c) + return out + + +def load_addresses(path: str) -> List[str]: + """Load address list from JSON array or plain text (one per line).""" + try: + with open(path, "r", encoding="utf-8") as f: + raw = f.read() + except (FileNotFoundError, PermissionError, OSError) as e: + print(f" Error reading {path}: {e}") + return [] + try: + data = json.loads(raw) + if isinstance(data, list): + return [str(d) for d in data if d] + if isinstance(data, dict): + for key in ("addresses", "domains", "ips", "data"): + if key in data and isinstance(data[key], list): + return [str(d) for d in data[key] if d] + except (json.JSONDecodeError, TypeError): + pass + return [ln.strip() for ln in raw.splitlines() if ln.strip()] + + +def _split_to_24s(subnets: List[str]) -> list: + """Split CIDR subnets into /24 blocks, deduplicate.""" + seen = set() + blocks = [] + for sub in subnets: + try: + net = ipaddress.IPv4Network(sub.strip(), strict=False) + if net.prefixlen <= 24: + for block in net.subnets(new_prefix=24): + key = int(block.network_address) + if key not in seen: + seen.add(key) + blocks.append(block) + else: + key = int(net.network_address) + if key not in seen: + seen.add(key) + blocks.append(net) + except (ValueError, TypeError): + continue + return blocks + + +def generate_cf_ips(subnets: List[str], sample_per_24: int = 0) -> List[str]: + """Generate IPs from CIDR subnets. sample_per_24=0 means all hosts.""" + blocks = _split_to_24s(subnets) + random.shuffle(blocks) + ips = [] + for net in blocks: + hosts = [str(ip) for ip in net.hosts()] + if sample_per_24 > 0 and sample_per_24 < len(hosts): + hosts = random.sample(hosts, sample_per_24) + ips.extend(hosts) + return ips + + +async def _tls_probe( + ip: str, sni: str, timeout: float, validate: bool = True, port: int = 443, +) -> Tuple[float, bool, str]: + """TLS probe with optional Cloudflare header validation. + Returns (latency_ms, is_cloudflare, error).""" + w = None + try: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + t0 = time.monotonic() + r, w = await asyncio.wait_for( + asyncio.open_connection(ip, port, ssl=ctx, server_hostname=sni), + timeout=timeout, + ) + tls_ms = (time.monotonic() - t0) * 1000 + + is_cf = True + if validate: + is_cf = False + try: + req = f"GET / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n" + w.write(req.encode()) + await w.drain() + hdr = await asyncio.wait_for(r.read(2048), timeout=min(timeout, 3)) + htxt = hdr.decode("latin-1", errors="replace").lower() + is_cf = "server: cloudflare" in htxt or "cf-ray:" in htxt + except Exception: + pass + + w.close() + try: + await w.wait_closed() + except Exception: + pass + w = None + return tls_ms, is_cf, "" + except asyncio.TimeoutError: + return -1, False, "timeout" + except Exception as e: + return -1, False, str(e)[:40] + finally: + if w: + try: + w.close() + except Exception: + pass + + +@dataclass +class CleanScanState: + """State for clean IP scanning progress.""" + total: int = 0 + done: int = 0 + found: int = 0 + interrupted: bool = False + results: List[Tuple[str, float]] = field(default_factory=list) # top 20 for display + all_results: List[Tuple[str, float]] = field(default_factory=list) # full reference + start_time: float = 0.0 + + +async def scan_clean_ips( + ips: List[str], + sni: str = "speed.cloudflare.com", + workers: int = 500, + timeout: float = 3.0, + validate: bool = True, + cs: Optional[CleanScanState] = None, + ports: Optional[List[int]] = None, +) -> List[Tuple[str, float]]: + """Scan IPs for TLS + optional CF validation. Returns [(addr, latency_ms)] sorted. + addr is 'ip' for port 443, or 'ip:port' for other ports.""" + if ports is None: + ports = [443] + sem = asyncio.Semaphore(workers) + results: List[Tuple[str, float]] = [] + lock = asyncio.Lock() + + total_probes = len(ips) * len(ports) + if cs: + cs.total = total_probes + cs.done = 0 + cs.found = 0 + cs.start_time = time.monotonic() + + async def probe(ip: str, port: int): + if cs and cs.interrupted: + return + async with sem: + if cs and cs.interrupted: + return + lat, is_cf, _err = await _tls_probe(ip, sni, timeout, validate, port) + if lat > 0 and is_cf: + addr = ip if port == 443 else f"{ip}:{port}" + async with lock: + results.append((addr, lat)) + if cs: + cs.found += 1 + cs.all_results = results # full reference for Ctrl+C recovery + if cs.found % 10 == 0 or cs.found <= 20: + cs.results = sorted(results, key=lambda x: x[1])[:20] + if cs: + cs.done += 1 + + # Build flat list of (ip, port) pairs + probes = [(ip, p) for ip in ips for p in ports] + random.shuffle(probes) # spread ports across batches for better coverage + + BATCH = 50_000 + for i in range(0, len(probes), BATCH): + if cs and cs.interrupted: + break + batch = probes[i : i + BATCH] + tasks = [asyncio.ensure_future(probe(ip, port)) for ip, port in batch] + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + break + finally: + for t in tasks: + if not t.done(): + t.cancel() + + results.sort(key=lambda x: x[1]) + return results + + +def load_configs_from_args(args) -> Tuple[List[ConfigEntry], str]: + """Load configs based on CLI args. Returns (configs, source_label).""" + if getattr(args, "sub", None): + configs = fetch_sub(args.sub) + return configs, args.sub + if getattr(args, "template", None): + if not getattr(args, "input", None): + return [], "ERROR: --template requires -i (address list file)" + addrs = load_addresses(args.input) + configs = generate_from_template(args.template, addrs) + return configs, f"{args.input} ({len(addrs)} addresses)" + if getattr(args, "input", None): + configs = load_input(args.input) + return configs, args.input + return [], "" + + +def parse_size(s: str) -> int: + s = s.strip().upper() + m = re.match(r"^(\d+(?:\.\d+)?)\s*(MB|KB|GB|B)?$", s) + if not m: + try: + return max(1, int(s)) + except ValueError: + return 1_000_000 # default 1MB + n = float(m.group(1)) + u = m.group(2) or "B" + mul = {"B": 1, "KB": 1_000, "MB": 1_000_000, "GB": 1_000_000_000} + return max(1, int(n * mul.get(u, 1))) + + +def parse_rounds_str(s: str) -> List[RoundCfg]: + out = [] + for p in s.split(","): + p = p.strip() + if ":" in p: + sz, top = p.split(":", 1) + try: + out.append(RoundCfg(parse_size(sz), int(top))) + except ValueError: + pass # skip malformed round + return out + + +def find_config_files() -> List[Tuple[str, str, int]]: + """Find config files in cwd. Returns [(path, type, count)].""" + results = [] + for pat in ("*.txt", "*.json", "*.conf", "*.lst"): + for p in globmod.glob(pat): + try: + with open(p, "r", encoding="utf-8", errors="replace") as f: + head = f.read(2048) + count = 0 + json_ok = False + if head.strip().startswith("{") or head.strip().startswith("["): + try: + with open(p, encoding="utf-8") as jf: + d = json.loads(jf.read()) + if isinstance(d, dict) and "data" in d: + d = d["data"] + if isinstance(d, list): + count = len(d) + results.append((p, "json", count)) + json_ok = True + except Exception: + pass + if not json_ok and ("vless://" in head or "vmess://" in head): + with open(p, encoding="utf-8") as f: + count = sum(1 for ln in f if ln.strip().startswith(("vless://", "vmess://"))) + results.append((p, "configs", count)) + except Exception: + pass + results.sort(key=lambda x: x[2], reverse=True) + return results + + +async def _resolve(e: ConfigEntry, sem: asyncio.Semaphore, counter: List[int]) -> ConfigEntry: + if e.ip: + counter[0] += 1 + return e + async with sem: + try: + loop = asyncio.get_running_loop() + info = await loop.getaddrinfo(e.address, 443, family=socket.AF_INET) + if info: + e.ip = info[0][4][0] + except Exception: + e.ip = "" + counter[0] += 1 + return e + + +async def resolve_all(st: State, workers: int = 100): + sem = asyncio.Semaphore(workers) + counter = [0] # mutable for closure + total = len(st.configs) + + async def _progress(): + spin = "|/-\\" + i = 0 + while counter[0] < total: + s = spin[i % len(spin)] + pct = counter[0] * 100 // max(1, total) + _w(f"\r {A.CYN}{s}{A.RST} Resolving DNS... {counter[0]}/{total} ({pct}%) ") + _fl() + i += 1 + await asyncio.sleep(0.15) + _w(f"\r {A.GRN}OK{A.RST} Resolved {total} domains -> {len(set(c.ip for c in st.configs if c.ip))} unique IPs\n") + _fl() + + prog_task = asyncio.create_task(_progress()) + try: + st.configs = list(await asyncio.gather(*[_resolve(c, sem, counter) for c in st.configs])) + finally: + prog_task.cancel() + try: + await prog_task + except asyncio.CancelledError: + pass + for c in st.configs: + if c.ip: + st.ip_map[c.ip].append(c) + st.ips = list(st.ip_map.keys()) + for ip in st.ips: + cs = st.ip_map[ip] + st.res[ip] = Result( + ip=ip, + domains=[c.address for c in cs], + uris=[c.original_uri for c in cs if c.original_uri], + ) + + +async def _lat_one(ip: str, sni: str, timeout: float) -> Tuple[float, float, str]: + """Measure TCP RTT and full TLS connection time (TCP+TLS handshake).""" + try: + t0 = time.monotonic() + r, w = await asyncio.wait_for( + asyncio.open_connection(ip, 443), timeout=timeout + ) + tcp = (time.monotonic() - t0) * 1000 + w.close() + try: + await w.wait_closed() + except Exception: + pass + except asyncio.TimeoutError: + return -1, -1, "tcp-timeout" + except Exception as e: + return -1, -1, f"tcp:{str(e)[:50]}" + try: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + t0 = time.monotonic() + r, w = await asyncio.wait_for( + asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=sni), + timeout=timeout, + ) + tls_full = (time.monotonic() - t0) * 1000 # full TCP+TLS time + w.close() + try: + await w.wait_closed() + except Exception: + pass + return tcp, tls_full, "" + except asyncio.TimeoutError: + return tcp, -1, "tls-timeout" + except Exception as e: + return tcp, -1, f"tls:{str(e)[:50]}" + + +async def phase1(st: State, workers: int, timeout: float): + st.phase = "latency" + st.phase_label = "Testing latency" + st.total = len(st.ips) + st.done_count = 0 + sem = asyncio.Semaphore(workers) + + async def go(ip: str): + async with sem: + if st.interrupted: + return + res = st.res[ip] + # Use speed.cloudflare.com as SNI — filters out non-CF IPs early + # (non-CF IPs will fail TLS since they don't serve this cert) + tcp, tls, err = await _lat_one(ip, SPEED_HOST, timeout) + res.tcp_ms = tcp + res.tls_ms = tls + res.error = err + res.alive = tls > 0 + st.done_count += 1 + if res.alive: + st.alive_n += 1 + else: + st.dead_n += 1 + + tasks = [asyncio.ensure_future(go(ip)) for ip in st.ips] + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + pass + finally: + for t in tasks: + if not t.done(): + t.cancel() + + +async def _dl_one( + ip: str, size: int, timeout: float, + host: str = "", path: str = "", +) -> Tuple[float, float, int, str, str]: + """Download test. Returns (ttfb_ms, mbps, bytes, colo, error). + Error "429" means rate-limited — caller should back off.""" + if not host: + host = SPEED_HOST + if not path: + path = f"{SPEED_PATH}?bytes={size}" + + dl_timeout = max(timeout, 30 + (size / 1_000_000) * 2) + conn_timeout = min(timeout, 15) + + w = None + total = 0 + dl_start = 0.0 + ttfb = 0.0 + colo = "" + + def _cleanup(): + nonlocal w + if w is not None: + try: + w.close() + except Exception: + pass + w = None + + try: + ctx = ssl.create_default_context() + t_start = time.monotonic() + try: + t0 = t_start + r, w = await asyncio.wait_for( + asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=host), + timeout=conn_timeout, + ) + except ssl.SSLCertVerificationError: + _cleanup() + ctx2 = ssl.create_default_context() + ctx2.check_hostname = False + ctx2.verify_mode = ssl.CERT_NONE + t0 = time.monotonic() + r, w = await asyncio.wait_for( + asyncio.open_connection( + ip, 443, ssl=ctx2, server_hostname=host + ), + timeout=conn_timeout, + ) + conn_ms = (time.monotonic() - t0) * 1000 + + range_hdr = "" + if "bytes=" not in path: + range_hdr = f"Range: bytes=0-{size - 1}\r\n" + req = ( + f"GET {path} HTTP/1.1\r\n" + f"Host: {host}\r\n" + f"User-Agent: Mozilla/5.0 (X11; Linux x86_64) Chrome/120\r\n" + f"Accept: */*\r\n" + f"{range_hdr}" + f"Connection: close\r\n\r\n" + ) + w.write(req.encode()) + await w.drain() + + hbuf = b"" + while b"\r\n\r\n" not in hbuf: + ch = await asyncio.wait_for(r.read(4096), timeout=min(conn_timeout, 10)) + if not ch: + _dbg(f"DL {ip} {size}: empty response (no headers)") + return -1, 0, 0, "", "empty" + hbuf += ch + if len(hbuf) > 65536: + _dbg(f"DL {ip} {size}: header too big") + return -1, 0, 0, "", "hdr-too-big" + + sep = hbuf.index(b"\r\n\r\n") + 4 + htxt = hbuf[:sep].decode("latin-1", errors="replace") + body0 = hbuf[sep:] + + status_line = htxt.split("\r\n")[0] + status_parts = status_line.split(None, 2) + status_code = status_parts[1] if len(status_parts) >= 2 else "" + if status_code == "429": + ra = "" + for line in htxt.split("\r\n"): + if line.lower().startswith("retry-after:"): + ra = line.split(":", 1)[1].strip() + break + _dbg(f"DL {ip} {size}: 429 rate-limited (retry-after={ra})") + return -1, 0, 0, "", f"429:{ra}" + if status_code not in ("200", "206"): + _dbg(f"DL {ip} {size}: HTTP error: {status_line[:80]}") + return -1, 0, 0, "", f"http:{status_line[:40]}" + + for line in htxt.split("\r\n"): + if line.lower().startswith("cf-ray:"): + ray = line.split(":", 1)[1].strip() + if "-" in ray: + colo = ray.rsplit("-", 1)[-1] + break + + ttfb = (time.monotonic() - t0) * 1000 - conn_ms + dl_start = time.monotonic() + total = len(body0) + + sample_interval = 1_000_000 if size >= 5_000_000 else size + 1 + next_sample = sample_interval + samples: List[Tuple[int, float]] = [] + + min_for_stable = min(size // 2, 20_000_000) if size >= 5_000_000 else size + min_samples = 5 if size >= 10_000_000 else 3 + + while True: + try: + elapsed_total = time.monotonic() - t_start + left = max(1.0, dl_timeout - elapsed_total) + ch = await asyncio.wait_for(r.read(65536), timeout=min(left, 10)) + if not ch: + break + total += len(ch) + if total >= next_sample: + elapsed = time.monotonic() - dl_start + samples.append((total, elapsed)) + next_sample += sample_interval + # only check stability after enough data downloaded + if len(samples) >= min_samples and total >= min_for_stable: + recent = samples[-4:] + sp = [] + for j in range(1, len(recent)): + db = recent[j][0] - recent[j - 1][0] + dt = recent[j][1] - recent[j - 1][1] + if dt > 0: + sp.append(db / dt) + if len(sp) >= 2: + mn = statistics.mean(sp) + if mn > 0: + try: + sd = statistics.stdev(sp) + if sd / mn < 0.10: + break + except statistics.StatisticsError: + pass + except asyncio.TimeoutError: + break + except Exception: + break + + dl_t = time.monotonic() - dl_start + mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 + _dbg(f"DL {ip} {size}: OK {mbps:.2f}MB/s total={total} dt={dl_t:.1f}s host={host}") + return ttfb, mbps, total, colo, "" + + except asyncio.TimeoutError: + if total > 0 and dl_start > 0: + dl_t = time.monotonic() - dl_start + mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 + _dbg(f"DL {ip} {size}: TIMEOUT partial={total}B mbps={mbps:.2f} dt={dl_t:.1f}s") + if mbps > 0: + return ttfb, mbps, total, colo, "" + _dbg(f"DL {ip} {size}: TIMEOUT no data total={total}") + return -1, 0, 0, "", "timeout" + except Exception as e: + if total > 0 and dl_start > 0: + dl_t = time.monotonic() - dl_start + mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 + _dbg(f"DL {ip} {size}: ERR partial={total}B mbps={mbps:.2f} err={e}") + if mbps > 0: + return ttfb, mbps, total, colo, "" + _dbg(f"DL {ip} {size}: ERR no data err={e}") + return -1, 0, 0, "", str(e)[:60] + finally: + _cleanup() + + +async def phase2_round( + st: State, + rcfg: RoundCfg, + candidates: List[str], + workers: int, + timeout: float, + rlim: Optional[CFRateLimiter] = None, + cdn_host: str = "", + cdn_path: str = "", +): + st.total = len(candidates) + st.done_count = 0 + if rcfg.size >= 50_000_000: + workers = min(workers, 6) + elif rcfg.size >= 10_000_000: + workers = min(workers, 8) + sem = asyncio.Semaphore(workers) + + max_retries = 2 + + async def go(ip: str): + best_mbps_this = 0.0 + best_ttfb = -1.0 + best_colo = "" + last_err = "" + force_cdn = False # set True when CF rejects (403/429) + + for attempt in range(max_retries): + if st.interrupted: + break + + # Pick endpoint: speed.cloudflare.com if budget available, else fallback CDN + use_host = cdn_host + use_path = cdn_path + if force_cdn and CDN_FALLBACK: + use_host, use_path = CDN_FALLBACK + _dbg(f"DL {ip}: forced fallback CDN {use_host}") + elif rlim and rlim.would_block() and CDN_FALLBACK: + use_host, use_path = CDN_FALLBACK + _dbg(f"DL {ip}: using fallback CDN {use_host}") + elif rlim: + await rlim.acquire(st) + + # acquire sem for the actual download + await sem.acquire() + try: + if st.interrupted: + break + ttfb, mbps, _total, colo, err = await _dl_one( + ip, rcfg.size, timeout, host=use_host, path=use_path, + ) + finally: + sem.release() # free slot immediately after download + + if mbps > 0: + best_mbps_this = mbps + best_ttfb = ttfb + best_colo = colo + break + + # 429 from speed.cloudflare.com: report + force CDN on retry + if err.startswith("429") and use_host == SPEED_HOST: + ra_str = err.split(":", 1)[1] if ":" in err else "" + try: + ra = int(ra_str) + except (ValueError, TypeError): + ra = 60 + if rlim: + rlim.report_429(ra) + _dbg(f"DL {ip}: 429 reported to limiter (retry-after={ra})") + force_cdn = True + # 403 from speed.cloudflare.com: CF rejected size, force CDN + elif err.startswith("http:") and use_host == SPEED_HOST: + _dbg(f"DL {ip}: {err} from CF, switching to CDN fallback") + force_cdn = True + # error from fallback CDN + elif err.startswith("429") or err.startswith("http:"): + _dbg(f"DL {ip}: {err} from {use_host}, will retry") + last_err = err + + res = st.res[ip] + res.speeds.append(best_mbps_this) + if best_mbps_this > 0: + if best_mbps_this > res.best_mbps: + res.best_mbps = best_mbps_this + if best_ttfb > 0 and (res.ttfb_ms < 0 or best_ttfb < res.ttfb_ms): + res.ttfb_ms = best_ttfb + if best_colo and not res.colo: + res.colo = best_colo + if best_mbps_this > st.best_speed: + st.best_speed = best_mbps_this + elif last_err: + res.error = last_err + st.done_count += 1 + + tasks = [asyncio.ensure_future(go(ip)) for ip in candidates] + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + pass + finally: + for t in tasks: + if not t.done(): + t.cancel() + + +def calc_scores(st: State): + has_speed = any(r.best_mbps > 0 for r in st.res.values()) + for r in st.res.values(): + if not r.alive: + r.score = 0 + continue + lat = max(0, 100 - r.tls_ms / 10) if r.tls_ms > 0 else 0 + spd = min(100, r.best_mbps * 20) if r.best_mbps > 0 else 0 + ttfb = max(0, 100 - r.ttfb_ms / 5) if r.ttfb_ms > 0 else 0 + if r.best_mbps > 0: + r.score = round(lat * 0.35 + spd * 0.50 + ttfb * 0.15, 1) + elif has_speed: + # Speed rounds ran but this IP wasn't tested - rank below tested ones + r.score = round(lat * 0.35, 1) + else: + # No speed rounds at all (latency-only mode) + r.score = round(lat, 1) + + +def sorted_alive(st: State, key: str = "score") -> List[Result]: + alive = [r for r in st.res.values() if r.alive] + if key == "score": + alive.sort(key=lambda r: r.score, reverse=True) + elif key == "latency": + alive.sort(key=lambda r: r.tls_ms) + elif key == "speed": + alive.sort(key=lambda r: r.best_mbps, reverse=True) + return alive + + +def sorted_all(st: State, key: str = "score") -> List[Result]: + """Return all results: alive sorted by key, then dead at the bottom.""" + alive = sorted_alive(st, key) + dead = [r for r in st.res.values() if not r.alive] + dead.sort(key=lambda r: r.ip) + return alive + dead + + +def draw_menu_header(cols: int) -> List[str]: + W = cols - 2 + lines = [] + lines.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") + t = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST} {A.DIM}v{VERSION}{A.RST}" + lines.append(f"{A.CYN}║{A.RST}" + t + " " * (W - _vl(t)) + f"{A.CYN}║{A.RST}") + lines.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + return lines + + +def draw_box_line(content: str, cols: int) -> str: + W = cols - 2 + vl = _vl(content) + pad = " " * max(0, W - vl) + return f"{A.CYN}║{A.RST}{content}{pad}{A.CYN}║{A.RST}" + + +def draw_box_sep(cols: int) -> str: + return f"{A.CYN}╠{'═' * (cols - 2)}╣{A.RST}" + + +def draw_box_bottom(cols: int) -> str: + return f"{A.CYN}╚{'═' * (cols - 2)}╝{A.RST}" + + +def tui_show_guide(): + """Show help/guide screen explaining input formats.""" + _w(A.CLR + A.HOME) + cols, rows = term_size() + lines = draw_menu_header(cols) + lines.append(draw_box_line(f" {A.BOLD}{A.WHT}How to prepare input files{A.RST}", cols)) + lines.append(draw_box_sep(cols)) + + guide = [ + "", + f" {A.BOLD}{A.CYN}Local Files (auto-detected){A.RST}", + f" {A.DIM}Place config files in the same directory where you run cfray.{A.RST}", + f" {A.DIM}Supported formats: {A.WHT}.txt .json .conf .lst{A.RST}", + f" {A.DIM}They will appear automatically under {A.WHT}LOCAL FILES{A.DIM} in the menu.{A.RST}", + f" {A.GRN}Example:{A.RST} {A.DIM}cp configs.txt /root/ && cd /root && python3 scanner.py{A.RST}", + "", + f" {A.BOLD}{A.CYN}[P] Enter File Path{A.RST}", + f" {A.DIM}Load a config file from any location by typing its full path.{A.RST}", + f" {A.GRN}Example:{A.RST} {A.DIM}/home/user/configs/my_vless.txt{A.RST}", + "", + f" {A.CYN}{'─' * 46}{A.RST}", + "", + f" {A.BOLD}{A.CYN}[1-9] VLESS/VMess URI file (.txt){A.RST}", + f" {A.DIM}Text file, one URI per line. Can mix VLESS and VMess.{A.RST}", + f" {A.GRN}vless://uuid@domain:443?type=ws&host=sni.com&...#name{A.RST}", + "", + f" {A.BOLD}{A.CYN}[1-9] Domain JSON file (.json){A.RST}", + f' {A.DIM}JSON with domain+IP:{A.RST} {A.GRN}{{"data": [{{"domain":"x.ir","ipv4":"1.2.3.4"}}]}}{A.RST}', + "", + f" {A.BOLD}{A.CYN}[S] Subscription URL{A.RST}", + f" {A.DIM}Fetches VLESS/VMess configs from a remote URL (plain or base64).{A.RST}", + f" {A.GRN}https://example.com/sub.txt{A.RST}", + f" {A.DIM}CLI: python3 scanner.py --sub URL{A.RST}", + "", + f" {A.BOLD}{A.CYN}[T] Template + Address list{A.RST}", + f" {A.DIM}Give one VLESS/VMess template + a file of CF IPs/domains.{A.RST}", + f" {A.DIM}Scanner replaces the address for each entry and tests them all.{A.RST}", + f" {A.WHT}Template:{A.RST} {A.GRN}vless://uuid@ADDR:443?type=ws&...#name{A.RST}", + f" {A.WHT}Addresses:{A.RST} {A.GRN}one IP or domain per line (.txt){A.RST}", + f" {A.DIM}CLI: python3 scanner.py --template 'vless://...' -i addrs.txt{A.RST}", + "", + f" {A.BOLD}{A.CYN}[F] Find Clean Cloudflare IPs{A.RST}", + f" {A.DIM}Scans all CF IP ranges to find reachable edge IPs.{A.RST}", + f" {A.DIM}Modes: Quick (~4K), Normal (~12K), Full (~1.5M), Mega (~3M multi-port){A.RST}", + f" {A.DIM}Mega tests all IPs on ports 443+8443 for maximum coverage.{A.RST}", + f" {A.DIM}Found IPs can be saved or used with a template for speed test.{A.RST}", + f" {A.DIM}CLI: python3 scanner.py --find-clean --no-tui --clean-mode mega{A.RST}", + "", + f" {A.CYN}{'─' * 46}{A.RST}", + f" {A.BOLD}{A.CYN}How it works:{A.RST}", + f" {A.DIM}1. Resolve domains to CF edge IPs, deduplicate{A.RST}", + f" {A.DIM}2. Test TCP+TLS latency, cut bottom by latency{A.RST}", + f" {A.DIM}3. Speed test top candidates in progressive rounds{A.RST}", + f" {A.DIM}4. Score = latency 35% + speed 50% + TTFB 15%{A.RST}", + f" {A.DIM}5. Export top configs ranked by score{A.RST}", + "", + f" {A.BOLD}{A.WHT}Made By Sam - SamNet Technologies{A.RST}", + f" {A.DIM}https://github.com/SamNet-dev/cfray{A.RST}", + ] + + # Fit within terminal: header(3) + title(1) + sep(1) + guide + footer(3) + max_guide = rows - 8 + if max_guide < len(guide): + guide = guide[:max_guide] + + for g in guide: + lines.append(draw_box_line(g, cols)) + + lines.append(draw_box_line("", cols)) + lines.append(draw_box_sep(cols)) + lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + + _w("\n".join(lines) + "\n") + _fl() + _read_key_blocking() + + +def _clean_pick_mode() -> Optional[str]: + """Pick scan scope for clean IP finder. Returns mode or None/'__back__'.""" + while True: + _w(A.CLR + A.HOME + A.HIDE) + cols, _ = term_size() + lines = draw_menu_header(cols) + lines.append(draw_box_line(f" {A.BOLD}Find Clean Cloudflare IPs{A.RST}", cols)) + lines.append(draw_box_line(f" {A.DIM}Scans Cloudflare IP ranges to find reachable edge IPs{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + lines.append(draw_box_sep(cols)) + lines.append(draw_box_line(f" {A.BOLD}Select scan scope:{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + + for name, key in [("quick", "1"), ("normal", "2"), ("full", "3"), ("mega", "4")]: + cfg = CLEAN_MODES[name] + num = f"{A.CYN}{A.BOLD}{key}{A.RST}" + lbl = f"{A.BOLD}{cfg['label']}{A.RST}" + if name == "normal": + lbl += f" {A.GRN}(recommended){A.RST}" + lines.append(draw_box_line(f" {num} {lbl}", cols)) + desc = cfg["desc"] + if len(cfg.get("ports", [])) > 1: + desc += f" (ports: {', '.join(str(p) for p in cfg['ports'])})" + lines.append(draw_box_line(f" {A.DIM}{desc}{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + + lines.append(draw_box_sep(cols)) + lines.append(draw_box_line(f" {A.DIM}[1-4] Select [B] Back [Q] Quit{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + + _w("\n".join(lines) + "\n") + _fl() + + key = _read_key_blocking() + if key in ("q", "ctrl-c"): + return None + if key in ("b", "esc"): + return "__back__" + if key == "1": + return "quick" + if key == "2" or key == "enter": + return "normal" + if key == "3": + return "full" + if key == "4": + return "mega" + + +def _draw_clean_progress(cs: CleanScanState): + """Draw live progress screen for clean IP scan.""" + cols, rows = term_size() + W = cols - 2 + out: List[str] = [] + + def bx(c: str): + out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") + + out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") + elapsed = _fmt_elapsed(time.monotonic() - cs.start_time) if cs.start_time else "0s" + title = f" {A.BOLD}{A.WHT}Finding Clean Cloudflare IPs{A.RST}" + right = f"{A.DIM}{elapsed} | ^C stop{A.RST}" + bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right) + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + + pct = cs.done * 100 // max(1, cs.total) + bw = max(1, min(30, W - 40)) + filled = int(bw * pct / 100) + bar = f"{A.GRN}{'█' * filled}{A.DIM}{'░' * (bw - filled)}{A.RST}" + bx(f" Probing [{bar}] {cs.done:,}/{cs.total:,} {pct}%") + + found_line = f" {A.GRN}Found: {cs.found:,} clean IPs{A.RST}" + if cs.results: + best_lat = cs.results[0][1] + found_line += f" {A.DIM}Best: {best_lat:.0f}ms{A.RST}" + bx(found_line) + + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + bx(f" {A.BOLD}Top IPs found (by latency):{A.RST}") + + vis = min(15, rows - 12) + if cs.results: + for i, (ip, lat) in enumerate(cs.results[:vis]): + bx(f" {A.CYN}{i+1:>3}.{A.RST} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}") + else: + bx(f" {A.DIM}Scanning...{A.RST}") + + # Fill remaining space + used = len(cs.results[:vis]) if cs.results else 1 + for _ in range(vis - used): + bx("") + + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + bx(f" {A.DIM}Press Ctrl+C to stop early and show results{A.RST}") + out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") + + _w(A.HOME) + _w("\n".join(out) + "\n") + _fl() + + +def _clean_show_results(results: List[Tuple[str, float]], elapsed: str) -> Optional[str]: + """Show clean IP results with j/k scrolling. Returns action string or None.""" + MAX_SHOW = 300 + display = results[:MAX_SHOW] + offset = 0 + + while True: + _w(A.CLR + A.HOME + A.HIDE) + cols, rows = term_size() + lines = draw_menu_header(cols) + + if results: + lines.append(draw_box_line( + f" {A.BOLD}{A.GRN}Scan Complete!{A.RST} " + f"Found {A.BOLD}{len(results):,}{A.RST} clean IPs in {elapsed}", cols)) + else: + lines.append(draw_box_line(f" {A.YEL}Scan Complete — no clean IPs found.{A.RST}", cols)) + lines.append(draw_box_sep(cols)) + + if display: + # header + separator = 2 rows, footer area = 5 rows, menu header = 3 rows + vis = max(5, rows - 13) + end = min(len(display), offset + vis) + + hdr = f" {A.BOLD}{'#':>4} {'Address':<22} {'Latency':>8}{A.RST}" + if len(display) > vis: + pos = f"{A.DIM}[{offset+1}-{end} of {len(display)}" + if len(results) > MAX_SHOW: + pos += f", {len(results):,} total" + pos += f"]{A.RST}" + hdr += " " * max(1, cols - 2 - _vl(hdr) - _vl(pos) - 1) + pos + lines.append(draw_box_line(hdr, cols)) + lines.append(draw_box_line( + f" {A.DIM}{'─'*4} {'─'*22} {'─'*8}{A.RST}", cols)) + + for i in range(offset, end): + ip, lat = display[i] + lines.append(draw_box_line( + f" {i+1:>4} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}", cols)) + + lines.append(draw_box_line("", cols)) + lines.append(draw_box_sep(cols)) + ft = "" + if results: + ft += f" {A.CYN}[S]{A.RST} Save all {A.CYN}[T]{A.RST} Template+SpeedTest " + ft += f" {A.CYN}[B]{A.RST} Back" + lines.append(draw_box_line(ft, cols)) + if display and len(display) > vis: + lines.append(draw_box_line( + f" {A.DIM}j/↓ down k/↑ up n/p page down/up{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + + _w("\n".join(lines) + "\n") + _fl() + + key = _read_key_blocking() + if key in ("b", "esc", "q", "ctrl-c"): + return "back" + if key in ("j", "down"): + vis = max(5, rows - 13) + offset = min(offset + 1, max(0, len(display) - vis)) + continue + if key in ("k", "up"): + offset = max(0, offset - 1) + continue + if key == "n": + vis = max(5, rows - 13) + offset = min(offset + vis, max(0, len(display) - vis)) + continue + if key == "p": + vis = max(5, rows - 13) + offset = max(0, offset - vis) + continue + if key == "s" and results: + return "save" + if key == "t" and results: + _w(A.SHOW) + _w(f"\n {A.BOLD}{A.CYN}Speed Test with Clean IPs{A.RST}\n") + _w(f" {A.DIM}Paste a VLESS/VMess config URI. The address in it will be{A.RST}\n") + _w(f" {A.DIM}replaced with each clean IP, then all configs get speed-tested.{A.RST}\n\n") + _w(f" {A.CYN}Template:{A.RST} ") + _fl() + try: + tpl = input().strip() + except (EOFError, KeyboardInterrupt): + continue + if not tpl or not parse_config(tpl): + _w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n") + _fl() + time.sleep(1.5) + continue + return f"template:{tpl}" + + +async def tui_run_clean_finder() -> Optional[Tuple[str, str]]: + """Run the clean IP finder flow. Returns (input_method, input_value) or None.""" + + mode = _clean_pick_mode() + if mode is None: + return None + if mode == "__back__": + return ("__back__", "") + + scan_cfg = CLEAN_MODES[mode] + + # Generate IPs + _w(A.CLR + A.HOME) + cols, _ = term_size() + lines = draw_menu_header(cols) + lines.append(draw_box_line( + f" {A.BOLD}Generating IPs from {len(CF_SUBNETS)} Cloudflare ranges...{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + _w("\n".join(lines) + "\n") + _fl() + + ips = generate_cf_ips(CF_SUBNETS, scan_cfg["sample"]) + ports = scan_cfg.get("ports", [443]) + _dbg(f"CLEAN: Generated {len(ips):,} IPs × {len(ports)} port(s), sample={scan_cfg['sample']}") + + # Run scan with live progress + cs = CleanScanState() + scan_task = asyncio.ensure_future( + scan_clean_ips( + ips, workers=scan_cfg["workers"], timeout=3.0, + validate=scan_cfg["validate"], cs=cs, ports=ports, + ) + ) + + old_sigint = signal.getsignal(signal.SIGINT) + def _sig(sig, frame): + cs.interrupted = True + scan_task.cancel() + signal.signal(signal.SIGINT, _sig) + + _w(A.CLR + A.HIDE) + try: + while not scan_task.done(): + _draw_clean_progress(cs) + await asyncio.sleep(0.3) + except (asyncio.CancelledError, Exception): + pass + finally: + signal.signal(signal.SIGINT, old_sigint) + + try: + results = await scan_task + except (asyncio.CancelledError, Exception): + results = sorted(cs.all_results or cs.results, key=lambda x: x[1]) + + elapsed = _fmt_elapsed(time.monotonic() - cs.start_time) + _dbg(f"CLEAN: Done in {elapsed}. Found {len(results):,} / {len(ips):,}") + + # Show results and get user action + action = _clean_show_results(results, elapsed) + + if action is None or action == "back": + return ("__back__", "") + + if action == "save": + try: + os.makedirs(RESULTS_DIR, exist_ok=True) + path = os.path.abspath(_results_path("clean_ips.txt")) + with open(path, "w", encoding="utf-8") as f: + for ip, lat in results: + f.write(f"{ip}\n") + _w(f"\n {A.GRN}Saved {len(results):,} IPs to {path}{A.RST}\n") + except Exception as e: + _w(f"\n {A.RED}Save error: {e}{A.RST}\n") + _w(f" {A.DIM}Press any key...{A.RST}\n") + _fl() + _wait_any_key() + return ("__back__", "") + + if action.startswith("template:"): + template_uri = action[9:] + try: + os.makedirs(RESULTS_DIR, exist_ok=True) + path = os.path.abspath(_results_path("clean_ips.txt")) + with open(path, "w", encoding="utf-8") as f: + for ip, lat in results: + f.write(f"{ip}\n") + except Exception as e: + _w(f"\n {A.RED}Save error: {e}{A.RST}\n") + _fl() + time.sleep(2) + return ("__back__", "") + return ("template", f"{template_uri}|||{path}") + + return None + + +def _tui_prompt_text(label: str) -> Optional[str]: + """Show cursor, prompt for text input, return stripped text or None.""" + _w(A.SHOW) + _w(f"\n {A.CYN}{label}{A.RST} ") + _fl() + try: + val = input().strip() + except (EOFError, KeyboardInterrupt): + return None + return val if val else None + + +def tui_pick_file() -> Optional[Tuple[str, str]]: + """Interactive file/input picker. Returns (method, value) or None. + method is one of: 'file', 'sub', 'template'. + For 'file': value is the file path. + For 'sub': value is the subscription URL. + For 'template': value is 'template_uri|||address_file_path'. + """ + enable_ansi() + files = find_config_files() + + while True: + _w(A.CLR + A.HOME + A.HIDE) + cols, rows = term_size() + W = cols - 2 + + out: List[str] = [] + def bx(c: str): + out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") + + # Single clean box — no internal double-line separators + out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") + title = f" ⚡ {A.BOLD}{A.WHT}cfray{A.RST} {A.DIM}v{VERSION}{A.RST}" + subtitle = f"{A.DIM}Cloudflare Config Scanner{A.RST}" + bx(title + " " + subtitle) + bx("") + + # Section: Local Files + bx(f" {A.DIM}── {A.BOLD}{A.WHT}📁 LOCAL FILES{A.RST} {A.DIM}{'─' * max(1, W - 19)}{A.RST}") + if files: + for i, (path, ftype, count) in enumerate(files[:9]): + num = f" {A.CYN}{A.BOLD}{i + 1}{A.RST}." + name = os.path.basename(path) + desc = f"{A.DIM}{ftype}, {count} entries{A.RST}" + bx(f" {num} 📄 {name:<28} {desc}") + else: + bx(f" {A.DIM}No config files found in current directory{A.RST}") + bx(f" {A.DIM}Drop .txt or .json files here, or use options below{A.RST}") + bx("") + + # Section: Remote Sources + bx(f" {A.DIM}── {A.BOLD}{A.WHT}🌐 REMOTE SOURCES{A.RST} {A.DIM}{'─' * max(1, W - 22)}{A.RST}") + bx(f" {A.CYN}{A.BOLD}s{A.RST}. 🔗 {A.WHT}Subscription URL{A.RST} {A.DIM}Fetch configs from remote URL{A.RST}") + bx(f" {A.CYN}{A.BOLD}p{A.RST}. 📂 {A.WHT}Enter File Path{A.RST} {A.DIM}Load from custom file path{A.RST}") + bx("") + + # Section: Tools + bx(f" {A.DIM}── {A.BOLD}{A.WHT}🔧 TOOLS{A.RST} {A.DIM}{'─' * max(1, W - 13)}{A.RST}") + bx(f" {A.CYN}{A.BOLD}t{A.RST}. 🧩 {A.WHT}Template + Addresses{A.RST} {A.DIM}Test one config against many IPs{A.RST}") + bx(f" {A.CYN}{A.BOLD}f{A.RST}. 🔍 {A.WHT}Clean IP Finder{A.RST} {A.DIM}Scan Cloudflare IP ranges{A.RST}") + bx("") + bx(f" {A.DIM}{'─' * (W - 2)}{A.RST}") + bx(f" {A.DIM}[h] ❓ Help [q] 🚪 Quit{A.RST}") + out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") + + _w("\n".join(out) + "\n") + _fl() + + key = _read_key_blocking() + if key in ("q", "ctrl-c", "esc"): + _w(A.SHOW) + _fl() + return None + if key == "h": + tui_show_guide() + files = find_config_files() + continue + if key == "p": + path = _tui_prompt_text("Enter file path:") + if path is None: + continue + if os.path.isfile(path): + return ("file", path) + _w(f" {A.RED}File not found.{A.RST}\n") + _fl() + time.sleep(1) + continue + if key == "s": + _w(A.SHOW) + _w(f"\n {A.BOLD}{A.CYN}Subscription URL{A.RST}\n") + _w(f" {A.DIM}Paste a URL that contains VLESS/VMess configs (plain text or base64).{A.RST}\n") + _w(f" {A.DIM}Example: https://example.com/sub.txt{A.RST}\n\n") + _fl() + url = _tui_prompt_text("URL:") + if url is None: + continue + if not url.lower().startswith(("http://", "https://")): + _w(f" {A.RED}URL must start with http:// or https://{A.RST}\n") + _fl() + time.sleep(1.5) + continue + return ("sub", url) + if key == "t": + _w(A.SHOW) + _w(f"\n {A.BOLD}{A.CYN}Template + Address List{A.RST}\n") + _w(f" {A.DIM}This mode takes ONE working config and a list of Cloudflare IPs/domains.{A.RST}\n") + _w(f" {A.DIM}It replaces the address in your config with each IP from the list,{A.RST}\n") + _w(f" {A.DIM}then tests all of them to find the fastest.{A.RST}\n\n") + _w(f" {A.BOLD}Step 1:{A.RST} {A.CYN}Paste your VLESS/VMess config URI:{A.RST}\n") + _w(f" {A.DIM}(a full vless://... or vmess://... URI){A.RST}\n ") + _fl() + try: + tpl = input().strip() + except (EOFError, KeyboardInterrupt): + continue + if not tpl or not parse_config(tpl): + _w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n") + _fl() + time.sleep(1.5) + continue + _w(f"\n {A.BOLD}Step 2:{A.RST} {A.CYN}Enter path to address list file:{A.RST}\n") + _w(f" {A.DIM}(a .txt file with one IP or domain per line){A.RST}\n") + _fl() + addr_path = _tui_prompt_text("Path:") + if addr_path is None: + continue + if not os.path.isfile(addr_path): + _w(f" {A.RED}File not found.{A.RST}\n") + _fl() + time.sleep(1) + continue + return ("template", f"{tpl}|||{addr_path}") + if key == "f": + return ("find_clean", "") + if key.isdigit() and 1 <= int(key) <= len(files): + return ("file", files[int(key) - 1][0]) + + +def tui_pick_mode() -> Optional[str]: + """Interactive mode picker. Returns mode name or None.""" + while True: + _w(A.CLR + A.HOME + A.HIDE) + cols, _ = term_size() + lines = draw_menu_header(cols) + lines.append(draw_box_line(f" {A.BOLD}Select scan mode:{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + + modes = [("quick", "1"), ("normal", "2"), ("thorough", "3")] + for name, key in modes: + p = PRESETS[name] + num = f"{A.CYN}{A.BOLD}{key}{A.RST}" + lbl = f"{A.BOLD}{p['label']}{A.RST}" + if name == "normal": + lbl += f" {A.GRN}(recommended){A.RST}" + lines.append(draw_box_line(f" {num} {lbl}", cols)) + lines.append( + draw_box_line(f" {A.DIM}{p['desc']}{A.RST}", cols) + ) + lines.append( + draw_box_line( + f" {A.DIM}Data: {p['data']} | Est. time: {p['time']}{A.RST}", + cols, + ) + ) + lines.append(draw_box_line("", cols)) + + lines.append(draw_box_sep(cols)) + lines.append( + draw_box_line( + f" {A.DIM}[1-3] Select [B] Back [Q] Quit{A.RST}", cols + ) + ) + lines.append(draw_box_bottom(cols)) + + _w("\n".join(lines) + "\n") + _fl() + + key = _read_key_blocking() + if key in ("q", "ctrl-c"): + _w(A.SHOW) + _fl() + return None + if key == "b": + return "__back__" + if key == "1": + return "quick" + if key == "2" or key == "enter": + return "normal" + if key == "3": + return "thorough" + + +class Dashboard: + def __init__(self, st: State): + self.st = st + self.sort = "score" + self.offset = 0 + self.show_domains = False + + def _bar(self, cur: int, tot: int, w: int = 24) -> str: + if tot == 0: + return "░" * w + p = min(1.0, cur / tot) + f = int(w * p) + return f"{A.GRN}{'█' * f}{A.DIM}{'░' * (w - f)}{A.RST}" + + def _cscore(self, v: float) -> str: + if v >= 70: + return f"{A.GRN}{v:5.1f}{A.RST}" + if v >= 40: + return f"{A.YEL}{v:5.1f}{A.RST}" + if v > 0: + return f"{A.RED}{v:5.1f}{A.RST}" + return f"{A.DIM} -{A.RST}" + + def _speed_str(self, v: float) -> str: + if v <= 0: + return f"{A.DIM} -{A.RST}" + if v >= 1: + return f"{A.GRN}{v:5.1f}{A.RST}" + return f"{A.YEL}{v * 1000:4.0f}K{A.RST}" + + def draw(self): + cols, rows = term_size() + W = cols - 2 + s = self.st + vis = max(3, rows - 18 - len(s.rounds)) + out: List[str] = [] + + def bx(c: str): + out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") + + out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") + elapsed = _fmt_elapsed(time.monotonic() - s.start_time) if s.start_time else "0s" + title = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST}" + right = f"{A.DIM}{elapsed} | {s.mode} | ^C stop{A.RST}" + bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right) + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + + fname = os.path.basename(s.input_file) + info = f" {A.DIM}File:{A.RST} {fname} {A.DIM}Configs:{A.RST} {len(s.configs)} {A.DIM}Unique IPs:{A.RST} {len(s.ips)}" + if s.latency_cut_n > 0: + info += f" {A.DIM}Cut:{A.RST} {s.latency_cut_n}" + bx(info) + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + + bw = min(24, W - 55) + + if s.phase == "latency": + pct = s.done_count * 100 // max(1, s.total) + bx(f" {A.GRN}▶{A.RST} {A.BOLD}Latency{A.RST} [{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%") + elif s.alive_n > 0: + cut_info = f" {A.DIM}cut {s.latency_cut_n}{A.RST}" if s.latency_cut_n > 0 else "" + bx(f" {A.GRN}✓{A.RST} Latency {A.GRN}{s.alive_n} alive{A.RST} {A.DIM}{s.dead_n} dead{A.RST}{cut_info}") + else: + bx(f" {A.DIM}○ Latency waiting...{A.RST}") + + for i, rc in enumerate(s.rounds): + rn = i + 1 + lbl = f"Speed R{rn} ({rc.label}x{rc.keep})" + if s.cur_round == rn and s.phase.startswith("speed") and not s.finished: + pct = s.done_count * 100 // max(1, s.total) + bx(f" {A.GRN}▶{A.RST} {A.BOLD}{lbl:<18}{A.RST}[{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%") + elif s.cur_round > rn or (s.cur_round >= rn and s.finished): + bx(f" {A.GRN}✓{A.RST} {lbl:<18}{A.GRN}done{A.RST}") + else: + bx(f" {A.DIM}○ {lbl:<18}waiting...{A.RST}") + + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + parts = [] + if s.alive_n > 0: + alats = [r.tls_ms for r in s.res.values() if r.alive and r.tls_ms > 0] + avg_lat = statistics.mean(alats) if alats else 0 + parts.append(f"{A.GRN}● {s.alive_n}{A.RST} alive") + parts.append(f"{A.RED}● {s.dead_n}{A.RST} dead") + if avg_lat: + parts.append(f"{A.DIM}avg latency:{A.RST} {avg_lat:.0f}ms") + if s.best_speed > 0: + parts.append(f"{A.CYN}best:{A.RST} {s.best_speed:.2f} MB/s") + bx(" " + " ".join(parts) if parts else " ") + + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + + hdr = f" {A.BOLD}{'#':>3} {'IP':<16} {'Dom':>3} {'Ping':>6} {'Conn':>6}" + for i, rc in enumerate(s.rounds): + hdr += f" {'R' + str(i + 1):>5}" + hdr += f" {'Colo':>4} {'Score':>5}{A.RST}" + bx(hdr) + + sep = f" {'─' * 3} {'─' * 16} {'─' * 3} {'─' * 6} {'─' * 6}" + for _ in s.rounds: + sep += f" {'─' * 5}" + sep += f" {'─' * 4} {'─' * 5}" + bx(f"{A.DIM}{sep}{A.RST}") + + results = sorted_all(s, self.sort) + total_results = len(results) + page = results[self.offset : self.offset + vis] + + for rank, r in enumerate(page, self.offset + 1): + if not r.alive: + row = f" {A.DIM}{rank:>3} {r.ip:<16} {len(r.domains):>3} {A.RED}{'dead':>6}{A.RST}{A.DIM} {'':>6}" + for j in range(len(s.rounds)): + row += f" {'':>5}" + row += f" {'':>4} {A.RED}{'--':>5}{A.RST}" + bx(row) + continue + tcp = f"{r.tcp_ms:6.0f}" if r.tcp_ms > 0 else f"{A.DIM} -{A.RST}" + tls = f"{r.tls_ms:6.0f}" if r.tls_ms > 0 else f"{A.DIM} -{A.RST}" + row = f" {rank:>3} {r.ip:<16} {len(r.domains):>3} {tcp} {tls}" + for j in range(len(s.rounds)): + if j < len(r.speeds) and r.speeds[j] > 0: + row += f" {self._speed_str(r.speeds[j])}" + else: + row += f" {A.DIM} -{A.RST}" + if r.colo: + cl = f"{r.colo:>4}" + else: + cl = f"{A.DIM} -{A.RST}" + row += f" {cl} {self._cscore(r.score)}" + bx(row) + + for _ in range(vis - len(page)): + bx("") + + out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") + + if s.notify and time.monotonic() < s.notify_until: + bx(f" {A.GRN}{A.BOLD}{s.notify}{A.RST}") + elif s.finished: + sort_hint = f"sort:{A.BOLD}{self.sort}{A.RST}" + page_hint = f"{self.offset + 1}-{min(self.offset + vis, total_results)}/{total_results}" + ft = ( + f" {A.CYN}[S]{A.RST} {sort_hint} " + f"{A.CYN}[E]{A.RST} Export " + f"{A.CYN}[A]{A.RST} ExportAll " + f"{A.CYN}[C]{A.RST} Configs " + f"{A.CYN}[D]{A.RST} Domains " + f"{A.CYN}[H]{A.RST} Help " + f"{A.CYN}[J/K]{A.RST}" + ) + ft2 = ( + f" Scroll {A.CYN}[N/P]{A.RST} Page ({page_hint}) " + f"{A.CYN}[B]{A.RST} Back " + f"{A.CYN}[Q]{A.RST} Quit" + ) + bx(ft) + bx(ft2) + else: + bx(f" {A.DIM}{s.phase_label}... Press Ctrl+C to stop and export partial results{A.RST}") + + out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") + + _w(A.HOME) + _w("\n".join(out) + "\n") + _fl() + + def draw_domain_popup(self, r: Result): + """Show domains for the selected IP.""" + _w(A.CLR) + cols, rows = term_size() + vis = min(len(r.domains), rows - 10) + lines = [] + lines.append(f"{A.CYN}╔{'═' * (cols - 2)}╗{A.RST}") + lines.append(draw_box_line(f" {A.BOLD}Domains for {r.ip} ({len(r.domains)} total){A.RST}", cols)) + ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-" + conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-" + lines.append(draw_box_line(f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s}{A.RST}", cols)) + lines.append(draw_box_sep(cols)) + for d in r.domains[:vis]: + lines.append(draw_box_line(f" {d}", cols)) + if len(r.domains) > vis: + lines.append(draw_box_line(f" {A.DIM}...and {len(r.domains) - vis} more{A.RST}", cols)) + lines.append(draw_box_sep(cols)) + lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + _w("\n".join(lines) + "\n") + _fl() + _wait_any_key() + _w(A.CLR) # clear before dashboard redraws + + def draw_config_popup(self, r: Result): + """Show all VLESS/VMess URIs for the selected IP.""" + _w(A.CLR) + cols, rows = term_size() + lines = [] + lines.append(f"{A.CYN}╔{'═' * (cols - 2)}╗{A.RST}") + lines.append(draw_box_line(f" {A.BOLD}Configs for {r.ip} ({len(r.uris)} URIs){A.RST}", cols)) + ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-" + conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-" + speed_s = f"{r.best_mbps:.1f} MB/s" if r.best_mbps > 0 else "-" + lines.append(draw_box_line( + f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s} | Speed: {speed_s}{A.RST}", cols + )) + lines.append(draw_box_sep(cols)) + if r.uris: + max_show = rows - 10 + for i, uri in enumerate(r.uris[:max_show]): + # Truncate long URIs to fit terminal width + tag = f" {A.CYN}{i+1}.{A.RST} " + max_uri = cols - 8 + display = uri if len(uri) <= max_uri else uri[:max_uri - 3] + "..." + lines.append(draw_box_line(f"{tag}{A.GRN}{display}{A.RST}", cols)) + if len(r.uris) > max_show: + lines.append(draw_box_line(f" {A.DIM}...and {len(r.uris) - max_show} more{A.RST}", cols)) + else: + lines.append(draw_box_line(f" {A.DIM}No VLESS/VMess URIs stored for this IP{A.RST}", cols)) + lines.append(draw_box_line(f" {A.DIM}(only available when loaded from URIs or subscriptions){A.RST}", cols)) + lines.append(draw_box_sep(cols)) + lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) + lines.append(draw_box_bottom(cols)) + _w("\n".join(lines) + "\n") + _fl() + _wait_any_key() + _w(A.CLR) + + def draw_help_popup(self): + """Show keybinding help + column explanations overlay.""" + _w(A.CLR) + cols, rows = term_size() + W = min(64, cols - 4) + lines = [] + lines.append(f" {A.CYN}{'=' * W}{A.RST}") + lines.append(f" {A.BOLD}{A.WHT} Keyboard Shortcuts{A.RST}") + lines.append(f" {A.CYN}{'-' * W}{A.RST}") + help_items = [ + ("S", "Cycle sort order: score / latency / speed"), + ("E", "Export results (CSV + top N configs)"), + ("A", "Export ALL configs sorted best to worst"), + ("C", "View VLESS/VMess URIs for an IP (enter rank #)"), + ("D", "View domains for an IP (enter rank #)"), + ("J / K", "Scroll down / up one row"), + ("N / P", "Page down / up"), + ("B", "Back to main menu (new scan)"), + ("H", "Show this help screen"), + ("Q", "Quit (results auto-saved on exit)"), + ] + for key, desc in help_items: + lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}") + lines.append("") + lines.append(f" {A.CYN}{'=' * W}{A.RST}") + lines.append(f" {A.BOLD}{A.WHT} Column Guide{A.RST}") + lines.append(f" {A.CYN}{'-' * W}{A.RST}") + col_items = [ + ("#", "Rank (sorted by current sort order)"), + ("IP", "Cloudflare edge IP address"), + ("Dom", "How many domains share this IP"), + ("Ping", "TCP connect time in ms (like ping)"), + ("Conn", "Full connection time in ms (TCP + TLS handshake)"), + ("R1,R2..", "Download speed per round (MB/s or KB/s)"), + ("Colo", "CF datacenter code (e.g. FRA, IAH, MRS)"), + ("Score", "Combined score (0-100, higher = better)"), + ] + for key, desc in col_items: + lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}") + lines.append("") + lines.append(f" {A.DIM}Score = Conn latency (35%) + speed (50%) + TTFB (15%){A.RST}") + lines.append(f" {A.DIM}'-' means not tested yet (only top IPs get speed tested){A.RST}") + lines.append(f" {A.CYN}{'=' * W}{A.RST}") + lines.append(f" {A.BOLD}{A.WHT} Made By Sam - SamNet Technologies{A.RST}") + lines.append(f" {A.DIM} https://github.com/SamNet-dev/cfray{A.RST}") + lines.append(f" {A.CYN}{'=' * W}{A.RST}") + lines.append(f" {A.DIM}Press any key to go back{A.RST}") + + _w("\n".join(lines) + "\n") + _fl() + _wait_any_key() + _w(A.CLR) # clear before dashboard redraws + + def handle(self, key: str) -> Optional[str]: + sorts = ["score", "latency", "speed"] + if key == "s": + idx = sorts.index(self.sort) if self.sort in sorts else 0 + self.sort = sorts[(idx + 1) % len(sorts)] + elif key in ("j", "down"): + self.offset = min(self.offset + 1, max(0, len(sorted_all(self.st, self.sort)) - 3)) + elif key in ("k", "up"): + self.offset = max(0, self.offset - 1) + elif key == "n": + # page down + _, rows = term_size() + page = max(3, rows - 18 - len(self.st.rounds)) + self.offset = min(self.offset + page, max(0, len(sorted_all(self.st, self.sort)) - 3)) + elif key == "p": + # page up + _, rows = term_size() + page = max(3, rows - 18 - len(self.st.rounds)) + self.offset = max(0, self.offset - page) + elif key == "e": + return "export" + elif key == "a": + return "export-all" + elif key == "c": + return "configs" + elif key == "d": + return "domains" + elif key == "h": + return "help" + elif key == "b": + return "back" + elif key in ("q", "ctrl-c"): + return "quit" + return None + + +def save_csv(st: State, path: str, sort_by: str = "score"): + results = sorted_alive(st, sort_by) + with open(path, "w", newline="", encoding="utf-8") as f: + w = csv.writer(f) + hdr = ["Rank", "IP", "Domains", "Domain_Count", "Ping_ms", "Conn_ms", "TTFB_ms"] + for i, rc in enumerate(st.rounds): + hdr.append(f"R{i + 1}_{rc.label}_MBps") + hdr += ["Best_MBps", "Colo", "Score", "Error"] + w.writerow(hdr) + for rank, r in enumerate(results, 1): + row = [ + rank, + r.ip, + "|".join(r.domains[:5]), + len(r.domains), + f"{r.tcp_ms:.1f}" if r.tcp_ms > 0 else "", + f"{r.tls_ms:.1f}" if r.tls_ms > 0 else "", + f"{r.ttfb_ms:.1f}" if r.ttfb_ms > 0 else "", + ] + for i in range(len(st.rounds)): + row.append( + f"{r.speeds[i]:.3f}" + if i < len(r.speeds) and r.speeds[i] > 0 + else "" + ) + row += [ + f"{r.best_mbps:.3f}" if r.best_mbps > 0 else "", + r.colo, + f"{r.score:.1f}", + r.error, + ] + w.writerow(row) + + +def save_configs(st: State, path: str, top: int = 50, sort_by: str = "score"): + """Save top configs. Use top=0 for ALL configs sorted best to worst.""" + results = sorted_alive(st, sort_by) + has_uris = any(r.uris for r in results) + limit = top if top > 0 else len(results) + with open(path, "w", encoding="utf-8") as f: + n = 0 + for r in results: + if n >= limit: + break + if has_uris: + for uri in r.uris: + f.write(uri + "\n") + n += 1 + if n >= limit: + break + else: + # JSON input: write IP and domains as a reference list + doms = ", ".join(r.domains[:3]) + extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else "" + f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n") + n += 1 + + +def save_all_configs_sorted(st: State, path: str, sort_by: str = "score"): + """Save ALL raw configs (every URI) sorted by their IP's score, best to worst.""" + results = sorted_alive(st, sort_by) + dead = [r for r in st.res.values() if not r.alive] + has_uris = any(r.uris for r in results) + with open(path, "w", encoding="utf-8") as f: + for r in results: + if has_uris: + for uri in r.uris: + f.write(uri + "\n") + else: + doms = ", ".join(r.domains[:3]) + extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else "" + f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n") + for r in dead: + if has_uris: + for uri in r.uris: + f.write(uri + "\n") + else: + doms = ", ".join(r.domains[:3]) + f.write(f"{r.ip} # DEAD domains={doms}\n") + + +RESULTS_DIR = "results" + + +def _results_path(filename: str) -> str: + """Return path inside the results/ directory, creating it if needed.""" + os.makedirs(RESULTS_DIR, exist_ok=True) + return os.path.join(RESULTS_DIR, filename) + + +def do_export( + st: State, base_path: str, sort_by: str = "score", top: int = 50, + output_csv: str = "", output_configs: str = "", +) -> Tuple[str, str, str]: + stem = os.path.basename(base_path).rsplit(".", 1)[0] if base_path else "scan" + csv_path = output_csv if output_csv else _results_path(stem + "_results.csv") + if output_configs: + cfg_path = output_configs + elif top <= 0: + cfg_path = _results_path(stem + "_all_sorted.txt") + else: + cfg_path = _results_path(stem + f"_top{top}.txt") + full_path = _results_path(stem + "_full_sorted.txt") + save_csv(st, csv_path, sort_by) + save_configs(st, cfg_path, top, sort_by) + save_all_configs_sorted(st, full_path, sort_by) + st.saved = True + return csv_path, cfg_path, full_path + + +async def _refresh_loop(dash: Dashboard, st: State): + while not st.finished: + try: + dash.draw() + except Exception: + pass + await asyncio.sleep(0.3) + + +async def run_scan(st: State, workers: int, speed_workers: int, timeout: float, speed_timeout: float): + """Run the scan phases with dynamic round sizing.""" + try: + os.makedirs("results", exist_ok=True) + with open(DEBUG_LOG, "w") as f: + f.write(f"=== Scan started {time.strftime('%Y-%m-%d %H:%M:%S')} mode={st.mode} ===\n") + except Exception: + pass + st.start_time = time.monotonic() + + if not st.interrupted: + await phase1(st, workers, timeout) + + if st.interrupted or st.alive_n == 0: + st.finished = True + calc_scores(st) + return + + preset = PRESETS.get(st.mode, PRESETS["normal"]) + + alive = sorted( + (ip for ip, r in st.res.items() if r.alive), + key=lambda ip: st.res[ip].tls_ms, + ) + + cut_pct = preset.get("latency_cut", 0) + if cut_pct > 0 and len(alive) > 50: + cut_n = max(1, int(len(alive) * cut_pct / 100)) + alive = alive[:-cut_n] + st.latency_cut_n = cut_n + _dbg(f"=== Latency cut: removed bottom {cut_pct}% = {cut_n} IPs, {len(alive)} remaining ===") + + if not st.rounds: + st.rounds = build_dynamic_rounds(st.mode, len(alive)) + _dbg(f"=== Dynamic rounds: {[(r.label, r.keep) for r in st.rounds]} ===") + + if not st.interrupted and st.rounds: + rlim = CFRateLimiter() + cands = list(alive) + cdn_host = SPEED_HOST + cdn_path = "" # _dl_one uses default + + for i, rc in enumerate(st.rounds): + if st.interrupted: + break + st.cur_round = i + 1 + st.phase = f"speed_r{i + 1}" + actual_count = min(rc.keep, len(cands)) + st.phase_label = f"Speed R{i + 1} ({rc.label} x {actual_count})" + _dbg(f"=== Round R{i+1}: {rc.size}B x {actual_count} IPs, workers={speed_workers}, timeout={speed_timeout}s, budget={rlim.BUDGET - rlim.count} left ===") + + if i > 0: + calc_scores(st) + cands = sorted(cands, key=lambda ip: st.res[ip].score, reverse=True) + cands = cands[: rc.keep] + + await phase2_round( + st, rc, cands, speed_workers, speed_timeout, + rlim=rlim, cdn_host=cdn_host, cdn_path=cdn_path, + ) + calc_scores(st) + + st.finished = True + calc_scores(st) + + +async def run_tui(args): + """TUI mode: interactive startup + dashboard.""" + enable_ansi() + + # Determine initial input source from CLI args + input_method = None # "file", "sub", or "template" + input_value = None + if getattr(args, "sub", None): + input_method, input_value = "sub", args.sub + elif getattr(args, "template", None): + if getattr(args, "input", None): + input_method, input_value = "template", f"{args.template}|||{args.input}" + else: + print("Error: --template requires -i (address list file)") + return + elif getattr(args, "find_clean", False): + input_method, input_value = "find_clean", "" + elif getattr(args, "input", None): + input_method, input_value = "file", args.input + + while True: # outer loop: back returns here + interactive = input_method is None + while True: + if input_method is None: + pick = tui_pick_file() + if not pick: + _w(A.SHOW) + return + input_method, input_value = pick + + if input_method == "find_clean": + result = await tui_run_clean_finder() + if result is None: + _w(A.SHOW) + return + if result[0] == "__back__": + input_method = None + input_value = None + continue + input_method, input_value = result + + mode = args.mode + if not getattr(args, "_mode_set", False) and interactive: + picked = tui_pick_mode() + if not picked: + _w(A.SHOW) + return + if picked == "__back__": + input_method = None + input_value = None + continue + mode = picked + break + + st = State() + st.mode = mode + st.top = args.top + + if args.rounds: + st.rounds = parse_rounds_str(args.rounds) + elif args.skip_download: + st.rounds = [] + + # Determine display label for loading screen + if input_method == "sub": + load_label = input_value.split("/")[-1][:40] or "subscription" + elif input_method == "template": + parts = input_value.split("|||", 1) + load_label = os.path.basename(parts[1]) if len(parts) > 1 else "template" + else: + load_label = os.path.basename(input_value) + + _w(A.CLR + A.HOME) + cols, _ = term_size() + lines = draw_menu_header(cols) + lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + lines.append(draw_box_line(f" {A.CYN}>{A.RST} Loading {load_label}...", cols)) + lines.append(draw_box_bottom(cols)) + _w("\n".join(lines) + "\n") + _fl() + + # Load configs based on input method + if input_method == "sub": + st.configs = fetch_sub(input_value) + st.input_file = input_value + elif input_method == "template": + tpl_uri, addr_path = input_value.split("|||", 1) + addrs = load_addresses(addr_path) + st.configs = generate_from_template(tpl_uri, addrs) + st.input_file = f"{addr_path} ({len(addrs)} addresses)" + else: + st.configs = load_input(input_value) + st.input_file = input_value + + if not st.configs: + _w(A.SHOW) + print(f"No configs found in {st.input_file}") + return + + _w(A.CLR + A.HOME) + lines = draw_menu_header(cols) + lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols)) + lines.append(draw_box_line("", cols)) + lines.append(draw_box_line(f" {A.GRN}OK{A.RST} Loaded {len(st.configs)} configs", cols)) + lines.append(draw_box_line("", cols)) + _w("\n".join(lines) + "\n") + _fl() + + st.phase = "dns" + st.phase_label = "Resolving DNS" + try: + await resolve_all(st) + except Exception as e: + _w(A.SHOW + "\n") + print(f"DNS resolution error: {e}") + return + if not st.ips: + _w(A.SHOW + "\n") + print("No IPs resolved — check network or config addresses.") + return + + dash = Dashboard(st) + refresh = asyncio.create_task(_refresh_loop(dash, st)) + + scan_task = asyncio.ensure_future( + run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) + ) + + old_sigint = signal.getsignal(signal.SIGINT) + + def _sig(sig, frame): + st.interrupted = True + st.finished = True + scan_task.cancel() + signal.signal(signal.SIGINT, _sig) + + try: + await scan_task + except asyncio.CancelledError: + st.interrupted = True + st.finished = True + calc_scores(st) + + # Restore original SIGINT so Ctrl+C works in post-scan loop + signal.signal(signal.SIGINT, old_sigint) + + if refresh: + refresh.cancel() + try: + await refresh + except asyncio.CancelledError: + pass + + try: + csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top) + st.notify = f"Saved to results/ folder" + except Exception as e: + csv_p = cfg_p = full_p = "" + st.notify = f"Export error: {e}" + st.notify_until = time.monotonic() + 5 + + dash.draw() + + go_back = False + try: + while True: + key = _read_key_nb(0.1) + if key is None: + # refresh notification timeout + if st.notify and time.monotonic() >= st.notify_until: + st.notify = "" + dash.draw() + continue + + act = dash.handle(key) + if act == "quit": + break + elif act == "back": + # show save summary and go to main menu + _w(A.CLR) + save_lines = [ + f" {A.CYN}{'=' * 50}{A.RST}", + f" {A.BOLD}{A.WHT} Results saved:{A.RST}", + f" {A.CYN}{'-' * 50}{A.RST}", + f" {A.GRN}CSV:{A.RST} {csv_p}", + f" {A.GRN}Top:{A.RST} {cfg_p}", + f" {A.GRN}Full:{A.RST} {full_p}", + f" {A.CYN}{'=' * 50}{A.RST}", + "", + f" {A.DIM}Press any key to go to main menu...{A.RST}", + ] + _w("\n".join(save_lines) + "\n") + _fl() + _wait_any_key() + go_back = True + break + elif act == "export": + try: + csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top) + st.notify = f"Exported to results/ folder" + except Exception as e: + st.notify = f"Export error: {e}" + st.notify_until = time.monotonic() + 4 + elif act == "export-all": + try: + csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, 0) + st.notify = f"Exported ALL to results/ folder" + except Exception as e: + st.notify = f"Export error: {e}" + st.notify_until = time.monotonic() + 4 + elif act == "configs": + results = sorted_all(st, dash.sort) + if results: + n = _prompt_number(f"{A.CYN}Enter rank # to view configs (1-{len(results)}):{A.RST} ", len(results)) + if n is not None: + dash.draw_config_popup(results[n - 1]) + elif act == "domains": + results = sorted_all(st, dash.sort) + if results: + n = _prompt_number(f"{A.CYN}Enter rank # to view domains (1-{len(results)}):{A.RST} ", len(results)) + if n is not None: + dash.draw_domain_popup(results[n - 1]) + elif act == "help": + dash.draw_help_popup() + dash.draw() + except (KeyboardInterrupt, EOFError, OSError): + pass + + if go_back: + # reset for next run — clear CLI input so file picker shows + args.input = None + args.sub = None + args.template = None + args._mode_set = False + input_method = None + input_value = None + continue + + _w(A.SHOW + "\n") + _fl() + print(f"Results saved to {RESULTS_DIR}/ folder") + break + + +async def run_headless(args): + """Headless mode (--no-tui).""" + st = State() + st.input_file = args.input + st.mode = args.mode + + if args.rounds: + st.rounds = parse_rounds_str(args.rounds) + elif args.skip_download: + st.rounds = [] + + print(f"CF Config Scanner v{VERSION}") + st.configs, src = load_configs_from_args(args) + print(f"Loading: {src}") + print(f"Loaded {len(st.configs)} configs") + if not st.configs: + return + + print("Resolving DNS...") + await resolve_all(st) + print(f" {len(st.ips)} unique IPs") + if not st.ips: + return + + scan_task = asyncio.ensure_future( + run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) + ) + + old_sigint = signal.getsignal(signal.SIGINT) + + def _sig(sig, frame): + st.interrupted = True + st.finished = True + scan_task.cancel() + signal.signal(signal.SIGINT, _sig) + + try: + await scan_task + except asyncio.CancelledError: + st.interrupted = True + st.finished = True + calc_scores(st) + print("\n Interrupted! Exporting partial results...") + + signal.signal(signal.SIGINT, old_sigint) + + results = sorted_alive(st, "score") + elapsed = _fmt_elapsed(time.monotonic() - st.start_time) + print(f"\nDone in {elapsed}. {st.alive_n} alive IPs.\n") + print(f"{'=' * 95}") + hdr = f"{'#':>4} {'IP':<16} {'Dom':>4} {'Ping ms':>7} {'Conn ms':>7}" + for i in range(len(st.rounds)): + hdr += f" {'R' + str(i + 1) + ' MB/s':>9}" + hdr += f" {'Colo':>5} {'Score':>6}" + print(hdr) + print("=" * 95) + for rank, r in enumerate(results[:50], 1): + tcp = f"{r.tcp_ms:7.1f}" if r.tcp_ms > 0 else " -" + tls = f"{r.tls_ms:7.1f}" if r.tls_ms > 0 else " -" + row = f"{rank:>4} {r.ip:<16} {len(r.domains):>4} {tcp} {tls}" + for j in range(len(st.rounds)): + if j < len(r.speeds) and r.speeds[j] > 0: + row += f" {r.speeds[j]:>9.2f}" + else: + row += " -" + cl = f"{r.colo:>5}" if r.colo else " -" + sc = f"{r.score:>6.1f}" if r.score > 0 else " -" + row += f" {cl} {sc}" + print(row) + + try: + csv_p, cfg_p, full_p = do_export( + st, args.input or "scan", top=args.top, + output_csv=getattr(args, "output", "") or "", + output_configs=getattr(args, "output_configs", "") or "", + ) + print(f"\nResults saved:") + print(f" CSV: {csv_p}") + print(f" Configs: {cfg_p}") + print(f" Full: {full_p}") + except Exception as e: + print(f"\nError saving results: {e}") + + +async def run_headless_clean(args): + """Headless clean IP finder (--find-clean --no-tui).""" + scan_cfg = CLEAN_MODES.get(getattr(args, "clean_mode", "normal"), CLEAN_MODES["normal"]) + + subnets = CF_SUBNETS + if getattr(args, "subnets", None): + if os.path.isfile(args.subnets): + with open(args.subnets, encoding="utf-8") as f: + subnets = [ln.strip() for ln in f if ln.strip() and not ln.startswith("#")] + else: + subnets = [s.strip() for s in args.subnets.split(",") if s.strip()] + + ports = scan_cfg.get("ports", [443]) + print(f"CF Config Scanner v{VERSION} — Clean IP Finder") + print(f"Ranges: {len(subnets)} | Sample: {scan_cfg['sample'] or 'all'} | Workers: {scan_cfg['workers']} | Ports: {', '.join(str(p) for p in ports)}") + + ips = generate_cf_ips(subnets, scan_cfg["sample"]) + total_probes = len(ips) * len(ports) + print(f"Scanning {len(ips):,} IPs × {len(ports)} port(s) = {total_probes:,} probes...") + + cs = CleanScanState() + start = time.monotonic() + + scan_task = asyncio.ensure_future( + scan_clean_ips( + ips, workers=scan_cfg["workers"], timeout=3.0, + validate=scan_cfg["validate"], cs=cs, ports=ports, + ) + ) + + old_sigint = signal.getsignal(signal.SIGINT) + def _sig(sig, frame): + cs.interrupted = True + scan_task.cancel() + signal.signal(signal.SIGINT, _sig) + + last_pct = -1 + try: + while not scan_task.done(): + pct = cs.done * 100 // max(1, cs.total) + if pct != last_pct and pct % 5 == 0: + print(f" {pct}% ({cs.done:,}/{cs.total:,}) found {cs.found:,} clean") + last_pct = pct + await asyncio.sleep(1) + except (asyncio.CancelledError, Exception): + pass + finally: + signal.signal(signal.SIGINT, old_sigint) + + try: + results = await scan_task + except (asyncio.CancelledError, Exception): + results = sorted(cs.all_results or cs.results, key=lambda x: x[1]) + + elapsed = _fmt_elapsed(time.monotonic() - start) + print(f"\nDone in {elapsed}. Found {len(results):,} clean IPs.\n") + print(f"{'='*50}") + print(f"{'#':>4} {'Address':<22} {'Latency':>8}") + print(f"{'='*50}") + for i, (ip, lat) in enumerate(results[:30]): + print(f"{i+1:>4} {ip:<22} {lat:>6.0f}ms") + if len(results) > 30: + print(f" ...and {len(results)-30:,} more") + + if results: + try: + os.makedirs(RESULTS_DIR, exist_ok=True) + path = os.path.abspath(_results_path("clean_ips.txt")) + with open(path, "w", encoding="utf-8") as f: + for ip, lat in results: + f.write(f"{ip}\n") + print(f"\nSaved {len(results):,} IPs to {path}") + except Exception as e: + print(f"\nSave error: {e}") + path = "" + else: + print("\nNo clean IPs found. Nothing saved.") + path = "" + + # If --template also given, proceed to speed test + if getattr(args, "template", None) and results: + print(f"\nContinuing to speed test with template...") + addrs = [ip for ip, _ in results] + configs = generate_from_template(args.template, addrs) + if configs: + args.input = path + st = State() + st.input_file = f"clean ({len(results)} IPs)" + st.mode = args.mode + st.configs = configs + if args.rounds: + st.rounds = parse_rounds_str(args.rounds) + elif args.skip_download: + st.rounds = [] + print(f"Generated {len(configs)} configs") + print("Resolving DNS...") + await resolve_all(st) + print(f" {len(st.ips)} unique IPs") + if st.ips: + start2 = time.monotonic() + scan2 = asyncio.ensure_future( + run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) + ) + old2 = signal.getsignal(signal.SIGINT) + def _sig2(sig, frame): + st.interrupted = True + st.finished = True + scan2.cancel() + signal.signal(signal.SIGINT, _sig2) + try: + await scan2 + except asyncio.CancelledError: + st.interrupted = True + st.finished = True + calc_scores(st) + signal.signal(signal.SIGINT, old2) + + alive_results = sorted_alive(st, "score") + elapsed2 = _fmt_elapsed(time.monotonic() - start2) + print(f"\nSpeed test done in {elapsed2}. {st.alive_n} alive.") + print(f"{'='*80}") + for rank, r in enumerate(alive_results[:20], 1): + spd = f"{r.best_mbps:.2f}" if r.best_mbps > 0 else " -" + lat_s = f"{r.tls_ms:.0f}" if r.tls_ms > 0 else " -" + print(f"{rank:>3} {r.ip:<16} {lat_s:>6}ms {spd:>8} MB/s score={r.score:.1f}") + try: + csv_p, cfg_p, full_p = do_export(st, path, top=args.top) + print(f"\nSaved: {csv_p} | {cfg_p} | {full_p}") + except Exception as e: + print(f"Export error: {e}") + + +def main(): + if hasattr(sys.stdout, "reconfigure"): + try: + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + + p = argparse.ArgumentParser( + description="CF Config Scanner - test VLESS configs for latency + download speed", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""Run with no arguments for interactive TUI. + +Modes (sort by latency first, then speed-test the best): + quick Cut 50%% latency, 1MB x100 -> 5MB x20 (~200 MB, ~2-3 min) + normal Cut 40%% latency, 1MB x200 -> 5MB x50 -> 20MB x20 (~850 MB, ~5-10 min) + thorough Cut 15%% latency, 5MB xALL -> 25MB x150 -> 100MB x50 (~8-15 GB, ~30-60 min) + +Examples: + %(prog)s Interactive TUI + %(prog)s -i configs.txt TUI with file + %(prog)s --sub https://example.com/sub.txt Fetch from subscription URL + %(prog)s --template "vless://UUID@{ip}:443?..." -i addrs.json Generate from template + %(prog)s -i configs.txt --mode quick Quick scan + %(prog)s -i configs.txt --top 0 Export ALL sorted + %(prog)s -i configs.txt --no-tui -o results.csv Headless + %(prog)s --find-clean --no-tui Find clean CF IPs (headless) + %(prog)s --find-clean --no-tui --template "vless://..." Find + speed test +""", + ) + p.add_argument("-i", "--input", help="Input file (VLESS URIs or domains.json)") + p.add_argument("--sub", help="Subscription URL (fetches VLESS URIs from URL)") + p.add_argument("--template", help="Base VLESS URI template (use with -i address list)") + p.add_argument("-m", "--mode", choices=["quick", "normal", "thorough"], default="normal") + p.add_argument("--rounds", help='Custom rounds, e.g. "1MB:200,5MB:50,20MB:20"') + p.add_argument("-w", "--workers", type=int, default=LATENCY_WORKERS, help="Latency workers") + p.add_argument("--speed-workers", type=int, default=SPEED_WORKERS, help="Download workers") + p.add_argument("--timeout", type=float, default=LATENCY_TIMEOUT, help="Latency timeout (s)") + p.add_argument("--speed-timeout", type=float, default=SPEED_TIMEOUT, help="Download timeout (s)") + p.add_argument("--skip-download", action="store_true", help="Latency only") + p.add_argument("--top", type=int, default=50, help="Export top N configs (0 = ALL sorted best to worst)") + p.add_argument("--no-tui", action="store_true", help="Plain text output") + p.add_argument("-o", "--output", help="CSV output path (headless)") + p.add_argument("--output-configs", help="Save top VLESS URIs (headless)") + p.add_argument("--find-clean", action="store_true", help="Find clean Cloudflare IPs") + p.add_argument("--clean-mode", choices=["quick", "normal", "full", "mega"], default="normal", + help="Clean IP scan scope (quick=~4K, normal=~12K, full=~1.5M, mega=~3M multi-port)") + p.add_argument("--subnets", help="Custom subnets file or comma-separated CIDRs") + args = p.parse_args() + + args._mode_set = any(a == "-m" or a.startswith("--mode") for a in sys.argv) + + try: + if getattr(args, "find_clean", False) and args.no_tui: + asyncio.run(run_headless_clean(args)) + elif args.no_tui: + if not args.input and not args.sub and not args.template: + p.error("--input, --sub, or --template is required in --no-tui mode") + asyncio.run(run_headless(args)) + else: + asyncio.run(run_tui(args)) + except KeyboardInterrupt: + pass + finally: + _w(A.SHOW + "\n") + _fl() + + +if __name__ == "__main__": + main()