adguard-cidre/blocklist_scheduler.py
2025-06-06 12:09:41 +00:00

201 lines
7.1 KiB
Python

#!/usr/bin/env python3
import os
import sys
import logging
import requests
import yaml
import schedule
import time
import re
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format='[blocklist] %(levelname)s: %(message)s',
stream=sys.stdout,
)
ADGUARD_YAML = Path("/adguard/AdGuardHome.yaml")
TMP_YAML = ADGUARD_YAML.parent / (ADGUARD_YAML.name + ".tmp")
MANUAL_IPS_FILE = Path("/adguard/manually_blocked_ips.conf")
CIDR_BASE_URL = "https://raw.githubusercontent.com/vulnebify/cidre/main/output/cidr/ipv4"
COUNTRY_LIST_URL = "https://raw.githubusercontent.com/vulnebify/cidre/refs/heads/main/cidre/countries.py"
FIRST_BACKUP = ADGUARD_YAML.parent / "AdGuardHome.yaml.first-start.bak"
LAST_UPDATE_BACKUP = ADGUARD_YAML.parent / "AdGuardHome.yaml.last-update.bak"
BLOCK_COUNTRIES = os.getenv("BLOCK_COUNTRIES", "")
BLOCKLIST_CRON_TYPE = os.getenv("BLOCKLIST_CRON_TYPE", "daily").lower()
BLOCKLIST_CRON_TIME = os.getenv("BLOCKLIST_CRON_TIME", "06:00")
BLOCKLIST_CRON_DAY = os.getenv("BLOCKLIST_CRON_DAY", "mon").lower()
ADGUARD_CONTAINER_NAME = os.getenv("ADGUARD_CONTAINER_NAME", "adguardhome")
DOCKER_API_URL = os.getenv("DOCKER_API_URL", "http://socket-proxy-adguard:2375")
def backup_first_start():
if not FIRST_BACKUP.exists():
logging.info(f"Creating first start backup: {FIRST_BACKUP}")
FIRST_BACKUP.write_text(ADGUARD_YAML.read_text())
else:
logging.info("First start backup already exists, skipping.")
def backup_last_update():
logging.info(f"Creating last update backup: {LAST_UPDATE_BACKUP}")
LAST_UPDATE_BACKUP.write_text(ADGUARD_YAML.read_text())
def fetch_all_country_codes():
try:
resp = requests.get(COUNTRY_LIST_URL, timeout=15)
resp.raise_for_status()
matches = re.findall(r'"([A-Z]{2})"', resp.text)
return set(code.lower() for code in matches)
except Exception as e:
logging.error(f"Failed to fetch available country codes: {e}")
return set()
def get_selected_countries():
if not BLOCK_COUNTRIES:
logging.error("BLOCK_COUNTRIES is not set. Skipping update.")
return []
raw_codes = [c.strip() for c in BLOCK_COUNTRIES.split(",") if c.strip()]
if not raw_codes:
logging.error("No valid country codes provided.")
return []
is_exclusion = all(c.startswith("!") for c in raw_codes)
is_inclusion = all(not c.startswith("!") for c in raw_codes)
if not (is_exclusion or is_inclusion):
logging.error("Mixed syntax in BLOCK_COUNTRIES. Use only inclusion (e.g. 'fr,de') or only exclusion (e.g. '!fr,!de').")
sys.exit(1)
available = fetch_all_country_codes()
selected = {c.lstrip("!") for c in raw_codes}
unknown = selected - available
if unknown:
logging.warning(f"Unknown country codes: {', '.join(sorted(unknown))}")
if is_exclusion:
return sorted(available - selected)
else:
return sorted(selected & available)
def download_cidr_lists(countries):
combined_ips = []
for code in countries:
url = f"{CIDR_BASE_URL}/{code.lower()}.cidr"
logging.info(f"Downloading CIDR list for {code} from {url}")
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
ips = r.text.strip().splitlines()
logging.info(f"Downloaded {len(ips)} CIDR entries for {code}")
combined_ips.extend(ips)
except Exception as e:
logging.warning(f"Failed to download {code}: {e}")
return combined_ips
def read_manual_ips():
if MANUAL_IPS_FILE.exists():
logging.info(f"Reading manual IPs from {MANUAL_IPS_FILE}")
valid_ips = []
with MANUAL_IPS_FILE.open() as f:
for line in f:
line = line.strip()
if line and (line.count('.') == 3 or '/' in line):
valid_ips.append(line)
logging.info(f"Added {len(valid_ips)} manual IP entries")
return valid_ips
else:
logging.info("Manual IPs file does not exist, skipping.")
return []
def update_yaml_with_ips(ips):
if not ADGUARD_YAML.exists():
logging.error(f"{ADGUARD_YAML} does not exist. Cannot update.")
return False
try:
with ADGUARD_YAML.open() as f:
data = yaml.safe_load(f)
except Exception as e:
logging.error(f"Failed to parse YAML file: {e}")
return False
if not isinstance(data, dict):
logging.error("Invalid YAML format.")
return False
data['dns']['disallowed_clients'] = ips
with TMP_YAML.open('w') as f:
yaml.safe_dump(data, f)
TMP_YAML.replace(ADGUARD_YAML)
logging.info(f"Updated {ADGUARD_YAML} with new disallowed clients list.")
return True
def restart_adguard_container():
restart_url = f"{DOCKER_API_URL}/containers/{ADGUARD_CONTAINER_NAME}/restart"
logging.info(f"Restarting AdGuard container '{ADGUARD_CONTAINER_NAME}'...")
try:
resp = requests.post(restart_url, timeout=10)
if resp.status_code == 204:
logging.info("AdGuard container restarted successfully.")
else:
logging.error(f"Failed to restart container: {resp.status_code} {resp.text}")
except Exception as e:
logging.error(f"Error restarting container: {e}")
def update_blocklist():
countries = get_selected_countries()
if not countries:
logging.error("No valid countries to process. Skipping update.")
return
cidr_ips = download_cidr_lists(countries)
manual_ips = read_manual_ips()
combined_ips = cidr_ips + manual_ips
backup_last_update()
success = update_yaml_with_ips(combined_ips)
if success:
restart_adguard_container()
def schedule_job():
try:
hour, minute = [int(x) for x in BLOCKLIST_CRON_TIME.split(":")]
except Exception:
logging.error(f"Invalid BLOCKLIST_CRON_TIME '{BLOCKLIST_CRON_TIME}', must be HH:MM. Defaulting to 06:00.")
hour, minute = 6, 0
if BLOCKLIST_CRON_TYPE == "daily":
schedule.every().day.at(f"{hour:02d}:{minute:02d}").do(update_blocklist)
logging.info(f"Scheduled daily update at {hour:02d}:{minute:02d}")
elif BLOCKLIST_CRON_TYPE == "weekly":
valid_days = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
day = BLOCKLIST_CRON_DAY[:3]
if day not in valid_days:
logging.error(f"Invalid BLOCKLIST_CRON_DAY '{BLOCKLIST_CRON_DAY}', must be one of {valid_days}. Defaulting to Monday.")
day = "mon"
getattr(schedule.every(), day).at(f"{hour:02d}:{minute:02d}").do(update_blocklist)
logging.info(f"Scheduled weekly update on {day.capitalize()} at {hour:02d}:{minute:02d}")
else:
logging.error(f"Invalid BLOCKLIST_CRON_TYPE '{BLOCKLIST_CRON_TYPE}', must be 'daily' or 'weekly'. Defaulting to daily.")
schedule.every().day.at(f"{hour:02d}:{minute:02d}").do(update_blocklist)
logging.info(f"Scheduled daily update at {hour:02d}:{minute:02d}")
def main():
logging.info("Starting blocklist scheduler...")
backup_first_start()
update_blocklist()
schedule_job()
while True:
schedule.run_pending()
time.sleep(10)
if __name__ == "__main__":
main()