wip-python -> Python rewriting + better logs + backup + update at startup #3

Merged
Djeex merged 6 commits from wip-python into main 2025-05-31 23:22:30 +02:00
4 changed files with 85 additions and 112 deletions
Showing only changes of commit d97348cd17 - Show all commits

View File

@ -7,6 +7,12 @@ RUN apt-get update && apt-get install -y \
tzdata \ tzdata \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install python dependencies
RUN pip install --no-cache-dir requests
# Create crontabs directory (if needed)
RUN mkdir -p /etc/crontabs
# Copy scripts # Copy scripts
COPY update-blocklist.py /usr/local/bin/update-blocklist.py COPY update-blocklist.py /usr/local/bin/update-blocklist.py
COPY entrypoint.py /usr/local/bin/entrypoint.py COPY entrypoint.py /usr/local/bin/entrypoint.py
@ -17,7 +23,7 @@ RUN chmod +x /usr/local/bin/update-blocklist.py /usr/local/bin/entrypoint.py
# Set default timezone (can be overridden with TZ env var) # Set default timezone (can be overridden with TZ env var)
ENV TZ=UTC ENV TZ=UTC
# Configure timezone (tzdata) — important for /usr/share/zoneinfo/* # Configure timezone (tzdata)
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Set entrypoint # Set entrypoint

View File

@ -73,6 +73,6 @@
5. **Check logs to verify updates** 5. **Check logs to verify updates**
```bash ```bash
docker-compose logs -f docker compose logs -f
``` ```

View File

