Source code for cloudflare_saas.dns_verifier

"""DNS verification utilities using aiodns."""

import asyncio
from typing import List, Optional

import aiodns
from aiodns.error import DNSError as AIODNSError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

from .exceptions import DNSError


[docs] class DNSVerifier: """Async DNS verification."""
[docs] def __init__(self): self.resolver = aiodns.DNSResolver()
[docs] @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(AIODNSError), ) async def verify_cname( self, domain: str, expected_target: str, ) -> bool: """Verify CNAME record points to expected target.""" try: result = await self.resolver.query(domain, 'CNAME') # Normalize targets (remove trailing dots) actual_targets = [r.host.rstrip('.') for r in result] expected = expected_target.rstrip('.') return expected in actual_targets except AIODNSError as e: # No CNAME record found if 'NXDOMAIN' in str(e) or 'No answer' in str(e): return False raise DNSError(f"DNS query failed for {domain}: {e}") except Exception as e: raise DNSError(f"Unexpected DNS error: {e}")
[docs] async def get_cname_records(self, domain: str) -> List[str]: """Get all CNAME records for a domain.""" try: result = await self.resolver.query(domain, 'CNAME') return [r.host.rstrip('.') for r in result] except AIODNSError: return [] except Exception as e: raise DNSError(f"Failed to get CNAME records: {e}")
[docs] async def verify_txt( self, domain: str, expected_value: str, ) -> bool: """Verify TXT record contains expected value.""" try: result = await self.resolver.query(domain, 'TXT') actual_values = [r.text.decode('utf-8') if isinstance(r.text, bytes) else r.text for r in result] return expected_value in actual_values except AIODNSError: return False except Exception as e: raise DNSError(f"Failed to verify TXT record: {e}")
[docs] async def wait_for_cname( self, domain: str, expected_target: str, max_attempts: int = 30, delay_seconds: int = 10, ) -> bool: """Poll for CNAME verification with retries.""" for attempt in range(max_attempts): try: if await self.verify_cname(domain, expected_target): return True except DNSError: pass if attempt < max_attempts - 1: await asyncio.sleep(delay_seconds) return False