Files
crypto-trader/scanner.py
2026-05-16 00:08:45 -04:00

185 lines
6.6 KiB
Python

from __future__ import annotations
import logging
from dataclasses import dataclass, field
from config import Config
from kraken_client import KrakenClient, KrakenError
log = logging.getLogger(__name__)
@dataclass
class Opportunity:
pair: str # Altname used for orders, e.g. "XBTUSD"
last_price: float
open_price: float
change_pct: float # 24h price change percentage
volume_usd: float # 24h volume in USD
lot_decimals: int # Decimal precision for order quantity
order_min: float # Kraken's minimum order quantity
rsi: float = field(default=0.0)
recent_change_pct: float = field(default=0.0)
def _calculate_rsi(closes: list[float], period: int) -> float:
"""Wilder's smoothed RSI. Returns 50 (neutral) if there isn't enough data."""
if len(closes) < period + 1:
return 50.0
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
gains = [max(d, 0.0) for d in deltas]
losses = [max(-d, 0.0) for d in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
for i in range(period, len(gains)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
return 100.0
return 100.0 - (100.0 / (1 + avg_gain / avg_loss))
class Scanner:
def __init__(self, client: KrakenClient, config: Config):
self.client = client
self.config = config
def _velocity_check(self, pair: str) -> tuple[bool, float, float]:
"""
Fetch hourly OHLC and verify:
1. RSI is below rsi_max (not overbought)
2. Price is higher now than it was recent_candles hours ago (still moving up)
Returns (passes, rsi, recent_change_pct).
"""
try:
candles = self.client.get_ohlc(pair, interval=60)
except KrakenError as exc:
log.debug("OHLC fetch failed for %s: %s", pair, exc)
return False, 0.0, 0.0
needed = self.config.rsi_period + self.config.recent_candles + 2
if len(candles) < needed:
log.debug("%s: only %d candles, need %d — skipping", pair, len(candles), needed)
return False, 0.0, 0.0
closes = [float(c[4]) for c in candles]
rsi = _calculate_rsi(closes, self.config.rsi_period)
# Compare current close to the close N hours ago
ago_close = closes[-(self.config.recent_candles + 1)]
recent_change = (closes[-1] - ago_close) / ago_close * 100 if ago_close else 0.0
passes = rsi <= self.config.rsi_max and recent_change > 0
return passes, rsi, recent_change
def scan(self, exclude_pairs: set[str] | None = None) -> list[Opportunity]:
"""
Two-phase scan:
Phase 1 (cheap): bulk ticker call — filter by 24h change and volume.
Phase 2 (per-coin): OHLC call — filter by RSI and recent hourly momentum.
"""
exclude = exclude_pairs or set()
# ── Phase 1: bulk ticker pre-filter ──────────────────────────────────
all_pairs = self.client.get_asset_pairs()
usd_pairs: dict[str, dict] = {}
internal_to_alt: dict[str, str] = {}
for internal_key, info in all_pairs.items():
altname = info.get("altname", "")
if (
info.get("quote") in ("ZUSD", "USD")
and info.get("status") == "online"
and not altname.endswith(".d")
and altname not in exclude
):
usd_pairs[altname] = info
internal_to_alt[internal_key] = altname
if not usd_pairs:
log.info("No eligible USD pairs found after filtering")
return []
log.info("Fetching tickers for %d USD pairs...", len(usd_pairs))
try:
raw_tickers = self.client.get_tickers(list(usd_pairs.keys()))
except KrakenError as exc:
log.error("Ticker fetch failed: %s", exc)
return []
ticker_by_alt: dict[str, dict] = {}
for key, ticker in raw_tickers.items():
if key in usd_pairs:
ticker_by_alt[key] = ticker
elif key in internal_to_alt:
ticker_by_alt[internal_to_alt[key]] = ticker
candidates: list[Opportunity] = []
for altname, info in usd_pairs.items():
ticker = ticker_by_alt.get(altname)
if ticker is None:
continue
try:
last_price = float(ticker["c"][0])
open_price = float(ticker["o"])
volume_24h = float(ticker["v"][1])
if open_price <= 0 or last_price <= 0:
continue
change_pct = (last_price - open_price) / open_price * 100
volume_usd = volume_24h * last_price
if volume_usd < self.config.min_volume_usd:
continue
if not (self.config.min_price_change_pct <= change_pct <= self.config.max_price_change_pct):
continue
candidates.append(Opportunity(
pair=altname,
last_price=last_price,
open_price=open_price,
change_pct=change_pct,
volume_usd=volume_usd,
lot_decimals=int(info.get("lot_decimals", 8)),
order_min=float(info.get("ordermin", 0)),
))
except (KeyError, ValueError, ZeroDivisionError):
continue
log.info(
"Phase 1 complete: %d pairs checked, %d candidates pass volume/change filter",
len(usd_pairs), len(candidates),
)
if not candidates:
return []
# ── Phase 2: velocity check (OHLC per candidate) ─────────────────────
opportunities: list[Opportunity] = []
for opp in candidates:
passes, rsi, recent_change = self._velocity_check(opp.pair)
log.info(
" %-12s change=%+.2f%% rsi=%.1f recent_%dh=%+.2f%% %s",
opp.pair, opp.change_pct, rsi,
self.config.recent_candles, recent_change,
"OK" if passes else "SKIP",
)
if passes:
opp.rsi = rsi
opp.recent_change_pct = recent_change
opportunities.append(opp)
log.info(
"Phase 2 complete: %d of %d candidates passed velocity check",
len(opportunities), len(candidates),
)
return opportunities