added coin cooldown to stop coin chase

This commit is contained in:
2026-05-04 11:09:17 -04:00
parent 2ce401c487
commit 61987482e1
4 changed files with 51 additions and 13 deletions

6
bot.py
View File

@@ -139,7 +139,11 @@ def run_cycle(config: Config) -> bool:
return bool(closed) return bool(closed)
try: try:
opportunities = scanner.scan(exclude_pairs=portfolio.open_pairs()) cooled_down = {
pair for pair in portfolio._cooldowns
if portfolio.on_cooldown(pair, config.sold_cooldown_hours)
}
opportunities = scanner.scan(exclude_pairs=portfolio.open_pairs() | cooled_down)
except KrakenError as exc: except KrakenError as exc:
log.error("Market scan failed: %s", exc) log.error("Market scan failed: %s", exc)
return bool(closed) return bool(closed)

View File

@@ -41,6 +41,10 @@ class Config:
# Prevents dead money from tying up capital indefinitely. # Prevents dead money from tying up capital indefinitely.
max_hold_hours: int = 72 max_hold_hours: int = 72
# Cooldown: hours to wait before re-buying a pair that was just sold.
# Prevents immediately re-entering a coin whose momentum has stalled or reversed.
sold_cooldown_hours: float = 4.0
# ── Execution ──────────────────────────────────────────────────────────── # ── Execution ────────────────────────────────────────────────────────────
# ALWAYS start with paper_trading=True and verify behaviour before going live. # ALWAYS start with paper_trading=True and verify behaviour before going live.
# Set to False only after you understand the bot's decisions. # Set to False only after you understand the bot's decisions.

View File

@@ -42,22 +42,30 @@ class Position:
class Portfolio: class Portfolio:
def __init__(self, filepath: str): def __init__(self, filepath: str):
self._path = Path(filepath) self._path = Path(filepath)
self._cooldown_path = self._path.with_suffix(".cooldown.json")
self.positions: dict[str, Position] = {} self.positions: dict[str, Position] = {}
self._cooldowns: dict[str, str] = {} # pair → ISO sell time
self._load() self._load()
def _load(self) -> None: def _load(self) -> None:
if not self._path.exists(): if self._path.exists():
return
try: try:
content = self._path.read_text().strip() content = self._path.read_text().strip()
if not content: if content:
return
data = json.loads(content) data = json.loads(content)
self.positions = {pair: Position(**fields) for pair, fields in data.items()} self.positions = {pair: Position(**fields) for pair, fields in data.items()}
log.info("Loaded %d position(s) from %s", len(self.positions), self._path) log.info("Loaded %d position(s) from %s", len(self.positions), self._path)
except Exception as exc: except Exception as exc:
log.error("Could not load positions file: %s", exc) log.error("Could not load positions file: %s", exc)
if self._cooldown_path.exists():
try:
content = self._cooldown_path.read_text().strip()
if content:
self._cooldowns = json.loads(content)
except Exception as exc:
log.error("Could not load cooldown file: %s", exc)
def _save(self) -> None: def _save(self) -> None:
try: try:
self._path.write_text( self._path.write_text(
@@ -69,6 +77,12 @@ class Portfolio:
except Exception as exc: except Exception as exc:
log.error("Could not save positions file: %s", exc) log.error("Could not save positions file: %s", exc)
def _save_cooldowns(self) -> None:
try:
self._cooldown_path.write_text(json.dumps(self._cooldowns, indent=2))
except Exception as exc:
log.error("Could not save cooldown file: %s", exc)
def add(self, position: Position) -> None: def add(self, position: Position) -> None:
self.positions[position.pair] = position self.positions[position.pair] = position
self._save() self._save()
@@ -81,8 +95,24 @@ class Portfolio:
pos = self.positions.pop(pair, None) pos = self.positions.pop(pair, None)
if pos: if pos:
self._save() self._save()
self._cooldowns[pair] = datetime.now(tz=timezone.utc).isoformat()
self._save_cooldowns()
return pos return pos
def on_cooldown(self, pair: str, cooldown_hours: float) -> bool:
sell_time_str = self._cooldowns.get(pair)
if not sell_time_str:
return False
sell_time = datetime.fromisoformat(sell_time_str)
elapsed = (datetime.now(tz=timezone.utc) - sell_time).total_seconds() / 3600
if elapsed < cooldown_hours:
log.debug("%s on cooldown: %.1fh of %.1fh elapsed", pair, elapsed, cooldown_hours)
return True
# Cooldown expired — clean it up
del self._cooldowns[pair]
self._save_cooldowns()
return False
def get(self, pair: str) -> Position | None: def get(self, pair: str) -> Position | None:
return self.positions.get(pair) return self.positions.get(pair)