from __future__ import annotations import json import logging from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path log = logging.getLogger(__name__) @dataclass class Position: pair: str entry_price: float quantity: float entry_time: str # ISO 8601 UTC peak_price: float # Highest price seen since entry (drives trailing stop) cost_usd: float # USD spent including the allocation amount order_id: str = "" def update_peak(self, current_price: float) -> None: if current_price > self.peak_price: self.peak_price = current_price def pnl_pct(self, current_price: float) -> float: return (current_price - self.entry_price) / self.entry_price * 100 def drop_from_peak_pct(self, current_price: float) -> float: if self.peak_price <= 0: return 0.0 return (self.peak_price - current_price) / self.peak_price * 100 def hours_held(self) -> float: entry = datetime.fromisoformat(self.entry_time) if entry.tzinfo is None: entry = entry.replace(tzinfo=timezone.utc) now = datetime.now(tz=timezone.utc) return (now - entry).total_seconds() / 3600 class Portfolio: def __init__(self, filepath: str): self._path = Path(filepath) self.positions: dict[str, Position] = {} self._load() def _load(self) -> None: if not self._path.exists(): return try: content = self._path.read_text().strip() if not content: return data = json.loads(content) self.positions = {pair: Position(**fields) for pair, fields in data.items()} log.info("Loaded %d position(s) from %s", len(self.positions), self._path) except Exception as exc: log.error("Could not load positions file: %s", exc) def _save(self) -> None: try: self._path.write_text( json.dumps( {pair: asdict(pos) for pair, pos in self.positions.items()}, indent=2, ) ) except Exception as exc: log.error("Could not save positions file: %s", exc) def add(self, position: Position) -> None: self.positions[position.pair] = position self._save() log.info( "Position opened: %s qty=%.8f entry=$%.6f cost=$%.2f", position.pair, position.quantity, position.entry_price, position.cost_usd, ) def remove(self, pair: str) -> Position | None: pos = self.positions.pop(pair, None) if pos: self._save() return pos def get(self, pair: str) -> Position | None: return self.positions.get(pair) def open_pairs(self) -> set[str]: return set(self.positions.keys()) def all(self) -> list[Position]: return list(self.positions.values()) def __len__(self) -> int: return len(self.positions)