Files
resumeformatter/backend/app/services/r2_service.py
Laxmi Khilnani cda50356b4
Some checks failed
Profile Linker Docker Build / Build and push Docker image (push) Failing after 3s
feat: Complete Smart Resume Formatter with R2 and Gemini AI integration
- Integrated Cloudflare R2 for template storage and converted file management
- Added Google Gemini AI for resume parsing and HTML generation
- Created backend API endpoints for templates, conversion, and history
- Refactored frontend to use real API instead of mock data
- Fixed Docker networking issues (IPv6/IPv4) for R2 connectivity
- Added resumeService.ts for frontend API integration
- Updated Vite configuration for proper asset serving in Docker
- Successfully tested with 13 templates from R2 bucket
2025-10-14 21:43:41 +05:30

204 lines
7.0 KiB
Python

"""
Cloudflare R2 Storage Service
Handles all interactions with R2 bucket for templates and converted resumes
"""
import boto3
from botocore.client import Config
from typing import List, Dict, Optional
from datetime import datetime
import io
import socket
from app.core.config import settings
# Force IPv4 to avoid Docker IPv6 issues
original_getaddrinfo = socket.getaddrinfo
def getaddrinfo_ipv4_only(host, port, family=0, type=0, proto=0, flags=0):
"""Force IPv4 resolution only"""
return original_getaddrinfo(host, port, socket.AF_INET, type, proto, flags)
socket.getaddrinfo = getaddrinfo_ipv4_only
class R2Service:
"""Service for interacting with Cloudflare R2 storage"""
def __init__(self):
"""Initialize R2 client with credentials from settings"""
self.s3_client = boto3.client(
's3',
endpoint_url=settings.R2_ENDPOINT,
aws_access_key_id=settings.R2_ACCESS_KEY_ID,
aws_secret_access_key=settings.R2_SECRET_ACCESS_KEY,
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
),
region_name='auto'
)
self.bucket_name = settings.R2_BUCKET_NAME
self.templates_prefix = "templates/"
self.converted_prefix = "converted_resumes/"
def list_templates(self) -> List[str]:
"""
List all available template names from R2
Returns: List of template names (without .html extension)
"""
try:
print(f"Attempting to list templates from bucket: {self.bucket_name}, prefix: {self.templates_prefix}")
print(f"Using endpoint: {settings.R2_ENDPOINT}")
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=self.templates_prefix
)
print(f"R2 Response: {response}")
if 'Contents' not in response:
print(f"No contents found in bucket with prefix {self.templates_prefix}")
return []
templates = []
for obj in response['Contents']:
key = obj['Key']
# Extract template name (remove prefix and .html extension)
if key.endswith('.html'):
template_name = key.replace(self.templates_prefix, '').replace('.html', '')
if template_name: # Skip if empty (i.e., if key was just the prefix)
templates.append(template_name)
print(f"Found templates: {templates}")
return templates
except Exception as e:
print(f"Error listing templates: {e}")
import traceback
traceback.print_exc()
return []
def get_template_content(self, template_name: str) -> Optional[str]:
"""
Get the HTML content of a specific template
Args:
template_name: Name of the template (without .html extension)
Returns: HTML content as string, or None if not found
"""
try:
key = f"{self.templates_prefix}{template_name}.html"
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=key
)
content = response['Body'].read().decode('utf-8')
return content
except Exception as e:
print(f"Error getting template content for {template_name}: {e}")
return None
def upload_converted_file(
self,
file_content: bytes,
filename: str,
content_type: str,
metadata: Optional[Dict[str, str]] = None
) -> Optional[str]:
"""
Upload a converted resume file to R2
Args:
file_content: File content as bytes
filename: Name of the file
content_type: MIME type (text/html or application/pdf)
metadata: Optional metadata dict
Returns: Public URL of uploaded file, or None if failed
"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
key = f"{self.converted_prefix}{timestamp}_{filename}"
upload_args = {
'Bucket': self.bucket_name,
'Key': key,
'Body': file_content,
'ContentType': content_type
}
if metadata:
upload_args['Metadata'] = metadata
self.s3_client.put_object(**upload_args)
# Generate public URL
url = f"{settings.R2_ENDPOINT}/{self.bucket_name}/{key}"
return url
except Exception as e:
print(f"Error uploading file {filename}: {e}")
return None
def list_converted_resumes(self, limit: int = 50) -> List[Dict]:
"""
List converted resumes from R2
Args:
limit: Maximum number of files to return
Returns: List of dicts with file metadata
"""
try:
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=self.converted_prefix,
MaxKeys=limit
)
if 'Contents' not in response:
return []
files = []
for obj in response['Contents']:
key = obj['Key']
# Skip directory markers
if key.endswith('/'):
continue
filename = key.replace(self.converted_prefix, '')
file_url = f"{settings.R2_ENDPOINT}/{self.bucket_name}/{key}"
files.append({
'id': key,
'name': filename,
'url': file_url,
'size': obj['Size'],
'lastModified': obj['LastModified'].isoformat(),
'timestamp': obj['LastModified']
})
# Sort by timestamp, newest first
files.sort(key=lambda x: x['timestamp'], reverse=True)
return files
except Exception as e:
print(f"Error listing converted resumes: {e}")
return []
def get_file_url(self, key: str, expires_in: int = 3600) -> Optional[str]:
"""
Generate a presigned URL for a file
Args:
key: Object key in R2
expires_in: URL expiration time in seconds (default 1 hour)
Returns: Presigned URL or None if failed
"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': key},
ExpiresIn=expires_in
)
return url
except Exception as e:
print(f"Error generating presigned URL for {key}: {e}")
return None
# Singleton instance
r2_service = R2Service()