Source code for cloudflare_saas.r2_client

"""R2 client for bucket operations using aiobotocore."""

import asyncio
import os
import mimetypes
from pathlib import Path
from typing import List, Optional, AsyncIterator
from contextlib import asynccontextmanager

from aiobotocore.session import get_session
from botocore.exceptions import ClientError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

from .exceptions import R2OperationError
from .config import Config
from .logging_config import LoggerMixin


[docs] class R2Client(LoggerMixin): """Async R2 client for object operations."""
[docs] def __init__(self, config: Config): self.config = config self.session = get_session() self.logger.info(f"Initialized R2Client for bucket: {config.r2_bucket_name}")
@asynccontextmanager async def _get_client(self): """Get S3 client context manager.""" async with self.session.create_client( 's3', endpoint_url=self.config.r2_endpoint, region_name='auto', aws_access_key_id=self.config.r2_access_key_id, aws_secret_access_key=self.config.r2_secret_access_key, ) as client: yield client
[docs] @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(ClientError), ) async def upload_file( self, tenant_id: str, local_path: Path, object_key: str, content_type: Optional[str] = None, ) -> str: """Upload a file to R2 under tenant namespace.""" if content_type is None: content_type, _ = mimetypes.guess_type(str(local_path)) if content_type is None: content_type = 'application/octet-stream' # Construct namespaced key namespaced_key = f"{tenant_id}/{object_key}" try: async with self._get_client() as client: with open(local_path, 'rb') as f: await client.put_object( Bucket=self.config.r2_bucket_name, Key=namespaced_key, Body=f, ContentType=content_type, ) return namespaced_key except ClientError as e: raise R2OperationError( f"Failed to upload {local_path} to {namespaced_key}: {e}" ) except Exception as e: raise R2OperationError(f"Unexpected error uploading file: {e}")
[docs] async def upload_directory( self, tenant_id: str, local_dir: Path, base_prefix: str = "", ) -> List[str]: """Upload entire directory to R2.""" if not local_dir.is_dir(): raise R2OperationError(f"{local_dir} is not a directory") uploaded_keys = [] tasks = [] for file_path in local_dir.rglob('*'): if file_path.is_file(): # Calculate relative path rel_path = file_path.relative_to(local_dir) object_key = str(rel_path).replace('\\', '/') if base_prefix: object_key = f"{base_prefix}/{object_key}" # Create upload task task = self.upload_file(tenant_id, file_path, object_key) tasks.append(task) # Upload files concurrently try: uploaded_keys = await asyncio.gather(*tasks) except Exception as e: raise R2OperationError(f"Failed to upload directory: {e}") return uploaded_keys
[docs] @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(ClientError), ) async def delete_object(self, tenant_id: str, object_key: str) -> None: """Delete an object from R2.""" namespaced_key = f"{tenant_id}/{object_key}" try: async with self._get_client() as client: await client.delete_object( Bucket=self.config.r2_bucket_name, Key=namespaced_key, ) except ClientError as e: raise R2OperationError(f"Failed to delete {namespaced_key}: {e}")
[docs] async def delete_tenant_objects(self, tenant_id: str) -> int: """Delete all objects for a tenant.""" keys_to_delete = [] try: async with self._get_client() as client: # List all objects with tenant prefix paginator = client.get_paginator('list_objects_v2') async for page in paginator.paginate( Bucket=self.config.r2_bucket_name, Prefix=f"{tenant_id}/", ): if 'Contents' in page: for obj in page['Contents']: keys_to_delete.append({'Key': obj['Key']}) # Delete in batches of 1000 (S3 limit) deleted_count = 0 for i in range(0, len(keys_to_delete), 1000): batch = keys_to_delete[i:i+1000] await client.delete_objects( Bucket=self.config.r2_bucket_name, Delete={'Objects': batch}, ) deleted_count += len(batch) return deleted_count except ClientError as e: raise R2OperationError(f"Failed to delete tenant objects: {e}")
[docs] async def list_tenant_objects( self, tenant_id: str, max_keys: int = 1000, ) -> List[dict]: """List objects for a tenant.""" objects = [] try: async with self._get_client() as client: response = await client.list_objects_v2( Bucket=self.config.r2_bucket_name, Prefix=f"{tenant_id}/", MaxKeys=max_keys, ) if 'Contents' in response: objects = response['Contents'] return objects except ClientError as e: raise R2OperationError(f"Failed to list tenant objects: {e}")
[docs] async def object_exists(self, tenant_id: str, object_key: str) -> bool: """Check if object exists.""" namespaced_key = f"{tenant_id}/{object_key}" try: async with self._get_client() as client: await client.head_object( Bucket=self.config.r2_bucket_name, Key=namespaced_key, ) return True except ClientError as e: if e.response['Error']['Code'] == '404': return False raise R2OperationError(f"Failed to check object existence: {e}")