Source code for cloudflare_saas.platform
"""Platform orchestrator with pluggable storage."""
import asyncio
import secrets
import time
from pathlib import Path
from typing import Optional, List, Dict
from .config import Config
from .logging_config import LoggerMixin, configure_logging, LogLevel, LogFormat
from .models import (
Tenant,
CustomDomain,
DomainStatus,
DeploymentResult,
VerificationMethod,
HostnameVerificationInstructions,
)
from .r2_client import R2Client
from .cloudflare_client import CloudflareClient
from .d1_client import D1Client
from .dns_verifier import DNSVerifier
from .storage_adapter import StorageAdapter, InMemoryStorageAdapter
from .exceptions import (
TenantNotFoundError,
DeploymentError,
DomainVerificationError,
)
[docs]
class CloudflareSaaSPlatform(LoggerMixin):
"""
Main platform orchestrator with pluggable storage.
This class coordinates R2 storage, Cloudflare custom hostnames,
DNS verification, and tenant management with configurable persistence.
"""
[docs]
def __init__(
self,
config: Config,
storage: Optional[StorageAdapter] = None,
):
self.config = config
self.r2 = R2Client(config)
self.cloudflare = CloudflareClient(config)
self.d1 = D1Client(config) if config.d1_database_id else None
self.dns = DNSVerifier()
# Use provided storage or default to in-memory
self.storage = storage or InMemoryStorageAdapter()
# Configure logging based on config
configure_logging(
level=LogLevel(config.log_level),
log_format=LogFormat(config.log_format),
log_file=config.log_file,
enable_console=config.enable_console_logging,
)
self.logger.info(
f"Initialized CloudflareSaaSPlatform for domain: {config.platform_domain}"
)
# ==================== Tenant Management ====================
[docs]
async def create_tenant(
self,
name: str,
slug: str,
owner_id: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> Tenant:
"""Create a new tenant."""
self.logger.info(f"Creating tenant: name={name}, slug={slug}")
tenant_id = f"tenant-{slug}"
subdomain = f"{tenant_id}.{self.config.platform_domain}"
tenant = Tenant(
tenant_id=tenant_id,
name=name,
slug=slug,
subdomain=subdomain,
owner_id=owner_id,
metadata=metadata or {},
)
await self.storage.save_tenant(tenant)
self.logger.info(f"Successfully created tenant: {tenant_id}")
return tenant
[docs]
async def get_tenant(self, tenant_id: str) -> Tenant:
"""Get tenant by ID."""
self.logger.debug(f"Fetching tenant: {tenant_id}")
tenant = await self.storage.get_tenant(tenant_id)
if not tenant:
self.logger.error(f"Tenant not found: {tenant_id}")
raise TenantNotFoundError(f"Tenant {tenant_id} not found")
self.logger.debug(f"Found tenant: {tenant_id}")
return tenant
[docs]
async def list_tenants(self, limit: int = 100, offset: int = 0) -> List[Tenant]:
"""List tenants with pagination."""
return await self.storage.list_tenants(limit, offset)
[docs]
async def delete_tenant(self, tenant_id: str) -> None:
"""Delete tenant and all associated resources."""
self.logger.warning(f"Deleting tenant and all resources: {tenant_id}")
await self.get_tenant(tenant_id)
# Delete R2 objects
self.logger.info(f"Deleting R2 objects for tenant: {tenant_id}")
await self.r2.delete_tenant_objects(tenant_id)
# Delete custom domains
domains = await self.storage.list_tenant_domains(tenant_id)
self.logger.info(f"Deleting {len(domains)} custom domains for tenant: {tenant_id}")
for domain in domains:
try:
await self.remove_custom_domain(domain.domain)
except Exception as e:
self.logger.error(f"Failed to remove domain {domain.domain}: {e}")
pass # Continue cleanup
await self.storage.delete_tenant(tenant_id)
self.logger.info(f"Successfully deleted tenant: {tenant_id}")
[docs]
async def resolve_tenant_from_host(self, host: str) -> Optional[str]:
"""Resolve tenant ID from hostname."""
# Fast path: subdomain
if host.endswith(f".{self.config.platform_domain}"):
tenant_id = host.replace(f".{self.config.platform_domain}", "")
tenant = await self.storage.get_tenant(tenant_id)
if tenant:
return tenant_id
# Slow path: custom domain lookup
return await self.storage.get_domain_by_tenant(host)
# ==================== Site Deployment ====================
[docs]
async def deploy_tenant_site(
self,
tenant_id: str,
local_path: str,
base_prefix: str = "",
) -> DeploymentResult:
"""Deploy static site for tenant."""
self.logger.info(f"Starting deployment for tenant: {tenant_id}, path: {local_path}")
# Try to get the tenant, create it if it doesn't exist
try:
tenant = await self.get_tenant(tenant_id)
except TenantNotFoundError:
self.logger.info(f"Tenant {tenant_id} not found, creating it automatically")
# Extract name from tenant_id (remove any suffix after last hyphen if it looks like an ID)
name_parts = tenant_id.rsplit('-', 1)
if len(name_parts) == 2 and name_parts[1].replace(' ', '').isdigit():
# Looks like "name-12345", use the name part
tenant_name = name_parts[0].replace('-', ' ').title()
else:
# Use tenant_id as name, replace hyphens with spaces
tenant_name = tenant_id.replace('-', ' ').title()
tenant = await self.create_tenant(
name=tenant_name,
slug=tenant_id,
owner_id=None,
metadata={"auto_created": True, "source": "deployment"}
)
local_dir = Path(local_path)
if not local_dir.exists():
self.logger.error(f"Deployment path does not exist: {local_path}")
raise DeploymentError(f"Path {local_path} does not exist")
start_time = time.time()
try:
uploaded_keys = await self.r2.upload_directory(
tenant_id,
local_dir,
base_prefix,
)
total_size = sum(
f.stat().st_size
for f in local_dir.rglob('*')
if f.is_file()
)
deployment_time = time.time() - start_time
self.logger.info(
f"Deployment successful for {tenant_id}: "
f"{len(uploaded_keys)} files, {total_size} bytes, {deployment_time:.2f}s"
)
return DeploymentResult(
tenant_id=tenant_id,
files_uploaded=len(uploaded_keys),
total_size_bytes=total_size,
deployment_time_seconds=deployment_time,
success=True,
uploaded_paths=uploaded_keys,
)
except Exception as e:
self.logger.error(f"Deployment failed for {tenant_id}: {e}")
return DeploymentResult(
tenant_id=tenant_id,
files_uploaded=0,
total_size_bytes=0,
deployment_time_seconds=time.time() - start_time,
success=False,
error_message=str(e),
)
[docs]
async def get_deployment_status(self, tenant_id: str) -> Dict:
"""Get deployment status for tenant."""
await self.get_tenant(tenant_id)
objects = await self.r2.list_tenant_objects(tenant_id)
return {
"tenant_id": tenant_id,
"object_count": len(objects),
"total_size_bytes": sum(obj.get('Size', 0) for obj in objects),
"objects": objects,
}
# ==================== Custom Domain Management ====================
[docs]
async def add_custom_domain(
self,
tenant_id: str,
domain: str,
verification_method: VerificationMethod = VerificationMethod.HTTP,
) -> HostnameVerificationInstructions:
"""Start custom domain onboarding process."""
tenant = await self.get_tenant(tenant_id)
existing = await self.storage.get_domain(domain)
if existing:
if existing.tenant_id != tenant_id:
raise DomainVerificationError(
f"Domain {domain} is already registered to another tenant"
)
verification_token = secrets.token_urlsafe(32)
cname_target = tenant.subdomain
custom_domain = CustomDomain(
domain=domain,
tenant_id=tenant_id,
status=DomainStatus.PENDING,
verification_method=verification_method,
verification_token=verification_token,
cname_target=cname_target,
)
await self.storage.save_domain(custom_domain)
# Start async verification
asyncio.create_task(self._verify_and_provision_domain(domain))
instructions = f"""
To activate your custom domain '{domain}', please:
1. Add a CNAME record:
- Name: {domain}
- Value: {cname_target}
2. The SSL certificate will be automatically provisioned once DNS is verified.
3. Verification typically takes 5-10 minutes after DNS propagation.
"""
result = HostnameVerificationInstructions(
domain=domain,
cname_target=cname_target,
verification_method=verification_method,
instructions=instructions,
)
if verification_method == VerificationMethod.HTTP:
result.http_verification_url = f"http://{domain}/.well-known/cf-custom-hostname-challenge/{verification_token}"
result.http_verification_token = verification_token
elif verification_method == VerificationMethod.TXT:
result.txt_record_name = f"_cf-custom-hostname.{domain}"
result.txt_record_value = verification_token
return result
async def _verify_and_provision_domain(self, domain: str) -> None:
"""Background task to verify DNS and provision custom hostname."""
custom_domain = await self.storage.get_domain(domain)
if not custom_domain:
return
try:
custom_domain.status = DomainStatus.VERIFYING
await self.storage.save_domain(custom_domain)
cname_verified = await self.dns.wait_for_cname(
domain,
custom_domain.cname_target,
max_attempts=30,
delay_seconds=10,
)
if not cname_verified:
custom_domain.status = DomainStatus.FAILED
custom_domain.error_message = "CNAME verification timed out"
await self.storage.save_domain(custom_domain)
return
custom_domain.status = DomainStatus.VERIFIED
await self.storage.save_domain(custom_domain)
if self.config.enable_custom_hostnames:
cf_result = await self.cloudflare.create_custom_hostname(
domain,
custom_domain.verification_method,
)
custom_domain.cloudflare_hostname_id = cf_result["id"]
custom_domain.ssl_status = cf_result["ssl_status"]
custom_domain.status = DomainStatus.ACTIVE
else:
custom_domain.status = DomainStatus.ACTIVE
await self.storage.save_domain(custom_domain)
except Exception as e:
custom_domain.status = DomainStatus.FAILED
custom_domain.error_message = str(e)
await self.storage.save_domain(custom_domain)
[docs]
async def get_domain_status(self, domain: str) -> CustomDomain:
"""Get status of custom domain provisioning."""
custom_domain = await self.storage.get_domain(domain)
if not custom_domain:
raise DomainVerificationError(f"Domain {domain} not found")
if custom_domain.cloudflare_hostname_id:
try:
cf_status = await self.cloudflare.get_custom_hostname(
custom_domain.cloudflare_hostname_id
)
custom_domain.ssl_status = cf_status["ssl_status"]
if cf_status["status"] == "active":
custom_domain.status = DomainStatus.ACTIVE
await self.storage.save_domain(custom_domain)
except Exception:
pass
return custom_domain
[docs]
async def remove_custom_domain(self, domain: str) -> None:
"""Remove custom domain."""
custom_domain = await self.storage.get_domain(domain)
if not custom_domain:
raise DomainVerificationError(f"Domain {domain} not found")
if custom_domain.cloudflare_hostname_id:
try:
await self.cloudflare.delete_custom_hostname(
custom_domain.cloudflare_hostname_id
)
except Exception:
pass
await self.storage.delete_domain(domain)
[docs]
async def list_tenant_domains(self, tenant_id: str) -> List[CustomDomain]:
"""List all domains for a tenant."""
await self.get_tenant(tenant_id)
return await self.storage.list_tenant_domains(tenant_id)