| | import torch |
| | import torch.nn as nn |
| | from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizerFast |
| | from torch.utils.data import DataLoader |
| |
|
| | class Charm15Model(nn.Module): |
| | def __init__(self, model_name: str, device: str = "cuda" if torch.cuda.is_available() else "cpu"): |
| | """Initialize Charm 15 with a pretrained model.""" |
| | super(Charm15Model, self).__init__() |
| | self.device = device |
| | self.model_name = model_name |
| | |
| | try: |
| | |
| | self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
| | if self.tokenizer.pad_token is None: |
| | self.tokenizer.pad_token = self.tokenizer.eos_token |
| | self.tokenizer.pad_token_id = self.tokenizer.eos_token_id |
| | |
| | |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | torch_dtype=torch.bfloat16, |
| | device_map="auto", |
| | low_cpu_mem_usage=True |
| | ).to(self.device) |
| | print(f"Loaded model {model_name} on {self.device}") |
| | except Exception as e: |
| | print(f"Error initializing model/tokenizer: {e}") |
| | raise |
| |
|
| | def generate_text(self, prompt: str, max_length: int = 2048, temperature: float = 0.7, |
| | top_k: int = 50, top_p: float = 0.9): |
| | """Generate text with the model.""" |
| | try: |
| | inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) |
| | with torch.no_grad(): |
| | output = self.model.generate( |
| | **inputs, |
| | max_length=max_length, |
| | temperature=temperature, |
| | top_k=top_k, |
| | top_p=top_p, |
| | do_sample=True, |
| | repetition_penalty=1.1, |
| | pad_token_id=self.tokenizer.pad_token_id, |
| | use_cache=True |
| | ) |
| | return self.tokenizer.decode(output[0], skip_special_tokens=True) |
| | except Exception as e: |
| | print(f"Error generating text: {e}") |
| | return None |
| |
|
| | def fine_tune(self, train_dataloader: DataLoader, eval_dataloader: DataLoader = None, |
| | epochs: int = 3, lr: float = 5e-5, gradient_accumulation_steps: int = 4): |
| | """Fine-tune the model with a DataLoader.""" |
| | optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr) |
| | self.model.train() |
| | |
| | try: |
| | for epoch in range(epochs): |
| | total_loss = 0 |
| | for step, batch in enumerate(train_dataloader): |
| | batch = {k: v.to(self.device) for k, v in batch.items()} |
| | outputs = self.model(**batch) |
| | loss = outputs.loss / gradient_accumulation_steps |
| | |
| | loss.backward() |
| | if (step + 1) % gradient_accumulation_steps == 0: |
| | optimizer.step() |
| | optimizer.zero_grad() |
| | |
| | total_loss += loss.item() * gradient_accumulation_steps |
| | |
| | avg_loss = total_loss / len(train_dataloader) |
| | print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}") |
| | |
| | |
| | if eval_dataloader: |
| | eval_loss = self._evaluate(eval_dataloader) |
| | print(f"Eval Loss: {eval_loss:.4f}") |
| | except Exception as e: |
| | print(f"Error during fine-tuning: {e}") |
| | raise |
| |
|
| | def _evaluate(self, dataloader: DataLoader): |
| | """Evaluate the model on a DataLoader.""" |
| | self.model.eval() |
| | total_loss = 0 |
| | with torch.no_grad(): |
| | for batch in dataloader: |
| | batch = {k: v.to(self.device) for k, v in batch.items()} |
| | outputs = self.model(**batch) |
| | total_loss += outputs.loss.item() |
| | self.model.train() |
| | return total_loss / len(dataloader) |
| |
|
| | def save_model(self, save_path: str): |
| | """Save model and tokenizer.""" |
| | try: |
| | os.makedirs(save_path, exist_ok=True) |
| | self.model.save_pretrained(save_path) |
| | self.tokenizer.save_pretrained(save_path) |
| | print(f"Model saved to {save_path}") |
| | except Exception as e: |
| | print(f"Error saving model: {e}") |
| |
|
| | def load_model(self, load_path: str): |
| | """Load model and tokenizer from a path.""" |
| | try: |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | load_path, torch_dtype=torch.bfloat16, device_map="auto" |
| | ).to(self.device) |
| | self.tokenizer = AutoTokenizer.from_pretrained(load_path) |
| | if self.tokenizer.pad_token is None: |
| | self.tokenizer.pad_token = self.tokenizer.eos_token |
| | print(f"Model loaded from {load_path}") |
| | except Exception as e: |
| | print(f"Error loading model: {e}") |
| | raise |
| |
|
| | def quantize_model(self, bits: int = 8): |
| | """Quantize model for efficiency (basic dynamic quantization).""" |
| | try: |
| | if bits != 8: |
| | print("⚠️ Only 8-bit quantization supported with torch.qint8") |
| | self.model = torch.quantization.quantize_dynamic( |
| | self.model, {nn.Linear}, dtype=torch.qint8 |
| | ) |
| | print("Model quantized to 8 bits (dynamic quantization)") |
| | except Exception as e: |
| | print(f"Error quantizing model: {e}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | model = Charm15Model(model_name="mistralai/Mixtral-8x7B-Instruct-v0.1") |
| | |
| | |
| | prompt = "Charm 15 is amazing because" |
| | text = model.generate_text(prompt) |
| | print(f"Generated: {text}") |
| | |
| | |
| | from your_dataloader_script import DataLoaderHandler |
| | train_loader = DataLoaderHandler( |
| | "../datasets/eclipse_corpuz_1.1.jsonl", |
| | "../finetuned_charm15/tokenizer.json", |
| | batch_size=4 |
| | ).get_dataloader() |
| | |
| | |
| | model.fine_tune(train_loader) |
| | |
| | |
| | model.save_model("../finetuned_charm15") |
| | |
| | |
| | model.quantize_model() |
| | |
| | |
| | model.load_model("../finetuned_charm15") |
| | print(model.generate_text("Testing reloaded model")) |