185 lines
6.6 KiB
Python
185 lines
6.6 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
|
|
from config import Config
|
|
from kraken_client import KrakenClient, KrakenError
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Opportunity:
|
|
pair: str # Altname used for orders, e.g. "XBTUSD"
|
|
last_price: float
|
|
open_price: float
|
|
change_pct: float # 24h price change percentage
|
|
volume_usd: float # 24h volume in USD
|
|
lot_decimals: int # Decimal precision for order quantity
|
|
order_min: float # Kraken's minimum order quantity
|
|
rsi: float = field(default=0.0)
|
|
recent_change_pct: float = field(default=0.0)
|
|
|
|
|
|
def _calculate_rsi(closes: list[float], period: int) -> float:
|
|
"""Wilder's smoothed RSI. Returns 50 (neutral) if there isn't enough data."""
|
|
if len(closes) < period + 1:
|
|
return 50.0
|
|
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
|
|
gains = [max(d, 0.0) for d in deltas]
|
|
losses = [max(-d, 0.0) for d in deltas]
|
|
avg_gain = sum(gains[:period]) / period
|
|
avg_loss = sum(losses[:period]) / period
|
|
for i in range(period, len(gains)):
|
|
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
|
|
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
|
|
if avg_loss == 0:
|
|
return 100.0
|
|
return 100.0 - (100.0 / (1 + avg_gain / avg_loss))
|
|
|
|
|
|
class Scanner:
|
|
def __init__(self, client: KrakenClient, config: Config):
|
|
self.client = client
|
|
self.config = config
|
|
|
|
def _velocity_check(self, pair: str) -> tuple[bool, float, float]:
|
|
"""
|
|
Fetch hourly OHLC and verify:
|
|
1. RSI is below rsi_max (not overbought)
|
|
2. Price is higher now than it was recent_candles hours ago (still moving up)
|
|
|
|
Returns (passes, rsi, recent_change_pct).
|
|
"""
|
|
try:
|
|
candles = self.client.get_ohlc(pair, interval=60)
|
|
except KrakenError as exc:
|
|
log.debug("OHLC fetch failed for %s: %s", pair, exc)
|
|
return False, 0.0, 0.0
|
|
|
|
needed = self.config.rsi_period + self.config.recent_candles + 2
|
|
if len(candles) < needed:
|
|
log.debug("%s: only %d candles, need %d — skipping", pair, len(candles), needed)
|
|
return False, 0.0, 0.0
|
|
|
|
closes = [float(c[4]) for c in candles]
|
|
|
|
rsi = _calculate_rsi(closes, self.config.rsi_period)
|
|
|
|
# Compare current close to the close N hours ago
|
|
ago_close = closes[-(self.config.recent_candles + 1)]
|
|
recent_change = (closes[-1] - ago_close) / ago_close * 100 if ago_close else 0.0
|
|
|
|
passes = rsi <= self.config.rsi_max and recent_change > 0
|
|
return passes, rsi, recent_change
|
|
|
|
def scan(self, exclude_pairs: set[str] | None = None) -> list[Opportunity]:
|
|
"""
|
|
Two-phase scan:
|
|
Phase 1 (cheap): bulk ticker call — filter by 24h change and volume.
|
|
Phase 2 (per-coin): OHLC call — filter by RSI and recent hourly momentum.
|
|
"""
|
|
exclude = exclude_pairs or set()
|
|
|
|
# ── Phase 1: bulk ticker pre-filter ──────────────────────────────────
|
|
all_pairs = self.client.get_asset_pairs()
|
|
|
|
usd_pairs: dict[str, dict] = {}
|
|
internal_to_alt: dict[str, str] = {}
|
|
|
|
for internal_key, info in all_pairs.items():
|
|
altname = info.get("altname", "")
|
|
if (
|
|
info.get("quote") in ("ZUSD", "USD")
|
|
and info.get("status") == "online"
|
|
and not altname.endswith(".d")
|
|
and altname not in exclude
|
|
):
|
|
usd_pairs[altname] = info
|
|
internal_to_alt[internal_key] = altname
|
|
|
|
if not usd_pairs:
|
|
log.info("No eligible USD pairs found after filtering")
|
|
return []
|
|
|
|
log.info("Fetching tickers for %d USD pairs...", len(usd_pairs))
|
|
|
|
try:
|
|
raw_tickers = self.client.get_tickers(list(usd_pairs.keys()))
|
|
except KrakenError as exc:
|
|
log.error("Ticker fetch failed: %s", exc)
|
|
return []
|
|
|
|
ticker_by_alt: dict[str, dict] = {}
|
|
for key, ticker in raw_tickers.items():
|
|
if key in usd_pairs:
|
|
ticker_by_alt[key] = ticker
|
|
elif key in internal_to_alt:
|
|
ticker_by_alt[internal_to_alt[key]] = ticker
|
|
|
|
candidates: list[Opportunity] = []
|
|
|
|
for altname, info in usd_pairs.items():
|
|
ticker = ticker_by_alt.get(altname)
|
|
if ticker is None:
|
|
continue
|
|
try:
|
|
last_price = float(ticker["c"][0])
|
|
open_price = float(ticker["o"])
|
|
volume_24h = float(ticker["v"][1])
|
|
|
|
if open_price <= 0 or last_price <= 0:
|
|
continue
|
|
|
|
change_pct = (last_price - open_price) / open_price * 100
|
|
volume_usd = volume_24h * last_price
|
|
|
|
if volume_usd < self.config.min_volume_usd:
|
|
continue
|
|
if not (self.config.min_price_change_pct <= change_pct <= self.config.max_price_change_pct):
|
|
continue
|
|
|
|
candidates.append(Opportunity(
|
|
pair=altname,
|
|
last_price=last_price,
|
|
open_price=open_price,
|
|
change_pct=change_pct,
|
|
volume_usd=volume_usd,
|
|
lot_decimals=int(info.get("lot_decimals", 8)),
|
|
order_min=float(info.get("ordermin", 0)),
|
|
))
|
|
except (KeyError, ValueError, ZeroDivisionError):
|
|
continue
|
|
|
|
log.info(
|
|
"Phase 1 complete: %d pairs checked, %d candidates pass volume/change filter",
|
|
len(usd_pairs), len(candidates),
|
|
)
|
|
|
|
if not candidates:
|
|
return []
|
|
|
|
# ── Phase 2: velocity check (OHLC per candidate) ─────────────────────
|
|
opportunities: list[Opportunity] = []
|
|
|
|
for opp in candidates:
|
|
passes, rsi, recent_change = self._velocity_check(opp.pair)
|
|
log.info(
|
|
" %-12s change=%+.2f%% rsi=%.1f recent_%dh=%+.2f%% %s",
|
|
opp.pair, opp.change_pct, rsi,
|
|
self.config.recent_candles, recent_change,
|
|
"OK" if passes else "SKIP",
|
|
)
|
|
if passes:
|
|
opp.rsi = rsi
|
|
opp.recent_change_pct = recent_change
|
|
opportunities.append(opp)
|
|
|
|
log.info(
|
|
"Phase 2 complete: %d of %d candidates passed velocity check",
|
|
len(opportunities), len(candidates),
|
|
)
|
|
|
|
return opportunities
|