127 lines
4.3 KiB
Python
127 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Position:
|
|
pair: str
|
|
entry_price: float
|
|
quantity: float
|
|
entry_time: str # ISO 8601 UTC
|
|
peak_price: float # Highest price seen since entry (drives trailing stop)
|
|
cost_usd: float # USD spent including the allocation amount
|
|
order_id: str = ""
|
|
|
|
def update_peak(self, current_price: float) -> None:
|
|
if current_price > self.peak_price:
|
|
self.peak_price = current_price
|
|
|
|
def pnl_pct(self, current_price: float) -> float:
|
|
return (current_price - self.entry_price) / self.entry_price * 100
|
|
|
|
def drop_from_peak_pct(self, current_price: float) -> float:
|
|
if self.peak_price <= 0:
|
|
return 0.0
|
|
return (self.peak_price - current_price) / self.peak_price * 100
|
|
|
|
def hours_held(self) -> float:
|
|
entry = datetime.fromisoformat(self.entry_time)
|
|
if entry.tzinfo is None:
|
|
entry = entry.replace(tzinfo=timezone.utc)
|
|
now = datetime.now(tz=timezone.utc)
|
|
return (now - entry).total_seconds() / 3600
|
|
|
|
|
|
class Portfolio:
|
|
def __init__(self, filepath: str):
|
|
self._path = Path(filepath)
|
|
self._cooldown_path = self._path.with_suffix(".cooldown.json")
|
|
self.positions: dict[str, Position] = {}
|
|
self._cooldowns: dict[str, str] = {} # pair → ISO sell time
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
if self._path.exists():
|
|
try:
|
|
content = self._path.read_text().strip()
|
|
if content:
|
|
data = json.loads(content)
|
|
self.positions = {pair: Position(**fields) for pair, fields in data.items()}
|
|
log.info("Loaded %d position(s) from %s", len(self.positions), self._path)
|
|
except Exception as exc:
|
|
log.error("Could not load positions file: %s", exc)
|
|
|
|
if self._cooldown_path.exists():
|
|
try:
|
|
content = self._cooldown_path.read_text().strip()
|
|
if content:
|
|
self._cooldowns = json.loads(content)
|
|
except Exception as exc:
|
|
log.error("Could not load cooldown file: %s", exc)
|
|
|
|
def _save(self) -> None:
|
|
try:
|
|
self._path.write_text(
|
|
json.dumps(
|
|
{pair: asdict(pos) for pair, pos in self.positions.items()},
|
|
indent=2,
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
log.error("Could not save positions file: %s", exc)
|
|
|
|
def _save_cooldowns(self) -> None:
|
|
try:
|
|
self._cooldown_path.write_text(json.dumps(self._cooldowns, indent=2))
|
|
except Exception as exc:
|
|
log.error("Could not save cooldown file: %s", exc)
|
|
|
|
def add(self, position: Position) -> None:
|
|
self.positions[position.pair] = position
|
|
self._save()
|
|
log.info(
|
|
"Position opened: %s qty=%.8f entry=$%.6f cost=$%.2f",
|
|
position.pair, position.quantity, position.entry_price, position.cost_usd,
|
|
)
|
|
|
|
def remove(self, pair: str) -> Position | None:
|
|
pos = self.positions.pop(pair, None)
|
|
if pos:
|
|
self._save()
|
|
self._cooldowns[pair] = datetime.now(tz=timezone.utc).isoformat()
|
|
self._save_cooldowns()
|
|
return pos
|
|
|
|
def on_cooldown(self, pair: str, cooldown_hours: float) -> bool:
|
|
sell_time_str = self._cooldowns.get(pair)
|
|
if not sell_time_str:
|
|
return False
|
|
sell_time = datetime.fromisoformat(sell_time_str)
|
|
elapsed = (datetime.now(tz=timezone.utc) - sell_time).total_seconds() / 3600
|
|
if elapsed < cooldown_hours:
|
|
log.debug("%s on cooldown: %.1fh of %.1fh elapsed", pair, elapsed, cooldown_hours)
|
|
return True
|
|
# Cooldown expired — clean it up
|
|
del self._cooldowns[pair]
|
|
self._save_cooldowns()
|
|
return False
|
|
|
|
def get(self, pair: str) -> Position | None:
|
|
return self.positions.get(pair)
|
|
|
|
def open_pairs(self) -> set[str]:
|
|
return set(self.positions.keys())
|
|
|
|
def all(self) -> list[Position]:
|
|
return list(self.positions.values())
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.positions)
|