import asyncio import aiohttp import aiodns import json import argparse import logging import textwrap import sys from rich.console import Console from rich.table import Table from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') console = Console(force_terminal=True) class TakeoverScanner: def __init__(self, fingerprints_file, concurrency, timeout, output_file=None, debug=False): self.fingerprints_data = self._load_json(fingerprints_file) if not self.fingerprints_data: raise ValueError(f"Fingerprint file '{fingerprints_file}' could not be loaded or is empty.") self.concurrency = concurrency self.timeout = aiohttp.ClientTimeout(total=timeout) self.output_file = output_file self.debug = debug self.results = [] self.semaphore = asyncio.Semaphore(concurrency) self.resolver = None def _load_json(self, file_path): try: with open(file_path, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: raise ValueError(f"Error loading file {file_path}: {e}") async def _get_dns_records(self, domain): try: cname_res = await self.resolver.query(domain, 'CNAME') return "cname", cname_res.cname except aiodns.error.DNSError: try: a_res = await self.resolver.query(domain, 'A') return "a", [r.host for r in a_res] except aiodns.error.DNSError: return None, None async def _check_nxdomain_on_cname(self, domain): try: await self.resolver.query(domain, 'A') return False except aiodns.error.DNSError as e: return e.args[0] == aiodns.error.ARES_ENOTFOUND async def _fetch_http_responses(self, session, domain): urls = [f"http://{domain}", f"https://{domain}"] content = "" headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'} for url in urls: try: async with session.get(url, timeout=self.timeout, headers=headers, ssl=False) as response: content += await response.text(encoding='utf-8', errors='ignore') except Exception: continue return content async def check_domain(self, domain, http_session, progress): async with self.semaphore: record_type, record_value = await self._get_dns_records(domain) if not record_type: if self.debug: progress.print(f"[dim]Debug | Domain: {domain:<45} | INFO: No CNAME or A record found.[/dim]") return if self.debug: progress.print(f"[dim]Debug | Domain: {domain:<45} | Type: {record_type.upper():<5} | Value: {record_value}[/dim]") if record_type == 'cname': for service_info in self.fingerprints_data: cname_patterns = service_info.get("cname", []) if any(pattern in record_value for pattern in cname_patterns): if self.debug: progress.print(f" [yellow]Debug | ✓ CNAME Match:[/yellow] {domain} matches the [bold]{service_info['service']}[/bold] service.") if service_info.get("nxdomain"): if await self._check_nxdomain_on_cname(domain): vulnerability = {"domain": domain, "service": service_info["service"], "type": "Dangling CNAME (NXDOMAIN)", "record": record_value, "details": "CNAME points to non-existent domain."} self.results.append(vulnerability) progress.print(f"[bold red]VULN (NXDOMAIN):[/bold red] [cyan]{domain}[/cyan] -> [green]{service_info['service']}[/green]") return else: http_content = await self._fetch_http_responses(http_session, domain) if http_content: for fingerprint in service_info.get("fingerprint", []): if fingerprint and fingerprint.lower() in http_content.lower(): vulnerability = {"domain": domain, "service": service_info["service"], "type": "HTTP Fingerprint", "record": record_value, "details": f"Matched: '{fingerprint}'"} self.results.append(vulnerability) progress.print(f"[bold red]VULN (HTTP):[/bold red] [cyan]{domain}[/cyan] -> [green]{service_info['service']}[/green]") return elif record_type == 'a': http_content = await self._fetch_http_responses(http_session, domain) if not http_content: return for service_info in self.fingerprints_data: if service_info.get("nxdomain") is False: for fingerprint in service_info.get("fingerprint", []): if fingerprint and fingerprint.lower() in http_content.lower(): if self.debug: progress.print(f" [yellow]Debug | ✓ A-Record HTTP Match:[/yellow] Content of {domain} matches the [bold]{service_info['service']}[/bold] fingerprint.") vulnerability = {"domain": domain, "service": service_info["service"], "type": "A Record Fingerprint", "record": record_value, "details": f"Matched: '{fingerprint}'"} self.results.append(vulnerability) progress.print(f"[bold red]VULN (A-Record):[/bold red] [cyan]{domain}[/cyan] -> [green]{service_info['service']}[/green]") return async def scan(self, domains): self.resolver = aiodns.DNSResolver() progress_columns = [TextColumn("[bold cyan]{task.description}"), BarColumn(bar_width=None), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TextColumn("•"), TextColumn("{task.completed} of {task.total}"), TextColumn("•"), TimeRemainingColumn()] conn = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(connector=conn) as http_session: with Progress(*progress_columns, console=console) as progress: if self.debug: console.print("[bold yellow]Debug Mode Active. Output will be more detailed.[/bold yellow]") console.print(f"[bold green]Scanning {len(domains)} subdomains with a concurrency of {self.concurrency}...[/bold green]") main_task_id = progress.add_task("Checking Subdomains", total=len(domains)) tasks = [self.check_domain(domain, http_session, progress) for domain in domains] for future in asyncio.as_completed(tasks): try: await future except Exception as e: if self.debug: progress.print(f"[red]Error in task: {e}[/red]") finally: progress.update(main_task_id, advance=1) self._display_results() def _display_results(self): console.print() if not self.results: console.print("[bold yellow]Scan complete. No vulnerabilities found.[/bold yellow]"); return table = Table(title="[bold magenta]Potential Subdomain Takeover Summary[/bold magenta]", show_header=True, header_style="bold blue") table.add_column("Domain", style="cyan", no_wrap=True) table.add_column("Identified Service", style="green") table.add_column("Vulnerability Type", style="yellow") table.add_column("Record Value", style="white") table.add_column("Details", style="red") for res in self.results: table.add_row(res["domain"], res["service"], res["type"], str(res["record"]), res["details"]) console.print(table) if self.output_file: try: with open(self.output_file, 'w') as f: json.dump(self.results, f, indent=4) console.print(f"[bold green]Results also saved to: {self.output_file}[/bold green]") except IOError as e: console.print(f"[bold red]Failed to save results to file: {e}[/bold red]") def main(): banner_raw = r""" [bold cyan] _________ ____________ _________________________ / _____/ \_____ \ \ / /\_ _____/\______ \ \_____ \ ______ / | \ Y / | __)_ | _/ / \ /_____/ / | \ / | \ | | \ /_______ / \_______ /\___/ /_______ / |____|_ / \/ \/ \/ \/ [/bold cyan] [bold green] The Ultimate Subdomain Takeover Scanner - HaxorSec@2025 t.me/ntKiL22[/bold green] """ banner = textwrap.dedent(banner_raw) console.print(banner) parser = argparse.ArgumentParser(description="Ultimate Subdomain Takeover Scanner.") parser.add_argument("-f", "--file", help="File containing a list of subdomains. If not provided, reads from stdin.") parser.add_argument("-p", "--fingerprints", default="fingerprints.json", help="JSON file containing fingerprints.") parser.add_argument("-c", "--concurrency", type=int, default=100, help="Number of concurrent checks.") parser.add_argument("-t", "--timeout", type=int, default=10, help="Timeout for HTTP requests.") parser.add_argument("-o", "--output", help="Save the results in JSON format to this file.") parser.add_argument("--debug", action="store_true", help="Enable debug output.") args = parser.parse_args() domains = [] if args.file: try: with open(args.file, 'r') as f: domains = [line.strip() for line in f if line.strip()] except FileNotFoundError: console.print(f"[bold red]Error: File '{args.file}' not found.[/bold red]") sys.exit(1) elif not sys.stdin.isatty(): domains = [line.strip() for line in sys.stdin if line.strip()] if not domains: console.print("[bold red]Error: No domain input. Use -f or pipe from another command.[/bold red]") parser.print_help() sys.exit(1) try: scanner = TakeoverScanner( fingerprints_file=args.fingerprints, concurrency=args.concurrency, timeout=args.timeout, output_file=args.output, debug=args.debug ) asyncio.run(scanner.scan(domains)) except ValueError as e: console.print(f"[bold red]Error: {e}[/bold red]") except KeyboardInterrupt: console.print("\n[bold yellow]Scan stopped by user.[/bold yellow]") if __name__ == "__main__": main()