from __future__ import annotations import logging from dataclasses import dataclass from config import Config from kraken_client import KrakenClient, KrakenError log = logging.getLogger(__name__) @dataclass class Opportunity: pair: str # Altname used for orders, e.g. "XBTUSD" last_price: float open_price: float change_pct: float # 24h price change percentage volume_usd: float # 24h volume in USD lot_decimals: int # Decimal precision for order quantity order_min: float # Kraken's minimum order quantity class Scanner: def __init__(self, client: KrakenClient, config: Config): self.client = client self.config = config def scan(self, exclude_pairs: set[str] | None = None) -> list[Opportunity]: """ Returns a list of trading opportunities sorted by volume (most liquid first). Filters by min_volume_usd and the configured 24h price change range. """ exclude = exclude_pairs or set() # Fetch all pair metadata in one call all_pairs = self.client.get_asset_pairs() # Filter to online USD-quoted pairs that aren't already held usd_pairs: dict[str, dict] = {} # altname → pair_info internal_to_alt: dict[str, str] = {} # internal_key → altname for internal_key, info in all_pairs.items(): altname = info.get("altname", "") quote = info.get("quote", "") if ( quote in ("ZUSD", "USD") and info.get("status") == "online" and not altname.endswith(".d") # skip dark-pool pairs and altname not in exclude ): usd_pairs[altname] = info internal_to_alt[internal_key] = altname if not usd_pairs: log.info("No eligible USD pairs found after filtering") return [] log.info("Fetching tickers for %d USD pairs...", len(usd_pairs)) try: raw_tickers = self.client.get_tickers(list(usd_pairs.keys())) except KrakenError as exc: log.error("Ticker fetch failed: %s", exc) return [] # Kraken may key the ticker response by altname or internal name — handle both ticker_by_alt: dict[str, dict] = {} for key, ticker in raw_tickers.items(): if key in usd_pairs: ticker_by_alt[key] = ticker elif key in internal_to_alt: ticker_by_alt[internal_to_alt[key]] = ticker opportunities: list[Opportunity] = [] for altname, info in usd_pairs.items(): ticker = ticker_by_alt.get(altname) if ticker is None: log.debug("No ticker for %s — skipping", altname) continue try: last_price = float(ticker["c"][0]) # last trade price open_price = float(ticker["o"]) # opening price (midnight UTC) volume_24h = float(ticker["v"][1]) # base volume over last 24h if open_price <= 0 or last_price <= 0: continue change_pct = (last_price - open_price) / open_price * 100 volume_usd = volume_24h * last_price if volume_usd < self.config.min_volume_usd: continue if not (self.config.min_price_change_pct <= change_pct <= self.config.max_price_change_pct): continue opportunities.append(Opportunity( pair=altname, last_price=last_price, open_price=open_price, change_pct=change_pct, volume_usd=volume_usd, lot_decimals=int(info.get("lot_decimals", 8)), order_min=float(info.get("ordermin", 0)), )) except (KeyError, ValueError, ZeroDivisionError): log.debug("Bad ticker data for %s — skipping", altname) continue # Most liquid first so we prefer established assets when slots are limited opportunities.sort(key=lambda o: o.volume_usd, reverse=True) log.info( "Scan complete: %d pairs checked, %d opportunities found", len(usd_pairs), len(opportunities), ) for opp in opportunities: log.info( " %-12s change=%+.2f%% volume=$%,.0f", opp.pair, opp.change_pct, opp.volume_usd, ) return opportunities