#!/usr/bin/env python3 # # ┌─────────────────────────────────────────────────────────────────┐ # │ │ # │ ⚡ CF CONFIG SCANNER v1.0 │ # │ │ # │ Test VLESS/VMess proxy configs for latency + download speed │ # │ │ # │ • Latency test (TCP + TLS) all IPs in seconds │ # │ • Download speed test via progressive funnel │ # │ • Live TUI dashboard with real-time results │ # │ • Smart rate limiting with CDN fallback │ # │ • Clean IP Finder — scan all Cloudflare ranges (up to 3M) │ # │ • Multi-port scanning (443, 8443) for maximum coverage │ # │ • Zero dependencies — Python 3.8+ stdlib only │ # │ │ # │ GitHub: https://github.com/SamNet-dev/cfray │ # │ │ # └─────────────────────────────────────────────────────────────────┘ # # Usage: # python3 scanner.py Interactive TUI # python3 scanner.py -i configs.txt Normal mode # python3 scanner.py --sub https://example.com/sub Fetch from subscription # python3 scanner.py --template "vless://..." -i addrs.json Generate + test # python3 scanner.py --find-clean --no-tui --clean-mode mega Clean IP scan # import asyncio import argparse import base64 import csv import glob as globmod import ipaddress import json import os import random import re import signal import socket import ssl import statistics import sys import time import urllib.parse import urllib.request from collections import defaultdict from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple VERSION = "1.0" SPEED_HOST = "speed.cloudflare.com" SPEED_PATH = "/__down" DEBUG_LOG = os.path.join("results", "debug.log") LOG_MAX_BYTES = 5 * 1024 * 1024 LATENCY_WORKERS = 50 SPEED_WORKERS = 10 LATENCY_TIMEOUT = 5.0 SPEED_TIMEOUT = 30.0 CDN_FALLBACK = ("cloudflaremirrors.com", "/archlinux/iso/latest/archlinux-x86_64.iso") # Cloudflare published IPv4 ranges (https://www.cloudflare.com/ips-v4/) CF_SUBNETS = [ "173.245.48.0/20", "103.21.244.0/22", "103.22.200.0/22", "103.31.4.0/22", "141.101.64.0/18", "108.162.192.0/18", "190.93.240.0/20", "188.114.96.0/20", "197.234.240.0/22", "198.41.128.0/17", "162.158.0.0/15", "104.16.0.0/13", "104.24.0.0/14", "172.64.0.0/13", ] CF_HTTPS_PORTS = [443, 8443, 2053, 2083, 2087, 2096] CLEAN_MODES = { "quick": {"label": "Quick", "sample": 1, "workers": 500, "validate": False, "ports": [443], "desc": "1 random IP per /24 (~4K IPs, ~30s)"}, "normal": {"label": "Normal", "sample": 3, "workers": 500, "validate": True, "ports": [443], "desc": "3 IPs per /24 + CF verify (~12K IPs, ~2 min)"}, "full": {"label": "Full", "sample": 0, "workers": 1000, "validate": True, "ports": [443], "desc": "All IPs + CF verify (~1.5M IPs, 20+ min)"}, "mega": {"label": "Mega", "sample": 0, "workers": 1500, "validate": True, "ports": [443, 8443], "desc": "All IPs × 2 ports (~3M probes, 30-60 min)"}, } PRESETS = { "quick": { "label": "Quick", "desc": "Latency sort -> 1MB top 100 -> 5MB top 20", "dynamic": True, "latency_cut": 50, "round_sizes": [1_000_000, 5_000_000], "round_pcts": [100, 20], "round_min": [50, 10], "round_max": [100, 20], "data": "~200 MB", "time": "~2-3 min", }, "normal": { "label": "Normal", "desc": "Latency sort -> 1MB top 200 -> 5MB top 50 -> 20MB top 20", "dynamic": True, "latency_cut": 40, "round_sizes": [1_000_000, 5_000_000, 20_000_000], "round_pcts": [100, 25, 10], "round_min": [50, 20, 10], "round_max": [200, 50, 20], "data": "~850 MB", "time": "~5-10 min", }, "thorough": { "label": "Thorough", "desc": "Deep funnel: 5MB / 25MB / 50MB", "dynamic": True, "latency_cut": 15, "round_sizes": [5_000_000, 25_000_000, 50_000_000], "round_pcts": [100, 25, 10], "round_min": [0, 30, 15], "round_max": [0, 150, 50], "data": "~5-10 GB", "time": "~20-45 min", }, } class A: RST = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" ITAL = "\033[3m" ULINE = "\033[4m" RED = "\033[31m" GRN = "\033[32m" YEL = "\033[33m" BLU = "\033[34m" MAG = "\033[35m" CYN = "\033[36m" WHT = "\033[97m" BGBL = "\033[44m" BGDG = "\033[100m" HOME = "\033[H" CLR = "\033[H\033[J" EL = "\033[2K" HIDE = "\033[?25l" SHOW = "\033[?25h" _ansi_re = re.compile(r"\033\[[^m]*m") def _dbg(msg: str): """Append a debug line to results/debug.log with rotation.""" try: os.makedirs("results", exist_ok=True) if os.path.exists(DEBUG_LOG): try: sz = os.path.getsize(DEBUG_LOG) if sz > LOG_MAX_BYTES: bak = DEBUG_LOG + ".1" if os.path.exists(bak): os.remove(bak) os.rename(DEBUG_LOG, bak) except Exception: pass with open(DEBUG_LOG, "a", encoding="utf-8") as f: f.write(f"{time.strftime('%H:%M:%S')} {msg}\n") except Exception: pass def _char_width(c: str) -> int: """Return terminal column width of a single character (1 or 2).""" o = ord(c) # Common wide ranges: CJK, emojis, dingbats, symbols, etc. if ( 0x1100 <= o <= 0x115F # Hangul Jamo or 0x2329 <= o <= 0x232A # angle brackets or 0x2E80 <= o <= 0x303E # CJK radicals / ideographic or 0x3040 <= o <= 0x33BF # Hiragana / Katakana / CJK compat or 0x3400 <= o <= 0x4DBF # CJK Unified Extension A or 0x4E00 <= o <= 0xA4CF # CJK Unified / Yi or 0xA960 <= o <= 0xA97C # Hangul Jamo Extended-A or 0xAC00 <= o <= 0xD7A3 # Hangul Syllables or 0xF900 <= o <= 0xFAFF # CJK Compatibility Ideographs or 0xFE10 <= o <= 0xFE6F # CJK compat forms / small forms or 0xFF01 <= o <= 0xFF60 # Fullwidth forms or 0xFFE0 <= o <= 0xFFE6 # Fullwidth signs or 0x1F000 <= o <= 0x1FAFF # Mahjong, Domino, Playing Cards, Emojis, Symbols or 0x20000 <= o <= 0x2FA1F # CJK Unified Extension B-F or 0x2600 <= o <= 0x27BF # Misc symbols, Dingbats or 0x2700 <= o <= 0x27BF # Dingbats or 0xFE00 <= o <= 0xFE0F # Variation selectors (zero-width but paired with emoji) or 0x200D == o # ZWJ (zero-width joiner) or 0x231A <= o <= 0x231B # Watch, Hourglass or 0x23E9 <= o <= 0x23F3 # Various symbols or 0x23F8 <= o <= 0x23FA # Various symbols or 0x25AA <= o <= 0x25AB # Small squares or 0x25B6 == o or 0x25C0 == o # Play buttons or 0x25FB <= o <= 0x25FE # Medium squares or 0x2614 <= o <= 0x2615 # Umbrella, Hot beverage or 0x2648 <= o <= 0x2653 # Zodiac signs or 0x267F == o # Wheelchair or 0x2693 == o # Anchor or 0x26A1 == o # High voltage (⚡) or 0x26AA <= o <= 0x26AB # Circles or 0x26BD <= o <= 0x26BE # Soccer, Baseball or 0x26C4 <= o <= 0x26C5 # Snowman, Sun behind cloud or 0x26D4 == o # No entry or 0x26EA == o # Church or 0x26F2 <= o <= 0x26F3 # Fountain, Golf or 0x26F5 == o # Sailboat or 0x26FA == o # Tent or 0x26FD == o # Fuel pump or 0x2702 == o # Scissors or 0x2705 == o # Check mark or 0x2708 <= o <= 0x270D # Various or 0x270F == o # Pencil or 0x2753 <= o <= 0x2755 # Question marks (❓❔❕) or 0x2757 == o # Exclamation or 0x2795 <= o <= 0x2797 # Plus, Minus, Divide or 0x27B0 == o or 0x27BF == o # Curly loop ): return 2 # Zero-width characters if o in (0xFE0F, 0xFE0E, 0x200D, 0x200B, 0x200C, 0x200E, 0x200F): return 0 return 1 def _vl(s: str) -> int: """Visible length of a string, accounting for ANSI codes and wide chars.""" clean = _ansi_re.sub("", s) return sum(_char_width(c) for c in clean) def _w(text: str): sys.stdout.write(text) def _fl(): sys.stdout.flush() def enable_ansi(): if sys.platform == "win32": os.system("") try: import ctypes k = ctypes.windll.kernel32 h = k.GetStdHandle(-11) m = ctypes.c_ulong() k.GetConsoleMode(h, ctypes.byref(m)) k.SetConsoleMode(h, m.value | 0x0004) except Exception: pass def term_size() -> Tuple[int, int]: try: c, r = os.get_terminal_size() return max(c, 60), max(r, 20) except Exception: return 80, 24 def _read_key_blocking() -> str: """Read a single key press (blocking). Returns key name.""" if sys.platform == "win32": import msvcrt k = msvcrt.getch() if k in (b"\x00", b"\xe0"): k2 = msvcrt.getch() return {b"H": "up", b"P": "down", b"K": "left", b"M": "right"}.get(k2, "") if k == b"\r": return "enter" if k == b"\x03": return "ctrl-c" if k == b"\x1b": return "esc" return k.decode("latin-1", errors="replace") else: import select as _sel import termios, tty fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setcbreak(fd) ch = sys.stdin.read(1) if ch == "\x1b": rdy, _, _ = _sel.select([sys.stdin], [], [], 0.2) if rdy: ch2 = sys.stdin.read(1) if ch2 == "[": rdy2, _, _ = _sel.select([sys.stdin], [], [], 0.2) if rdy2: ch3 = sys.stdin.read(1) return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "esc") return "esc" return "esc" if ch == "\r" or ch == "\n": return "enter" if ch == "\x03": return "ctrl-c" return ch finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def _read_key_nb(timeout: float = 0.05) -> Optional[str]: """Non-blocking key read. Returns None if no key.""" if sys.platform == "win32": import msvcrt if msvcrt.kbhit(): return _read_key_blocking() time.sleep(timeout) return None else: import select import termios, tty fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setcbreak(fd) rdy, _, _ = select.select([sys.stdin], [], [], timeout) if rdy: ch = sys.stdin.read(1) if ch == "\x1b": # Wait for escape sequence bytes (longer timeout for SSH) rdy2, _, _ = select.select([sys.stdin], [], [], 0.2) if rdy2: ch2 = sys.stdin.read(1) if ch2 == "[": rdy3, _, _ = select.select([sys.stdin], [], [], 0.2) if rdy3: ch3 = sys.stdin.read(1) return {"A": "up", "B": "down", "C": "right", "D": "left"}.get(ch3, "") return "" return "esc" # bare Esc key if ch in ("\r", "\n"): return "enter" if ch == "\x03": return "ctrl-c" return ch return None finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def _wait_any_key(): """Simple blocking wait for any keypress. More robust than _read_key_blocking for popups.""" if sys.platform == "win32": import msvcrt msvcrt.getch() else: import termios, tty fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setraw(fd) sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def _prompt_number(prompt: str, max_val: int) -> Optional[int]: """Show prompt, read a number from user. Returns None if cancelled.""" _w(A.SHOW) _w(f"\n {prompt}") _fl() buf = "" if sys.platform == "win32": import msvcrt while True: k = msvcrt.getch() if k == b"\r": break if k == b"\x1b" or k == b"\x03": _w("\n") return None if k == b"\x08" and buf: buf = buf[:-1] _w("\b \b") _fl() continue ch = k.decode("latin-1", errors="replace") if ch.isdigit(): buf += ch _w(ch) _fl() else: import termios, tty fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setcbreak(fd) while True: ch = sys.stdin.read(1) if ch in ("\r", "\n"): break if ch == "\x1b" or ch == "\x03": _w("\n") return None if ch == "\x7f" and buf: # backspace buf = buf[:-1] _w("\b \b") _fl() continue if ch.isdigit(): buf += ch _w(ch) _fl() finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) _w(A.HIDE) if buf and buf.isdigit(): n = int(buf) if 1 <= n <= max_val: return n return None def _fmt_elapsed(secs: float) -> str: m, s = divmod(int(secs), 60) if m > 0: return f"{m}m {s:02d}s" return f"{s}s" @dataclass class ConfigEntry: address: str name: str = "" original_uri: str = "" ip: str = "" @dataclass class RoundCfg: size: int keep: int @property def label(self) -> str: if self.size >= 1_000_000: return f"{self.size // 1_000_000}MB" return f"{self.size // 1000}KB" @dataclass class Result: ip: str domains: List[str] = field(default_factory=list) uris: List[str] = field(default_factory=list) tcp_ms: float = -1 tls_ms: float = -1 ttfb_ms: float = -1 speeds: List[float] = field(default_factory=list) best_mbps: float = -1 colo: str = "" score: float = 0 error: str = "" alive: bool = False class State: def __init__(self): self.input_file = "" self.configs: List[ConfigEntry] = [] self.ip_map: Dict[str, List[ConfigEntry]] = defaultdict(list) self.ips: List[str] = [] self.res: Dict[str, Result] = {} self.rounds: List[RoundCfg] = [] self.mode = "normal" self.phase = "init" self.phase_label = "" self.cur_round = 0 self.total = 0 self.done_count = 0 self.alive_n = 0 self.dead_n = 0 self.best_speed = 0.0 self.start_time = 0.0 self.notify = "" # notification message shown in footer self.notify_until = 0.0 self.top = 50 # export top N (0 = all) self.finished = False self.interrupted = False self.saved = False self.latency_cut_n = 0 # how many IPs were cut after latency phase class CFRateLimiter: """Respects Cloudflare's per-IP rate limit window. CF allows ~600 requests per 10-minute window to speed.cloudflare.com. When 429 is received, retry-after header tells us exactly when the window resets. We track request count and pause when budget runs out or when CF explicitly tells us to wait. """ BUDGET = 550 # conservative limit (CF allows ~600) WINDOW = 600 # 10-minute window in seconds def __init__(self): self.count = 0 self.window_start = 0.0 self.blocked_until = 0.0 self._lock = asyncio.Lock() async def _wait_blocked(self, st: Optional["State"]): """Wait out a 429 block period (called outside lock).""" while time.monotonic() < self.blocked_until: if st and st.interrupted: return left = int(self.blocked_until - time.monotonic()) if st: st.phase_label = f"CF rate limit — resuming in {left}s" await asyncio.sleep(1) async def _wait_budget(self, wait_until: float, st: Optional["State"]): """Wait for window reset when budget exhausted (called outside lock).""" while time.monotonic() < wait_until: if st and st.interrupted: return left = int(wait_until - time.monotonic()) if st: st.phase_label = f"Rate limit ({self.count} reqs) — next window in {left}s" await asyncio.sleep(1) async def acquire(self, st: Optional["State"] = None): """Wait if we're rate-limited, then count a request.""" # Wait out any 429 block first (outside lock so others can also wait) if self.blocked_until > 0 and time.monotonic() < self.blocked_until: _dbg(f"RATE: waiting {self.blocked_until - time.monotonic():.0f}s for CF window reset") await self._wait_blocked(st) await self._lock.acquire() try: # Re-check after acquiring lock if self.blocked_until > 0 and time.monotonic() >= self.blocked_until: self.count = 0 self.window_start = time.monotonic() self.blocked_until = 0.0 now = time.monotonic() if self.window_start == 0.0: self.window_start = now if now - self.window_start >= self.WINDOW: self.count = 0 self.window_start = now if self.count >= self.BUDGET: remaining = self.WINDOW - (now - self.window_start) if remaining > 0: _dbg(f"RATE: budget exhausted ({self.count} reqs), waiting {remaining:.0f}s") wait_until = self.window_start + self.WINDOW saved_window = self.window_start self._lock.release() try: await self._wait_budget(wait_until, st) finally: await self._lock.acquire() # Only reset if no other coroutine already did if self.window_start == saved_window: self.count = 0 self.window_start = time.monotonic() else: self.count = 0 self.window_start = time.monotonic() self.count += 1 finally: self._lock.release() def would_block(self) -> bool: """Check if speed.cloudflare.com is currently rate-limited.""" now = time.monotonic() if self.blocked_until > 0 and now < self.blocked_until: return True if self.window_start > 0 and now - self.window_start < self.WINDOW: if self.count >= self.BUDGET: return True return False def report_429(self, retry_after: int): """CF told us to wait. Set blocked_until so all workers pause. Cap at 600s (10 min) — CF's actual window is 10 min but it sends punitive retry-after (3600+) after repeated violations.""" capped = min(max(retry_after, 30), 600) until = time.monotonic() + capped if until > self.blocked_until: self.blocked_until = until _dbg(f"RATE: 429 received (retry-after={retry_after}s, capped={capped}s)") def build_dynamic_rounds(mode: str, alive_count: int) -> List[RoundCfg]: """Build round configs dynamically based on mode and alive IP count.""" preset = PRESETS.get(mode, PRESETS["normal"]) if not preset.get("dynamic"): return [RoundCfg(1_000_000, alive_count)] sizes = preset["round_sizes"] pcts = preset["round_pcts"] mins = preset["round_min"] maxs = preset["round_max"] # Small sets (<50 IPs): test ALL in every round — no funnel needed small_set = alive_count <= 50 rounds = [] for size, pct, mn, mx in zip(sizes, pcts, mins, maxs): if small_set: keep = alive_count else: keep = int(alive_count * pct / 100) if pct < 100 else alive_count if mn > 0: keep = max(mn, keep) if mx > 0: keep = min(mx, keep) keep = min(keep, alive_count) if keep > 0: rounds.append(RoundCfg(size, keep)) return rounds def parse_vless(uri: str) -> Optional[ConfigEntry]: uri = uri.strip() if not uri.startswith("vless://"): return None rest = uri[8:] name = "" if "#" in rest: rest, name = rest.rsplit("#", 1) name = urllib.parse.unquote(name) if "?" in rest: rest = rest.split("?", 1)[0] if "@" not in rest: return None _, addr = rest.split("@", 1) if addr.startswith("["): if "]" not in addr: return None address = addr[1 : addr.index("]")] else: address = addr.rsplit(":", 1)[0] return ConfigEntry(address=address, name=name, original_uri=uri.strip()) def parse_vmess(uri: str) -> Optional[ConfigEntry]: uri = uri.strip() if not uri.startswith("vmess://"): return None b64 = uri[8:] if "#" in b64: b64 = b64.split("#", 1)[0] b64 += "=" * (-len(b64) % 4) try: try: raw = base64.b64decode(b64).decode("utf-8", errors="replace") except Exception: raw = base64.urlsafe_b64decode(b64).decode("utf-8", errors="replace") obj = json.loads(raw) if not isinstance(obj, dict): return None except Exception: return None address = str(obj.get("add", "")) if not address: return None name = str(obj.get("ps", "")) return ConfigEntry(address=address, name=name, original_uri=uri.strip()) def parse_config(uri: str) -> Optional[ConfigEntry]: """Try parsing as VLESS or VMess.""" return parse_vless(uri) or parse_vmess(uri) def load_input(path: str) -> List[ConfigEntry]: try: with open(path, "r", encoding="utf-8") as f: raw = f.read() except (FileNotFoundError, PermissionError, OSError) as e: print(f" Error reading {path}: {e}") return [] try: data = json.loads(raw) if isinstance(data, dict) and "data" in data: data = data["data"] out: List[ConfigEntry] = [] for i, e in enumerate(data): d = e.get("domain", "") if d: out.append( ConfigEntry(address=d, name=f"d-{i+1}", ip=e.get("ipv4", "")) ) if out: return out except (json.JSONDecodeError, TypeError, AttributeError): pass out = [] for ln in raw.splitlines(): c = parse_config(ln) if c: out.append(c) return out def fetch_sub(url: str) -> List[ConfigEntry]: """Fetch configs from a subscription URL (base64 or plain VLESS URIs).""" if not url.lower().startswith(("http://", "https://")): print(f" Error: --sub only accepts http:// or https:// URLs") return [] _dbg(f"Fetching subscription: {url}") try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8", errors="replace").strip() except Exception as e: _dbg(f"Subscription fetch failed: {e}") print(f" Error fetching subscription: {e}") return [] try: decoded = base64.b64decode(raw).decode("utf-8", errors="replace") if "://" in decoded: raw = decoded except Exception: pass out = [] for ln in raw.splitlines(): c = parse_config(ln.strip()) if c: out.append(c) _dbg(f"Subscription loaded: {len(out)} configs") return out def generate_from_template(template: str, addresses: List[str]) -> List[ConfigEntry]: """Generate configs by substituting addresses into a VLESS/VMess template.""" out = [] parsed = parse_config(template) if not parsed: return out for i, addr in enumerate(addresses): addr = addr.strip() if not addr: continue # Handle ip:port format (e.g. from multi-port clean scan) addr_ip = addr addr_port = None if ":" in addr and not addr.startswith("["): parts = addr.rsplit(":", 1) if parts[1].isdigit(): addr_ip, addr_port = parts[0], parts[1] uri = re.sub( r"(@)(\[[^\]]+\]|[^:]+)(:|$)", lambda m: m.group(1) + addr_ip + m.group(3), template, count=1, ) if addr_port: # Replace existing port, or insert port if template had none if re.search(r"@[^:/?#]+:\d+", uri): uri = re.sub(r"(@[^:/?#]+:)\d+", lambda m: m.group(1) + addr_port, uri, count=1) else: uri = re.sub(r"(@[^/?#]+)([?/#])", lambda m: m.group(1) + ":" + addr_port + m.group(2), uri, count=1) uri = re.sub(r"#.*$", f"#cfg-{i+1}-{addr_ip[:20]}", uri) c = parse_config(uri) if c: out.append(c) return out def load_addresses(path: str) -> List[str]: """Load address list from JSON array or plain text (one per line).""" try: with open(path, "r", encoding="utf-8") as f: raw = f.read() except (FileNotFoundError, PermissionError, OSError) as e: print(f" Error reading {path}: {e}") return [] try: data = json.loads(raw) if isinstance(data, list): return [str(d) for d in data if d] if isinstance(data, dict): for key in ("addresses", "domains", "ips", "data"): if key in data and isinstance(data[key], list): return [str(d) for d in data[key] if d] except (json.JSONDecodeError, TypeError): pass return [ln.strip() for ln in raw.splitlines() if ln.strip()] def _split_to_24s(subnets: List[str]) -> list: """Split CIDR subnets into /24 blocks, deduplicate.""" seen = set() blocks = [] for sub in subnets: try: net = ipaddress.IPv4Network(sub.strip(), strict=False) if net.prefixlen <= 24: for block in net.subnets(new_prefix=24): key = int(block.network_address) if key not in seen: seen.add(key) blocks.append(block) else: key = int(net.network_address) if key not in seen: seen.add(key) blocks.append(net) except (ValueError, TypeError): continue return blocks def generate_cf_ips(subnets: List[str], sample_per_24: int = 0) -> List[str]: """Generate IPs from CIDR subnets. sample_per_24=0 means all hosts.""" blocks = _split_to_24s(subnets) random.shuffle(blocks) ips = [] for net in blocks: hosts = [str(ip) for ip in net.hosts()] if sample_per_24 > 0 and sample_per_24 < len(hosts): hosts = random.sample(hosts, sample_per_24) ips.extend(hosts) return ips async def _tls_probe( ip: str, sni: str, timeout: float, validate: bool = True, port: int = 443, ) -> Tuple[float, bool, str]: """TLS probe with optional Cloudflare header validation. Returns (latency_ms, is_cloudflare, error).""" w = None try: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE t0 = time.monotonic() r, w = await asyncio.wait_for( asyncio.open_connection(ip, port, ssl=ctx, server_hostname=sni), timeout=timeout, ) tls_ms = (time.monotonic() - t0) * 1000 is_cf = True if validate: is_cf = False try: req = f"GET / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n" w.write(req.encode()) await w.drain() hdr = await asyncio.wait_for(r.read(2048), timeout=min(timeout, 3)) htxt = hdr.decode("latin-1", errors="replace").lower() is_cf = "server: cloudflare" in htxt or "cf-ray:" in htxt except Exception: pass w.close() try: await w.wait_closed() except Exception: pass w = None return tls_ms, is_cf, "" except asyncio.TimeoutError: return -1, False, "timeout" except Exception as e: return -1, False, str(e)[:40] finally: if w: try: w.close() except Exception: pass @dataclass class CleanScanState: """State for clean IP scanning progress.""" total: int = 0 done: int = 0 found: int = 0 interrupted: bool = False results: List[Tuple[str, float]] = field(default_factory=list) # top 20 for display all_results: List[Tuple[str, float]] = field(default_factory=list) # full reference start_time: float = 0.0 async def scan_clean_ips( ips: List[str], sni: str = "speed.cloudflare.com", workers: int = 500, timeout: float = 3.0, validate: bool = True, cs: Optional[CleanScanState] = None, ports: Optional[List[int]] = None, ) -> List[Tuple[str, float]]: """Scan IPs for TLS + optional CF validation. Returns [(addr, latency_ms)] sorted. addr is 'ip' for port 443, or 'ip:port' for other ports.""" if ports is None: ports = [443] sem = asyncio.Semaphore(workers) results: List[Tuple[str, float]] = [] lock = asyncio.Lock() total_probes = len(ips) * len(ports) if cs: cs.total = total_probes cs.done = 0 cs.found = 0 cs.start_time = time.monotonic() async def probe(ip: str, port: int): if cs and cs.interrupted: return async with sem: if cs and cs.interrupted: return lat, is_cf, _err = await _tls_probe(ip, sni, timeout, validate, port) if lat > 0 and is_cf: addr = ip if port == 443 else f"{ip}:{port}" async with lock: results.append((addr, lat)) if cs: cs.found += 1 cs.all_results = results # full reference for Ctrl+C recovery if cs.found % 10 == 0 or cs.found <= 20: cs.results = sorted(results, key=lambda x: x[1])[:20] if cs: cs.done += 1 # Build flat list of (ip, port) pairs probes = [(ip, p) for ip in ips for p in ports] random.shuffle(probes) # spread ports across batches for better coverage BATCH = 50_000 for i in range(0, len(probes), BATCH): if cs and cs.interrupted: break batch = probes[i : i + BATCH] tasks = [asyncio.ensure_future(probe(ip, port)) for ip, port in batch] try: await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: break finally: for t in tasks: if not t.done(): t.cancel() results.sort(key=lambda x: x[1]) return results def load_configs_from_args(args) -> Tuple[List[ConfigEntry], str]: """Load configs based on CLI args. Returns (configs, source_label).""" if getattr(args, "sub", None): configs = fetch_sub(args.sub) return configs, args.sub if getattr(args, "template", None): if not getattr(args, "input", None): return [], "ERROR: --template requires -i (address list file)" addrs = load_addresses(args.input) configs = generate_from_template(args.template, addrs) return configs, f"{args.input} ({len(addrs)} addresses)" if getattr(args, "input", None): configs = load_input(args.input) return configs, args.input return [], "" def parse_size(s: str) -> int: s = s.strip().upper() m = re.match(r"^(\d+(?:\.\d+)?)\s*(MB|KB|GB|B)?$", s) if not m: try: return max(1, int(s)) except ValueError: return 1_000_000 # default 1MB n = float(m.group(1)) u = m.group(2) or "B" mul = {"B": 1, "KB": 1_000, "MB": 1_000_000, "GB": 1_000_000_000} return max(1, int(n * mul.get(u, 1))) def parse_rounds_str(s: str) -> List[RoundCfg]: out = [] for p in s.split(","): p = p.strip() if ":" in p: sz, top = p.split(":", 1) try: out.append(RoundCfg(parse_size(sz), int(top))) except ValueError: pass # skip malformed round return out def find_config_files() -> List[Tuple[str, str, int]]: """Find config files in cwd. Returns [(path, type, count)].""" results = [] for pat in ("*.txt", "*.json", "*.conf", "*.lst"): for p in globmod.glob(pat): try: with open(p, "r", encoding="utf-8", errors="replace") as f: head = f.read(2048) count = 0 json_ok = False if head.strip().startswith("{") or head.strip().startswith("["): try: with open(p, encoding="utf-8") as jf: d = json.loads(jf.read()) if isinstance(d, dict) and "data" in d: d = d["data"] if isinstance(d, list): count = len(d) results.append((p, "json", count)) json_ok = True except Exception: pass if not json_ok and ("vless://" in head or "vmess://" in head): with open(p, encoding="utf-8") as f: count = sum(1 for ln in f if ln.strip().startswith(("vless://", "vmess://"))) results.append((p, "configs", count)) except Exception: pass results.sort(key=lambda x: x[2], reverse=True) return results async def _resolve(e: ConfigEntry, sem: asyncio.Semaphore, counter: List[int]) -> ConfigEntry: if e.ip: counter[0] += 1 return e async with sem: try: loop = asyncio.get_running_loop() info = await loop.getaddrinfo(e.address, 443, family=socket.AF_INET) if info: e.ip = info[0][4][0] except Exception: e.ip = "" counter[0] += 1 return e async def resolve_all(st: State, workers: int = 100): sem = asyncio.Semaphore(workers) counter = [0] # mutable for closure total = len(st.configs) async def _progress(): spin = "|/-\\" i = 0 while counter[0] < total: s = spin[i % len(spin)] pct = counter[0] * 100 // max(1, total) _w(f"\r {A.CYN}{s}{A.RST} Resolving DNS... {counter[0]}/{total} ({pct}%) ") _fl() i += 1 await asyncio.sleep(0.15) _w(f"\r {A.GRN}OK{A.RST} Resolved {total} domains -> {len(set(c.ip for c in st.configs if c.ip))} unique IPs\n") _fl() prog_task = asyncio.create_task(_progress()) try: st.configs = list(await asyncio.gather(*[_resolve(c, sem, counter) for c in st.configs])) finally: prog_task.cancel() try: await prog_task except asyncio.CancelledError: pass for c in st.configs: if c.ip: st.ip_map[c.ip].append(c) st.ips = list(st.ip_map.keys()) for ip in st.ips: cs = st.ip_map[ip] st.res[ip] = Result( ip=ip, domains=[c.address for c in cs], uris=[c.original_uri for c in cs if c.original_uri], ) async def _lat_one(ip: str, sni: str, timeout: float) -> Tuple[float, float, str]: """Measure TCP RTT and full TLS connection time (TCP+TLS handshake).""" try: t0 = time.monotonic() r, w = await asyncio.wait_for( asyncio.open_connection(ip, 443), timeout=timeout ) tcp = (time.monotonic() - t0) * 1000 w.close() try: await w.wait_closed() except Exception: pass except asyncio.TimeoutError: return -1, -1, "tcp-timeout" except Exception as e: return -1, -1, f"tcp:{str(e)[:50]}" try: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE t0 = time.monotonic() r, w = await asyncio.wait_for( asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=sni), timeout=timeout, ) tls_full = (time.monotonic() - t0) * 1000 # full TCP+TLS time w.close() try: await w.wait_closed() except Exception: pass return tcp, tls_full, "" except asyncio.TimeoutError: return tcp, -1, "tls-timeout" except Exception as e: return tcp, -1, f"tls:{str(e)[:50]}" async def phase1(st: State, workers: int, timeout: float): st.phase = "latency" st.phase_label = "Testing latency" st.total = len(st.ips) st.done_count = 0 sem = asyncio.Semaphore(workers) async def go(ip: str): async with sem: if st.interrupted: return res = st.res[ip] # Use speed.cloudflare.com as SNI — filters out non-CF IPs early # (non-CF IPs will fail TLS since they don't serve this cert) tcp, tls, err = await _lat_one(ip, SPEED_HOST, timeout) res.tcp_ms = tcp res.tls_ms = tls res.error = err res.alive = tls > 0 st.done_count += 1 if res.alive: st.alive_n += 1 else: st.dead_n += 1 tasks = [asyncio.ensure_future(go(ip)) for ip in st.ips] try: await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: pass finally: for t in tasks: if not t.done(): t.cancel() async def _dl_one( ip: str, size: int, timeout: float, host: str = "", path: str = "", ) -> Tuple[float, float, int, str, str]: """Download test. Returns (ttfb_ms, mbps, bytes, colo, error). Error "429" means rate-limited — caller should back off.""" if not host: host = SPEED_HOST if not path: path = f"{SPEED_PATH}?bytes={size}" dl_timeout = max(timeout, 30 + (size / 1_000_000) * 2) conn_timeout = min(timeout, 15) w = None total = 0 dl_start = 0.0 ttfb = 0.0 colo = "" def _cleanup(): nonlocal w if w is not None: try: w.close() except Exception: pass w = None try: ctx = ssl.create_default_context() t_start = time.monotonic() try: t0 = t_start r, w = await asyncio.wait_for( asyncio.open_connection(ip, 443, ssl=ctx, server_hostname=host), timeout=conn_timeout, ) except ssl.SSLCertVerificationError: _cleanup() ctx2 = ssl.create_default_context() ctx2.check_hostname = False ctx2.verify_mode = ssl.CERT_NONE t0 = time.monotonic() r, w = await asyncio.wait_for( asyncio.open_connection( ip, 443, ssl=ctx2, server_hostname=host ), timeout=conn_timeout, ) conn_ms = (time.monotonic() - t0) * 1000 range_hdr = "" if "bytes=" not in path: range_hdr = f"Range: bytes=0-{size - 1}\r\n" req = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"User-Agent: Mozilla/5.0 (X11; Linux x86_64) Chrome/120\r\n" f"Accept: */*\r\n" f"{range_hdr}" f"Connection: close\r\n\r\n" ) w.write(req.encode()) await w.drain() hbuf = b"" while b"\r\n\r\n" not in hbuf: ch = await asyncio.wait_for(r.read(4096), timeout=min(conn_timeout, 10)) if not ch: _dbg(f"DL {ip} {size}: empty response (no headers)") return -1, 0, 0, "", "empty" hbuf += ch if len(hbuf) > 65536: _dbg(f"DL {ip} {size}: header too big") return -1, 0, 0, "", "hdr-too-big" sep = hbuf.index(b"\r\n\r\n") + 4 htxt = hbuf[:sep].decode("latin-1", errors="replace") body0 = hbuf[sep:] status_line = htxt.split("\r\n")[0] status_parts = status_line.split(None, 2) status_code = status_parts[1] if len(status_parts) >= 2 else "" if status_code == "429": ra = "" for line in htxt.split("\r\n"): if line.lower().startswith("retry-after:"): ra = line.split(":", 1)[1].strip() break _dbg(f"DL {ip} {size}: 429 rate-limited (retry-after={ra})") return -1, 0, 0, "", f"429:{ra}" if status_code not in ("200", "206"): _dbg(f"DL {ip} {size}: HTTP error: {status_line[:80]}") return -1, 0, 0, "", f"http:{status_line[:40]}" for line in htxt.split("\r\n"): if line.lower().startswith("cf-ray:"): ray = line.split(":", 1)[1].strip() if "-" in ray: colo = ray.rsplit("-", 1)[-1] break ttfb = (time.monotonic() - t0) * 1000 - conn_ms dl_start = time.monotonic() total = len(body0) sample_interval = 1_000_000 if size >= 5_000_000 else size + 1 next_sample = sample_interval samples: List[Tuple[int, float]] = [] min_for_stable = min(size // 2, 20_000_000) if size >= 5_000_000 else size min_samples = 5 if size >= 10_000_000 else 3 while True: try: elapsed_total = time.monotonic() - t_start left = max(1.0, dl_timeout - elapsed_total) ch = await asyncio.wait_for(r.read(65536), timeout=min(left, 10)) if not ch: break total += len(ch) if total >= next_sample: elapsed = time.monotonic() - dl_start samples.append((total, elapsed)) next_sample += sample_interval # only check stability after enough data downloaded if len(samples) >= min_samples and total >= min_for_stable: recent = samples[-4:] sp = [] for j in range(1, len(recent)): db = recent[j][0] - recent[j - 1][0] dt = recent[j][1] - recent[j - 1][1] if dt > 0: sp.append(db / dt) if len(sp) >= 2: mn = statistics.mean(sp) if mn > 0: try: sd = statistics.stdev(sp) if sd / mn < 0.10: break except statistics.StatisticsError: pass except asyncio.TimeoutError: break except Exception: break dl_t = time.monotonic() - dl_start mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 _dbg(f"DL {ip} {size}: OK {mbps:.2f}MB/s total={total} dt={dl_t:.1f}s host={host}") return ttfb, mbps, total, colo, "" except asyncio.TimeoutError: if total > 0 and dl_start > 0: dl_t = time.monotonic() - dl_start mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 _dbg(f"DL {ip} {size}: TIMEOUT partial={total}B mbps={mbps:.2f} dt={dl_t:.1f}s") if mbps > 0: return ttfb, mbps, total, colo, "" _dbg(f"DL {ip} {size}: TIMEOUT no data total={total}") return -1, 0, 0, "", "timeout" except Exception as e: if total > 0 and dl_start > 0: dl_t = time.monotonic() - dl_start mbps = (total / 1_000_000) / dl_t if dl_t > 0 else 0 _dbg(f"DL {ip} {size}: ERR partial={total}B mbps={mbps:.2f} err={e}") if mbps > 0: return ttfb, mbps, total, colo, "" _dbg(f"DL {ip} {size}: ERR no data err={e}") return -1, 0, 0, "", str(e)[:60] finally: _cleanup() async def phase2_round( st: State, rcfg: RoundCfg, candidates: List[str], workers: int, timeout: float, rlim: Optional[CFRateLimiter] = None, cdn_host: str = "", cdn_path: str = "", ): st.total = len(candidates) st.done_count = 0 if rcfg.size >= 50_000_000: workers = min(workers, 6) elif rcfg.size >= 10_000_000: workers = min(workers, 8) sem = asyncio.Semaphore(workers) max_retries = 2 async def go(ip: str): best_mbps_this = 0.0 best_ttfb = -1.0 best_colo = "" last_err = "" force_cdn = False # set True when CF rejects (403/429) for attempt in range(max_retries): if st.interrupted: break # Pick endpoint: speed.cloudflare.com if budget available, else fallback CDN use_host = cdn_host use_path = cdn_path if force_cdn and CDN_FALLBACK: use_host, use_path = CDN_FALLBACK _dbg(f"DL {ip}: forced fallback CDN {use_host}") elif rlim and rlim.would_block() and CDN_FALLBACK: use_host, use_path = CDN_FALLBACK _dbg(f"DL {ip}: using fallback CDN {use_host}") elif rlim: await rlim.acquire(st) # acquire sem for the actual download await sem.acquire() try: if st.interrupted: break ttfb, mbps, _total, colo, err = await _dl_one( ip, rcfg.size, timeout, host=use_host, path=use_path, ) finally: sem.release() # free slot immediately after download if mbps > 0: best_mbps_this = mbps best_ttfb = ttfb best_colo = colo break # 429 from speed.cloudflare.com: report + force CDN on retry if err.startswith("429") and use_host == SPEED_HOST: ra_str = err.split(":", 1)[1] if ":" in err else "" try: ra = int(ra_str) except (ValueError, TypeError): ra = 60 if rlim: rlim.report_429(ra) _dbg(f"DL {ip}: 429 reported to limiter (retry-after={ra})") force_cdn = True # 403 from speed.cloudflare.com: CF rejected size, force CDN elif err.startswith("http:") and use_host == SPEED_HOST: _dbg(f"DL {ip}: {err} from CF, switching to CDN fallback") force_cdn = True # error from fallback CDN elif err.startswith("429") or err.startswith("http:"): _dbg(f"DL {ip}: {err} from {use_host}, will retry") last_err = err res = st.res[ip] res.speeds.append(best_mbps_this) if best_mbps_this > 0: if best_mbps_this > res.best_mbps: res.best_mbps = best_mbps_this if best_ttfb > 0 and (res.ttfb_ms < 0 or best_ttfb < res.ttfb_ms): res.ttfb_ms = best_ttfb if best_colo and not res.colo: res.colo = best_colo if best_mbps_this > st.best_speed: st.best_speed = best_mbps_this elif last_err: res.error = last_err st.done_count += 1 tasks = [asyncio.ensure_future(go(ip)) for ip in candidates] try: await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: pass finally: for t in tasks: if not t.done(): t.cancel() def calc_scores(st: State): has_speed = any(r.best_mbps > 0 for r in st.res.values()) for r in st.res.values(): if not r.alive: r.score = 0 continue lat = max(0, 100 - r.tls_ms / 10) if r.tls_ms > 0 else 0 spd = min(100, r.best_mbps * 20) if r.best_mbps > 0 else 0 ttfb = max(0, 100 - r.ttfb_ms / 5) if r.ttfb_ms > 0 else 0 if r.best_mbps > 0: r.score = round(lat * 0.35 + spd * 0.50 + ttfb * 0.15, 1) elif has_speed: # Speed rounds ran but this IP wasn't tested - rank below tested ones r.score = round(lat * 0.35, 1) else: # No speed rounds at all (latency-only mode) r.score = round(lat, 1) def sorted_alive(st: State, key: str = "score") -> List[Result]: alive = [r for r in st.res.values() if r.alive] if key == "score": alive.sort(key=lambda r: r.score, reverse=True) elif key == "latency": alive.sort(key=lambda r: r.tls_ms) elif key == "speed": alive.sort(key=lambda r: r.best_mbps, reverse=True) return alive def sorted_all(st: State, key: str = "score") -> List[Result]: """Return all results: alive sorted by key, then dead at the bottom.""" alive = sorted_alive(st, key) dead = [r for r in st.res.values() if not r.alive] dead.sort(key=lambda r: r.ip) return alive + dead def draw_menu_header(cols: int) -> List[str]: W = cols - 2 lines = [] lines.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") t = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST} {A.DIM}v{VERSION}{A.RST}" lines.append(f"{A.CYN}║{A.RST}" + t + " " * (W - _vl(t)) + f"{A.CYN}║{A.RST}") lines.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") return lines def draw_box_line(content: str, cols: int) -> str: W = cols - 2 vl = _vl(content) pad = " " * max(0, W - vl) return f"{A.CYN}║{A.RST}{content}{pad}{A.CYN}║{A.RST}" def draw_box_sep(cols: int) -> str: return f"{A.CYN}╠{'═' * (cols - 2)}╣{A.RST}" def draw_box_bottom(cols: int) -> str: return f"{A.CYN}╚{'═' * (cols - 2)}╝{A.RST}" def tui_show_guide(): """Show help/guide screen explaining input formats.""" _w(A.CLR + A.HOME) cols, rows = term_size() lines = draw_menu_header(cols) lines.append(draw_box_line(f" {A.BOLD}{A.WHT}How to prepare input files{A.RST}", cols)) lines.append(draw_box_sep(cols)) guide = [ "", f" {A.BOLD}{A.CYN}Local Files (auto-detected){A.RST}", f" {A.DIM}Place config files in the same directory where you run cfray.{A.RST}", f" {A.DIM}Supported formats: {A.WHT}.txt .json .conf .lst{A.RST}", f" {A.DIM}They will appear automatically under {A.WHT}LOCAL FILES{A.DIM} in the menu.{A.RST}", f" {A.GRN}Example:{A.RST} {A.DIM}cp configs.txt /root/ && cd /root && python3 scanner.py{A.RST}", "", f" {A.BOLD}{A.CYN}[P] Enter File Path{A.RST}", f" {A.DIM}Load a config file from any location by typing its full path.{A.RST}", f" {A.GRN}Example:{A.RST} {A.DIM}/home/user/configs/my_vless.txt{A.RST}", "", f" {A.CYN}{'─' * 46}{A.RST}", "", f" {A.BOLD}{A.CYN}[1-9] VLESS/VMess URI file (.txt){A.RST}", f" {A.DIM}Text file, one URI per line. Can mix VLESS and VMess.{A.RST}", f" {A.GRN}vless://uuid@domain:443?type=ws&host=sni.com&...#name{A.RST}", "", f" {A.BOLD}{A.CYN}[1-9] Domain JSON file (.json){A.RST}", f' {A.DIM}JSON with domain+IP:{A.RST} {A.GRN}{{"data": [{{"domain":"x.ir","ipv4":"1.2.3.4"}}]}}{A.RST}', "", f" {A.BOLD}{A.CYN}[S] Subscription URL{A.RST}", f" {A.DIM}Fetches VLESS/VMess configs from a remote URL (plain or base64).{A.RST}", f" {A.GRN}https://example.com/sub.txt{A.RST}", f" {A.DIM}CLI: python3 scanner.py --sub URL{A.RST}", "", f" {A.BOLD}{A.CYN}[T] Template + Address list{A.RST}", f" {A.DIM}Give one VLESS/VMess template + a file of CF IPs/domains.{A.RST}", f" {A.DIM}Scanner replaces the address for each entry and tests them all.{A.RST}", f" {A.WHT}Template:{A.RST} {A.GRN}vless://uuid@ADDR:443?type=ws&...#name{A.RST}", f" {A.WHT}Addresses:{A.RST} {A.GRN}one IP or domain per line (.txt){A.RST}", f" {A.DIM}CLI: python3 scanner.py --template 'vless://...' -i addrs.txt{A.RST}", "", f" {A.BOLD}{A.CYN}[F] Find Clean Cloudflare IPs{A.RST}", f" {A.DIM}Scans all CF IP ranges to find reachable edge IPs.{A.RST}", f" {A.DIM}Modes: Quick (~4K), Normal (~12K), Full (~1.5M), Mega (~3M multi-port){A.RST}", f" {A.DIM}Mega tests all IPs on ports 443+8443 for maximum coverage.{A.RST}", f" {A.DIM}Found IPs can be saved or used with a template for speed test.{A.RST}", f" {A.DIM}CLI: python3 scanner.py --find-clean --no-tui --clean-mode mega{A.RST}", "", f" {A.CYN}{'─' * 46}{A.RST}", f" {A.BOLD}{A.CYN}How it works:{A.RST}", f" {A.DIM}1. Resolve domains to CF edge IPs, deduplicate{A.RST}", f" {A.DIM}2. Test TCP+TLS latency, cut bottom by latency{A.RST}", f" {A.DIM}3. Speed test top candidates in progressive rounds{A.RST}", f" {A.DIM}4. Score = latency 35% + speed 50% + TTFB 15%{A.RST}", f" {A.DIM}5. Export top configs ranked by score{A.RST}", "", f" {A.BOLD}{A.WHT}Made By Sam - SamNet Technologies{A.RST}", f" {A.DIM}https://github.com/SamNet-dev/cfray{A.RST}", ] # Fit within terminal: header(3) + title(1) + sep(1) + guide + footer(3) max_guide = rows - 8 if max_guide < len(guide): guide = guide[:max_guide] for g in guide: lines.append(draw_box_line(g, cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_sep(cols)) lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() _read_key_blocking() def _clean_pick_mode() -> Optional[str]: """Pick scan scope for clean IP finder. Returns mode or None/'__back__'.""" while True: _w(A.CLR + A.HOME + A.HIDE) cols, _ = term_size() lines = draw_menu_header(cols) lines.append(draw_box_line(f" {A.BOLD}Find Clean Cloudflare IPs{A.RST}", cols)) lines.append(draw_box_line(f" {A.DIM}Scans Cloudflare IP ranges to find reachable edge IPs{A.RST}", cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_sep(cols)) lines.append(draw_box_line(f" {A.BOLD}Select scan scope:{A.RST}", cols)) lines.append(draw_box_line("", cols)) for name, key in [("quick", "1"), ("normal", "2"), ("full", "3"), ("mega", "4")]: cfg = CLEAN_MODES[name] num = f"{A.CYN}{A.BOLD}{key}{A.RST}" lbl = f"{A.BOLD}{cfg['label']}{A.RST}" if name == "normal": lbl += f" {A.GRN}(recommended){A.RST}" lines.append(draw_box_line(f" {num} {lbl}", cols)) desc = cfg["desc"] if len(cfg.get("ports", [])) > 1: desc += f" (ports: {', '.join(str(p) for p in cfg['ports'])})" lines.append(draw_box_line(f" {A.DIM}{desc}{A.RST}", cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_sep(cols)) lines.append(draw_box_line(f" {A.DIM}[1-4] Select [B] Back [Q] Quit{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() key = _read_key_blocking() if key in ("q", "ctrl-c"): return None if key in ("b", "esc"): return "__back__" if key == "1": return "quick" if key == "2" or key == "enter": return "normal" if key == "3": return "full" if key == "4": return "mega" def _draw_clean_progress(cs: CleanScanState): """Draw live progress screen for clean IP scan.""" cols, rows = term_size() W = cols - 2 out: List[str] = [] def bx(c: str): out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") elapsed = _fmt_elapsed(time.monotonic() - cs.start_time) if cs.start_time else "0s" title = f" {A.BOLD}{A.WHT}Finding Clean Cloudflare IPs{A.RST}" right = f"{A.DIM}{elapsed} | ^C stop{A.RST}" bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right) out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") pct = cs.done * 100 // max(1, cs.total) bw = max(1, min(30, W - 40)) filled = int(bw * pct / 100) bar = f"{A.GRN}{'█' * filled}{A.DIM}{'░' * (bw - filled)}{A.RST}" bx(f" Probing [{bar}] {cs.done:,}/{cs.total:,} {pct}%") found_line = f" {A.GRN}Found: {cs.found:,} clean IPs{A.RST}" if cs.results: best_lat = cs.results[0][1] found_line += f" {A.DIM}Best: {best_lat:.0f}ms{A.RST}" bx(found_line) out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") bx(f" {A.BOLD}Top IPs found (by latency):{A.RST}") vis = min(15, rows - 12) if cs.results: for i, (ip, lat) in enumerate(cs.results[:vis]): bx(f" {A.CYN}{i+1:>3}.{A.RST} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}") else: bx(f" {A.DIM}Scanning...{A.RST}") # Fill remaining space used = len(cs.results[:vis]) if cs.results else 1 for _ in range(vis - used): bx("") out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") bx(f" {A.DIM}Press Ctrl+C to stop early and show results{A.RST}") out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") _w(A.HOME) _w("\n".join(out) + "\n") _fl() def _clean_show_results(results: List[Tuple[str, float]], elapsed: str) -> Optional[str]: """Show clean IP results with j/k scrolling. Returns action string or None.""" MAX_SHOW = 300 display = results[:MAX_SHOW] offset = 0 while True: _w(A.CLR + A.HOME + A.HIDE) cols, rows = term_size() lines = draw_menu_header(cols) if results: lines.append(draw_box_line( f" {A.BOLD}{A.GRN}Scan Complete!{A.RST} " f"Found {A.BOLD}{len(results):,}{A.RST} clean IPs in {elapsed}", cols)) else: lines.append(draw_box_line(f" {A.YEL}Scan Complete — no clean IPs found.{A.RST}", cols)) lines.append(draw_box_sep(cols)) if display: # header + separator = 2 rows, footer area = 5 rows, menu header = 3 rows vis = max(5, rows - 13) end = min(len(display), offset + vis) hdr = f" {A.BOLD}{'#':>4} {'Address':<22} {'Latency':>8}{A.RST}" if len(display) > vis: pos = f"{A.DIM}[{offset+1}-{end} of {len(display)}" if len(results) > MAX_SHOW: pos += f", {len(results):,} total" pos += f"]{A.RST}" hdr += " " * max(1, cols - 2 - _vl(hdr) - _vl(pos) - 1) + pos lines.append(draw_box_line(hdr, cols)) lines.append(draw_box_line( f" {A.DIM}{'─'*4} {'─'*22} {'─'*8}{A.RST}", cols)) for i in range(offset, end): ip, lat = display[i] lines.append(draw_box_line( f" {i+1:>4} {ip:<22} {A.GRN}{lat:>6.0f}ms{A.RST}", cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_sep(cols)) ft = "" if results: ft += f" {A.CYN}[S]{A.RST} Save all {A.CYN}[T]{A.RST} Template+SpeedTest " ft += f" {A.CYN}[B]{A.RST} Back" lines.append(draw_box_line(ft, cols)) if display and len(display) > vis: lines.append(draw_box_line( f" {A.DIM}j/↓ down k/↑ up n/p page down/up{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() key = _read_key_blocking() if key in ("b", "esc", "q", "ctrl-c"): return "back" if key in ("j", "down"): vis = max(5, rows - 13) offset = min(offset + 1, max(0, len(display) - vis)) continue if key in ("k", "up"): offset = max(0, offset - 1) continue if key == "n": vis = max(5, rows - 13) offset = min(offset + vis, max(0, len(display) - vis)) continue if key == "p": vis = max(5, rows - 13) offset = max(0, offset - vis) continue if key == "s" and results: return "save" if key == "t" and results: _w(A.SHOW) _w(f"\n {A.BOLD}{A.CYN}Speed Test with Clean IPs{A.RST}\n") _w(f" {A.DIM}Paste a VLESS/VMess config URI. The address in it will be{A.RST}\n") _w(f" {A.DIM}replaced with each clean IP, then all configs get speed-tested.{A.RST}\n\n") _w(f" {A.CYN}Template:{A.RST} ") _fl() try: tpl = input().strip() except (EOFError, KeyboardInterrupt): continue if not tpl or not parse_config(tpl): _w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n") _fl() time.sleep(1.5) continue return f"template:{tpl}" async def tui_run_clean_finder() -> Optional[Tuple[str, str]]: """Run the clean IP finder flow. Returns (input_method, input_value) or None.""" mode = _clean_pick_mode() if mode is None: return None if mode == "__back__": return ("__back__", "") scan_cfg = CLEAN_MODES[mode] # Generate IPs _w(A.CLR + A.HOME) cols, _ = term_size() lines = draw_menu_header(cols) lines.append(draw_box_line( f" {A.BOLD}Generating IPs from {len(CF_SUBNETS)} Cloudflare ranges...{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() ips = generate_cf_ips(CF_SUBNETS, scan_cfg["sample"]) ports = scan_cfg.get("ports", [443]) _dbg(f"CLEAN: Generated {len(ips):,} IPs × {len(ports)} port(s), sample={scan_cfg['sample']}") # Run scan with live progress cs = CleanScanState() scan_task = asyncio.ensure_future( scan_clean_ips( ips, workers=scan_cfg["workers"], timeout=3.0, validate=scan_cfg["validate"], cs=cs, ports=ports, ) ) old_sigint = signal.getsignal(signal.SIGINT) def _sig(sig, frame): cs.interrupted = True scan_task.cancel() signal.signal(signal.SIGINT, _sig) _w(A.CLR + A.HIDE) try: while not scan_task.done(): _draw_clean_progress(cs) await asyncio.sleep(0.3) except (asyncio.CancelledError, Exception): pass finally: signal.signal(signal.SIGINT, old_sigint) try: results = await scan_task except (asyncio.CancelledError, Exception): results = sorted(cs.all_results or cs.results, key=lambda x: x[1]) elapsed = _fmt_elapsed(time.monotonic() - cs.start_time) _dbg(f"CLEAN: Done in {elapsed}. Found {len(results):,} / {len(ips):,}") # Show results and get user action action = _clean_show_results(results, elapsed) if action is None or action == "back": return ("__back__", "") if action == "save": try: os.makedirs(RESULTS_DIR, exist_ok=True) path = os.path.abspath(_results_path("clean_ips.txt")) with open(path, "w", encoding="utf-8") as f: for ip, lat in results: f.write(f"{ip}\n") _w(f"\n {A.GRN}Saved {len(results):,} IPs to {path}{A.RST}\n") except Exception as e: _w(f"\n {A.RED}Save error: {e}{A.RST}\n") _w(f" {A.DIM}Press any key...{A.RST}\n") _fl() _wait_any_key() return ("__back__", "") if action.startswith("template:"): template_uri = action[9:] try: os.makedirs(RESULTS_DIR, exist_ok=True) path = os.path.abspath(_results_path("clean_ips.txt")) with open(path, "w", encoding="utf-8") as f: for ip, lat in results: f.write(f"{ip}\n") except Exception as e: _w(f"\n {A.RED}Save error: {e}{A.RST}\n") _fl() time.sleep(2) return ("__back__", "") return ("template", f"{template_uri}|||{path}") return None def _tui_prompt_text(label: str) -> Optional[str]: """Show cursor, prompt for text input, return stripped text or None.""" _w(A.SHOW) _w(f"\n {A.CYN}{label}{A.RST} ") _fl() try: val = input().strip() except (EOFError, KeyboardInterrupt): return None return val if val else None def tui_pick_file() -> Optional[Tuple[str, str]]: """Interactive file/input picker. Returns (method, value) or None. method is one of: 'file', 'sub', 'template'. For 'file': value is the file path. For 'sub': value is the subscription URL. For 'template': value is 'template_uri|||address_file_path'. """ enable_ansi() files = find_config_files() while True: _w(A.CLR + A.HOME + A.HIDE) cols, rows = term_size() W = cols - 2 out: List[str] = [] def bx(c: str): out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") # Single clean box — no internal double-line separators out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") title = f" ⚡ {A.BOLD}{A.WHT}cfray{A.RST} {A.DIM}v{VERSION}{A.RST}" subtitle = f"{A.DIM}Cloudflare Config Scanner{A.RST}" bx(title + " " + subtitle) bx("") # Section: Local Files bx(f" {A.DIM}── {A.BOLD}{A.WHT}📁 LOCAL FILES{A.RST} {A.DIM}{'─' * max(1, W - 19)}{A.RST}") if files: for i, (path, ftype, count) in enumerate(files[:9]): num = f" {A.CYN}{A.BOLD}{i + 1}{A.RST}." name = os.path.basename(path) desc = f"{A.DIM}{ftype}, {count} entries{A.RST}" bx(f" {num} 📄 {name:<28} {desc}") else: bx(f" {A.DIM}No config files found in current directory{A.RST}") bx(f" {A.DIM}Drop .txt or .json files here, or use options below{A.RST}") bx("") # Section: Remote Sources bx(f" {A.DIM}── {A.BOLD}{A.WHT}🌐 REMOTE SOURCES{A.RST} {A.DIM}{'─' * max(1, W - 22)}{A.RST}") bx(f" {A.CYN}{A.BOLD}s{A.RST}. 🔗 {A.WHT}Subscription URL{A.RST} {A.DIM}Fetch configs from remote URL{A.RST}") bx(f" {A.CYN}{A.BOLD}p{A.RST}. 📂 {A.WHT}Enter File Path{A.RST} {A.DIM}Load from custom file path{A.RST}") bx("") # Section: Tools bx(f" {A.DIM}── {A.BOLD}{A.WHT}🔧 TOOLS{A.RST} {A.DIM}{'─' * max(1, W - 13)}{A.RST}") bx(f" {A.CYN}{A.BOLD}t{A.RST}. 🧩 {A.WHT}Template + Addresses{A.RST} {A.DIM}Test one config against many IPs{A.RST}") bx(f" {A.CYN}{A.BOLD}f{A.RST}. 🔍 {A.WHT}Clean IP Finder{A.RST} {A.DIM}Scan Cloudflare IP ranges{A.RST}") bx("") bx(f" {A.DIM}{'─' * (W - 2)}{A.RST}") bx(f" {A.DIM}[h] ❓ Help [q] 🚪 Quit{A.RST}") out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") _w("\n".join(out) + "\n") _fl() key = _read_key_blocking() if key in ("q", "ctrl-c", "esc"): _w(A.SHOW) _fl() return None if key == "h": tui_show_guide() files = find_config_files() continue if key == "p": path = _tui_prompt_text("Enter file path:") if path is None: continue if os.path.isfile(path): return ("file", path) _w(f" {A.RED}File not found.{A.RST}\n") _fl() time.sleep(1) continue if key == "s": _w(A.SHOW) _w(f"\n {A.BOLD}{A.CYN}Subscription URL{A.RST}\n") _w(f" {A.DIM}Paste a URL that contains VLESS/VMess configs (plain text or base64).{A.RST}\n") _w(f" {A.DIM}Example: https://example.com/sub.txt{A.RST}\n\n") _fl() url = _tui_prompt_text("URL:") if url is None: continue if not url.lower().startswith(("http://", "https://")): _w(f" {A.RED}URL must start with http:// or https://{A.RST}\n") _fl() time.sleep(1.5) continue return ("sub", url) if key == "t": _w(A.SHOW) _w(f"\n {A.BOLD}{A.CYN}Template + Address List{A.RST}\n") _w(f" {A.DIM}This mode takes ONE working config and a list of Cloudflare IPs/domains.{A.RST}\n") _w(f" {A.DIM}It replaces the address in your config with each IP from the list,{A.RST}\n") _w(f" {A.DIM}then tests all of them to find the fastest.{A.RST}\n\n") _w(f" {A.BOLD}Step 1:{A.RST} {A.CYN}Paste your VLESS/VMess config URI:{A.RST}\n") _w(f" {A.DIM}(a full vless://... or vmess://... URI){A.RST}\n ") _fl() try: tpl = input().strip() except (EOFError, KeyboardInterrupt): continue if not tpl or not parse_config(tpl): _w(f" {A.RED}Invalid VLESS/VMess URI.{A.RST}\n") _fl() time.sleep(1.5) continue _w(f"\n {A.BOLD}Step 2:{A.RST} {A.CYN}Enter path to address list file:{A.RST}\n") _w(f" {A.DIM}(a .txt file with one IP or domain per line){A.RST}\n") _fl() addr_path = _tui_prompt_text("Path:") if addr_path is None: continue if not os.path.isfile(addr_path): _w(f" {A.RED}File not found.{A.RST}\n") _fl() time.sleep(1) continue return ("template", f"{tpl}|||{addr_path}") if key == "f": return ("find_clean", "") if key.isdigit() and 1 <= int(key) <= len(files): return ("file", files[int(key) - 1][0]) def tui_pick_mode() -> Optional[str]: """Interactive mode picker. Returns mode name or None.""" while True: _w(A.CLR + A.HOME + A.HIDE) cols, _ = term_size() lines = draw_menu_header(cols) lines.append(draw_box_line(f" {A.BOLD}Select scan mode:{A.RST}", cols)) lines.append(draw_box_line("", cols)) modes = [("quick", "1"), ("normal", "2"), ("thorough", "3")] for name, key in modes: p = PRESETS[name] num = f"{A.CYN}{A.BOLD}{key}{A.RST}" lbl = f"{A.BOLD}{p['label']}{A.RST}" if name == "normal": lbl += f" {A.GRN}(recommended){A.RST}" lines.append(draw_box_line(f" {num} {lbl}", cols)) lines.append( draw_box_line(f" {A.DIM}{p['desc']}{A.RST}", cols) ) lines.append( draw_box_line( f" {A.DIM}Data: {p['data']} | Est. time: {p['time']}{A.RST}", cols, ) ) lines.append(draw_box_line("", cols)) lines.append(draw_box_sep(cols)) lines.append( draw_box_line( f" {A.DIM}[1-3] Select [B] Back [Q] Quit{A.RST}", cols ) ) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() key = _read_key_blocking() if key in ("q", "ctrl-c"): _w(A.SHOW) _fl() return None if key == "b": return "__back__" if key == "1": return "quick" if key == "2" or key == "enter": return "normal" if key == "3": return "thorough" class Dashboard: def __init__(self, st: State): self.st = st self.sort = "score" self.offset = 0 self.show_domains = False def _bar(self, cur: int, tot: int, w: int = 24) -> str: if tot == 0: return "░" * w p = min(1.0, cur / tot) f = int(w * p) return f"{A.GRN}{'█' * f}{A.DIM}{'░' * (w - f)}{A.RST}" def _cscore(self, v: float) -> str: if v >= 70: return f"{A.GRN}{v:5.1f}{A.RST}" if v >= 40: return f"{A.YEL}{v:5.1f}{A.RST}" if v > 0: return f"{A.RED}{v:5.1f}{A.RST}" return f"{A.DIM} -{A.RST}" def _speed_str(self, v: float) -> str: if v <= 0: return f"{A.DIM} -{A.RST}" if v >= 1: return f"{A.GRN}{v:5.1f}{A.RST}" return f"{A.YEL}{v * 1000:4.0f}K{A.RST}" def draw(self): cols, rows = term_size() W = cols - 2 s = self.st vis = max(3, rows - 18 - len(s.rounds)) out: List[str] = [] def bx(c: str): out.append(f"{A.CYN}║{A.RST}" + c + " " * max(0, W - _vl(c)) + f"{A.CYN}║{A.RST}") out.append(f"{A.CYN}╔{'═' * W}╗{A.RST}") elapsed = _fmt_elapsed(time.monotonic() - s.start_time) if s.start_time else "0s" title = f" {A.BOLD}{A.WHT}CF Config Scanner{A.RST}" right = f"{A.DIM}{elapsed} | {s.mode} | ^C stop{A.RST}" bx(title + " " * max(1, W - _vl(title) - _vl(right)) + right) out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") fname = os.path.basename(s.input_file) info = f" {A.DIM}File:{A.RST} {fname} {A.DIM}Configs:{A.RST} {len(s.configs)} {A.DIM}Unique IPs:{A.RST} {len(s.ips)}" if s.latency_cut_n > 0: info += f" {A.DIM}Cut:{A.RST} {s.latency_cut_n}" bx(info) out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") bw = min(24, W - 55) if s.phase == "latency": pct = s.done_count * 100 // max(1, s.total) bx(f" {A.GRN}▶{A.RST} {A.BOLD}Latency{A.RST} [{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%") elif s.alive_n > 0: cut_info = f" {A.DIM}cut {s.latency_cut_n}{A.RST}" if s.latency_cut_n > 0 else "" bx(f" {A.GRN}✓{A.RST} Latency {A.GRN}{s.alive_n} alive{A.RST} {A.DIM}{s.dead_n} dead{A.RST}{cut_info}") else: bx(f" {A.DIM}○ Latency waiting...{A.RST}") for i, rc in enumerate(s.rounds): rn = i + 1 lbl = f"Speed R{rn} ({rc.label}x{rc.keep})" if s.cur_round == rn and s.phase.startswith("speed") and not s.finished: pct = s.done_count * 100 // max(1, s.total) bx(f" {A.GRN}▶{A.RST} {A.BOLD}{lbl:<18}{A.RST}[{self._bar(s.done_count, s.total, bw)}] {s.done_count}/{s.total} {pct}%") elif s.cur_round > rn or (s.cur_round >= rn and s.finished): bx(f" {A.GRN}✓{A.RST} {lbl:<18}{A.GRN}done{A.RST}") else: bx(f" {A.DIM}○ {lbl:<18}waiting...{A.RST}") out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") parts = [] if s.alive_n > 0: alats = [r.tls_ms for r in s.res.values() if r.alive and r.tls_ms > 0] avg_lat = statistics.mean(alats) if alats else 0 parts.append(f"{A.GRN}● {s.alive_n}{A.RST} alive") parts.append(f"{A.RED}● {s.dead_n}{A.RST} dead") if avg_lat: parts.append(f"{A.DIM}avg latency:{A.RST} {avg_lat:.0f}ms") if s.best_speed > 0: parts.append(f"{A.CYN}best:{A.RST} {s.best_speed:.2f} MB/s") bx(" " + " ".join(parts) if parts else " ") out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") hdr = f" {A.BOLD}{'#':>3} {'IP':<16} {'Dom':>3} {'Ping':>6} {'Conn':>6}" for i, rc in enumerate(s.rounds): hdr += f" {'R' + str(i + 1):>5}" hdr += f" {'Colo':>4} {'Score':>5}{A.RST}" bx(hdr) sep = f" {'─' * 3} {'─' * 16} {'─' * 3} {'─' * 6} {'─' * 6}" for _ in s.rounds: sep += f" {'─' * 5}" sep += f" {'─' * 4} {'─' * 5}" bx(f"{A.DIM}{sep}{A.RST}") results = sorted_all(s, self.sort) total_results = len(results) page = results[self.offset : self.offset + vis] for rank, r in enumerate(page, self.offset + 1): if not r.alive: row = f" {A.DIM}{rank:>3} {r.ip:<16} {len(r.domains):>3} {A.RED}{'dead':>6}{A.RST}{A.DIM} {'':>6}" for j in range(len(s.rounds)): row += f" {'':>5}" row += f" {'':>4} {A.RED}{'--':>5}{A.RST}" bx(row) continue tcp = f"{r.tcp_ms:6.0f}" if r.tcp_ms > 0 else f"{A.DIM} -{A.RST}" tls = f"{r.tls_ms:6.0f}" if r.tls_ms > 0 else f"{A.DIM} -{A.RST}" row = f" {rank:>3} {r.ip:<16} {len(r.domains):>3} {tcp} {tls}" for j in range(len(s.rounds)): if j < len(r.speeds) and r.speeds[j] > 0: row += f" {self._speed_str(r.speeds[j])}" else: row += f" {A.DIM} -{A.RST}" if r.colo: cl = f"{r.colo:>4}" else: cl = f"{A.DIM} -{A.RST}" row += f" {cl} {self._cscore(r.score)}" bx(row) for _ in range(vis - len(page)): bx("") out.append(f"{A.CYN}╠{'═' * W}╣{A.RST}") if s.notify and time.monotonic() < s.notify_until: bx(f" {A.GRN}{A.BOLD}{s.notify}{A.RST}") elif s.finished: sort_hint = f"sort:{A.BOLD}{self.sort}{A.RST}" page_hint = f"{self.offset + 1}-{min(self.offset + vis, total_results)}/{total_results}" ft = ( f" {A.CYN}[S]{A.RST} {sort_hint} " f"{A.CYN}[E]{A.RST} Export " f"{A.CYN}[A]{A.RST} ExportAll " f"{A.CYN}[C]{A.RST} Configs " f"{A.CYN}[D]{A.RST} Domains " f"{A.CYN}[H]{A.RST} Help " f"{A.CYN}[J/K]{A.RST}" ) ft2 = ( f" Scroll {A.CYN}[N/P]{A.RST} Page ({page_hint}) " f"{A.CYN}[B]{A.RST} Back " f"{A.CYN}[Q]{A.RST} Quit" ) bx(ft) bx(ft2) else: bx(f" {A.DIM}{s.phase_label}... Press Ctrl+C to stop and export partial results{A.RST}") out.append(f"{A.CYN}╚{'═' * W}╝{A.RST}") _w(A.HOME) _w("\n".join(out) + "\n") _fl() def draw_domain_popup(self, r: Result): """Show domains for the selected IP.""" _w(A.CLR) cols, rows = term_size() vis = min(len(r.domains), rows - 10) lines = [] lines.append(f"{A.CYN}╔{'═' * (cols - 2)}╗{A.RST}") lines.append(draw_box_line(f" {A.BOLD}Domains for {r.ip} ({len(r.domains)} total){A.RST}", cols)) ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-" conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-" lines.append(draw_box_line(f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s}{A.RST}", cols)) lines.append(draw_box_sep(cols)) for d in r.domains[:vis]: lines.append(draw_box_line(f" {d}", cols)) if len(r.domains) > vis: lines.append(draw_box_line(f" {A.DIM}...and {len(r.domains) - vis} more{A.RST}", cols)) lines.append(draw_box_sep(cols)) lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() _wait_any_key() _w(A.CLR) # clear before dashboard redraws def draw_config_popup(self, r: Result): """Show all VLESS/VMess URIs for the selected IP.""" _w(A.CLR) cols, rows = term_size() lines = [] lines.append(f"{A.CYN}╔{'═' * (cols - 2)}╗{A.RST}") lines.append(draw_box_line(f" {A.BOLD}Configs for {r.ip} ({len(r.uris)} URIs){A.RST}", cols)) ping_s = f"{r.tcp_ms:.0f}ms" if r.tcp_ms > 0 else "-" conn_s = f"{r.tls_ms:.0f}ms" if r.tls_ms > 0 else "-" speed_s = f"{r.best_mbps:.1f} MB/s" if r.best_mbps > 0 else "-" lines.append(draw_box_line( f" {A.DIM}Score: {r.score:.1f} | Ping: {ping_s} | Conn: {conn_s} | Speed: {speed_s}{A.RST}", cols )) lines.append(draw_box_sep(cols)) if r.uris: max_show = rows - 10 for i, uri in enumerate(r.uris[:max_show]): # Truncate long URIs to fit terminal width tag = f" {A.CYN}{i+1}.{A.RST} " max_uri = cols - 8 display = uri if len(uri) <= max_uri else uri[:max_uri - 3] + "..." lines.append(draw_box_line(f"{tag}{A.GRN}{display}{A.RST}", cols)) if len(r.uris) > max_show: lines.append(draw_box_line(f" {A.DIM}...and {len(r.uris) - max_show} more{A.RST}", cols)) else: lines.append(draw_box_line(f" {A.DIM}No VLESS/VMess URIs stored for this IP{A.RST}", cols)) lines.append(draw_box_line(f" {A.DIM}(only available when loaded from URIs or subscriptions){A.RST}", cols)) lines.append(draw_box_sep(cols)) lines.append(draw_box_line(f" {A.DIM}Press any key to go back{A.RST}", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() _wait_any_key() _w(A.CLR) def draw_help_popup(self): """Show keybinding help + column explanations overlay.""" _w(A.CLR) cols, rows = term_size() W = min(64, cols - 4) lines = [] lines.append(f" {A.CYN}{'=' * W}{A.RST}") lines.append(f" {A.BOLD}{A.WHT} Keyboard Shortcuts{A.RST}") lines.append(f" {A.CYN}{'-' * W}{A.RST}") help_items = [ ("S", "Cycle sort order: score / latency / speed"), ("E", "Export results (CSV + top N configs)"), ("A", "Export ALL configs sorted best to worst"), ("C", "View VLESS/VMess URIs for an IP (enter rank #)"), ("D", "View domains for an IP (enter rank #)"), ("J / K", "Scroll down / up one row"), ("N / P", "Page down / up"), ("B", "Back to main menu (new scan)"), ("H", "Show this help screen"), ("Q", "Quit (results auto-saved on exit)"), ] for key, desc in help_items: lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}") lines.append("") lines.append(f" {A.CYN}{'=' * W}{A.RST}") lines.append(f" {A.BOLD}{A.WHT} Column Guide{A.RST}") lines.append(f" {A.CYN}{'-' * W}{A.RST}") col_items = [ ("#", "Rank (sorted by current sort order)"), ("IP", "Cloudflare edge IP address"), ("Dom", "How many domains share this IP"), ("Ping", "TCP connect time in ms (like ping)"), ("Conn", "Full connection time in ms (TCP + TLS handshake)"), ("R1,R2..", "Download speed per round (MB/s or KB/s)"), ("Colo", "CF datacenter code (e.g. FRA, IAH, MRS)"), ("Score", "Combined score (0-100, higher = better)"), ] for key, desc in col_items: lines.append(f" {A.CYN}{key:<10}{A.RST} {desc}") lines.append("") lines.append(f" {A.DIM}Score = Conn latency (35%) + speed (50%) + TTFB (15%){A.RST}") lines.append(f" {A.DIM}'-' means not tested yet (only top IPs get speed tested){A.RST}") lines.append(f" {A.CYN}{'=' * W}{A.RST}") lines.append(f" {A.BOLD}{A.WHT} Made By Sam - SamNet Technologies{A.RST}") lines.append(f" {A.DIM} https://github.com/SamNet-dev/cfray{A.RST}") lines.append(f" {A.CYN}{'=' * W}{A.RST}") lines.append(f" {A.DIM}Press any key to go back{A.RST}") _w("\n".join(lines) + "\n") _fl() _wait_any_key() _w(A.CLR) # clear before dashboard redraws def handle(self, key: str) -> Optional[str]: sorts = ["score", "latency", "speed"] if key == "s": idx = sorts.index(self.sort) if self.sort in sorts else 0 self.sort = sorts[(idx + 1) % len(sorts)] elif key in ("j", "down"): self.offset = min(self.offset + 1, max(0, len(sorted_all(self.st, self.sort)) - 3)) elif key in ("k", "up"): self.offset = max(0, self.offset - 1) elif key == "n": # page down _, rows = term_size() page = max(3, rows - 18 - len(self.st.rounds)) self.offset = min(self.offset + page, max(0, len(sorted_all(self.st, self.sort)) - 3)) elif key == "p": # page up _, rows = term_size() page = max(3, rows - 18 - len(self.st.rounds)) self.offset = max(0, self.offset - page) elif key == "e": return "export" elif key == "a": return "export-all" elif key == "c": return "configs" elif key == "d": return "domains" elif key == "h": return "help" elif key == "b": return "back" elif key in ("q", "ctrl-c"): return "quit" return None def save_csv(st: State, path: str, sort_by: str = "score"): results = sorted_alive(st, sort_by) with open(path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) hdr = ["Rank", "IP", "Domains", "Domain_Count", "Ping_ms", "Conn_ms", "TTFB_ms"] for i, rc in enumerate(st.rounds): hdr.append(f"R{i + 1}_{rc.label}_MBps") hdr += ["Best_MBps", "Colo", "Score", "Error"] w.writerow(hdr) for rank, r in enumerate(results, 1): row = [ rank, r.ip, "|".join(r.domains[:5]), len(r.domains), f"{r.tcp_ms:.1f}" if r.tcp_ms > 0 else "", f"{r.tls_ms:.1f}" if r.tls_ms > 0 else "", f"{r.ttfb_ms:.1f}" if r.ttfb_ms > 0 else "", ] for i in range(len(st.rounds)): row.append( f"{r.speeds[i]:.3f}" if i < len(r.speeds) and r.speeds[i] > 0 else "" ) row += [ f"{r.best_mbps:.3f}" if r.best_mbps > 0 else "", r.colo, f"{r.score:.1f}", r.error, ] w.writerow(row) def save_configs(st: State, path: str, top: int = 50, sort_by: str = "score"): """Save top configs. Use top=0 for ALL configs sorted best to worst.""" results = sorted_alive(st, sort_by) has_uris = any(r.uris for r in results) limit = top if top > 0 else len(results) with open(path, "w", encoding="utf-8") as f: n = 0 for r in results: if n >= limit: break if has_uris: for uri in r.uris: f.write(uri + "\n") n += 1 if n >= limit: break else: # JSON input: write IP and domains as a reference list doms = ", ".join(r.domains[:3]) extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else "" f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n") n += 1 def save_all_configs_sorted(st: State, path: str, sort_by: str = "score"): """Save ALL raw configs (every URI) sorted by their IP's score, best to worst.""" results = sorted_alive(st, sort_by) dead = [r for r in st.res.values() if not r.alive] has_uris = any(r.uris for r in results) with open(path, "w", encoding="utf-8") as f: for r in results: if has_uris: for uri in r.uris: f.write(uri + "\n") else: doms = ", ".join(r.domains[:3]) extra = f" (+{len(r.domains) - 3} more)" if len(r.domains) > 3 else "" f.write(f"{r.ip} # score={r.score:.1f} domains={doms}{extra}\n") for r in dead: if has_uris: for uri in r.uris: f.write(uri + "\n") else: doms = ", ".join(r.domains[:3]) f.write(f"{r.ip} # DEAD domains={doms}\n") RESULTS_DIR = "results" def _results_path(filename: str) -> str: """Return path inside the results/ directory, creating it if needed.""" os.makedirs(RESULTS_DIR, exist_ok=True) return os.path.join(RESULTS_DIR, filename) def do_export( st: State, base_path: str, sort_by: str = "score", top: int = 50, output_csv: str = "", output_configs: str = "", ) -> Tuple[str, str, str]: stem = os.path.basename(base_path).rsplit(".", 1)[0] if base_path else "scan" csv_path = output_csv if output_csv else _results_path(stem + "_results.csv") if output_configs: cfg_path = output_configs elif top <= 0: cfg_path = _results_path(stem + "_all_sorted.txt") else: cfg_path = _results_path(stem + f"_top{top}.txt") full_path = _results_path(stem + "_full_sorted.txt") save_csv(st, csv_path, sort_by) save_configs(st, cfg_path, top, sort_by) save_all_configs_sorted(st, full_path, sort_by) st.saved = True return csv_path, cfg_path, full_path async def _refresh_loop(dash: Dashboard, st: State): while not st.finished: try: dash.draw() except Exception: pass await asyncio.sleep(0.3) async def run_scan(st: State, workers: int, speed_workers: int, timeout: float, speed_timeout: float): """Run the scan phases with dynamic round sizing.""" try: os.makedirs("results", exist_ok=True) with open(DEBUG_LOG, "w") as f: f.write(f"=== Scan started {time.strftime('%Y-%m-%d %H:%M:%S')} mode={st.mode} ===\n") except Exception: pass st.start_time = time.monotonic() if not st.interrupted: await phase1(st, workers, timeout) if st.interrupted or st.alive_n == 0: st.finished = True calc_scores(st) return preset = PRESETS.get(st.mode, PRESETS["normal"]) alive = sorted( (ip for ip, r in st.res.items() if r.alive), key=lambda ip: st.res[ip].tls_ms, ) cut_pct = preset.get("latency_cut", 0) if cut_pct > 0 and len(alive) > 50: cut_n = max(1, int(len(alive) * cut_pct / 100)) alive = alive[:-cut_n] st.latency_cut_n = cut_n _dbg(f"=== Latency cut: removed bottom {cut_pct}% = {cut_n} IPs, {len(alive)} remaining ===") if not st.rounds: st.rounds = build_dynamic_rounds(st.mode, len(alive)) _dbg(f"=== Dynamic rounds: {[(r.label, r.keep) for r in st.rounds]} ===") if not st.interrupted and st.rounds: rlim = CFRateLimiter() cands = list(alive) cdn_host = SPEED_HOST cdn_path = "" # _dl_one uses default for i, rc in enumerate(st.rounds): if st.interrupted: break st.cur_round = i + 1 st.phase = f"speed_r{i + 1}" actual_count = min(rc.keep, len(cands)) st.phase_label = f"Speed R{i + 1} ({rc.label} x {actual_count})" _dbg(f"=== Round R{i+1}: {rc.size}B x {actual_count} IPs, workers={speed_workers}, timeout={speed_timeout}s, budget={rlim.BUDGET - rlim.count} left ===") if i > 0: calc_scores(st) cands = sorted(cands, key=lambda ip: st.res[ip].score, reverse=True) cands = cands[: rc.keep] await phase2_round( st, rc, cands, speed_workers, speed_timeout, rlim=rlim, cdn_host=cdn_host, cdn_path=cdn_path, ) calc_scores(st) st.finished = True calc_scores(st) async def run_tui(args): """TUI mode: interactive startup + dashboard.""" enable_ansi() # Determine initial input source from CLI args input_method = None # "file", "sub", or "template" input_value = None if getattr(args, "sub", None): input_method, input_value = "sub", args.sub elif getattr(args, "template", None): if getattr(args, "input", None): input_method, input_value = "template", f"{args.template}|||{args.input}" else: print("Error: --template requires -i (address list file)") return elif getattr(args, "find_clean", False): input_method, input_value = "find_clean", "" elif getattr(args, "input", None): input_method, input_value = "file", args.input while True: # outer loop: back returns here interactive = input_method is None while True: if input_method is None: pick = tui_pick_file() if not pick: _w(A.SHOW) return input_method, input_value = pick if input_method == "find_clean": result = await tui_run_clean_finder() if result is None: _w(A.SHOW) return if result[0] == "__back__": input_method = None input_value = None continue input_method, input_value = result mode = args.mode if not getattr(args, "_mode_set", False) and interactive: picked = tui_pick_mode() if not picked: _w(A.SHOW) return if picked == "__back__": input_method = None input_value = None continue mode = picked break st = State() st.mode = mode st.top = args.top if args.rounds: st.rounds = parse_rounds_str(args.rounds) elif args.skip_download: st.rounds = [] # Determine display label for loading screen if input_method == "sub": load_label = input_value.split("/")[-1][:40] or "subscription" elif input_method == "template": parts = input_value.split("|||", 1) load_label = os.path.basename(parts[1]) if len(parts) > 1 else "template" else: load_label = os.path.basename(input_value) _w(A.CLR + A.HOME) cols, _ = term_size() lines = draw_menu_header(cols) lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_line(f" {A.CYN}>{A.RST} Loading {load_label}...", cols)) lines.append(draw_box_bottom(cols)) _w("\n".join(lines) + "\n") _fl() # Load configs based on input method if input_method == "sub": st.configs = fetch_sub(input_value) st.input_file = input_value elif input_method == "template": tpl_uri, addr_path = input_value.split("|||", 1) addrs = load_addresses(addr_path) st.configs = generate_from_template(tpl_uri, addrs) st.input_file = f"{addr_path} ({len(addrs)} addresses)" else: st.configs = load_input(input_value) st.input_file = input_value if not st.configs: _w(A.SHOW) print(f"No configs found in {st.input_file}") return _w(A.CLR + A.HOME) lines = draw_menu_header(cols) lines.append(draw_box_line(f" {A.BOLD}Starting scan...{A.RST}", cols)) lines.append(draw_box_line("", cols)) lines.append(draw_box_line(f" {A.GRN}OK{A.RST} Loaded {len(st.configs)} configs", cols)) lines.append(draw_box_line("", cols)) _w("\n".join(lines) + "\n") _fl() st.phase = "dns" st.phase_label = "Resolving DNS" try: await resolve_all(st) except Exception as e: _w(A.SHOW + "\n") print(f"DNS resolution error: {e}") return if not st.ips: _w(A.SHOW + "\n") print("No IPs resolved — check network or config addresses.") return dash = Dashboard(st) refresh = asyncio.create_task(_refresh_loop(dash, st)) scan_task = asyncio.ensure_future( run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) ) old_sigint = signal.getsignal(signal.SIGINT) def _sig(sig, frame): st.interrupted = True st.finished = True scan_task.cancel() signal.signal(signal.SIGINT, _sig) try: await scan_task except asyncio.CancelledError: st.interrupted = True st.finished = True calc_scores(st) # Restore original SIGINT so Ctrl+C works in post-scan loop signal.signal(signal.SIGINT, old_sigint) if refresh: refresh.cancel() try: await refresh except asyncio.CancelledError: pass try: csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top) st.notify = f"Saved to results/ folder" except Exception as e: csv_p = cfg_p = full_p = "" st.notify = f"Export error: {e}" st.notify_until = time.monotonic() + 5 dash.draw() go_back = False try: while True: key = _read_key_nb(0.1) if key is None: # refresh notification timeout if st.notify and time.monotonic() >= st.notify_until: st.notify = "" dash.draw() continue act = dash.handle(key) if act == "quit": break elif act == "back": # show save summary and go to main menu _w(A.CLR) save_lines = [ f" {A.CYN}{'=' * 50}{A.RST}", f" {A.BOLD}{A.WHT} Results saved:{A.RST}", f" {A.CYN}{'-' * 50}{A.RST}", f" {A.GRN}CSV:{A.RST} {csv_p}", f" {A.GRN}Top:{A.RST} {cfg_p}", f" {A.GRN}Full:{A.RST} {full_p}", f" {A.CYN}{'=' * 50}{A.RST}", "", f" {A.DIM}Press any key to go to main menu...{A.RST}", ] _w("\n".join(save_lines) + "\n") _fl() _wait_any_key() go_back = True break elif act == "export": try: csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, st.top) st.notify = f"Exported to results/ folder" except Exception as e: st.notify = f"Export error: {e}" st.notify_until = time.monotonic() + 4 elif act == "export-all": try: csv_p, cfg_p, full_p = do_export(st, input_value, dash.sort, 0) st.notify = f"Exported ALL to results/ folder" except Exception as e: st.notify = f"Export error: {e}" st.notify_until = time.monotonic() + 4 elif act == "configs": results = sorted_all(st, dash.sort) if results: n = _prompt_number(f"{A.CYN}Enter rank # to view configs (1-{len(results)}):{A.RST} ", len(results)) if n is not None: dash.draw_config_popup(results[n - 1]) elif act == "domains": results = sorted_all(st, dash.sort) if results: n = _prompt_number(f"{A.CYN}Enter rank # to view domains (1-{len(results)}):{A.RST} ", len(results)) if n is not None: dash.draw_domain_popup(results[n - 1]) elif act == "help": dash.draw_help_popup() dash.draw() except (KeyboardInterrupt, EOFError, OSError): pass if go_back: # reset for next run — clear CLI input so file picker shows args.input = None args.sub = None args.template = None args._mode_set = False input_method = None input_value = None continue _w(A.SHOW + "\n") _fl() print(f"Results saved to {RESULTS_DIR}/ folder") break async def run_headless(args): """Headless mode (--no-tui).""" st = State() st.input_file = args.input st.mode = args.mode if args.rounds: st.rounds = parse_rounds_str(args.rounds) elif args.skip_download: st.rounds = [] print(f"CF Config Scanner v{VERSION}") st.configs, src = load_configs_from_args(args) print(f"Loading: {src}") print(f"Loaded {len(st.configs)} configs") if not st.configs: return print("Resolving DNS...") await resolve_all(st) print(f" {len(st.ips)} unique IPs") if not st.ips: return scan_task = asyncio.ensure_future( run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) ) old_sigint = signal.getsignal(signal.SIGINT) def _sig(sig, frame): st.interrupted = True st.finished = True scan_task.cancel() signal.signal(signal.SIGINT, _sig) try: await scan_task except asyncio.CancelledError: st.interrupted = True st.finished = True calc_scores(st) print("\n Interrupted! Exporting partial results...") signal.signal(signal.SIGINT, old_sigint) results = sorted_alive(st, "score") elapsed = _fmt_elapsed(time.monotonic() - st.start_time) print(f"\nDone in {elapsed}. {st.alive_n} alive IPs.\n") print(f"{'=' * 95}") hdr = f"{'#':>4} {'IP':<16} {'Dom':>4} {'Ping ms':>7} {'Conn ms':>7}" for i in range(len(st.rounds)): hdr += f" {'R' + str(i + 1) + ' MB/s':>9}" hdr += f" {'Colo':>5} {'Score':>6}" print(hdr) print("=" * 95) for rank, r in enumerate(results[:50], 1): tcp = f"{r.tcp_ms:7.1f}" if r.tcp_ms > 0 else " -" tls = f"{r.tls_ms:7.1f}" if r.tls_ms > 0 else " -" row = f"{rank:>4} {r.ip:<16} {len(r.domains):>4} {tcp} {tls}" for j in range(len(st.rounds)): if j < len(r.speeds) and r.speeds[j] > 0: row += f" {r.speeds[j]:>9.2f}" else: row += " -" cl = f"{r.colo:>5}" if r.colo else " -" sc = f"{r.score:>6.1f}" if r.score > 0 else " -" row += f" {cl} {sc}" print(row) try: csv_p, cfg_p, full_p = do_export( st, args.input or "scan", top=args.top, output_csv=getattr(args, "output", "") or "", output_configs=getattr(args, "output_configs", "") or "", ) print(f"\nResults saved:") print(f" CSV: {csv_p}") print(f" Configs: {cfg_p}") print(f" Full: {full_p}") except Exception as e: print(f"\nError saving results: {e}") async def run_headless_clean(args): """Headless clean IP finder (--find-clean --no-tui).""" scan_cfg = CLEAN_MODES.get(getattr(args, "clean_mode", "normal"), CLEAN_MODES["normal"]) subnets = CF_SUBNETS if getattr(args, "subnets", None): if os.path.isfile(args.subnets): with open(args.subnets, encoding="utf-8") as f: subnets = [ln.strip() for ln in f if ln.strip() and not ln.startswith("#")] else: subnets = [s.strip() for s in args.subnets.split(",") if s.strip()] ports = scan_cfg.get("ports", [443]) print(f"CF Config Scanner v{VERSION} — Clean IP Finder") print(f"Ranges: {len(subnets)} | Sample: {scan_cfg['sample'] or 'all'} | Workers: {scan_cfg['workers']} | Ports: {', '.join(str(p) for p in ports)}") ips = generate_cf_ips(subnets, scan_cfg["sample"]) total_probes = len(ips) * len(ports) print(f"Scanning {len(ips):,} IPs × {len(ports)} port(s) = {total_probes:,} probes...") cs = CleanScanState() start = time.monotonic() scan_task = asyncio.ensure_future( scan_clean_ips( ips, workers=scan_cfg["workers"], timeout=3.0, validate=scan_cfg["validate"], cs=cs, ports=ports, ) ) old_sigint = signal.getsignal(signal.SIGINT) def _sig(sig, frame): cs.interrupted = True scan_task.cancel() signal.signal(signal.SIGINT, _sig) last_pct = -1 try: while not scan_task.done(): pct = cs.done * 100 // max(1, cs.total) if pct != last_pct and pct % 5 == 0: print(f" {pct}% ({cs.done:,}/{cs.total:,}) found {cs.found:,} clean") last_pct = pct await asyncio.sleep(1) except (asyncio.CancelledError, Exception): pass finally: signal.signal(signal.SIGINT, old_sigint) try: results = await scan_task except (asyncio.CancelledError, Exception): results = sorted(cs.all_results or cs.results, key=lambda x: x[1]) elapsed = _fmt_elapsed(time.monotonic() - start) print(f"\nDone in {elapsed}. Found {len(results):,} clean IPs.\n") print(f"{'='*50}") print(f"{'#':>4} {'Address':<22} {'Latency':>8}") print(f"{'='*50}") for i, (ip, lat) in enumerate(results[:30]): print(f"{i+1:>4} {ip:<22} {lat:>6.0f}ms") if len(results) > 30: print(f" ...and {len(results)-30:,} more") if results: try: os.makedirs(RESULTS_DIR, exist_ok=True) path = os.path.abspath(_results_path("clean_ips.txt")) with open(path, "w", encoding="utf-8") as f: for ip, lat in results: f.write(f"{ip}\n") print(f"\nSaved {len(results):,} IPs to {path}") except Exception as e: print(f"\nSave error: {e}") path = "" else: print("\nNo clean IPs found. Nothing saved.") path = "" # If --template also given, proceed to speed test if getattr(args, "template", None) and results: print(f"\nContinuing to speed test with template...") addrs = [ip for ip, _ in results] configs = generate_from_template(args.template, addrs) if configs: args.input = path st = State() st.input_file = f"clean ({len(results)} IPs)" st.mode = args.mode st.configs = configs if args.rounds: st.rounds = parse_rounds_str(args.rounds) elif args.skip_download: st.rounds = [] print(f"Generated {len(configs)} configs") print("Resolving DNS...") await resolve_all(st) print(f" {len(st.ips)} unique IPs") if st.ips: start2 = time.monotonic() scan2 = asyncio.ensure_future( run_scan(st, args.workers, args.speed_workers, args.timeout, args.speed_timeout) ) old2 = signal.getsignal(signal.SIGINT) def _sig2(sig, frame): st.interrupted = True st.finished = True scan2.cancel() signal.signal(signal.SIGINT, _sig2) try: await scan2 except asyncio.CancelledError: st.interrupted = True st.finished = True calc_scores(st) signal.signal(signal.SIGINT, old2) alive_results = sorted_alive(st, "score") elapsed2 = _fmt_elapsed(time.monotonic() - start2) print(f"\nSpeed test done in {elapsed2}. {st.alive_n} alive.") print(f"{'='*80}") for rank, r in enumerate(alive_results[:20], 1): spd = f"{r.best_mbps:.2f}" if r.best_mbps > 0 else " -" lat_s = f"{r.tls_ms:.0f}" if r.tls_ms > 0 else " -" print(f"{rank:>3} {r.ip:<16} {lat_s:>6}ms {spd:>8} MB/s score={r.score:.1f}") try: csv_p, cfg_p, full_p = do_export(st, path, top=args.top) print(f"\nSaved: {csv_p} | {cfg_p} | {full_p}") except Exception as e: print(f"Export error: {e}") def main(): if hasattr(sys.stdout, "reconfigure"): try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") except Exception: pass p = argparse.ArgumentParser( description="CF Config Scanner - test VLESS configs for latency + download speed", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Run with no arguments for interactive TUI. Modes (sort by latency first, then speed-test the best): quick Cut 50%% latency, 1MB x100 -> 5MB x20 (~200 MB, ~2-3 min) normal Cut 40%% latency, 1MB x200 -> 5MB x50 -> 20MB x20 (~850 MB, ~5-10 min) thorough Cut 15%% latency, 5MB xALL -> 25MB x150 -> 100MB x50 (~8-15 GB, ~30-60 min) Examples: %(prog)s Interactive TUI %(prog)s -i configs.txt TUI with file %(prog)s --sub https://example.com/sub.txt Fetch from subscription URL %(prog)s --template "vless://UUID@{ip}:443?..." -i addrs.json Generate from template %(prog)s -i configs.txt --mode quick Quick scan %(prog)s -i configs.txt --top 0 Export ALL sorted %(prog)s -i configs.txt --no-tui -o results.csv Headless %(prog)s --find-clean --no-tui Find clean CF IPs (headless) %(prog)s --find-clean --no-tui --template "vless://..." Find + speed test """, ) p.add_argument("-i", "--input", help="Input file (VLESS URIs or domains.json)") p.add_argument("--sub", help="Subscription URL (fetches VLESS URIs from URL)") p.add_argument("--template", help="Base VLESS URI template (use with -i address list)") p.add_argument("-m", "--mode", choices=["quick", "normal", "thorough"], default="normal") p.add_argument("--rounds", help='Custom rounds, e.g. "1MB:200,5MB:50,20MB:20"') p.add_argument("-w", "--workers", type=int, default=LATENCY_WORKERS, help="Latency workers") p.add_argument("--speed-workers", type=int, default=SPEED_WORKERS, help="Download workers") p.add_argument("--timeout", type=float, default=LATENCY_TIMEOUT, help="Latency timeout (s)") p.add_argument("--speed-timeout", type=float, default=SPEED_TIMEOUT, help="Download timeout (s)") p.add_argument("--skip-download", action="store_true", help="Latency only") p.add_argument("--top", type=int, default=50, help="Export top N configs (0 = ALL sorted best to worst)") p.add_argument("--no-tui", action="store_true", help="Plain text output") p.add_argument("-o", "--output", help="CSV output path (headless)") p.add_argument("--output-configs", help="Save top VLESS URIs (headless)") p.add_argument("--find-clean", action="store_true", help="Find clean Cloudflare IPs") p.add_argument("--clean-mode", choices=["quick", "normal", "full", "mega"], default="normal", help="Clean IP scan scope (quick=~4K, normal=~12K, full=~1.5M, mega=~3M multi-port)") p.add_argument("--subnets", help="Custom subnets file or comma-separated CIDRs") args = p.parse_args() args._mode_set = any(a == "-m" or a.startswith("--mode") for a in sys.argv) try: if getattr(args, "find_clean", False) and args.no_tui: asyncio.run(run_headless_clean(args)) elif args.no_tui: if not args.input and not args.sub and not args.template: p.error("--input, --sub, or --template is required in --no-tui mode") asyncio.run(run_headless(args)) else: asyncio.run(run_tui(args)) except KeyboardInterrupt: pass finally: _w(A.SHOW + "\n") _fl() if __name__ == "__main__": main()