Source code for cloudflare_saas.postgres_adapter

"""PostgreSQL storage adapter using asyncpg."""

from typing import Optional, List
import asyncpg
from asyncpg.pool import Pool

from .models import Tenant, CustomDomain, DomainStatus, VerificationMethod
from .storage_adapter import StorageAdapter
from .exceptions import CloudflareSaaSException


[docs] class PostgresStorageAdapter(StorageAdapter): """PostgreSQL storage implementation."""
[docs] def __init__(self, connection_string: str): self.connection_string = connection_string self.pool: Optional[Pool] = None
[docs] async def initialize(self) -> None: """Initialize connection pool and create tables.""" self.pool = await asyncpg.create_pool( self.connection_string, min_size=5, max_size=20, ) await self._create_tables()
[docs] async def close(self) -> None: """Close connection pool.""" if self.pool: await self.pool.close()
async def _create_tables(self) -> None: """Create database tables if they don't exist.""" async with self.pool.acquire() as conn: # Tenants table await conn.execute(""" CREATE TABLE IF NOT EXISTS tenants ( tenant_id VARCHAR(255) PRIMARY KEY, name VARCHAR(255) NOT NULL, slug VARCHAR(255) NOT NULL UNIQUE, subdomain VARCHAR(255) NOT NULL UNIQUE, owner_id VARCHAR(255), created_at TIMESTAMP NOT NULL DEFAULT NOW(), metadata JSONB DEFAULT '{}'::jsonb ) """) # Custom domains table await conn.execute(""" CREATE TABLE IF NOT EXISTS custom_domains ( domain VARCHAR(255) PRIMARY KEY, tenant_id VARCHAR(255) NOT NULL REFERENCES tenants(tenant_id) ON DELETE CASCADE, status VARCHAR(50) NOT NULL, verification_method VARCHAR(50) NOT NULL, verification_token VARCHAR(255), cname_target VARCHAR(255), cloudflare_hostname_id VARCHAR(255), ssl_status VARCHAR(50), created_at TIMESTAMP NOT NULL DEFAULT NOW(), verified_at TIMESTAMP, error_message TEXT ) """) # Create indexes await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_domains_tenant_id ON custom_domains(tenant_id) """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_domains_status ON custom_domains(status) """)
[docs] async def save_tenant(self, tenant: Tenant) -> None: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO tenants (tenant_id, name, slug, subdomain, owner_id, created_at, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (tenant_id) DO UPDATE SET name = EXCLUDED.name, metadata = EXCLUDED.metadata """, tenant.tenant_id, tenant.name, tenant.slug, tenant.subdomain, tenant.owner_id, tenant.created_at, tenant.metadata)
[docs] async def get_tenant(self, tenant_id: str) -> Optional[Tenant]: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT * FROM tenants WHERE tenant_id = $1 """, tenant_id) if row: return Tenant(**dict(row)) return None
[docs] async def delete_tenant(self, tenant_id: str) -> None: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: await conn.execute(""" DELETE FROM tenants WHERE tenant_id = $1 """, tenant_id)
[docs] async def list_tenants(self, limit: int = 100, offset: int = 0) -> List[Tenant]: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT * FROM tenants ORDER BY created_at DESC LIMIT $1 OFFSET $2 """, limit, offset) return [Tenant(**dict(row)) for row in rows]
[docs] async def save_domain(self, domain: CustomDomain) -> None: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO custom_domains ( domain, tenant_id, status, verification_method, verification_token, cname_target, cloudflare_hostname_id, ssl_status, created_at, verified_at, error_message ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (domain) DO UPDATE SET status = EXCLUDED.status, cloudflare_hostname_id = EXCLUDED.cloudflare_hostname_id, ssl_status = EXCLUDED.ssl_status, verified_at = EXCLUDED.verified_at, error_message = EXCLUDED.error_message """, domain.domain, domain.tenant_id, domain.status.value, domain.verification_method.value, domain.verification_token, domain.cname_target, domain.cloudflare_hostname_id, domain.ssl_status, domain.created_at, domain.verified_at, domain.error_message)
[docs] async def get_domain(self, domain: str) -> Optional[CustomDomain]: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT * FROM custom_domains WHERE domain = $1 """, domain) if row: data = dict(row) data["status"] = DomainStatus(data["status"]) data["verification_method"] = VerificationMethod(data["verification_method"]) return CustomDomain(**data) return None
[docs] async def delete_domain(self, domain: str) -> None: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: await conn.execute(""" DELETE FROM custom_domains WHERE domain = $1 """, domain)
[docs] async def list_tenant_domains(self, tenant_id: str) -> List[CustomDomain]: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT * FROM custom_domains WHERE tenant_id = $1 ORDER BY created_at DESC """, tenant_id) domains = [] for row in rows: data = dict(row) data["status"] = DomainStatus(data["status"]) data["verification_method"] = VerificationMethod(data["verification_method"]) domains.append(CustomDomain(**data)) return domains
[docs] async def get_domain_by_tenant(self, domain: str) -> Optional[str]: if not self.pool: raise CloudflareSaaSException("Storage not initialized") async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT tenant_id FROM custom_domains WHERE domain = $1 AND status = 'active' """, domain) if row: return row["tenant_id"] return None