@ -14,6 +14,20 @@ logging.basicConfig(
ADGUARD_YAML = Path("/adguard/AdGuardHome.yaml") ADGUARD_YAML = Path("/adguard/AdGuardHome.yaml")
FIRST_BACKUP = Path("/adguard/AdGuardHome.yaml.first-start.bak") FIRST_BACKUP = Path("/adguard/AdGuardHome.yaml.first-start.bak")
def setup_cron():
cron_expr = os.getenv("BLOCKLIST_CRON", "0 6 * * *")
cron_line = f"{cron_expr} root /usr/local/bin/update-blocklist.py\n"
cron_dir = "/etc/crontabs"
cron_file = f"{cron_dir}/root"
logging.info(f"Setting cron job: {cron_line.strip()}")
# Ensure cron directory exists
os.makedirs(cron_dir, exist_ok=True)
with open(cron_file, "w") as f:
f.write(cron_line)
def backup_first_start(): def backup_first_start():
if not FIRST_BACKUP.exists(): if not FIRST_BACKUP.exists():
logging.info("Creating first start backup...") logging.info("Creating first start backup...")
@ -34,17 +48,9 @@ def run_initial_update():
logging.error(f"Initial update script failed: {e}") logging.error(f"Initial update script failed: {e}")
sys.exit(1) sys.exit(1)
def setup_cron():
cron_expr = os.getenv("BLOCKLIST_CRON", "0 6 * * *")
cron_line = f"{cron_expr} root /usr/local/bin/update-blocklist.py\n"
cron_file = "/etc/crontabs/root"
logging.info(f"Setting cron job: {cron_line.strip()}")
with open(cron_file, "w") as f:
f.write(cron_line)
def start_cron_foreground(): def start_cron_foreground():
logging.info("Starting cron in foreground...") logging.info("Starting cron in foreground...")
os.execvp("crond", ["crond", "-f"]) os.execvp("cron", ["cron", "-f"])
def main(): def main():
# Check AdGuardHome.yaml exists # Check AdGuardHome.yaml exists

View File

@ -1,156 +1,117 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys import sys
import shutil
import logging import logging
import re
import requests import requests
from pathlib import Path from pathlib import Path
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='[update-blocklist] %(levelname)s: %(message)s', format='[update-blocklist] %(levelname)s: %(message)s',
stream=sys.stdout stream=sys.stdout,
) )
# Config / variables
ADGUARD_YAML = Path("/adguard/AdGuardHome.yaml") ADGUARD_YAML = Path("/adguard/AdGuardHome.yaml")
FIRST_BACKUP = Path("/adguard/AdGuardHome.yaml.first-start.bak") FIRST_BACKUP = Path("/adguard/AdGuardHome.yaml.first-start.bak")
LAST_CRON_BACKUP = Path("/adguard/AdGuardHome.yaml.last-cron.bak") LAST_CRON_BACKUP = Path("/adguard/AdGuardHome.yaml.last-cron.bak")
TMP_YAML = ADGUARD_YAML.parent / (ADGUARD_YAML.name + ".tmp")
MANUAL_IPS_FILE = Path("/adguard/manually_blocked_ips.conf") MANUAL_IPS_FILE = Path("/adguard/manually_blocked_ips.conf")
CIDR_BASE_URL = "https://raw.githubusercontent.com/vulnebify/cidre/main/output/cidr/ipv4" CIDR_BASE_URL = "https://raw.githubusercontent.com/vulnebify/cidre/main/output/cidr/ipv4"
COUNTRIES = os.getenv("BLOCK_COUNTRIES", "") COUNTRIES = os.getenv("BLOCK_COUNTRIES", "")
DOCKER_API_URL = os.getenv("DOCKER_API_URL", "http://socket-proxy-adguard:2375")
CONTAINER_NAME = os.getenv("ADGUARD_CONTAINER_NAME", "adguard-home")
TMP_YAML = Path("/tmp/AdGuardHome.yaml")
TMP_DIR = Path("/tmp/cidr")
def backup_first_start(): def backup_files():
if not FIRST_BACKUP.exists(): if not FIRST_BACKUP.exists():
logging.info(f"Creating first-start backup: {FIRST_BACKUP}") logging.info(f"Creating first-start backup: {FIRST_BACKUP}")
shutil.copy2(ADGUARD_YAML, FIRST_BACKUP) FIRST_BACKUP.write_text(ADGUARD_YAML.read_text())
else: else:
logging.info("First-start backup already exists, skipping.") logging.info("First-start backup already exists, skipping.")
def backup_last_cron():
logging.info(f"Creating last-cron backup: {LAST_CRON_BACKUP}") logging.info(f"Creating last-cron backup: {LAST_CRON_BACKUP}")
shutil.copy2(ADGUARD_YAML, LAST_CRON_BACKUP) LAST_CRON_BACKUP.write_text(ADGUARD_YAML.read_text())
def download_cidr_lists(countries): def download_cidr_lists(countries):
if not countries: combined_ips = []
logging.error("No countries specified in BLOCK_COUNTRIES environment variable.") for code in countries:
sys.exit(1) url = f"{CIDR_BASE_URL}/{code.lower()}.cidr"
TMP_DIR.mkdir(parents=True, exist_ok=True)
all_ips = []
codes = [c.strip().lower() for c in countries.split(",") if c.strip()]
for code in codes:
url = f"{CIDR_BASE_URL}/{code}.cidr"
logging.info(f"Downloading CIDR list for {code} from {url}") logging.info(f"Downloading CIDR list for {code} from {url}")
try: try:
r = requests.get(url, timeout=15) r = requests.get(url, timeout=30)
r.raise_for_status() r.raise_for_status()
lines = r.text.strip().splitlines() ips = r.text.strip().splitlines()
logging.info(f"Downloaded {len(lines)} CIDR entries for {code}") logging.info(f"Downloaded {len(ips)} CIDR entries for {code}")
all_ips.extend(lines) combined_ips.extend(ips)
except Exception as e: except Exception as e:
logging.warning(f"Failed to download {url}: {e}") logging.warning(f"Failed to download {code}: {e}")
return combined_ips
return all_ips
def read_manual_ips(): def read_manual_ips():
ips = []
if MANUAL_IPS_FILE.exists(): if MANUAL_IPS_FILE.exists():
logging.info(f"Reading manual IPs from {MANUAL_IPS_FILE}") logging.info(f"Reading manual IPs from {MANUAL_IPS_FILE}")
try: valid_ips = []
with MANUAL_IPS_FILE.open() as f: with MANUAL_IPS_FILE.open() as f:
for line in f: for line in f:
line = line.strip() line = line.strip()
if re.match(r'^(\d{1,3}\.){3}\d{1,3}(/\d{1,2})?$', line): # Simple regex match for IPv4 or IPv4 CIDR
ips.append(line) if line and line.count('.') == 3:
else: valid_ips.append(line)
logging.debug(f"Ignoring invalid manual IP line: {line}") logging.info(f"Added {len(valid_ips)} manual IP entries")
logging.info(f"Read {len(ips)} valid manual IP entries") return valid_ips
except Exception as e:
logging.warning(f"Error reading manual IPs: {e}")
else: else:
logging.info("Manual IPs file does not exist, skipping.") logging.info("Manual IPs file does not exist, skipping.")
return ips return []
def format_ips_yaml_list(ips):
return [f" - {ip}\n" for ip in ips]
def update_yaml_with_ips(ips): def update_yaml_with_ips(ips):
if not ADGUARD_YAML.exists(): # Format IPs for YAML list (4 spaces indent + dash)
logging.error(f"AdGuardHome.yaml not found at {ADGUARD_YAML}") formatted_ips = [f" - {ip}" for ip in ips]
sys.exit(1)
inside_disallowed = False
output_lines = []
with ADGUARD_YAML.open() as f: with ADGUARD_YAML.open() as f:
lines = f.readlines() for line in f:
if line.strip().startswith("disallowed_clients:"):
new_lines = [] # Replace existing disallowed_clients block
inside_disallowed = False output_lines.append("disallowed_clients:")
ips_inserted = False output_lines.extend(formatted_ips)
for line in lines:
stripped = line.rstrip("\n")
if stripped.startswith(" disallowed_clients:"):
# Write key line without any value (no [] etc)
new_lines.append(" disallowed_clients:\n")
# Insert ips
if ips:
new_lines.extend(format_ips_yaml_list(ips))
# mark inserted
inside_disallowed = True inside_disallowed = True
ips_inserted = True elif inside_disallowed:
continue # Skip old lines under disallowed_clients (assuming indentation)
if line.startswith(" ") and not line.startswith(" -"):
if inside_disallowed: # This is a new section, disallowed_clients block ended
# skip old IP entries starting with ' - '
if stripped.startswith(" - "):
continue
else:
inside_disallowed = False inside_disallowed = False
output_lines.append(line.rstrip("\n"))
new_lines.append(line) # Else skip line inside disallowed_clients block
if not ips_inserted:
# disallowed_clients not found - append at end
new_lines.append("\n disallowed_clients:\n")
if ips:
new_lines.extend(format_ips_yaml_list(ips))
with TMP_YAML.open("w") as f:
f.writelines(new_lines)
TMP_YAML.replace(ADGUARD_YAML)
logging.info(f"Updated {ADGUARD_YAML} with {len(ips)} disallowed_clients entries")
def restart_container():
url = f"{DOCKER_API_URL}/containers/{CONTAINER_NAME}/restart"
logging.info(f"Restarting container '{CONTAINER_NAME}' via {url}")
try:
r = requests.post(url, timeout=10)
if r.status_code == 204:
logging.info("Container restarted successfully.")
else: else:
logging.error(f"Failed to restart container. Status: {r.status_code} Response: {r.text}") output_lines.append(line.rstrip("\n"))
except Exception as e:
logging.error(f"Exception during container restart: {e}") # If the file ended while still inside disallowed_clients block, append nothing more (already done)
# Write to temporary YAML in same folder (to avoid cross-device rename error)
with TMP_YAML.open("w") as f:
f.write("\n".join(output_lines) + "\n")
# Atomic replace
TMP_YAML.replace(ADGUARD_YAML)
logging.info(f"Updated {ADGUARD_YAML} with new disallowed clients list.")
def main(): def main():
backup_first_start() if not ADGUARD_YAML.exists():
backup_last_cron() logging.error(f"{ADGUARD_YAML} not found, exiting.")
cidr_ips = download_cidr_lists(COUNTRIES) sys.exit(1)
if not COUNTRIES:
logging.error("No countries specified in BLOCK_COUNTRIES environment variable, exiting.")
sys.exit(1)
backup_files()
countries_list = [c.strip() for c in COUNTRIES.split(",") if c.strip()]
cidr_ips = download_cidr_lists(countries_list)
manual_ips = read_manual_ips() manual_ips = read_manual_ips()
combined_ips = cidr_ips + manual_ips combined_ips = cidr_ips + manual_ips
if not combined_ips:
logging.warning("No IPs to add to disallowed_clients. The list will be empty.")
update_yaml_with_ips(combined_ips) update_yaml_with_ips(combined_ips)
restart_container()
logging.info("Blocklist update complete.")
if __name__ == "__main__": if __name__ == "__main__":
main() main()