import torch from safetensors.torch import load_file, save_file import logging from typing import Dict, List, Optional import time from pathlib import Path import sys # Enhanced logging setup with rotation logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("model_operations.log") ] ) class ModelHandler: """Class to handle model operations with improved efficiency.""" DEFAULT_CHECKPOINT = Path("Model_4_of_10.safetensors") def __init__(self, checkpoint_path: str | Path = DEFAULT_CHECKPOINT): self.checkpoint_path = Path(checkpoint_path) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def _log_time(self, operation: str, start_time: float) -> None: """Helper method for consistent timing logging.""" elapsed = time.time() - start_time logging.info(f"{operation} completed in {elapsed:.2f} seconds") def load_model(self) -> Dict[str, torch.Tensor]: """Loads model with memory-efficient handling.""" start_time = time.time() try: logging.info(f"Loading model from {self.checkpoint_path}") # Load to CPU first to manage memory, then move to target device model_data = load_file(str(self.checkpoint_path), device="cpu") for key in model_data: model_data[key] = model_data[key].to(self.device) self._log_time("Model loading", start_time) return model_data except Exception as e: logging.error(f"Model loading failed: {str(e)}") raise RuntimeError(f"Failed to load model: {str(e)}") from e def save_model(self, model_tensors: Dict[str, torch.Tensor]) -> None: """Saves model with validation and error handling.""" start_time = time.time() try: logging.info(f"Saving model to {self.checkpoint_path}") self.checkpoint_path.parent.mkdir(parents=True, exist_ok=True) save_file(model_tensors, str(self.checkpoint_path)) self._log_time("Model saving", start_time) except Exception as e: logging.error(f"Model saving failed: {str(e)}") raise RuntimeError(f"Failed to save model: {str(e)}") from e def initialize_model( self, layers: List[int] = [8192, 16384, 32768], dtype: torch.dtype = torch.bfloat16, seed: Optional[int] = 42 ) -> Dict[str, torch.Tensor]: """Initializes model with optimized parameters.""" if seed is not None: torch.manual_seed(seed) model_tensors = {} start_time = time.time() try: for i, size in enumerate(layers, 1): layer_name = f"layer_{i}" logging.info(f"Initializing {layer_name} [{size}x{size}] on {self.device}") # Scaled initialization for better stability tensor = torch.randn(size, size, dtype=dtype, device=self.device) * (1.0 / size ** 0.5) model_tensors[layer_name] = tensor self._log_time("Model initialization", start_time) return model_tensors except Exception as e: logging.error(f"Model initialization failed: {str(e)}") raise RuntimeError(f"Failed to initialize model: {str(e)}") from e def verify_model( self, original: Dict[str, torch.Tensor], loaded: Dict[str, torch.Tensor], atol: float = 1e-5, rtol: float = 1e-3 ) -> bool: """Verifies model integrity with detailed comparison.""" all_match = True for key in original: if key not in loaded: logging.warning(f"Missing tensor: {key}") all_match = False continue orig, load = original[key], loaded[key] try: if orig.shape != load.shape: logging.warning(f"Shape mismatch in {key}: {orig.shape} vs {load.shape}") all_match = False continue if not torch.allclose(orig, load, atol=atol, rtol=rtol): diff = torch.max(torch.abs(orig - load)) logging.warning(f"Mismatch in {key}: max diff = {diff}") all_match = False else: logging.info(f"Tensor {key} verified (shape: {orig.shape})") except Exception as e: logging.error(f"Verification failed for {key}: {str(e)}") all_match = False return all_match def main(): """Main execution flow.""" try: # Initialize handler handler = ModelHandler() # Create and save model model_data = handler.initialize_model() handler.save_model(model_data) # Load and verify loaded_model_data = handler.load_model() is_valid = handler.verify_model(model_data, loaded_model_data) logging.info(f"Model verification {'passed' if is_valid else 'failed'}") return 0 except Exception as e: logging.error(f"Execution failed: {str(e)}") return 1 if __name__ == "__main__": sys.exit(main())