| | import torch |
| | from safetensors.torch import load_file, save_file |
| | import logging |
| | from typing import Dict, List, Optional |
| | import time |
| | from pathlib import Path |
| | import sys |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format="%(asctime)s - %(levelname)s - %(message)s", |
| | handlers=[ |
| | logging.StreamHandler(sys.stdout), |
| | logging.FileHandler("model_operations.log") |
| | ] |
| | ) |
| |
|
| | class ModelHandler: |
| | """Class to handle model operations with improved efficiency.""" |
| | |
| | DEFAULT_CHECKPOINT = Path("Model_4_of_10.safetensors") |
| | |
| | def __init__(self, checkpoint_path: str | Path = DEFAULT_CHECKPOINT): |
| | self.checkpoint_path = Path(checkpoint_path) |
| | self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | |
| | def _log_time(self, operation: str, start_time: float) -> None: |
| | """Helper method for consistent timing logging.""" |
| | elapsed = time.time() - start_time |
| | logging.info(f"{operation} completed in {elapsed:.2f} seconds") |
| |
|
| | def load_model(self) -> Dict[str, torch.Tensor]: |
| | """Loads model with memory-efficient handling.""" |
| | start_time = time.time() |
| | try: |
| | logging.info(f"Loading model from {self.checkpoint_path}") |
| | |
| | model_data = load_file(str(self.checkpoint_path), device="cpu") |
| | for key in model_data: |
| | model_data[key] = model_data[key].to(self.device) |
| | self._log_time("Model loading", start_time) |
| | return model_data |
| | except Exception as e: |
| | logging.error(f"Model loading failed: {str(e)}") |
| | raise RuntimeError(f"Failed to load model: {str(e)}") from e |
| |
|
| | def save_model(self, model_tensors: Dict[str, torch.Tensor]) -> None: |
| | """Saves model with validation and error handling.""" |
| | start_time = time.time() |
| | try: |
| | logging.info(f"Saving model to {self.checkpoint_path}") |
| | self.checkpoint_path.parent.mkdir(parents=True, exist_ok=True) |
| | save_file(model_tensors, str(self.checkpoint_path)) |
| | self._log_time("Model saving", start_time) |
| | except Exception as e: |
| | logging.error(f"Model saving failed: {str(e)}") |
| | raise RuntimeError(f"Failed to save model: {str(e)}") from e |
| |
|
| | def initialize_model( |
| | self, |
| | layers: List[int] = [8192, 16384, 32768], |
| | dtype: torch.dtype = torch.bfloat16, |
| | seed: Optional[int] = 42 |
| | ) -> Dict[str, torch.Tensor]: |
| | """Initializes model with optimized parameters.""" |
| | if seed is not None: |
| | torch.manual_seed(seed) |
| | |
| | model_tensors = {} |
| | start_time = time.time() |
| | try: |
| | for i, size in enumerate(layers, 1): |
| | layer_name = f"layer_{i}" |
| | logging.info(f"Initializing {layer_name} [{size}x{size}] on {self.device}") |
| | |
| | tensor = torch.randn(size, size, dtype=dtype, device=self.device) * (1.0 / size ** 0.5) |
| | model_tensors[layer_name] = tensor |
| | self._log_time("Model initialization", start_time) |
| | return model_tensors |
| | except Exception as e: |
| | logging.error(f"Model initialization failed: {str(e)}") |
| | raise RuntimeError(f"Failed to initialize model: {str(e)}") from e |
| |
|
| | def verify_model( |
| | self, |
| | original: Dict[str, torch.Tensor], |
| | loaded: Dict[str, torch.Tensor], |
| | atol: float = 1e-5, |
| | rtol: float = 1e-3 |
| | ) -> bool: |
| | """Verifies model integrity with detailed comparison.""" |
| | all_match = True |
| | for key in original: |
| | if key not in loaded: |
| | logging.warning(f"Missing tensor: {key}") |
| | all_match = False |
| | continue |
| | |
| | orig, load = original[key], loaded[key] |
| | try: |
| | if orig.shape != load.shape: |
| | logging.warning(f"Shape mismatch in {key}: {orig.shape} vs {load.shape}") |
| | all_match = False |
| | continue |
| | |
| | if not torch.allclose(orig, load, atol=atol, rtol=rtol): |
| | diff = torch.max(torch.abs(orig - load)) |
| | logging.warning(f"Mismatch in {key}: max diff = {diff}") |
| | all_match = False |
| | else: |
| | logging.info(f"Tensor {key} verified (shape: {orig.shape})") |
| | except Exception as e: |
| | logging.error(f"Verification failed for {key}: {str(e)}") |
| | all_match = False |
| | return all_match |
| |
|
| | def main(): |
| | """Main execution flow.""" |
| | try: |
| | |
| | handler = ModelHandler() |
| | |
| | |
| | model_data = handler.initialize_model() |
| | handler.save_model(model_data) |
| | |
| | |
| | loaded_model_data = handler.load_model() |
| | is_valid = handler.verify_model(model_data, loaded_model_data) |
| | |
| | logging.info(f"Model verification {'passed' if is_valid else 'failed'}") |
| | return 0 |
| | |
| | except Exception as e: |
| | logging.error(f"Execution failed: {str(e)}") |
| | return 1 |
| |
|
| | if __name__ == "__main__": |
| | sys.exit(main()) |