Files
cfray/scanner.py
2026-02-20 23:30:57 -06:00

3105 lines
113 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
#
# ┌─────────────────────────────────────────────────────────────────┐
# │ │
# │ ⚡ CF CONFIG SCANNER v1.0 │
# │ │
# │ Test VLESS/VMess proxy configs for latency + download speed │
# │ │
# │ • Latency test (TCP + TLS) all IPs in seconds │
# │ • Download speed test via progressive funnel │
# │ • Live TUI dashboard with real-time results │
# │ • Smart rate limiting with CDN fallback │
# │ • Clean IP Finder — scan all Cloudflare ranges (up to 3M) │
# │ • Multi-port scanning (443, 8443) for maximum coverage │
# │ • Zero dependencies — Python 3.8+ stdlib only │
# │ │
# │ GitHub: https://github.com/SamNet-dev/cfray │
# │ │
# └─────────────────────────────────────────────────────────────────┘
#
# Usage:
# python3 scanner.py Interactive TUI
# python3 scanner.py -i configs.txt Normal mode
# python3 scanner.py --sub https://example.com/sub Fetch from subscription
# python3 scanner.py --template "vless://..." -i addrs.json Generate + test
# python3 scanner.py --find-clean --no-tui --clean-mode mega Clean IP scan
#
import asyncio
import argparse
import base64
import csv
import glob as globmod
import ipaddress
import json
import os
import random
import re
import signal
import socket
import ssl
import statistics
import sys
import time
import urllib.parse
import urllib.request
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
VERSION = "1.0"
SPEED_HOST = "speed.cloudflare.com"
SPEED_PATH = "/__down"
DEBUG_LOG = os.path.join("results", "debug.log")
LOG_MAX_BYTES = 5 * 1024 * 1024
LATENCY_WORKERS = 50
SPEED_WORKERS = 10
LATENCY_TIMEOUT = 5.0
SPEED_TIMEOUT = 30.0
CDN_FALLBACK = ("cloudflaremirrors.com", "/archlinux/iso/latest/archlinux-x86_64.iso")
# Cloudflare published IPv4 ranges (https://www.cloudflare.com/ips-v4/)
CF_SUBNETS = [
"173.245.48.0/20", "103.21.244.0/22", "103.22.200.0/22",
"103.31.4.0/22", "141.101.64.0/18", "108.162.192.0/18",
"190.93.240.0/20", "188.114.96.0/20", "197.234.240.0/22",
"198.41.128.0/17", "162.158.0.0/15", "104.16.0.0/13",
"104.24.0.0/14", "172.64.0.0/13",
]
CF_HTTPS_PORTS = [443, 8443, 2053, 2083, 2087, 2096]
CLEAN_MODES = {
"quick": {"label": "Quick", "sample": 1, "workers": 500, "validate": False,
"ports": [443], "desc": "1 random IP per /24 (~4K IPs, ~30s)"},
"normal": {"label": "Normal", "sample": 3, "workers": 500, "validate": True,
"ports": [443], "desc": "3 IPs per /24 + CF verify (~12K IPs, ~2 min)"},
"full": {"label": "Full", "sample": 0, "workers": 1000, "validate": True,
"ports": [443], "desc": "All IPs + CF verify (~1.5M IPs, 20+ min)"},
"mega": {"label": "Mega", "sample": 0, "workers": 1500, "validate": True,
"ports": [443, 8443], "desc": "All IPs × 2 ports (~3M probes, 30-60 min)"},
}
PRESETS = {
"quick": {
"label": "Quick",
"desc": "Latency sort -> 1MB top 100 -> 5MB top 20",
"dynamic": True,
"latency_cut": 50,
"round_sizes": [1_000_000, 5_000_000],
"round_pcts": [100, 20],
"round_min": [50, 10],
"round_max": [100, 20],
"data": "~200 MB",
"time": "~2-3 min",
},
"normal": {
"label": "Normal",
"desc": "Latency sort -> 1MB top 200 -> 5MB top 50 -> 20MB top 20",
"dynamic": True,
"latency_cut": 40,
"round_sizes": [1_000_000, 5_000_000, 20_000_000],
"round_pcts": [100, 25, 10],
"round_min": [50, 20, 10],
"round_max": [200, 50, 20],
"data": "~850 MB",
"time": "~5-10 min",
},
"thorough": {
"label": "Thorough",
"desc": "Deep funnel: 5MB / 25MB / 50MB",
"dynamic": True,
"latency_cut": 15,
"round_sizes": [5_000_000, 25_000_000, 50_000_000],
"round_pcts": [100, 25, 10],
"round_min": [0, 30, 15],
"round_max": [0, 150, 50],
"data": "~5-10 GB",
"time": "~20-45 min",
},
}
class A:
RST = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
ITAL = "\033[3m"
ULINE = "\033[4m"
RED = "\033[31m"
GRN = "\033[32m"
YEL = "\033[33m"
BLU = "\033[34m"
MAG = "\033[35m"
CYN = "\033[36m"
WHT = "\033[97m"
BGBL = "\033[44m"
BGDG = "\033[100m"
HOME = "\033[H"
CLR = "\033[H\033[J"
EL = "\033[2K"
HIDE = "\033[?25l"
SHOW = "\033[?25h"
_ansi_re = re.compile(r"\033\[[^m]*m")
def _dbg(msg: str):
"""Append a debug line to results/debug.log with rotation."""
try:
os.makedirs("results", exist_ok=True)
if os.path.exists(DEBUG_LOG):
try:
sz = os.path.getsize(DEBUG_LOG)
if sz > LOG_MAX_BYTES:
bak = DEBUG_LOG + ".1"
if os.path.exists(bak):
os.remove(bak)
os.rename(DEBUG_LOG, bak)
except Exception:
pass
with open(DEBUG_LOG, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {msg}\n")
except Exception:
pass
def _char_width(c: str) -> int:
"""Return terminal column width of a single character (1 or 2)."""
o = ord(c)
# Common wide ranges: CJK, emojis, dingbats, symbols, etc.
if (
0x1100 <= o <= 0x115F # Hangul Jamo
or 0x2329 <= o <= 0x232A # angle brackets
or 0x2E80 <= o <= 0x303E # CJK radicals / ideographic
or 0x3040 <= o <= 0x33BF # Hiragana / Katakana / CJK compat
or 0x3400 <= o <= 0x4DBF # CJK Unified Extension A
or 0x4E00 <= o <= 0xA4CF # CJK Unified / Yi
or 0xA960 <= o <= 0xA97C # Hangul Jamo Extended-A
or 0xAC00 <= o <= 0xD7A3 # Hangul Syllables
or 0xF900 <= o <= 0xFAFF # CJK Compatibility Ideographs
or 0xFE10 <= o <= 0xFE6F # CJK compat forms / small forms
or 0xFF01 <= o <= 0xFF60 # Fullwidth forms
or 0xFFE0 <= o <= 0xFFE6 # Fullwidth signs
or 0x1F000 <= o <= 0x1FAFF # Mahjong, Domino, Playing Cards, Emojis, Symbols
or 0x20000 <= o <= 0x2FA1F # CJK Unified Extension B-F
or 0x2600 <= o <= 0x27BF # Misc symbols, Dingbats
or 0x2700 <= o <= 0x27BF # Dingbats
or 0xFE00 <= o <= 0xFE0F # Variation selectors (zero-width but paired with emoji)
or 0x200D == o # ZWJ (zero-width joiner)
or 0x231A <= o <= 0x231B # Watch, Hourglass
or 0x23E9 <= o <= 0x23F3 # Various symbols
or 0x23F8 <= o <= 0x23FA # Various symbols
or 0x25AA <= o <= 0x25AB # Small squares
or 0x25B6 == o or 0x25C0 == o # Play buttons
or 0x25FB <= o <= 0x25FE # Medium squares
or 0x2614 <= o <= 0x2615 # Umbrella, Hot beverage
or 0x2648 <= o <= 0x2653 # Zodiac signs
or 0x267F == o # Wheelchair
or 0x2693 == o # Anchor
or 0x26A1 == o # High voltage (⚡)
or 0x26AA <= o <= 0x26AB # Circles
or 0x26BD <= o <= 0x26BE # Soccer, Baseball
or 0x26C4 <= o <= 0x26C5 # Snowman, Sun behind cloud
or 0x26D4 == o # No entry
or 0x26EA == o # Church
or 0x26F2 <= o <= 0x26F3 # Fountain, Golf
or 0x26F5 == o # Sailboat
or 0x26FA == o # Tent
or 0x26FD == o # Fuel pump
or 0x2702 == o # Scissors
or 0x2705 == o # Check mark
or 0x2708 <= o <= 0x270D # Various
or 0x270F == o # Pencil
or 0x2753 <= o <= 0x2755 # Question marks (❓❔❕)
or 0x2757 == o # Exclamation
or 0x2795 <= o <= 0x2797 # Plus, Minus, Divide
or 0x27B0 == o or 0x27BF == o # Curly loop
):
return 2
# Zero-width characters
if o in (0xFE0F, 0xFE0E, 0x200D, 0x200B, 0x200C, 0x200E, 0x200F):
return 0
return 1
def _vl(s: str) -> int:
"""Visible length of a string, accounting for ANSI codes and wide chars."""
clean = _ansi_re.sub("", s)
return sum(_char_width(c) for c in clean)
def _w(text: str):
sys.stdout.write(text)
def _fl():
sys.stdout.flush()
def enable_ansi():
if sys.platform == "win32":
os.system("")
try:
import ctypes
k = ctypes.windll.kernel32
h = k.GetStdHandle(-11)
m = ctypes.c_ulong()
k.GetConsoleMode(h, ctypes.byref(m))
k.SetConsoleMode(h, m.value | 0x0004)
except Exception:
pass
def term_size() -> Tuple[int, int]:
try:
c, r = os.get_terminal_size()
return max(c, 60), max(r, 20)
except Exception:
return 80, 24
def _read_key_blocking() -> str:
"""Read a single key press (blocking). Returns key name."""
if sys.platform == "win32":
import msvcrt
k = msvcrt.getch()
if k in (b"\x00", b"\xe0"):
k2 = msvcrt.getch()
return {b"H": "up", b"P": "down", b"K": "left", b"M": "right"}.get(k2, "")
if k == b"\r":
return "enter"
if k == b"\x03":
return "ctrl-c"
if k == b"\x1b":
return "esc"
return k.decode("latin-1", errors="replace")
else:
import select as _sel
import termios, tty
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
ch = sys.stdin.read(1)
if ch == "\x1b":
rdy, _, _ = _sel.select([sys.stdin], [], [], 0.2)
if rdy:
ch2 = sys.stdin.read(1)
if ch2 == "[":
rdy2, _, _ = _sel.select([sys.stdin], [], [], 0.2)
if rdy2:
ch3 = sys.stdin.read(1)
return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "esc")
return "esc"
return "esc"
if ch == "\r" or ch == "\n":
return "enter"
if ch == "\x03":
return "ctrl-c"
return ch
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def _read_key_nb(timeout: float = 0.05) -> Optional[str]:
"""Non-blocking key read. Returns None if no key."""
if sys.platform == "win32":
import msvcrt
if msvcrt.kbhit():
return _read_key_blocking()
time.sleep(timeout)
return None
else:
import select
import termios, tty
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
rdy, _, _ = select.select([sys.stdin], [], [], timeout)
if rdy:
ch = sys.stdin.read(1)
if ch == "\x1b":
# Wait for escape sequence bytes (longer timeout for SSH)
rdy2, _, _ = select.select([sys.stdin], [], [], 0.2)
if rdy2:
ch2 = sys.stdin.read(1)
if ch2 == "[":
rdy3, _, _ = select.select([sys.stdin], [], [], 0.2)
if rdy3:
ch3 = sys.stdin.read(1)
return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "")
return ""
return "esc" # bare Esc key
if ch in ("\r", "\n"):
return "enter"
if ch == "\x03":
return "ctrl-c"
return ch
return None
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def _wait_any_key():
"""Simple blocking wait for any keypress. More robust than _read_key_blocking for popups."""
if sys.platform == "win32":
import msvcrt
msvcrt.getch()
else:
import termios, tty
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def _prompt_number(prompt: str, max_val: int) -> Optional[int]:
"""Show prompt, read a number from user. Returns None if cancelled."""
_w(A.SHOW)
_w(f"\n {prompt}")
_fl()
buf = ""
if sys.platform == "win32":
import msvcrt
while True:
k = msvcrt.getch()
if k == b"\r":
break
if k == b"\x1b" or k == b"\x03":
_w("\n")
return None
if k == b"\x08" and buf:
buf = buf[:-1]
_w("\b \b")
_fl()
continue
ch = k.decode("latin-1", errors="replace")
if ch.isdigit():
buf += ch
_w(ch)
_fl()
else:
import termios, tty
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
ch = sys.stdin.read(1)
if ch in ("\r", "\n"):
break
if ch == "\x1b" or ch == "\x03":
_w("\n")
return None
if ch == "\x7f" and buf: # backspace
buf = buf[:-1]
_w("\b \b")
_fl()
continue
if ch.isdigit():
buf += ch
_w(ch)
_fl()
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
_w(A.HIDE)
if buf and buf.isdigit():
n = int(buf)
if 1 <= n <= max_val:
return n
return None
def _fmt_elapsed(secs: float) -> str:
m, s = divmod(int(secs), 60)
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
@dataclass
class ConfigEntry:
address: str
name: str = ""
original_uri: str = ""
ip: str = ""
@dataclass
class RoundCfg:
size: int
keep: int
@property
def label(self) -> str:
if self.size >= 1_000_000:
return f"{self.size // 1_000_000}MB"
return f"{self.size // 1000}KB"
@dataclass
class Result:
ip: str
domains: List[str] = field(default_factory=list)
uris: List[str] = field(default_factory=list)
tcp_ms: float = -1
tls_ms: float = -1
ttfb_ms: float = -1
speeds: List[float] = field(default_factory=list)
best_mbps: float = -1
colo: str = ""
score: float = 0
error: str = ""
alive: bool = False
class State:
def __init__(self):
self.input_file = ""
self.configs: List[ConfigEntry] = []
self.ip_map: Dict[str, List[ConfigEntry]] = defaultdict(list)
self.ips: List[str] = []
self.res: Dict[str, Result] = {}
self.rounds: List[RoundCfg] = []
self.mode = "normal"
self.phase = "init"
self.phase_label = ""
self.cur_round = 0
self.total = 0
self.done_count = 0
self.alive_n = 0
self.dead_n = 0
self.best_speed = 0.0
self.start_time = 0.0
self.notify = "" # notification message shown in footer
self.notify_until = 0.0
self.top = 50 # export top N (0 = all)
self.finished = False
self.interrupted = False
self.saved = False
self.latency_cut_n = 0 # how many IPs were cut after latency phase
class CFRateLimiter:
"""Respects Cloudflare's per-IP rate limit window.
CF allows ~600 requests per 10-minute window to speed.cloudflare.com.
When 429 is received, retry-after header tells us exactly when the
window resets. We track request count and pause when budget runs out
or when CF explicitly tells us to wait.
"""
BUDGET = 550 # conservative limit (CF allows ~600)
WINDOW = 600 # 10-minute window in seconds
def __init__(self):
self.count = 0
self.window_start = 0.0
self.blocked_until = 0.0
self._lock = asyncio.Lock()
async def _wait_blocked(self, st: Optional["State"]):
"""Wait out a 429 block period (called outside lock)."""
while time.monotonic() < self.blocked_until:
if st and st.interrupted:
return
left = int(self.blocked_until - time.monotonic())
if st:
st.phase_label = f"CF rate limit — resuming in {left}s"
await asyncio.sleep(1)
async def _wait_budget(self, wait_until: float, st: Optional["State"]):
"""Wait for window reset when budget exhausted (called outside lock)."""
while time.monotonic() < wait_until:
if st and st.interrupted:
return
left = int(wait_until - time.monotonic())
if st:
st.phase_label = f"Rate limit ({self.count} reqs) — next window in {left}s"
await asyncio.sleep(1)
async def acquire(self, st: Optional["State"] = None):
"""Wait if we're rate-limited, then count a request."""
# Wait out any 429 block first (outside lock so others can also wait)
if self.blocked_until > 0 and time.monotonic() < self.blocked_until:
_dbg(f"RATE: waiting {self.blocked_until - time.monotonic():.0f}s for CF window reset")
await self._wait_blocked(st)
await self._lock.acquire()
try:
# Re-check after acquiring lock
if self.blocked_until > 0 and time.monotonic() >= self.blocked_until:
self.count = 0
self.window_start = time.monotonic()
self.blocked_until = 0.0
now = time.monotonic()
if self.window_start == 0.0:
self.window_start = now
if now - self.window_start >= self.WINDOW:
self.count = 0
self.window_start = now
if self.count >= self.BUDGET:
remaining = self.WINDOW - (now - self.window_start)
if remaining > 0:
_dbg(f"RATE: budget exhausted ({self.count} reqs), waiting {remaining:.0f}s")
wait_until = self.window_start + self.WINDOW
saved_window = self.window_start
self._lock.release()
try:
await self._wait_budget(wait_until, st)
finally:
await self._lock.acquire()
# Only reset if no other coroutine already did
if self.window_start == saved_window:
self.count = 0
self.window_start = time.monotonic()
else:
self.count = 0
self.window_start = time.monotonic()
self.count += 1
finally:
self._lock.release()
def would_block(self) -> bool:
"""Check if speed.cloudflare.com is currently rate-limited."""
now = time.monotonic()
if self.blocked_until > 0 and now < self.blocked_until:
return True
if self.window_start > 0 and now - self.window_start < self.WINDOW:
if self.count >= self.BUDGET:
return True
return False
def report_429(self, retry_after: int):
"""CF told us to wait. Set blocked_until so all workers pause.
Cap at 600s (10 min) — CF's actual window is 10 min but it sends
punitive retry-after (3600+) after repeated violations."""
capped = min(max(retry_after, 30), 600)
until = time.monotonic() + capped
if until > self.blocked_until:
self.blocked_until = until
_dbg(f"RATE: 429 received (retry-after={retry_after}s, capped={capped}s)")
def build_dynamic_rounds(mode: str, alive_count: int) -> List[RoundCfg]:
"""Build round configs dynamically based on mode and alive IP count."""
preset = PRESETS.get(mode, PRESETS["normal"])
if not preset.get("dynamic"):
return [RoundCfg(1_000_000, alive_count)]
sizes = preset["round_sizes"]
pcts = preset["round_pcts"]
mins = preset["round_min"]
maxs = preset["round_max"]
# Small sets (<50 IPs): test ALL in every round — no funnel needed
small_set = alive_count <= 50
rounds = []
for size, pct, mn, mx in zip(sizes, pcts, mins, maxs):
if small_set:
keep = alive_count
else:
keep = int(alive_count * pct / 100) if pct < 100 else alive_count
if mn > 0:
keep = max(mn, keep)
if mx > 0:
keep = min(mx, keep)
keep = min(keep, alive_count)
if keep > 0:
rounds.append(RoundCfg(size, keep))
return rounds
def parse_vless(uri: str) -> Optional[ConfigEntry]:
uri = uri.strip()
if not uri.startswith("vless://"):
return None
rest = uri[8:]
name = ""
if "#" in rest:
rest, name = rest.rsplit("#", 1)
name = urllib.parse.unquote(name)
if "?" in rest:
rest = rest.split("?", 1)[0]
if "@" not in rest:
return None
_, addr = rest.split("@", 1)
if addr.startswith("["):
if "]" not in addr:
return None
address = addr[1 : addr.index("]")]
else:
address = addr.rsplit(":", 1)[0]
return ConfigEntry(address=address, name=name, original_uri=uri.strip())
def parse_vmess(uri: str) -> Optional[ConfigEntry]:
uri = uri.strip()
if not uri.startswith("vmess://"):
return None
b64 = uri[8:]
if "#" in b64:
b64 = b64.split("#", 1)[0]
b64 += "=" * (-len(b64) % 4)
try:
try:
raw = base64.b64decode(b64).decode("utf-8", errors="replace")
except Exception:
raw = base64.urlsafe_b64decode(b64).decode("utf-8", errors="replace")
obj = json.loads(raw)
if not isinstance(obj, dict):
return None
except Exception:
return None
address = str(obj.get("add", ""))
if not address:
return None
name = str(obj.get("ps", ""))
return ConfigEntry(address=address, name=name, original_uri=uri.strip())
def parse_config(uri: str) -> Optional[ConfigEntry]:
"""Try parsing as VLESS or VMess."""
return parse_vless(uri) or parse_vmess(uri)
def load_input(path: str) -> List[ConfigEntry]:
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read()
except (FileNotFoundError, PermissionError, OSError) as e:
print(f" Error reading {path}: {e}")
return []
try:
data = json.loads(raw)
if isinstance(data, dict) and "data" in data:
data = data["data"]
out: List[ConfigEntry] = []
for i, e in enumerate(data):
d = e.get("domain", "")
if d:
out.append(
ConfigEntry(address=d, name=f"d-{i+1}", ip=e.get("ipv4", ""))
)
if out:
return out
except (json.JSONDecodeError, TypeError, AttributeError):
pass
out = []
for ln in raw.splitlines():
c = parse_config(ln)
if c:
out.append(c)
return out
def fetch_sub(url: str) -> List[ConfigEntry]:
"""Fetch configs from a subscription URL (base64 or plain VLESS URIs)."""
if not url.lower().startswith(("http://", "https://")):
print(f" Error: --sub only accepts http:// or https:// URLs")
return []
_dbg(f"Fetching subscription: {url}")
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8", errors="replace").strip()
except Exception as e:
_dbg(f"Subscription fetch failed: {e}")
print(f" Error fetching subscription: {e}")
return []
try:
decoded = base64.b64decode(raw).decode("utf-8", errors="replace")
if "://" in decoded:
raw = decoded
except Exception:
pass
out = []
for ln in raw.splitlines():
c = parse_config(ln.strip())
if c:
out.append(c)
_dbg(f"Subscription loaded: {len(out)} configs")
return out
def generate_from_template(template: str, addresses: List[str]) -> List[ConfigEntry]:
"""Generate configs by substituting addresses into a VLESS/VMess template."""
out = []
parsed = parse_config(template)
if not parsed:
return out
for i, addr in enumerate(addresses):
addr = addr.strip()
if not addr:
continue
# Handle ip:port format (e.g. from multi-port clean scan)
addr_ip = addr
addr_port = None
if ":" in addr and not addr.startswith("["):
parts = addr.rsplit(":", 1)
if parts[1].isdigit():
addr_ip, addr_port = parts[0], parts[1]
uri = re.sub(
r"(@)(\[[^\]]+\]|[^:]+)(:|$)",
lambda m: m.group(1) + addr_ip + m.group(3),
template,
count=1,
)
if addr_port:
# Replace existing port, or insert port if template had none
if re.search(r"@[^:/?#]+:\d+", uri):
uri = re.sub(r"(@[^:/?#]+:)\d+", lambda m: m.group(1) + addr_port, uri, count=1)
else:
uri = re.sub(r"(@[^/?#]+)([?/#])", lambda m: m.group(1) + ":" + addr_port + m.group(2), uri, count=1)
uri = re.sub(r"#.*$", f"#cfg-{i+1}-{addr_ip[:20]}", uri)
c = parse_config(uri)
if c:
out.append(c)
return out
def load_addresses(path: str) -> List[str]:
"""Load address list from JSON array or plain text (one per line)."""
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read()
except (FileNotFoundError, PermissionError, OSError) as e:
print(f" Error reading {path}: {e}")
return []
try:
data = json.loads(raw)
if isinstance(data, list):
return [str(d) for d in data if d]
if isinstance(data, dict):
for key in ("addresses", "domains", "ips", "data"):
if key in data and isinstance(data[key], list):
return [str(d) for d in data[key] if d]
except (json.JSONDecodeError, TypeError):
pass
return [ln.strip() for ln in raw.splitlines() if ln.strip()]
def _split_to_24s(subnets: List[str]) -> list:
"""Split CIDR subnets into /24 blocks, deduplicate."""
seen = set()
blocks = []
for sub in subnets:
try:
net = ipaddress.IPv4Network(sub.strip(), strict=False)
if net.prefixlen <= 24:
for block in net.subnets(new_prefix=24):
key = int(block.network_address)
if key not in seen:
seen.add(key)
blocks.append(block)
else:
key = int(net.network_address)
if key not in seen:
seen.add(key)
blocks.append(net)
except (ValueError, TypeError):
continue
return blocks
def generate_cf_ips(subnets: List[str], sample_per_24: int = 0) -> List[str]:
"""Generate IPs from CIDR subnets. sample_per_24=0 means all hosts."""
blocks = _split_to_24s(subnets)
random.shuffle(blocks)
ips = []
for net in blocks:
hosts = [str(ip) for ip in net.hosts()]
if sample_per_24 > 0 and sample_per_24 < len(hosts):
hosts = random.sample(hosts, sample_per_24)
ips.extend(hosts)
return ips
async def _tls_probe(
ip: str, sni: str, timeout: float, validate: bool = True, port: int = 443,
) -> Tuple[float, bool, str]:
"""TLS probe with optional Cloudflare header validation.
Returns (latency_ms, is_cloudflare, error)."""
w = None
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
t0 = time.monotonic()
r, w = await asyncio.wait_for(
asyncio.open_connection(ip, port, ssl=ctx, server_hostname=sni),
timeout=timeout,
)
tls_ms = (time.monotonic() - t0) * 1000
is_cf = True
if validate:
is_cf = False
try:
req = f"GET / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n"
w.write(req.encode())
await w.drain()
hdr = await asyncio.wait_for(r.read(2048), timeout=min(timeout, 3))
htxt = hdr.decode("latin-1", errors="replace").lower()
is_cf = "server: cloudflare" in htxt or "cf-ray:" in htxt
except Exception:
pass
w.close()
try:
await w.wait_closed()
except Exception:
pass
w = None
return tls_ms, is_cf, ""
except asyncio.TimeoutError:
return -1, False, "timeout"
except Exception as e:
return -1, False, str(e)[:40]
finally:
if w:
try:
w.close()
except Exception:
pass
@dataclass
class CleanScanState:
"""State for clean IP scanning progress."""
total: int = 0
done: int = 0
found: int = 0
interrupted: bool = False
results: List[Tuple[str, float]] = field(default_factory=list) # top 20 for display
all_results: List[Tuple[str, float]] = field(default_factory=list) # full reference
start_time: float = 0.0
async def scan_clean_ips(
ips: List[str],
sni: str = "speed.cloudflare.com",
workers: int = 500,
timeout: float = 3.0,
validate: bool = True,
cs: Optional[CleanScanState] = None,
ports: Optional[List[int]] = None,
) -> List[Tuple[str, float]]:
"""Scan IPs for TLS + optional CF validation. Returns [(addr, latency_ms)] sorted.
addr is 'ip' for port 443, or 'ip:port' for other ports."""
if ports is None:
ports = [443]
sem = asyncio.Semaphore(workers)
results: List[Tuple[str, float]] = []
lock = asyncio.Lock()
total_probes = len(ips) * len(ports)
if cs:
cs.total = total_probes
cs.done = 0
cs.found = 0
cs.start_time = time.monotonic()
async def probe(ip: str, port: int):
if cs and cs.interrupted:
return
async with sem:
if cs and cs.interrupted:
return
lat, is_cf, _err = await _tls_probe(ip, sni, timeout, validate, port)
if lat > 0 and is_cf:
addr = ip if port == 443 else f"{ip}:{port}"
async with lock:
results.append((addr, lat))
if cs:
cs.found += 1
cs.all_results = results # full reference for Ctrl+C recovery
if cs.found % 10 == 0 or cs.found <= 20:
cs.results = sorted(results, key=lambda x: x[1])[:20]
if cs:
cs.done += 1
# Build flat list of (ip, port) pairs
probes = [(ip, p) for ip in ips for p in ports]
random.shuffle(probes) # spread ports across batches for better coverage
BATCH = 50_000
for i in range(0, len(probes), BATCH):
if cs and cs.interrupted:
break
batch = probes[i : i + BATCH]
tasks = [asyncio.ensure_future(probe(ip, port)) for ip, port in batch]
try:
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
break
finally:
for t in tasks:
if not t.done():
t.cancel()
results.sort(key=lambda x: x[1])
return results
def load_configs_from_args(args) -> Tuple[List[ConfigEntry], str]:
"""Load configs based on CLI args. Returns (configs, source_label)."""
if getattr(args, "sub", None):
configs = fetch_sub(args.sub)
return configs, args.sub
if getattr(args, "template", None):
if not getattr(args, "input", None):
return [], "ERROR: --template requires -i (address list file)"
addrs = load_addresses(args.input)
configs = generate_from_template(args.template, addrs)
return configs, f"{args.input} ({len(addrs)} addresses)"
if getattr(args, "input", None):
configs = load_input(args.input)
return configs, args.input
return [], ""
def parse_size(s: str) -> int:
s = s.strip().upper()
m = re.match(r"^(\d+(?:\.\d+)?)\s*(MB|KB|GB|B)?$", s)
if not m:
try:
return max(1, int(s))
except ValueError:
return 1_000_000 # default 1MB
n = float(m.group(1))
u = m.group(2) or "B"
mul = {"B": 1, "KB": 1_000, "MB": 1_000_000, "GB": 1_000_000_000}
return max(1, int(n * mul.get(u, 1)))
def parse_rounds_str(s: str) -> List[RoundCfg]:
out = []
for p in s.split(","):
p = p.strip()
if ":" in p:
sz, top = p.split(":", 1)
try:
out.append(RoundCfg(parse_size(sz), int(top)))
except ValueError:
pass # skip malformed round
return out
def find_config_files() -> List[Tuple[str, str, int]]:
"""Find config files in cwd. Returns [(path, type, count)]."""
results = []
for pat in ("*.txt", "*.json", "*.conf", "*.lst"):
for p in globmod.glob(pat):
try:
with open(p, "r", encoding="utf-8", errors="replace") as f:
head = f.read(2048)
count = 0
json_ok = False
if head.strip().startswith("{") or head.strip().startswith("["):
try:
with open(p, encoding="utf-8") as jf:
d = json.loads(jf.read())
if isinstance(d, dict) and "data" in d:
d = d["data"]
if isinstance(d, list):
count = len(d)
results.append((p, "json", count))
json_ok = True
except Exception:
pass
if not json_ok and ("vless://" in head or "vmess://" in head):
with open(p, encoding="utf-8") as f:
count = sum(1 for ln in f if ln.strip().startswith(("vless://", "vmess://")))
results.append((p, "configs", count))
except Exception:
pass
results.sort(key=lambda x: x[2], reverse=True)
return results
async def _resolve(e: ConfigEntry, sem: asyncio.Semaphore, counter: List[int]) -> ConfigEntry:
if e.ip:
counter[0] += 1
return e
async with sem:
try:
loop = asyncio.get_running_loop()
info = await loop.getaddrinfo(e.address, 443, family=socket.AF_INET)
if info:
e.ip = info[0][4][0]
except Exception:
e.ip = ""
counter[0] += 1
return e
async def resolve_all(st: State, workers: int = 100):
sem = asyncio.Semaphore(workers)
counter = [0] # mutable for closure
total = len(st.configs)
async def _progress():
spin = "|/-\\"
i = 0
while counter[0] < total:
s = spin[i % len(spin)]
pct = counter[0] * 100 // max(1, total)
_w(f"\r {A.CYN}{s}{A.RST} Resolving DNS... {counter[0]}/{total} ({pct}%) ")
_fl()
i += 1
await asyncio.sleep(0.15)
_w(f"\r {A.GRN}OK{A.RST} Resolved {total} domains -> {len(set(c.ip for c in st.configs if c.ip))} unique IPs\n")
_fl()
prog_task = asyncio.create_task(_progress())
try:
st.configs = list(await asyncio.gather(*[_resolve(c, sem, counter) for c in st.configs]))
finally:
prog_task.cancel()
try:
await prog_task
except asyncio.CancelledError:
pass
for c in st.configs:
if c.ip:
st.ip_map[c.ip].append(c)
st.ips = list(st.ip_map.keys())
for ip in st.ips:
cs = st.ip_map[ip]
st.res[ip] = Result(
ip=ip,
domains=[c.address for c in cs],
uris=[c.original_uri for c in cs if c.original_uri],
)
async def _lat_one(ip: str, sni: str, timeout: float) -> Tuple[float, float, str]:
"""Measure TCP RTT and full TLS connection time (TCP+TLS handshake)."""
try:
t0 = time.monotonic()
r, w = await asyncio.wait_for(
asyncio.open_connection(ip, 443), timeout=timeout
)
tcp = (time.monotonic() - t0) * 1000
w.close()
try:
await w.wait_closed()
except Exception:
pass
except asyncio.TimeoutError:
return -1, -1, "tcp-timeout"
except Exception as e:
return -1, -1, f"tcp:{str(e)[:50]}"
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
t0 = time.monotonic()
r, w = await asyncio.wait_for(
asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=sni),
timeout=timeout,
)
tls_full = (time.monotonic() - t0) * 1000 # full TCP+TLS time
w.close()
try:
await w.wait_closed()
except Exception:
pass
return tcp, tls_full, ""
except asyncio.TimeoutError:
return tcp, -1, "tls-timeout"
except Exception as e:
return tcp, -1, f"tls:{str(e)[:50]}"
async def phase1(st: State, workers: int, timeout: float):
st.phase = "latency"
st.phase_label = "Testing latency"
st.total = len(st.ips)
st.done_count = 0
sem = asyncio.Semaphore(workers)
async def go(ip: str):
async with sem:
if st.interrupted:
return
res = st.res[ip]
# Use speed.cloudflare.com as SNI — filters out non-CF IPs early
# (non-CF IPs will fail TLS since they don't serve this cert)
tcp, tls, err = await _lat_one(ip, SPEED_HOST, timeout)
res.tcp_ms = tcp
res.tls_ms = tls
res.error = err
res.alive = tls > 0
st.done_count += 1
if res.alive:
st.alive_n += 1
else:
st.dead_n += 1
tasks = [asyncio.ensure_future(go(ip)) for ip in st.ips]
try:
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
pass
finally:
for t in tasks:
if not t.done():
t.cancel()
async def _dl_one(
ip: str, size: int, timeout: float,
host: str = "", path: str = "",
) -> Tuple[float, float, int, str, str]:
"""Download test. Returns (ttfb_ms, mbps, bytes, colo, error).
Error "429" means rate-limited — caller should back off."""
if not host:
host = SPEED_HOST
if not path:
path = f"{SPEED_PATH}?bytes={size}"
dl_timeout = max(timeout, 30 + (size / 1_000_000) * 2)
conn_timeout = min(timeout, 15)
w = None
total = 0
dl_start = 0.0
ttfb = 0.0
colo = ""
def _cleanup():
nonlocal w
if w is not None:
try:
w.close()
except Exception:
pass
w = None
try:
ctx = ssl.create_default_context()
t_start = time.monotonic()
try:
t0 = t_start
r, w = await asyncio.wait_for(
asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=host),
timeout=conn_timeout,
)
except ssl.SSLCertVerificationError:
_cleanup()
ctx2 = ssl.create_default_context()
ctx2.check_hostname = False
ctx2.verify_mode = ssl.CERT_NONE
t0 = time.monotonic()
r, w = await asyncio.wait_for(
asyncio.open_connection(
ip, 443, ssl=ctx2, server_hostname=host
),
timeout=conn_timeout,
)
conn_ms = (time.monotonic() - t0) * 1000
range_hdr = ""
if "bytes=" not in path:
range_hdr = f"Range: bytes=0-{size - 1}\r\n"
req = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"User-Agent: Mozilla/5.0 (X11; Linux x86_64) Chrome/120\r\n"
f"Accept: */*\r\n"
f"{range_hdr}"
f"Connection: close\r\n\r\n"
)
w.write(req.encode())
await w.drain()
hbuf = b""
while b"\r\n\r\n" not in hbuf:
ch = await asyncio.wait_for(r.read(4096), timeout=min(conn_timeout, 10))
if not ch:
_dbg(f"DL {ip} {size}: empty response (no headers)")
return -1, 0, 0, "", "empty"
hbuf += ch
if len(hbuf) > 65536:
_dbg(f"DL {ip} {size}: header too big")
return -1, 0, 0, "", "hdr-too-big"
sep = hbuf.index(b"\r\n\r\n") + 4
htxt = hbuf[:sep].decode("latin-1", errors="replace")
body0 = hbuf[sep:]
status_line = htxt.split("\r\n")[0]
status_parts = status_line.split(None, 2)
status_code = status_parts[1] if len(status_parts) >= 2 else ""
if status_code == "429":
ra = ""
for line in htxt.split("\r\n"):
if line.lower().startswith("retry-after:"):
ra = line.split(":", 1)[1].strip()
break
_dbg(f"DL {ip} {size}: 429 rate-limited (retry-after={ra})")
return -1, 0, 0, "", f"429:{ra}"
if status_code not in ("200", "206"):
_dbg(f"DL {ip} {size}: HTTP error: {status_line[:80]}")
return -1, 0, 0, "", f"http:{status_line[:40]}"
for line in htxt.split("\r\n"):
if line.lower().startswith("cf-ray:"):
ray = line.split(":", 1)[1].strip()
if "-" in ray:
colo = ray.rsplit("-", 1)[-1]
break
ttfb = (time.monotonic() - t0) * 1000 - conn_ms
dl_start = time.monotonic()
total = len(body0)
sample_interval = 1_000_000 if size >= 5_000_000 else size + 1
next_sample = sample_interval
samples: List[Tuple[int, float]] = []
min_for_stable = min(size // 2, 20_000_000) if size >= 5_000_000 else size
min_samples = 5 if size >= 10_000_000 else 3
while True:
try:
elapsed_total = time.monotonic() - t_start
left = max(1.0, dl_timeout - elapsed_total)
ch = await asyncio.wait_for(r.read(65536), timeout=min(left, 10))
if not ch:
break
total += len(ch)
if total >= next_sample:
elapsed = time.monotonic() - dl_start
samples.append((total, elapsed))
next_sample += sample_interval
# only check stability after enough data downloaded
if len(samples) >= min_samples and total >= min_for_stable:
recent = samples[-4:]
sp = []
for j in range(1, len(recent)):
db = recent[j][0] - recent[j - 1][0]
dt = recent[j][1] - recent[j - 1][1]
if dt > 0:
sp.append(db / dt)
if len(sp) >= 2:
mn = statistics.mean(sp)
if mn > 0:
try:
sd = statistics.stdev(sp)
if sd / mn < 0.10:
break
except statistics.StatisticsError:
pass
except asyncio.TimeoutError:
break
except Exception:
break
dl_t = time.monotonic() - dl_start
mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0
_dbg(f"DL {ip} {size}: OK {mbps:.2f}MB/s total={total} dt={dl_t:.1f}s host={host}")
return ttfb, mbps, total, colo, ""
except asyncio.TimeoutError:
if total > 0 and dl_start > 0:
dl_t = time.monotonic() - dl_start
mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0
_dbg(f"DL {ip} {size}: TIMEOUT partial={total}B mbps={mbps:.2f} dt={dl_t:.1f}s")
if mbps > 0:
return ttfb, mbps, total, colo, ""
_dbg(f"DL {ip} {size}: TIMEOUT no data total={total}")
return -1, 0, 0, "", "timeout"
except Exception as e:
if total > 0 and dl_start > 0:
dl_t = time.monotonic() - dl_start
mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0
_dbg(f"DL {ip} {size}: ERR partial={total}B mbps={mbps:.2f} err={e}")
if mbps > 0:
return ttfb, mbps, total, colo, ""
_dbg(f"DL {ip} {size}: ERR no data err={e}")
return -1, 0, 0, "", str(e)[:60]
finally:
_cleanup()
async def phase2_round(
st: State,
rcfg: RoundCfg,
candidates: List[str],
workers: int,
timeout: float,
rlim: Optional[CFRateLimiter] = None,
cdn_host: str = "",
cdn_path: str = "",
):
st.total = len(candidates)
st.done_count = 0
if rcfg.size >= 50_000_000:
workers = min(workers, 6)
elif rcfg.size >= 10_000_000:
workers = min(workers, 8)
sem = asyncio.Semaphore(workers)
max_retries = 2
async def go(ip: str):
best_mbps_this = 0.0
best_ttfb = -1.0
best_colo = ""
last_err = ""
force_cdn = False # set True when CF rejects (403/429)
for attempt in range(max_retries):
if st.interrupted:
break
# Pick endpoint: speed.cloudflare.com if budget available, else fallback CDN
use_host = cdn_host
use_path = cdn_path
if force_cdn and CDN_FALLBACK:
use_host, use_path = CDN_FALLBACK
_dbg(f"DL {ip}: forced fallback CDN {use_host}")
elif rlim and rlim.would_block() and CDN_FALLBACK:
use_host, use_path = CDN_FALLBACK
_dbg(f"DL {ip}: using fallback CDN {use_host}")
elif rlim:
await rlim.acquire(st)
# acquire sem for the actual download
await sem.acquire()
try:
if st.interrupted:
break
ttfb, mbps, _total, colo, err = await _dl_one(
ip, rcfg.size, timeout, host=use_host, path=use_path,
)
finally:
sem.release() # free slot immediately after download
if mbps > 0:
best_mbps_this = mbps
best_ttfb = ttfb
best_colo = colo
break
# 429 from speed.cloudflare.com: report + force CDN on retry
if err.startswith("429") and use_host == SPEED_HOST:
ra_str = err.split(":", 1)[1] if ":" in err else ""
try:
ra = int(ra_str)
except (ValueError, TypeError):
ra = 60
if rlim:
rlim.report_429(ra)
_dbg(f"DL {ip}: 429 reported to limiter (retry-after={ra})")
force_cdn = True
# 403 from speed.cloudflare.com: CF rejected size, force CDN
elif err.startswith("http:") and use_host == SPEED_HOST:
_dbg(f"DL {ip}: {err} from CF, switching to CDN fallback")
force_cdn = True
# error from fallback CDN
elif err.startswith("429") or err.startswith("http:"):
_dbg(f"DL {ip}: {err} from {use_host}, will retry")
last_err = err
res = st.res[ip]
res.speeds.append(best_mbps_this)
if best_mbps_this > 0:
if best_mbps_this > res.best_mbps:
res.best_mbps = best_mbps_this
if best_ttfb > 0 and (res.ttfb_ms < 0 or best_ttfb < res.ttfb_ms):
res.ttfb_ms = best_ttfb
if best_colo and not res.colo:
res.colo = best_colo
if best_mbps_this > st.best_speed:
st.best_speed = best_mbps_this
elif last_err:
res.error = last_err
st.done_count += 1
tasks = [asyncio.ensure_future(go(ip)) for ip in candidates]
try:
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
pass
finally:
for t in tasks:
if not t.done():
t.cancel()
def calc_scores(st: State):
has_speed = any(r.best_mbps > 0 for r in st.res.values())
for r in st.res.values():
if not r.alive:
r.score = 0
continue
lat = max(0, 100 - r.tls_ms / 10) if r.tls_ms > 0 else 0
spd = min(100, r.best_mbps * 20) if r.best_mbps > 0 else 0
ttfb = max(0, 100 - r.ttfb_ms / 5) if r.ttfb_ms > 0 else 0
if r.best_mbps > 0:
r.score = round(lat * 0.35 + spd * 0.50 + ttfb * 0.15, 1)
elif has_speed:
# Speed rounds ran but this IP wasn't tested - rank below tested ones
r.score = round(lat * 0.35, 1)
else:
# No speed rounds at all (latency-only mode)
r.score = round(lat, 1)
def sorted_alive(st: State, key: str = "score") -> List[Result]:
alive = [r for r in st.res.values() if r.alive]
if key == "score":
alive.sort(key=lambda r: r.score, reverse=True)
elif key == "latency":
alive.sort(key=lambda r: r.tls_ms)
elif key == "speed":
alive.sort(key=lambda r: r.best_mbps, reverse=True)
return alive
def sorted_all(st: State, key: str = "score") -> List[Result]:
"""Return all results: alive sorted by key, then dead at the bottom."""
alive = sorted_alive(st, key)
dead = [r for r in st.res.values() if not r.alive]
dead.sort(key=lambda r: r.ip)
return alive + dead
def draw_menu_header(cols: int) -> List[str]:
W = cols - 2
lines = []
lines.append(f"{A.CYN}{'' * W}{A.RST}")
t = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST} {A.DIM}v{VERSION}{A.RST}"
lines.append(f"{A.CYN}{A.RST}" + t + " " * (W - _vl(t)) + f"{A.CYN}{A.RST}")
lines.append(f"{A.CYN}{'' * W}{A.RST}")
return lines
def draw_box_line(content: str, cols: int) -> str:
W = cols - 2
vl = _vl(content)
pad = " " * max(0, W - vl)
return f"{A.CYN}{A.RST}{content}{pad}{A.CYN}{A.RST}"
def draw_box_sep(cols: int) -> str:
return f"{A.CYN}{'' * (cols - 2)}{A.RST}"
def draw_box_bottom(cols: int) -> str:
return f"{A.CYN}{'' * (cols - 2)}{A.RST}"
def tui_show_guide():
"""Show help/guide screen explaining input formats."""
_w(A.CLR + A.HOME)
cols, rows = term_size()
lines = draw_menu_header(cols)
lines.append(draw_box_line(f" {A.BOLD}{A.WHT}How to prepare input files{A.RST}", cols))
lines.append(draw_box_sep(cols))
guide = [
"",
f" {A.BOLD}{A.CYN}Local Files (auto-detected){A.RST}",
f" {A.DIM}Place config files in the same directory where you run cfray.{A.RST}",
f" {A.DIM}Supported formats: {A.WHT}.txt .json .conf .lst{A.RST}",
f" {A.DIM}They will appear automatically under {A.WHT}LOCAL FILES{A.DIM} in the menu.{A.RST}",
f" {A.GRN}Example:{A.RST} {A.DIM}cp configs.txt /root/ && cd /root && python3 scanner.py{A.RST}",
"",
f" {A.BOLD}{A.CYN}[P] Enter File Path{A.RST}",
f" {A.DIM}Load a config file from any location by typing its full path.{A.RST}",
f" {A.GRN}Example:{A.RST} {A.DIM}/home/user/configs/my_vless.txt{A.RST}",
"",
f" {A.CYN}{'' * 46}{A.RST}",
"",
f" {A.BOLD}{A.CYN}[1-9] VLESS/VMess URI file (.txt){A.RST}",
f" {A.DIM}Text file, one URI per line. Can mix VLESS and VMess.{A.RST}",
f" {A.GRN}vless://uuid@domain:443?type=ws&host=sni.com&...#name{A.RST}",
"",
f" {A.BOLD}{A.CYN}[1-9] Domain JSON file (.json){A.RST}",
f' {A.DIM}JSON with domain+IP:{A.RST} {A.GRN}{{"data": [{{"domain":"x.ir","ipv4":"1.2.3.4"}}]}}{A.RST}',
"",
f" {A.BOLD}{A.CYN}[S] Subscription URL{A.RST}",
f" {A.DIM}Fetches VLESS/VMess configs from a remote URL (plain or base64).{A.RST}",
f" {A.GRN}https://example.com/sub.txt{A.RST}",
f" {A.DIM}CLI: python3 scanner.py --sub URL{A.RST}",
"",
f" {A.BOLD}{A.CYN}[T] Template + Address list{A.RST}",
f" {A.DIM}Give one VLESS/VMess template + a file of CF IPs/domains.{A.RST}",
f" {A.DIM}Scanner replaces the address for each entry and tests them all.{A.RST}",
f" {A.WHT}Template:{A.RST} {A.GRN}vless://uuid@ADDR:443?type=ws&...#name{A.RST}",
f" {A.WHT}Addresses:{A.RST} {A.GRN}one IP or domain per line (.txt){A.RST}",
f" {A.DIM}CLI: python3 scanner.py --template 'vless://...' -i addrs.txt{A.RST}",
"",
f" {A.BOLD}{A.CYN}[F] Find Clean Cloudflare IPs{A.RST}",
f" {A.DIM}Scans all CF IP ranges to find reachable edge IPs.{A.RST}",
f" {A.DIM}Modes: Quick (~4K), Normal (~12K), Full (~1.5M), Mega (~3M multi-port){A.RST}",
f" {A.DIM}Mega tests all IPs on ports 443+8443 for maximum coverage.{A.RST}",
f" {A.DIM}Found IPs can be saved or used with a template for speed test.{A.RST}",
f" {A.DIM}CLI: python3 scanner.py --find-clean --no-tui --clean-mode mega{A.RST}",
"",
f" {A.CYN}{'' * 46}{A.RST}",
f" {A.BOLD}{A.CYN}How it works:{A.RST}",
f" {A.DIM}1. Resolve domains to CF edge IPs, deduplicate{A.RST}",
f" {A.DIM}2. Test TCP+TLS latency, cut bottom by latency{A.RST}",
f" {A.DIM}3. Speed test top candidates in progressive rounds{A.RST}",
f" {A.DIM}4. Score = latency 35% + speed 50% + TTFB 15%{A.RST}",
f" {A.DIM}5. Export top configs ranked by score{A.RST}",
"",
f" {A.BOLD}{A.WHT}Made By Sam - SamNet Technologies{A.RST}",
f" {A.DIM}https://github.com/SamNet-dev/cfray{A.RST}",
]
# Fit within terminal: header(3) + title(1) + sep(1) + guide + footer(3)
max_guide = rows - 8
if max_guide < len(guide):
guide = guide[:max_guide]
for g in guide:
lines.append(draw_box_line(g, cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_sep(cols))
lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
_read_key_blocking()
def _clean_pick_mode() -> Optional[str]:
"""Pick scan scope for clean IP finder. Returns mode or None/'__back__'."""
while True:
_w(A.CLR + A.HOME + A.HIDE)
cols, _ = term_size()
lines = draw_menu_header(cols)
lines.append(draw_box_line(f" {A.BOLD}Find Clean Cloudflare IPs{A.RST}", cols))
lines.append(draw_box_line(f" {A.DIM}Scans Cloudflare IP ranges to find reachable edge IPs{A.RST}", cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_sep(cols))
lines.append(draw_box_line(f" {A.BOLD}Select scan scope:{A.RST}", cols))
lines.append(draw_box_line("", cols))
for name, key in [("quick", "1"), ("normal", "2"), ("full", "3"), ("mega", "4")]:
cfg = CLEAN_MODES[name]
num = f"{A.CYN}{A.BOLD}{key}{A.RST}"
lbl = f"{A.BOLD}{cfg['label']}{A.RST}"
if name == "normal":
lbl += f" {A.GRN}(recommended){A.RST}"
lines.append(draw_box_line(f" {num} {lbl}", cols))
desc = cfg["desc"]
if len(cfg.get("ports", [])) > 1:
desc += f" (ports: {', '.join(str(p) for p in cfg['ports'])})"
lines.append(draw_box_line(f" {A.DIM}{desc}{A.RST}", cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_sep(cols))
lines.append(draw_box_line(f" {A.DIM}[1-4] Select [B] Back [Q] Quit{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
key = _read_key_blocking()
if key in ("q", "ctrl-c"):
return None
if key in ("b", "esc"):
return "__back__"
if key == "1":
return "quick"
if key == "2" or key == "enter":
return "normal"
if key == "3":
return "full"
if key == "4":
return "mega"
def _draw_clean_progress(cs: CleanScanState):
"""Draw live progress screen for clean IP scan."""
cols, rows = term_size()
W = cols - 2
out: List[str] = []
def bx(c: str):
out.append(f"{A.CYN}{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
elapsed = _fmt_elapsed(time.monotonic() - cs.start_time) if cs.start_time else "0s"
title = f" {A.BOLD}{A.WHT}Finding Clean Cloudflare IPs{A.RST}"
right = f"{A.DIM}{elapsed} | ^C stop{A.RST}"
bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right)
out.append(f"{A.CYN}{'' * W}{A.RST}")
pct = cs.done * 100 // max(1, cs.total)
bw = max(1, min(30, W - 40))
filled = int(bw * pct / 100)
bar = f"{A.GRN}{'' * filled}{A.DIM}{'' * (bw - filled)}{A.RST}"
bx(f" Probing [{bar}] {cs.done:,}/{cs.total:,} {pct}%")
found_line = f" {A.GRN}Found: {cs.found:,} clean IPs{A.RST}"
if cs.results:
best_lat = cs.results[0][1]
found_line += f" {A.DIM}Best: {best_lat:.0f}ms{A.RST}"
bx(found_line)
out.append(f"{A.CYN}{'' * W}{A.RST}")
bx(f" {A.BOLD}Top IPs found (by latency):{A.RST}")
vis = min(15, rows - 12)
if cs.results:
for i, (ip, lat) in enumerate(cs.results[:vis]):
bx(f" {A.CYN}{i+1:>3}.{A.RST} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}")
else:
bx(f" {A.DIM}Scanning...{A.RST}")
# Fill remaining space
used = len(cs.results[:vis]) if cs.results else 1
for _ in range(vis - used):
bx("")
out.append(f"{A.CYN}{'' * W}{A.RST}")
bx(f" {A.DIM}Press Ctrl+C to stop early and show results{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
_w(A.HOME)
_w("\n".join(out) + "\n")
_fl()
def _clean_show_results(results: List[Tuple[str, float]], elapsed: str) -> Optional[str]:
"""Show clean IP results with j/k scrolling. Returns action string or None."""
MAX_SHOW = 300
display = results[:MAX_SHOW]
offset = 0
while True:
_w(A.CLR + A.HOME + A.HIDE)
cols, rows = term_size()
lines = draw_menu_header(cols)
if results:
lines.append(draw_box_line(
f" {A.BOLD}{A.GRN}Scan Complete!{A.RST} "
f"Found {A.BOLD}{len(results):,}{A.RST} clean IPs in {elapsed}", cols))
else:
lines.append(draw_box_line(f" {A.YEL}Scan Complete — no clean IPs found.{A.RST}", cols))
lines.append(draw_box_sep(cols))
if display:
# header + separator = 2 rows, footer area = 5 rows, menu header = 3 rows
vis = max(5, rows - 13)
end = min(len(display), offset + vis)
hdr = f" {A.BOLD}{'#':>4} {'Address':<22} {'Latency':>8}{A.RST}"
if len(display) > vis:
pos = f"{A.DIM}[{offset+1}-{end} of {len(display)}"
if len(results) > MAX_SHOW:
pos += f", {len(results):,} total"
pos += f"]{A.RST}"
hdr += " " * max(1, cols - 2 - _vl(hdr) - _vl(pos) - 1) + pos
lines.append(draw_box_line(hdr, cols))
lines.append(draw_box_line(
f" {A.DIM}{''*4} {''*22} {''*8}{A.RST}", cols))
for i in range(offset, end):
ip, lat = display[i]
lines.append(draw_box_line(
f" {i+1:>4} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}", cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_sep(cols))
ft = ""
if results:
ft += f" {A.CYN}[S]{A.RST} Save all {A.CYN}[T]{A.RST} Template+SpeedTest "
ft += f" {A.CYN}[B]{A.RST} Back"
lines.append(draw_box_line(ft, cols))
if display and len(display) > vis:
lines.append(draw_box_line(
f" {A.DIM}j/↓ down k/↑ up n/p page down/up{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
key = _read_key_blocking()
if key in ("b", "esc", "q", "ctrl-c"):
return "back"
if key in ("j", "down"):
vis = max(5, rows - 13)
offset = min(offset + 1, max(0, len(display) - vis))
continue
if key in ("k", "up"):
offset = max(0, offset - 1)
continue
if key == "n":
vis = max(5, rows - 13)
offset = min(offset + vis, max(0, len(display) - vis))
continue
if key == "p":
vis = max(5, rows - 13)
offset = max(0, offset - vis)
continue
if key == "s" and results:
return "save"
if key == "t" and results:
_w(A.SHOW)
_w(f"\n {A.BOLD}{A.CYN}Speed Test with Clean IPs{A.RST}\n")
_w(f" {A.DIM}Paste a VLESS/VMess config URI. The address in it will be{A.RST}\n")
_w(f" {A.DIM}replaced with each clean IP, then all configs get speed-tested.{A.RST}\n\n")
_w(f" {A.CYN}Template:{A.RST} ")
_fl()
try:
tpl = input().strip()
except (EOFError, KeyboardInterrupt):
continue
if not tpl or not parse_config(tpl):
_w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n")
_fl()
time.sleep(1.5)
continue
return f"template:{tpl}"
async def tui_run_clean_finder() -> Optional[Tuple[str, str]]:
"""Run the clean IP finder flow. Returns (input_method, input_value) or None."""
mode = _clean_pick_mode()
if mode is None:
return None
if mode == "__back__":
return ("__back__", "")
scan_cfg = CLEAN_MODES[mode]
# Generate IPs
_w(A.CLR + A.HOME)
cols, _ = term_size()
lines = draw_menu_header(cols)
lines.append(draw_box_line(
f" {A.BOLD}Generating IPs from {len(CF_SUBNETS)} Cloudflare ranges...{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
ips = generate_cf_ips(CF_SUBNETS, scan_cfg["sample"])
ports = scan_cfg.get("ports", [443])
_dbg(f"CLEAN: Generated {len(ips):,} IPs × {len(ports)} port(s), sample={scan_cfg['sample']}")
# Run scan with live progress
cs = CleanScanState()
scan_task = asyncio.ensure_future(
scan_clean_ips(
ips, workers=scan_cfg["workers"], timeout=3.0,
validate=scan_cfg["validate"], cs=cs, ports=ports,
)
)
old_sigint = signal.getsignal(signal.SIGINT)
def _sig(sig, frame):
cs.interrupted = True
scan_task.cancel()
signal.signal(signal.SIGINT, _sig)
_w(A.CLR + A.HIDE)
try:
while not scan_task.done():
_draw_clean_progress(cs)
await asyncio.sleep(0.3)
except (asyncio.CancelledError, Exception):
pass
finally:
signal.signal(signal.SIGINT, old_sigint)
try:
results = await scan_task
except (asyncio.CancelledError, Exception):
results = sorted(cs.all_results or cs.results, key=lambda x: x[1])
elapsed = _fmt_elapsed(time.monotonic() - cs.start_time)
_dbg(f"CLEAN: Done in {elapsed}. Found {len(results):,} / {len(ips):,}")
# Show results and get user action
action = _clean_show_results(results, elapsed)
if action is None or action == "back":
return ("__back__", "")
if action == "save":
try:
os.makedirs(RESULTS_DIR, exist_ok=True)
path = os.path.abspath(_results_path("clean_ips.txt"))
with open(path, "w", encoding="utf-8") as f:
for ip, lat in results:
f.write(f"{ip}\n")
_w(f"\n {A.GRN}Saved {len(results):,} IPs to {path}{A.RST}\n")
except Exception as e:
_w(f"\n {A.RED}Save error: {e}{A.RST}\n")
_w(f" {A.DIM}Press any key...{A.RST}\n")
_fl()
_wait_any_key()
return ("__back__", "")
if action.startswith("template:"):
template_uri = action[9:]
try:
os.makedirs(RESULTS_DIR, exist_ok=True)
path = os.path.abspath(_results_path("clean_ips.txt"))
with open(path, "w", encoding="utf-8") as f:
for ip, lat in results:
f.write(f"{ip}\n")
except Exception as e:
_w(f"\n {A.RED}Save error: {e}{A.RST}\n")
_fl()
time.sleep(2)
return ("__back__", "")
return ("template", f"{template_uri}|||{path}")
return None
def _tui_prompt_text(label: str) -> Optional[str]:
"""Show cursor, prompt for text input, return stripped text or None."""
_w(A.SHOW)
_w(f"\n {A.CYN}{label}{A.RST} ")
_fl()
try:
val = input().strip()
except (EOFError, KeyboardInterrupt):
return None
return val if val else None
def tui_pick_file() -> Optional[Tuple[str, str]]:
"""Interactive file/input picker. Returns (method, value) or None.
method is one of: 'file', 'sub', 'template'.
For 'file': value is the file path.
For 'sub': value is the subscription URL.
For 'template': value is 'template_uri|||address_file_path'.
"""
enable_ansi()
files = find_config_files()
while True:
_w(A.CLR + A.HOME + A.HIDE)
cols, rows = term_size()
W = cols - 2
out: List[str] = []
def bx(c: str):
out.append(f"{A.CYN}{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}{A.RST}")
# Single clean box — no internal double-line separators
out.append(f"{A.CYN}{'' * W}{A.RST}")
title = f"{A.BOLD}{A.WHT}cfray{A.RST} {A.DIM}v{VERSION}{A.RST}"
subtitle = f"{A.DIM}Cloudflare Config Scanner{A.RST}"
bx(title + " " + subtitle)
bx("")
# Section: Local Files
bx(f" {A.DIM}── {A.BOLD}{A.WHT}📁 LOCAL FILES{A.RST} {A.DIM}{'' * max(1, W - 19)}{A.RST}")
if files:
for i, (path, ftype, count) in enumerate(files[:9]):
num = f" {A.CYN}{A.BOLD}{i + 1}{A.RST}."
name = os.path.basename(path)
desc = f"{A.DIM}{ftype}, {count} entries{A.RST}"
bx(f" {num} 📄 {name:<28} {desc}")
else:
bx(f" {A.DIM}No config files found in current directory{A.RST}")
bx(f" {A.DIM}Drop .txt or .json files here, or use options below{A.RST}")
bx("")
# Section: Remote Sources
bx(f" {A.DIM}── {A.BOLD}{A.WHT}🌐 REMOTE SOURCES{A.RST} {A.DIM}{'' * max(1, W - 22)}{A.RST}")
bx(f" {A.CYN}{A.BOLD}s{A.RST}. 🔗 {A.WHT}Subscription URL{A.RST} {A.DIM}Fetch configs from remote URL{A.RST}")
bx(f" {A.CYN}{A.BOLD}p{A.RST}. 📂 {A.WHT}Enter File Path{A.RST} {A.DIM}Load from custom file path{A.RST}")
bx("")
# Section: Tools
bx(f" {A.DIM}── {A.BOLD}{A.WHT}🔧 TOOLS{A.RST} {A.DIM}{'' * max(1, W - 13)}{A.RST}")
bx(f" {A.CYN}{A.BOLD}t{A.RST}. 🧩 {A.WHT}Template + Addresses{A.RST} {A.DIM}Test one config against many IPs{A.RST}")
bx(f" {A.CYN}{A.BOLD}f{A.RST}. 🔍 {A.WHT}Clean IP Finder{A.RST} {A.DIM}Scan Cloudflare IP ranges{A.RST}")
bx("")
bx(f" {A.DIM}{'' * (W - 2)}{A.RST}")
bx(f" {A.DIM}[h] ❓ Help [q] 🚪 Quit{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
_w("\n".join(out) + "\n")
_fl()
key = _read_key_blocking()
if key in ("q", "ctrl-c", "esc"):
_w(A.SHOW)
_fl()
return None
if key == "h":
tui_show_guide()
files = find_config_files()
continue
if key == "p":
path = _tui_prompt_text("Enter file path:")
if path is None:
continue
if os.path.isfile(path):
return ("file", path)
_w(f" {A.RED}File not found.{A.RST}\n")
_fl()
time.sleep(1)
continue
if key == "s":
_w(A.SHOW)
_w(f"\n {A.BOLD}{A.CYN}Subscription URL{A.RST}\n")
_w(f" {A.DIM}Paste a URL that contains VLESS/VMess configs (plain text or base64).{A.RST}\n")
_w(f" {A.DIM}Example: https://example.com/sub.txt{A.RST}\n\n")
_fl()
url = _tui_prompt_text("URL:")
if url is None:
continue
if not url.lower().startswith(("http://", "https://")):
_w(f" {A.RED}URL must start with http:// or https://{A.RST}\n")
_fl()
time.sleep(1.5)
continue
return ("sub", url)
if key == "t":
_w(A.SHOW)
_w(f"\n {A.BOLD}{A.CYN}Template + Address List{A.RST}\n")
_w(f" {A.DIM}This mode takes ONE working config and a list of Cloudflare IPs/domains.{A.RST}\n")
_w(f" {A.DIM}It replaces the address in your config with each IP from the list,{A.RST}\n")
_w(f" {A.DIM}then tests all of them to find the fastest.{A.RST}\n\n")
_w(f" {A.BOLD}Step 1:{A.RST} {A.CYN}Paste your VLESS/VMess config URI:{A.RST}\n")
_w(f" {A.DIM}(a full vless://... or vmess://... URI){A.RST}\n ")
_fl()
try:
tpl = input().strip()
except (EOFError, KeyboardInterrupt):
continue
if not tpl or not parse_config(tpl):
_w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n")
_fl()
time.sleep(1.5)
continue
_w(f"\n {A.BOLD}Step 2:{A.RST} {A.CYN}Enter path to address list file:{A.RST}\n")
_w(f" {A.DIM}(a .txt file with one IP or domain per line){A.RST}\n")
_fl()
addr_path = _tui_prompt_text("Path:")
if addr_path is None:
continue
if not os.path.isfile(addr_path):
_w(f" {A.RED}File not found.{A.RST}\n")
_fl()
time.sleep(1)
continue
return ("template", f"{tpl}|||{addr_path}")
if key == "f":
return ("find_clean", "")
if key.isdigit() and 1 <= int(key) <= len(files):
return ("file", files[int(key) - 1][0])
def tui_pick_mode() -> Optional[str]:
"""Interactive mode picker. Returns mode name or None."""
while True:
_w(A.CLR + A.HOME + A.HIDE)
cols, _ = term_size()
lines = draw_menu_header(cols)
lines.append(draw_box_line(f" {A.BOLD}Select scan mode:{A.RST}", cols))
lines.append(draw_box_line("", cols))
modes = [("quick", "1"), ("normal", "2"), ("thorough", "3")]
for name, key in modes:
p = PRESETS[name]
num = f"{A.CYN}{A.BOLD}{key}{A.RST}"
lbl = f"{A.BOLD}{p['label']}{A.RST}"
if name == "normal":
lbl += f" {A.GRN}(recommended){A.RST}"
lines.append(draw_box_line(f" {num} {lbl}", cols))
lines.append(
draw_box_line(f" {A.DIM}{p['desc']}{A.RST}", cols)
)
lines.append(
draw_box_line(
f" {A.DIM}Data: {p['data']} | Est. time: {p['time']}{A.RST}",
cols,
)
)
lines.append(draw_box_line("", cols))
lines.append(draw_box_sep(cols))
lines.append(
draw_box_line(
f" {A.DIM}[1-3] Select [B] Back [Q] Quit{A.RST}", cols
)
)
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
key = _read_key_blocking()
if key in ("q", "ctrl-c"):
_w(A.SHOW)
_fl()
return None
if key == "b":
return "__back__"
if key == "1":
return "quick"
if key == "2" or key == "enter":
return "normal"
if key == "3":
return "thorough"
class Dashboard:
def __init__(self, st: State):
self.st = st
self.sort = "score"
self.offset = 0
self.show_domains = False
def _bar(self, cur: int, tot: int, w: int = 24) -> str:
if tot == 0:
return "" * w
p = min(1.0, cur / tot)
f = int(w * p)
return f"{A.GRN}{'' * f}{A.DIM}{'' * (w - f)}{A.RST}"
def _cscore(self, v: float) -> str:
if v >= 70:
return f"{A.GRN}{v:5.1f}{A.RST}"
if v >= 40:
return f"{A.YEL}{v:5.1f}{A.RST}"
if v > 0:
return f"{A.RED}{v:5.1f}{A.RST}"
return f"{A.DIM} -{A.RST}"
def _speed_str(self, v: float) -> str:
if v <= 0:
return f"{A.DIM} -{A.RST}"
if v >= 1:
return f"{A.GRN}{v:5.1f}{A.RST}"
return f"{A.YEL}{v * 1000:4.0f}K{A.RST}"
def draw(self):
cols, rows = term_size()
W = cols - 2
s = self.st
vis = max(3, rows - 18 - len(s.rounds))
out: List[str] = []
def bx(c: str):
out.append(f"{A.CYN}{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
elapsed = _fmt_elapsed(time.monotonic() - s.start_time) if s.start_time else "0s"
title = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST}"
right = f"{A.DIM}{elapsed} | {s.mode} | ^C stop{A.RST}"
bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right)
out.append(f"{A.CYN}{'' * W}{A.RST}")
fname = os.path.basename(s.input_file)
info = f" {A.DIM}File:{A.RST} {fname} {A.DIM}Configs:{A.RST} {len(s.configs)} {A.DIM}Unique IPs:{A.RST} {len(s.ips)}"
if s.latency_cut_n > 0:
info += f" {A.DIM}Cut:{A.RST} {s.latency_cut_n}"
bx(info)
out.append(f"{A.CYN}{'' * W}{A.RST}")
bw = min(24, W - 55)
if s.phase == "latency":
pct = s.done_count * 100 // max(1, s.total)
bx(f" {A.GRN}{A.RST} {A.BOLD}Latency{A.RST} [{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%")
elif s.alive_n > 0:
cut_info = f" {A.DIM}cut {s.latency_cut_n}{A.RST}" if s.latency_cut_n > 0 else ""
bx(f" {A.GRN}{A.RST} Latency {A.GRN}{s.alive_n} alive{A.RST} {A.DIM}{s.dead_n} dead{A.RST}{cut_info}")
else:
bx(f" {A.DIM}○ Latency waiting...{A.RST}")
for i, rc in enumerate(s.rounds):
rn = i + 1
lbl = f"Speed R{rn} ({rc.label}x{rc.keep})"
if s.cur_round == rn and s.phase.startswith("speed") and not s.finished:
pct = s.done_count * 100 // max(1, s.total)
bx(f" {A.GRN}{A.RST} {A.BOLD}{lbl:<18}{A.RST}[{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%")
elif s.cur_round > rn or (s.cur_round >= rn and s.finished):
bx(f" {A.GRN}{A.RST} {lbl:<18}{A.GRN}done{A.RST}")
else:
bx(f" {A.DIM}{lbl:<18}waiting...{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
parts = []
if s.alive_n > 0:
alats = [r.tls_ms for r in s.res.values() if r.alive and r.tls_ms > 0]
avg_lat = statistics.mean(alats) if alats else 0
parts.append(f"{A.GRN}{s.alive_n}{A.RST} alive")
parts.append(f"{A.RED}{s.dead_n}{A.RST} dead")
if avg_lat:
parts.append(f"{A.DIM}avg latency:{A.RST} {avg_lat:.0f}ms")
if s.best_speed > 0:
parts.append(f"{A.CYN}best:{A.RST} {s.best_speed:.2f} MB/s")
bx(" " + " ".join(parts) if parts else " ")
out.append(f"{A.CYN}{'' * W}{A.RST}")
hdr = f" {A.BOLD}{'#':>3} {'IP':<16} {'Dom':>3} {'Ping':>6} {'Conn':>6}"
for i, rc in enumerate(s.rounds):
hdr += f" {'R' + str(i + 1):>5}"
hdr += f" {'Colo':>4} {'Score':>5}{A.RST}"
bx(hdr)
sep = f" {'' * 3} {'' * 16} {'' * 3} {'' * 6} {'' * 6}"
for _ in s.rounds:
sep += f" {'' * 5}"
sep += f" {'' * 4} {'' * 5}"
bx(f"{A.DIM}{sep}{A.RST}")
results = sorted_all(s, self.sort)
total_results = len(results)
page = results[self.offset : self.offset + vis]
for rank, r in enumerate(page, self.offset + 1):
if not r.alive:
row = f" {A.DIM}{rank:>3} {r.ip:<16} {len(r.domains):>3} {A.RED}{'dead':>6}{A.RST}{A.DIM} {'':>6}"
for j in range(len(s.rounds)):
row += f" {'':>5}"
row += f" {'':>4} {A.RED}{'--':>5}{A.RST}"
bx(row)
continue
tcp = f"{r.tcp_ms:6.0f}" if r.tcp_ms > 0 else f"{A.DIM} -{A.RST}"
tls = f"{r.tls_ms:6.0f}" if r.tls_ms > 0 else f"{A.DIM} -{A.RST}"
row = f" {rank:>3} {r.ip:<16} {len(r.domains):>3} {tcp} {tls}"
for j in range(len(s.rounds)):
if j < len(r.speeds) and r.speeds[j] > 0:
row += f" {self._speed_str(r.speeds[j])}"
else:
row += f" {A.DIM} -{A.RST}"
if r.colo:
cl = f"{r.colo:>4}"
else:
cl = f"{A.DIM} -{A.RST}"
row += f" {cl} {self._cscore(r.score)}"
bx(row)
for _ in range(vis - len(page)):
bx("")
out.append(f"{A.CYN}{'' * W}{A.RST}")
if s.notify and time.monotonic() < s.notify_until:
bx(f" {A.GRN}{A.BOLD}{s.notify}{A.RST}")
elif s.finished:
sort_hint = f"sort:{A.BOLD}{self.sort}{A.RST}"
page_hint = f"{self.offset + 1}-{min(self.offset + vis, total_results)}/{total_results}"
ft = (
f" {A.CYN}[S]{A.RST} {sort_hint} "
f"{A.CYN}[E]{A.RST} Export "
f"{A.CYN}[A]{A.RST} ExportAll "
f"{A.CYN}[C]{A.RST} Configs "
f"{A.CYN}[D]{A.RST} Domains "
f"{A.CYN}[H]{A.RST} Help "
f"{A.CYN}[J/K]{A.RST}"
)
ft2 = (
f" Scroll {A.CYN}[N/P]{A.RST} Page ({page_hint}) "
f"{A.CYN}[B]{A.RST} Back "
f"{A.CYN}[Q]{A.RST} Quit"
)
bx(ft)
bx(ft2)
else:
bx(f" {A.DIM}{s.phase_label}... Press Ctrl+C to stop and export partial results{A.RST}")
out.append(f"{A.CYN}{'' * W}{A.RST}")
_w(A.HOME)
_w("\n".join(out) + "\n")
_fl()
def draw_domain_popup(self, r: Result):
"""Show domains for the selected IP."""
_w(A.CLR)
cols, rows = term_size()
vis = min(len(r.domains), rows - 10)
lines = []
lines.append(f"{A.CYN}{'' * (cols - 2)}{A.RST}")
lines.append(draw_box_line(f" {A.BOLD}Domains for {r.ip} ({len(r.domains)} total){A.RST}", cols))
ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-"
conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-"
lines.append(draw_box_line(f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s}{A.RST}", cols))
lines.append(draw_box_sep(cols))
for d in r.domains[:vis]:
lines.append(draw_box_line(f" {d}", cols))
if len(r.domains) > vis:
lines.append(draw_box_line(f" {A.DIM}...and {len(r.domains) - vis} more{A.RST}", cols))
lines.append(draw_box_sep(cols))
lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
_wait_any_key()
_w(A.CLR) # clear before dashboard redraws
def draw_config_popup(self, r: Result):
"""Show all VLESS/VMess URIs for the selected IP."""
_w(A.CLR)
cols, rows = term_size()
lines = []
lines.append(f"{A.CYN}{'' * (cols - 2)}{A.RST}")
lines.append(draw_box_line(f" {A.BOLD}Configs for {r.ip} ({len(r.uris)} URIs){A.RST}", cols))
ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-"
conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-"
speed_s = f"{r.best_mbps:.1f} MB/s" if r.best_mbps > 0 else "-"
lines.append(draw_box_line(
f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s} | Speed: {speed_s}{A.RST}", cols
))
lines.append(draw_box_sep(cols))
if r.uris:
max_show = rows - 10
for i, uri in enumerate(r.uris[:max_show]):
# Truncate long URIs to fit terminal width
tag = f" {A.CYN}{i+1}.{A.RST} "
max_uri = cols - 8
display = uri if len(uri) <= max_uri else uri[:max_uri - 3] + "..."
lines.append(draw_box_line(f"{tag}{A.GRN}{display}{A.RST}", cols))
if len(r.uris) > max_show:
lines.append(draw_box_line(f" {A.DIM}...and {len(r.uris) - max_show} more{A.RST}", cols))
else:
lines.append(draw_box_line(f" {A.DIM}No VLESS/VMess URIs stored for this IP{A.RST}", cols))
lines.append(draw_box_line(f" {A.DIM}(only available when loaded from URIs or subscriptions){A.RST}", cols))
lines.append(draw_box_sep(cols))
lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
_wait_any_key()
_w(A.CLR)
def draw_help_popup(self):
"""Show keybinding help + column explanations overlay."""
_w(A.CLR)
cols, rows = term_size()
W = min(64, cols - 4)
lines = []
lines.append(f" {A.CYN}{'=' * W}{A.RST}")
lines.append(f" {A.BOLD}{A.WHT} Keyboard Shortcuts{A.RST}")
lines.append(f" {A.CYN}{'-' * W}{A.RST}")
help_items = [
("S", "Cycle sort order: score / latency / speed"),
("E", "Export results (CSV + top N configs)"),
("A", "Export ALL configs sorted best to worst"),
("C", "View VLESS/VMess URIs for an IP (enter rank #)"),
("D", "View domains for an IP (enter rank #)"),
("J / K", "Scroll down / up one row"),
("N / P", "Page down / up"),
("B", "Back to main menu (new scan)"),
("H", "Show this help screen"),
("Q", "Quit (results auto-saved on exit)"),
]
for key, desc in help_items:
lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}")
lines.append("")
lines.append(f" {A.CYN}{'=' * W}{A.RST}")
lines.append(f" {A.BOLD}{A.WHT} Column Guide{A.RST}")
lines.append(f" {A.CYN}{'-' * W}{A.RST}")
col_items = [
("#", "Rank (sorted by current sort order)"),
("IP", "Cloudflare edge IP address"),
("Dom", "How many domains share this IP"),
("Ping", "TCP connect time in ms (like ping)"),
("Conn", "Full connection time in ms (TCP + TLS handshake)"),
("R1,R2..", "Download speed per round (MB/s or KB/s)"),
("Colo", "CF datacenter code (e.g. FRA, IAH, MRS)"),
("Score", "Combined score (0-100, higher = better)"),
]
for key, desc in col_items:
lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}")
lines.append("")
lines.append(f" {A.DIM}Score = Conn latency (35%) + speed (50%) + TTFB (15%){A.RST}")
lines.append(f" {A.DIM}'-' means not tested yet (only top IPs get speed tested){A.RST}")
lines.append(f" {A.CYN}{'=' * W}{A.RST}")
lines.append(f" {A.BOLD}{A.WHT} Made By Sam - SamNet Technologies{A.RST}")
lines.append(f" {A.DIM} https://github.com/SamNet-dev/cfray{A.RST}")
lines.append(f" {A.CYN}{'=' * W}{A.RST}")
lines.append(f" {A.DIM}Press any key to go back{A.RST}")
_w("\n".join(lines) + "\n")
_fl()
_wait_any_key()
_w(A.CLR) # clear before dashboard redraws
def handle(self, key: str) -> Optional[str]:
sorts = ["score", "latency", "speed"]
if key == "s":
idx = sorts.index(self.sort) if self.sort in sorts else 0
self.sort = sorts[(idx + 1) % len(sorts)]
elif key in ("j", "down"):
self.offset = min(self.offset + 1, max(0, len(sorted_all(self.st, self.sort)) - 3))
elif key in ("k", "up"):
self.offset = max(0, self.offset - 1)
elif key == "n":
# page down
_, rows = term_size()
page = max(3, rows - 18 - len(self.st.rounds))
self.offset = min(self.offset + page, max(0, len(sorted_all(self.st, self.sort)) - 3))
elif key == "p":
# page up
_, rows = term_size()
page = max(3, rows - 18 - len(self.st.rounds))
self.offset = max(0, self.offset - page)
elif key == "e":
return "export"
elif key == "a":
return "export-all"
elif key == "c":
return "configs"
elif key == "d":
return "domains"
elif key == "h":
return "help"
elif key == "b":
return "back"
elif key in ("q", "ctrl-c"):
return "quit"
return None
def save_csv(st: State, path: str, sort_by: str = "score"):
results = sorted_alive(st, sort_by)
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
hdr = ["Rank", "IP", "Domains", "Domain_Count", "Ping_ms", "Conn_ms", "TTFB_ms"]
for i, rc in enumerate(st.rounds):
hdr.append(f"R{i + 1}_{rc.label}_MBps")
hdr += ["Best_MBps", "Colo", "Score", "Error"]
w.writerow(hdr)
for rank, r in enumerate(results, 1):
row = [
rank,
r.ip,
"|".join(r.domains[:5]),
len(r.domains),
f"{r.tcp_ms:.1f}" if r.tcp_ms > 0 else "",
f"{r.tls_ms:.1f}" if r.tls_ms > 0 else "",
f"{r.ttfb_ms:.1f}" if r.ttfb_ms > 0 else "",
]
for i in range(len(st.rounds)):
row.append(
f"{r.speeds[i]:.3f}"
if i < len(r.speeds) and r.speeds[i] > 0
else ""
)
row += [
f"{r.best_mbps:.3f}" if r.best_mbps > 0 else "",
r.colo,
f"{r.score:.1f}",
r.error,
]
w.writerow(row)
def save_configs(st: State, path: str, top: int = 50, sort_by: str = "score"):
"""Save top configs. Use top=0 for ALL configs sorted best to worst."""
results = sorted_alive(st, sort_by)
has_uris = any(r.uris for r in results)
limit = top if top > 0 else len(results)
with open(path, "w", encoding="utf-8") as f:
n = 0
for r in results:
if n >= limit:
break
if has_uris:
for uri in r.uris:
f.write(uri + "\n")
n += 1
if n >= limit:
break
else:
# JSON input: write IP and domains as a reference list
doms = ", ".join(r.domains[:3])
extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else ""
f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n")
n += 1
def save_all_configs_sorted(st: State, path: str, sort_by: str = "score"):
"""Save ALL raw configs (every URI) sorted by their IP's score, best to worst."""
results = sorted_alive(st, sort_by)
dead = [r for r in st.res.values() if not r.alive]
has_uris = any(r.uris for r in results)
with open(path, "w", encoding="utf-8") as f:
for r in results:
if has_uris:
for uri in r.uris:
f.write(uri + "\n")
else:
doms = ", ".join(r.domains[:3])
extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else ""
f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n")
for r in dead:
if has_uris:
for uri in r.uris:
f.write(uri + "\n")
else:
doms = ", ".join(r.domains[:3])
f.write(f"{r.ip} # DEAD domains={doms}\n")
RESULTS_DIR = "results"
def _results_path(filename: str) -> str:
"""Return path inside the results/ directory, creating it if needed."""
os.makedirs(RESULTS_DIR, exist_ok=True)
return os.path.join(RESULTS_DIR, filename)
def do_export(
st: State, base_path: str, sort_by: str = "score", top: int = 50,
output_csv: str = "", output_configs: str = "",
) -> Tuple[str, str, str]:
stem = os.path.basename(base_path).rsplit(".", 1)[0] if base_path else "scan"
csv_path = output_csv if output_csv else _results_path(stem + "_results.csv")
if output_configs:
cfg_path = output_configs
elif top <= 0:
cfg_path = _results_path(stem + "_all_sorted.txt")
else:
cfg_path = _results_path(stem + f"_top{top}.txt")
full_path = _results_path(stem + "_full_sorted.txt")
save_csv(st, csv_path, sort_by)
save_configs(st, cfg_path, top, sort_by)
save_all_configs_sorted(st, full_path, sort_by)
st.saved = True
return csv_path, cfg_path, full_path
async def _refresh_loop(dash: Dashboard, st: State):
while not st.finished:
try:
dash.draw()
except Exception:
pass
await asyncio.sleep(0.3)
async def run_scan(st: State, workers: int, speed_workers: int, timeout: float, speed_timeout: float):
"""Run the scan phases with dynamic round sizing."""
try:
os.makedirs("results", exist_ok=True)
with open(DEBUG_LOG, "w") as f:
f.write(f"=== Scan started {time.strftime('%Y-%m-%d %H:%M:%S')} mode={st.mode} ===\n")
except Exception:
pass
st.start_time = time.monotonic()
if not st.interrupted:
await phase1(st, workers, timeout)
if st.interrupted or st.alive_n == 0:
st.finished = True
calc_scores(st)
return
preset = PRESETS.get(st.mode, PRESETS["normal"])
alive = sorted(
(ip for ip, r in st.res.items() if r.alive),
key=lambda ip: st.res[ip].tls_ms,
)
cut_pct = preset.get("latency_cut", 0)
if cut_pct > 0 and len(alive) > 50:
cut_n = max(1, int(len(alive) * cut_pct / 100))
alive = alive[:-cut_n]
st.latency_cut_n = cut_n
_dbg(f"=== Latency cut: removed bottom {cut_pct}% = {cut_n} IPs, {len(alive)} remaining ===")
if not st.rounds:
st.rounds = build_dynamic_rounds(st.mode, len(alive))
_dbg(f"=== Dynamic rounds: {[(r.label, r.keep) for r in st.rounds]} ===")
if not st.interrupted and st.rounds:
rlim = CFRateLimiter()
cands = list(alive)
cdn_host = SPEED_HOST
cdn_path = "" # _dl_one uses default
for i, rc in enumerate(st.rounds):
if st.interrupted:
break
st.cur_round = i + 1
st.phase = f"speed_r{i + 1}"
actual_count = min(rc.keep, len(cands))
st.phase_label = f"Speed R{i + 1} ({rc.label} x {actual_count})"
_dbg(f"=== Round R{i+1}: {rc.size}B x {actual_count} IPs, workers={speed_workers}, timeout={speed_timeout}s, budget={rlim.BUDGET - rlim.count} left ===")
if i > 0:
calc_scores(st)
cands = sorted(cands, key=lambda ip: st.res[ip].score, reverse=True)
cands = cands[: rc.keep]
await phase2_round(
st, rc, cands, speed_workers, speed_timeout,
rlim=rlim, cdn_host=cdn_host, cdn_path=cdn_path,
)
calc_scores(st)
st.finished = True
calc_scores(st)
async def run_tui(args):
"""TUI mode: interactive startup + dashboard."""
enable_ansi()
# Determine initial input source from CLI args
input_method = None # "file", "sub", or "template"
input_value = None
if getattr(args, "sub", None):
input_method, input_value = "sub", args.sub
elif getattr(args, "template", None):
if getattr(args, "input", None):
input_method, input_value = "template", f"{args.template}|||{args.input}"
else:
print("Error: --template requires -i (address list file)")
return
elif getattr(args, "find_clean", False):
input_method, input_value = "find_clean", ""
elif getattr(args, "input", None):
input_method, input_value = "file", args.input
while True: # outer loop: back returns here
interactive = input_method is None
while True:
if input_method is None:
pick = tui_pick_file()
if not pick:
_w(A.SHOW)
return
input_method, input_value = pick
if input_method == "find_clean":
result = await tui_run_clean_finder()
if result is None:
_w(A.SHOW)
return
if result[0] == "__back__":
input_method = None
input_value = None
continue
input_method, input_value = result
mode = args.mode
if not getattr(args, "_mode_set", False) and interactive:
picked = tui_pick_mode()
if not picked:
_w(A.SHOW)
return
if picked == "__back__":
input_method = None
input_value = None
continue
mode = picked
break
st = State()
st.mode = mode
st.top = args.top
if args.rounds:
st.rounds = parse_rounds_str(args.rounds)
elif args.skip_download:
st.rounds = []
# Determine display label for loading screen
if input_method == "sub":
load_label = input_value.split("/")[-1][:40] or "subscription"
elif input_method == "template":
parts = input_value.split("|||", 1)
load_label = os.path.basename(parts[1]) if len(parts) > 1 else "template"
else:
load_label = os.path.basename(input_value)
_w(A.CLR + A.HOME)
cols, _ = term_size()
lines = draw_menu_header(cols)
lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_line(f" {A.CYN}>{A.RST} Loading {load_label}...", cols))
lines.append(draw_box_bottom(cols))
_w("\n".join(lines) + "\n")
_fl()
# Load configs based on input method
if input_method == "sub":
st.configs = fetch_sub(input_value)
st.input_file = input_value
elif input_method == "template":
tpl_uri, addr_path = input_value.split("|||", 1)
addrs = load_addresses(addr_path)
st.configs = generate_from_template(tpl_uri, addrs)
st.input_file = f"{addr_path} ({len(addrs)} addresses)"
else:
st.configs = load_input(input_value)
st.input_file = input_value
if not st.configs:
_w(A.SHOW)
print(f"No configs found in {st.input_file}")
return
_w(A.CLR + A.HOME)
lines = draw_menu_header(cols)
lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols))
lines.append(draw_box_line("", cols))
lines.append(draw_box_line(f" {A.GRN}OK{A.RST} Loaded {len(st.configs)} configs", cols))
lines.append(draw_box_line("", cols))
_w("\n".join(lines) + "\n")
_fl()
st.phase = "dns"
st.phase_label = "Resolving DNS"
try:
await resolve_all(st)
except Exception as e:
_w(A.SHOW + "\n")
print(f"DNS resolution error: {e}")
return
if not st.ips:
_w(A.SHOW + "\n")
print("No IPs resolved — check network or config addresses.")
return
dash = Dashboard(st)
refresh = asyncio.create_task(_refresh_loop(dash, st))
scan_task = asyncio.ensure_future(
run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout)
)
old_sigint = signal.getsignal(signal.SIGINT)
def _sig(sig, frame):
st.interrupted = True
st.finished = True
scan_task.cancel()
signal.signal(signal.SIGINT, _sig)
try:
await scan_task
except asyncio.CancelledError:
st.interrupted = True
st.finished = True
calc_scores(st)
# Restore original SIGINT so Ctrl+C works in post-scan loop
signal.signal(signal.SIGINT, old_sigint)
if refresh:
refresh.cancel()
try:
await refresh
except asyncio.CancelledError:
pass
try:
csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top)
st.notify = f"Saved to results/ folder"
except Exception as e:
csv_p = cfg_p = full_p = ""
st.notify = f"Export error: {e}"
st.notify_until = time.monotonic() + 5
dash.draw()
go_back = False
try:
while True:
key = _read_key_nb(0.1)
if key is None:
# refresh notification timeout
if st.notify and time.monotonic() >= st.notify_until:
st.notify = ""
dash.draw()
continue
act = dash.handle(key)
if act == "quit":
break
elif act == "back":
# show save summary and go to main menu
_w(A.CLR)
save_lines = [
f" {A.CYN}{'=' * 50}{A.RST}",
f" {A.BOLD}{A.WHT} Results saved:{A.RST}",
f" {A.CYN}{'-' * 50}{A.RST}",
f" {A.GRN}CSV:{A.RST} {csv_p}",
f" {A.GRN}Top:{A.RST} {cfg_p}",
f" {A.GRN}Full:{A.RST} {full_p}",
f" {A.CYN}{'=' * 50}{A.RST}",
"",
f" {A.DIM}Press any key to go to main menu...{A.RST}",
]
_w("\n".join(save_lines) + "\n")
_fl()
_wait_any_key()
go_back = True
break
elif act == "export":
try:
csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top)
st.notify = f"Exported to results/ folder"
except Exception as e:
st.notify = f"Export error: {e}"
st.notify_until = time.monotonic() + 4
elif act == "export-all":
try:
csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, 0)
st.notify = f"Exported ALL to results/ folder"
except Exception as e:
st.notify = f"Export error: {e}"
st.notify_until = time.monotonic() + 4
elif act == "configs":
results = sorted_all(st, dash.sort)
if results:
n = _prompt_number(f"{A.CYN}Enter rank # to view configs (1-{len(results)}):{A.RST} ", len(results))
if n is not None:
dash.draw_config_popup(results[n - 1])
elif act == "domains":
results = sorted_all(st, dash.sort)
if results:
n = _prompt_number(f"{A.CYN}Enter rank # to view domains (1-{len(results)}):{A.RST} ", len(results))
if n is not None:
dash.draw_domain_popup(results[n - 1])
elif act == "help":
dash.draw_help_popup()
dash.draw()
except (KeyboardInterrupt, EOFError, OSError):
pass
if go_back:
# reset for next run — clear CLI input so file picker shows
args.input = None
args.sub = None
args.template = None
args._mode_set = False
input_method = None
input_value = None
continue
_w(A.SHOW + "\n")
_fl()
print(f"Results saved to {RESULTS_DIR}/ folder")
break
async def run_headless(args):
"""Headless mode (--no-tui)."""
st = State()
st.input_file = args.input
st.mode = args.mode
if args.rounds:
st.rounds = parse_rounds_str(args.rounds)
elif args.skip_download:
st.rounds = []
print(f"CF Config Scanner v{VERSION}")
st.configs, src = load_configs_from_args(args)
print(f"Loading: {src}")
print(f"Loaded {len(st.configs)} configs")
if not st.configs:
return
print("Resolving DNS...")
await resolve_all(st)
print(f" {len(st.ips)} unique IPs")
if not st.ips:
return
scan_task = asyncio.ensure_future(
run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout)
)
old_sigint = signal.getsignal(signal.SIGINT)
def _sig(sig, frame):
st.interrupted = True
st.finished = True
scan_task.cancel()
signal.signal(signal.SIGINT, _sig)
try:
await scan_task
except asyncio.CancelledError:
st.interrupted = True
st.finished = True
calc_scores(st)
print("\n Interrupted! Exporting partial results...")
signal.signal(signal.SIGINT, old_sigint)
results = sorted_alive(st, "score")
elapsed = _fmt_elapsed(time.monotonic() - st.start_time)
print(f"\nDone in {elapsed}. {st.alive_n} alive IPs.\n")
print(f"{'=' * 95}")
hdr = f"{'#':>4} {'IP':<16} {'Dom':>4} {'Ping ms':>7} {'Conn ms':>7}"
for i in range(len(st.rounds)):
hdr += f" {'R' + str(i + 1) + ' MB/s':>9}"
hdr += f" {'Colo':>5} {'Score':>6}"
print(hdr)
print("=" * 95)
for rank, r in enumerate(results[:50], 1):
tcp = f"{r.tcp_ms:7.1f}" if r.tcp_ms > 0 else " -"
tls = f"{r.tls_ms:7.1f}" if r.tls_ms > 0 else " -"
row = f"{rank:>4} {r.ip:<16} {len(r.domains):>4} {tcp} {tls}"
for j in range(len(st.rounds)):
if j < len(r.speeds) and r.speeds[j] > 0:
row += f" {r.speeds[j]:>9.2f}"
else:
row += " -"
cl = f"{r.colo:>5}" if r.colo else " -"
sc = f"{r.score:>6.1f}" if r.score > 0 else " -"
row += f" {cl} {sc}"
print(row)
try:
csv_p, cfg_p, full_p = do_export(
st, args.input or "scan", top=args.top,
output_csv=getattr(args, "output", "") or "",
output_configs=getattr(args, "output_configs", "") or "",
)
print(f"\nResults saved:")
print(f" CSV: {csv_p}")
print(f" Configs: {cfg_p}")
print(f" Full: {full_p}")
except Exception as e:
print(f"\nError saving results: {e}")
async def run_headless_clean(args):
"""Headless clean IP finder (--find-clean --no-tui)."""
scan_cfg = CLEAN_MODES.get(getattr(args, "clean_mode", "normal"), CLEAN_MODES["normal"])
subnets = CF_SUBNETS
if getattr(args, "subnets", None):
if os.path.isfile(args.subnets):
with open(args.subnets, encoding="utf-8") as f:
subnets = [ln.strip() for ln in f if ln.strip() and not ln.startswith("#")]
else:
subnets = [s.strip() for s in args.subnets.split(",") if s.strip()]
ports = scan_cfg.get("ports", [443])
print(f"CF Config Scanner v{VERSION} — Clean IP Finder")
print(f"Ranges: {len(subnets)} | Sample: {scan_cfg['sample'] or 'all'} | Workers: {scan_cfg['workers']} | Ports: {', '.join(str(p) for p in ports)}")
ips = generate_cf_ips(subnets, scan_cfg["sample"])
total_probes = len(ips) * len(ports)
print(f"Scanning {len(ips):,} IPs × {len(ports)} port(s) = {total_probes:,} probes...")
cs = CleanScanState()
start = time.monotonic()
scan_task = asyncio.ensure_future(
scan_clean_ips(
ips, workers=scan_cfg["workers"], timeout=3.0,
validate=scan_cfg["validate"], cs=cs, ports=ports,
)
)
old_sigint = signal.getsignal(signal.SIGINT)
def _sig(sig, frame):
cs.interrupted = True
scan_task.cancel()
signal.signal(signal.SIGINT, _sig)
last_pct = -1
try:
while not scan_task.done():
pct = cs.done * 100 // max(1, cs.total)
if pct != last_pct and pct % 5 == 0:
print(f" {pct}% ({cs.done:,}/{cs.total:,}) found {cs.found:,} clean")
last_pct = pct
await asyncio.sleep(1)
except (asyncio.CancelledError, Exception):
pass
finally:
signal.signal(signal.SIGINT, old_sigint)
try:
results = await scan_task
except (asyncio.CancelledError, Exception):
results = sorted(cs.all_results or cs.results, key=lambda x: x[1])
elapsed = _fmt_elapsed(time.monotonic() - start)
print(f"\nDone in {elapsed}. Found {len(results):,} clean IPs.\n")
print(f"{'='*50}")
print(f"{'#':>4} {'Address':<22} {'Latency':>8}")
print(f"{'='*50}")
for i, (ip, lat) in enumerate(results[:30]):
print(f"{i+1:>4} {ip:<22} {lat:>6.0f}ms")
if len(results) > 30:
print(f" ...and {len(results)-30:,} more")
if results:
try:
os.makedirs(RESULTS_DIR, exist_ok=True)
path = os.path.abspath(_results_path("clean_ips.txt"))
with open(path, "w", encoding="utf-8") as f:
for ip, lat in results:
f.write(f"{ip}\n")
print(f"\nSaved {len(results):,} IPs to {path}")
except Exception as e:
print(f"\nSave error: {e}")
path = ""
else:
print("\nNo clean IPs found. Nothing saved.")
path = ""
# If --template also given, proceed to speed test
if getattr(args, "template", None) and results:
print(f"\nContinuing to speed test with template...")
addrs = [ip for ip, _ in results]
configs = generate_from_template(args.template, addrs)
if configs:
args.input = path
st = State()
st.input_file = f"clean ({len(results)} IPs)"
st.mode = args.mode
st.configs = configs
if args.rounds:
st.rounds = parse_rounds_str(args.rounds)
elif args.skip_download:
st.rounds = []
print(f"Generated {len(configs)} configs")
print("Resolving DNS...")
await resolve_all(st)
print(f" {len(st.ips)} unique IPs")
if st.ips:
start2 = time.monotonic()
scan2 = asyncio.ensure_future(
run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout)
)
old2 = signal.getsignal(signal.SIGINT)
def _sig2(sig, frame):
st.interrupted = True
st.finished = True
scan2.cancel()
signal.signal(signal.SIGINT, _sig2)
try:
await scan2
except asyncio.CancelledError:
st.interrupted = True
st.finished = True
calc_scores(st)
signal.signal(signal.SIGINT, old2)
alive_results = sorted_alive(st, "score")
elapsed2 = _fmt_elapsed(time.monotonic() - start2)
print(f"\nSpeed test done in {elapsed2}. {st.alive_n} alive.")
print(f"{'='*80}")
for rank, r in enumerate(alive_results[:20], 1):
spd = f"{r.best_mbps:.2f}" if r.best_mbps > 0 else " -"
lat_s = f"{r.tls_ms:.0f}" if r.tls_ms > 0 else " -"
print(f"{rank:>3} {r.ip:<16} {lat_s:>6}ms {spd:>8} MB/s score={r.score:.1f}")
try:
csv_p, cfg_p, full_p = do_export(st, path, top=args.top)
print(f"\nSaved: {csv_p} | {cfg_p} | {full_p}")
except Exception as e:
print(f"Export error: {e}")
def main():
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
p = argparse.ArgumentParser(
description="CF Config Scanner - test VLESS configs for latency + download speed",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Run with no arguments for interactive TUI.
Modes (sort by latency first, then speed-test the best):
quick Cut 50%% latency, 1MB x100 -> 5MB x20 (~200 MB, ~2-3 min)
normal Cut 40%% latency, 1MB x200 -> 5MB x50 -> 20MB x20 (~850 MB, ~5-10 min)
thorough Cut 15%% latency, 5MB xALL -> 25MB x150 -> 100MB x50 (~8-15 GB, ~30-60 min)
Examples:
%(prog)s Interactive TUI
%(prog)s -i configs.txt TUI with file
%(prog)s --sub https://example.com/sub.txt Fetch from subscription URL
%(prog)s --template "vless://UUID@{ip}:443?..." -i addrs.json Generate from template
%(prog)s -i configs.txt --mode quick Quick scan
%(prog)s -i configs.txt --top 0 Export ALL sorted
%(prog)s -i configs.txt --no-tui -o results.csv Headless
%(prog)s --find-clean --no-tui Find clean CF IPs (headless)
%(prog)s --find-clean --no-tui --template "vless://..." Find + speed test
""",
)
p.add_argument("-i", "--input", help="Input file (VLESS URIs or domains.json)")
p.add_argument("--sub", help="Subscription URL (fetches VLESS URIs from URL)")
p.add_argument("--template", help="Base VLESS URI template (use with -i address list)")
p.add_argument("-m", "--mode", choices=["quick", "normal", "thorough"], default="normal")
p.add_argument("--rounds", help='Custom rounds, e.g. "1MB:200,5MB:50,20MB:20"')
p.add_argument("-w", "--workers", type=int, default=LATENCY_WORKERS, help="Latency workers")
p.add_argument("--speed-workers", type=int, default=SPEED_WORKERS, help="Download workers")
p.add_argument("--timeout", type=float, default=LATENCY_TIMEOUT, help="Latency timeout (s)")
p.add_argument("--speed-timeout", type=float, default=SPEED_TIMEOUT, help="Download timeout (s)")
p.add_argument("--skip-download", action="store_true", help="Latency only")
p.add_argument("--top", type=int, default=50, help="Export top N configs (0 = ALL sorted best to worst)")
p.add_argument("--no-tui", action="store_true", help="Plain text output")
p.add_argument("-o", "--output", help="CSV output path (headless)")
p.add_argument("--output-configs", help="Save top VLESS URIs (headless)")
p.add_argument("--find-clean", action="store_true", help="Find clean Cloudflare IPs")
p.add_argument("--clean-mode", choices=["quick", "normal", "full", "mega"], default="normal",
help="Clean IP scan scope (quick=~4K, normal=~12K, full=~1.5M, mega=~3M multi-port)")
p.add_argument("--subnets", help="Custom subnets file or comma-separated CIDRs")
args = p.parse_args()
args._mode_set = any(a == "-m" or a.startswith("--mode") for a in sys.argv)
try:
if getattr(args, "find_clean", False) and args.no_tui:
asyncio.run(run_headless_clean(args))
elif args.no_tui:
if not args.input and not args.sub and not args.template:
p.error("--input, --sub, or --template is required in --no-tui mode")
asyncio.run(run_headless(args))
else:
asyncio.run(run_tui(args))
except KeyboardInterrupt:
pass
finally:
_w(A.SHOW + "\n")
_fl()
if __name__ == "__main__":
main()