""" Cloudflare R2 Storage Service Handles all interactions with R2 bucket for templates and converted resumes """ import boto3 from botocore.client import Config from typing import List, Dict, Optional from datetime import datetime import io import socket from app.core.config import settings # Force IPv4 to avoid Docker IPv6 issues original_getaddrinfo = socket.getaddrinfo def getaddrinfo_ipv4_only(host, port, family=0, type=0, proto=0, flags=0): """Force IPv4 resolution only""" return original_getaddrinfo(host, port, socket.AF_INET, type, proto, flags) socket.getaddrinfo = getaddrinfo_ipv4_only class R2Service: """Service for interacting with Cloudflare R2 storage""" def __init__(self): """Initialize R2 client with credentials from settings""" self.s3_client = boto3.client( 's3', endpoint_url=settings.R2_ENDPOINT, aws_access_key_id=settings.R2_ACCESS_KEY_ID, aws_secret_access_key=settings.R2_SECRET_ACCESS_KEY, config=Config( signature_version='s3v4', s3={'addressing_style': 'path'} ), region_name='auto' ) self.bucket_name = settings.R2_BUCKET_NAME self.templates_prefix = "templates/" self.converted_prefix = "converted_resumes/" def list_templates(self) -> List[str]: """ List all available template names from R2 Returns: List of template names (without .html extension) """ try: print(f"Attempting to list templates from bucket: {self.bucket_name}, prefix: {self.templates_prefix}") print(f"Using endpoint: {settings.R2_ENDPOINT}") response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=self.templates_prefix ) print(f"R2 Response: {response}") if 'Contents' not in response: print(f"No contents found in bucket with prefix {self.templates_prefix}") return [] templates = [] for obj in response['Contents']: key = obj['Key'] # Extract template name (remove prefix and .html extension) if key.endswith('.html'): template_name = key.replace(self.templates_prefix, '').replace('.html', '') if template_name: # Skip if empty (i.e., if key was just the prefix) templates.append(template_name) print(f"Found templates: {templates}") return templates except Exception as e: print(f"Error listing templates: {e}") import traceback traceback.print_exc() return [] def get_template_content(self, template_name: str) -> Optional[str]: """ Get the HTML content of a specific template Args: template_name: Name of the template (without .html extension) Returns: HTML content as string, or None if not found """ try: key = f"{self.templates_prefix}{template_name}.html" response = self.s3_client.get_object( Bucket=self.bucket_name, Key=key ) content = response['Body'].read().decode('utf-8') return content except Exception as e: print(f"Error getting template content for {template_name}: {e}") return None def upload_converted_file( self, file_content: bytes, filename: str, content_type: str, metadata: Optional[Dict[str, str]] = None ) -> Optional[str]: """ Upload a converted resume file to R2 Args: file_content: File content as bytes filename: Name of the file content_type: MIME type (text/html or application/pdf) metadata: Optional metadata dict Returns: Public URL of uploaded file, or None if failed """ try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") key = f"{self.converted_prefix}{timestamp}_{filename}" upload_args = { 'Bucket': self.bucket_name, 'Key': key, 'Body': file_content, 'ContentType': content_type } if metadata: upload_args['Metadata'] = metadata self.s3_client.put_object(**upload_args) # Generate public URL url = f"{settings.R2_ENDPOINT}/{self.bucket_name}/{key}" return url except Exception as e: print(f"Error uploading file {filename}: {e}") return None def list_converted_resumes(self, limit: int = 50) -> List[Dict]: """ List converted resumes from R2 Args: limit: Maximum number of files to return Returns: List of dicts with file metadata """ try: response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=self.converted_prefix, MaxKeys=limit ) if 'Contents' not in response: return [] files = [] for obj in response['Contents']: key = obj['Key'] # Skip directory markers if key.endswith('/'): continue filename = key.replace(self.converted_prefix, '') file_url = f"{settings.R2_ENDPOINT}/{self.bucket_name}/{key}" files.append({ 'id': key, 'name': filename, 'url': file_url, 'size': obj['Size'], 'lastModified': obj['LastModified'].isoformat(), 'timestamp': obj['LastModified'] }) # Sort by timestamp, newest first files.sort(key=lambda x: x['timestamp'], reverse=True) return files except Exception as e: print(f"Error listing converted resumes: {e}") return [] def get_file_url(self, key: str, expires_in: int = 3600) -> Optional[str]: """ Generate a presigned URL for a file Args: key: Object key in R2 expires_in: URL expiration time in seconds (default 1 hour) Returns: Presigned URL or None if failed """ try: url = self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': self.bucket_name, 'Key': key}, ExpiresIn=expires_in ) return url except Exception as e: print(f"Error generating presigned URL for {key}: {e}") return None # Singleton instance r2_service = R2Service()