DNS Guardian
import os
import subprocess
import json
import time
import threading
import socket
from flask import Flask, request
try:
from telegram import Update, Bot
from telegram.ext import Updater, CommandHandler, CallbackContext
TELEGRAM_AVAILABLE = True
except ImportError:
TELEGRAM_AVAILABLE = False
try:
import dns.resolver
except ImportError:
subprocess.run(["pip", "install", "dnspython"])
import dns.resolver
try:
from flask import Flask, request
except ImportError:
subprocess.run(["pip", "install", "flask"])
from flask import Flask, request
# Configuration
BASE_DIR = os.path.expanduser("~/dnsfixer")
DOWNLOAD_DIR = os.path.join(BASE_DIR, "downloads")
LOG_FILE = os.path.join(BASE_DIR, "fixer_log.txt")
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
APK_LIST = {
"Nebulo": "https://github.com/ch4t4r/Nebulo/releases/download/v1.0.5/Nebulo_v1.0.5.apk",
"Intra": "https://github.com/Jigsaw-Code/Intra/releases/download/v1.3.6/intra-release.apk"
}
SAFE_DNS = ["1.1.1.1", "8.8.8.8"]
TEST_DOMAINS = ["google.com", "facebook.com", "cloudflare.com"]
# Optional Telegram Credentials
TELEGRAM_TOKEN = os.getenv("TG_TOKEN", "")
TELEGRAM_USER_ID = os.getenv("TG_USER_ID", "") # example: '123456789'
# === Utilities ===
def log(msg):
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
with open(LOG_FILE, "a") as f:
f.write(f"{timestamp} {msg}\n")
print(msg)
def check_termux_api():
log("[*] Verifying Termux:API...")
try:
result = subprocess.run(
["termux-battery-status"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=3
)
log("[+] Termux API is working.")
return True
except subprocess.TimeoutExpired:
log("[!] Termux API call timed out. Likely missing permissions.")
return False
except FileNotFoundError:
log("[!] Termux API not found. Run: pkg install termux-api")
return False
except Exception as e:
log(f"[!] Unknown error during Termux API check: {e}")
return False
def check_overlay_permission():
result = subprocess.run(["termux-settings", "get", "draw_overlay"], capture_output=True, text=True)
if "true" not in result.stdout.strip():
log("[!] Display over other apps permission not granted.")
log("Run: termux-setup-storage && termux-open-url 'package:com.termux'")
return False
return True
def resolve_dns(domain, dns=None):
try:
resolver = dns.resolver.Resolver()
if dns:
resolver.nameservers = [dns]
answer = resolver.resolve(domain)
return str(answer[0])
except Exception as e:
return f"ERR: {e}"
def check_dns_hijack():
hijacked = False
for domain in TEST_DOMAINS:
sys_ip = resolve_dns(domain)
pub_ip = resolve_dns(domain, SAFE_DNS[0])
log(f"{domain}: System = {sys_ip}, Public = {pub_ip}")
if "ERR" not in sys_ip and sys_ip != pub_ip:
hijacked = True
log(f"[!] DNS Hijack Suspected for {domain}")
return hijacked
def download_and_prompt_apks():
for name, url in APK_LIST.items():
path = os.path.join(DOWNLOAD_DIR, f"{name}.apk")
if not os.path.exists(path):
subprocess.run(["wget", "-q", "-O", path, url])
log(f"[+] Downloaded {name}")
subprocess.run([
"am", "start", "-a", "android.intent.action.VIEW",
"-d", f"file://{path}", "-t", "application/vnd.android.package-archive"
])
# === Flask App (SMS/HTTP) ===
app = Flask(__name__)
@app.route("/sms", methods=["POST"])
@app.route("/http", methods=["GET", "POST"])
def control_api():
data = request.json if request.is_json else request.values
sender = data.get("sender", "http")
msg = data.get("text", "").lower()
log(f"[CMD] {sender}: {msg}")
if "fix" in msg:
if check_dns_hijack():
download_and_prompt_apks()
return "Fix attempted", 200
elif "status" in msg:
hijack = check_dns_hijack()
return "Hijacked" if hijack else "Clean", 200
elif "log" in msg:
return open(LOG_FILE).read()[-1000:], 200
return "Unknown", 200
def run_flask():
app.run(host="127.0.0.1", port=8888)
# === Telegram Bot ===
def telegram_bot():
if not TELEGRAM_TOKEN or not TELEGRAM_USER_ID:
log("[*] Telegram credentials not provided. Skipping Telegram bot.")
return
updater = Updater(token=TELEGRAM_TOKEN, use_context=True)
dispatcher = updater.dispatcher
def auth_check(update):
return str(update.effective_user.id) == TELEGRAM_USER_ID
def handle_fix(update: Update, context: CallbackContext):
if not auth_check(update): return
update.message.reply_text("Checking and fixing DNS...")
if check_dns_hijack():
download_and_prompt_apks()
update.message.reply_text("Fix attempted.")
else:
update.message.reply_text("DNS is clean.")
def handle_status(update: Update, context: CallbackContext):
if not auth_check(update): return
hijack = check_dns_hijack()
update.message.reply_text("Hijacked" if hijack else "Clean")
def handle_log(update: Update, context: CallbackContext):
if not auth_check(update): return
with open(LOG_FILE) as f:
update.message.reply_text(f.read()[-1000:])
dispatcher.add_handler(CommandHandler("fix", handle_fix))
dispatcher.add_handler(CommandHandler("status", handle_status))
dispatcher.add_handler(CommandHandler("log", handle_log))
updater.start_polling()
updater.idle()
# === Main Loop ===
def main():
check_termux_api()
check_overlay_permission()
threading.Thread(target=run_flask, daemon=True).start()
if TELEGRAM_AVAILABLE:
threading.Thread(target=telegram_bot, daemon=True).start()
while True:
log("[*] DNS check cycle...")
if check_dns_hijack():
download_and_prompt_apks()
else:
log("[+] DNS is clean.")
time.sleep(120)
if __name__ == "__main__":
main()
Comments
Post a Comment