DNS Guardian

 


import os

import subprocess

import json

import time

import threading

import socket


from flask import Flask, request


try:

    from telegram import Update, Bot

    from telegram.ext import Updater, CommandHandler, CallbackContext

    TELEGRAM_AVAILABLE = True

except ImportError:

    TELEGRAM_AVAILABLE = False


try:

    import dns.resolver

except ImportError:

    subprocess.run(["pip", "install", "dnspython"])

    import dns.resolver


try:

    from flask import Flask, request

except ImportError:

    subprocess.run(["pip", "install", "flask"])

    from flask import Flask, request


# Configuration

BASE_DIR = os.path.expanduser("~/dnsfixer")

DOWNLOAD_DIR = os.path.join(BASE_DIR, "downloads")

LOG_FILE = os.path.join(BASE_DIR, "fixer_log.txt")

os.makedirs(DOWNLOAD_DIR, exist_ok=True)


APK_LIST = {

    "Nebulo": "https://github.com/ch4t4r/Nebulo/releases/download/v1.0.5/Nebulo_v1.0.5.apk",

    "Intra": "https://github.com/Jigsaw-Code/Intra/releases/download/v1.3.6/intra-release.apk"

}

SAFE_DNS = ["1.1.1.1", "8.8.8.8"]

TEST_DOMAINS = ["google.com", "facebook.com", "cloudflare.com"]


# Optional Telegram Credentials

TELEGRAM_TOKEN = os.getenv("TG_TOKEN", "")

TELEGRAM_USER_ID = os.getenv("TG_USER_ID", "")  # example: '123456789'


# === Utilities ===


def log(msg):

    timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")

    with open(LOG_FILE, "a") as f:

        f.write(f"{timestamp} {msg}\n")

    print(msg)


def check_termux_api():

    log("[*] Verifying Termux:API...")

    try:

        result = subprocess.run(

            ["termux-battery-status"],

            stdout=subprocess.DEVNULL,

            stderr=subprocess.DEVNULL,

            timeout=3

        )

        log("[+] Termux API is working.")

        return True

    except subprocess.TimeoutExpired:

        log("[!] Termux API call timed out. Likely missing permissions.")

        return False

    except FileNotFoundError:

        log("[!] Termux API not found. Run: pkg install termux-api")

        return False

    except Exception as e:

        log(f"[!] Unknown error during Termux API check: {e}")

        return False


def check_overlay_permission():

    result = subprocess.run(["termux-settings", "get", "draw_overlay"], capture_output=True, text=True)

    if "true" not in result.stdout.strip():

        log("[!] Display over other apps permission not granted.")

        log("Run: termux-setup-storage && termux-open-url 'package:com.termux'")

        return False

    return True


def resolve_dns(domain, dns=None):

    try:

        resolver = dns.resolver.Resolver()

        if dns:

            resolver.nameservers = [dns]

        answer = resolver.resolve(domain)

        return str(answer[0])

    except Exception as e:

        return f"ERR: {e}"


def check_dns_hijack():

    hijacked = False

    for domain in TEST_DOMAINS:

        sys_ip = resolve_dns(domain)

        pub_ip = resolve_dns(domain, SAFE_DNS[0])

        log(f"{domain}: System = {sys_ip}, Public = {pub_ip}")

        if "ERR" not in sys_ip and sys_ip != pub_ip:

            hijacked = True

            log(f"[!] DNS Hijack Suspected for {domain}")

    return hijacked


def download_and_prompt_apks():

    for name, url in APK_LIST.items():

        path = os.path.join(DOWNLOAD_DIR, f"{name}.apk")

        if not os.path.exists(path):

            subprocess.run(["wget", "-q", "-O", path, url])

            log(f"[+] Downloaded {name}")

        subprocess.run([

            "am", "start", "-a", "android.intent.action.VIEW",

            "-d", f"file://{path}", "-t", "application/vnd.android.package-archive"

        ])


# === Flask App (SMS/HTTP) ===


app = Flask(__name__)


@app.route("/sms", methods=["POST"])

@app.route("/http", methods=["GET", "POST"])

def control_api():

    data = request.json if request.is_json else request.values

    sender = data.get("sender", "http")

    msg = data.get("text", "").lower()

    log(f"[CMD] {sender}: {msg}")

    if "fix" in msg:

        if check_dns_hijack():

            download_and_prompt_apks()

        return "Fix attempted", 200

    elif "status" in msg:

        hijack = check_dns_hijack()

        return "Hijacked" if hijack else "Clean", 200

    elif "log" in msg:

        return open(LOG_FILE).read()[-1000:], 200

    return "Unknown", 200


def run_flask():

    app.run(host="127.0.0.1", port=8888)


# === Telegram Bot ===


def telegram_bot():

    if not TELEGRAM_TOKEN or not TELEGRAM_USER_ID:

        log("[*] Telegram credentials not provided. Skipping Telegram bot.")

        return

    updater = Updater(token=TELEGRAM_TOKEN, use_context=True)

    dispatcher = updater.dispatcher


    def auth_check(update):

        return str(update.effective_user.id) == TELEGRAM_USER_ID


    def handle_fix(update: Update, context: CallbackContext):

        if not auth_check(update): return

        update.message.reply_text("Checking and fixing DNS...")

        if check_dns_hijack():

            download_and_prompt_apks()

            update.message.reply_text("Fix attempted.")

        else:

            update.message.reply_text("DNS is clean.")


    def handle_status(update: Update, context: CallbackContext):

        if not auth_check(update): return

        hijack = check_dns_hijack()

        update.message.reply_text("Hijacked" if hijack else "Clean")


    def handle_log(update: Update, context: CallbackContext):

        if not auth_check(update): return

        with open(LOG_FILE) as f:

            update.message.reply_text(f.read()[-1000:])


    dispatcher.add_handler(CommandHandler("fix", handle_fix))

    dispatcher.add_handler(CommandHandler("status", handle_status))

    dispatcher.add_handler(CommandHandler("log", handle_log))


    updater.start_polling()

    updater.idle()


# === Main Loop ===


def main():

    check_termux_api()

    check_overlay_permission()

    threading.Thread(target=run_flask, daemon=True).start()

    if TELEGRAM_AVAILABLE:

        threading.Thread(target=telegram_bot, daemon=True).start()


    while True:

        log("[*] DNS check cycle...")

        if check_dns_hijack():

            download_and_prompt_apks()

        else:

            log("[+] DNS is clean.")

        time.sleep(120)


if __name__ == "__main__":

    main()

Comments

Popular posts from this blog

The End of Modern Slavery and Human Trafficking

Why Has No One Asked Me What Happened…Ever?

A Letter to Every City In America