From Zero
To Production
Price Action Scanner
A complete, plain-English + code-ready blueprint for building a real-time Nifty 500 breakout/breakdown scanner using Angel One API, XGBoost ML ranking, and Telegram alerts — explained so simply a 5-year-old could follow the logic, and so thoroughly a senior engineer could ship it.
Explained Like You're 5
Imagine a toy store where a toy's price tag is like a stock price. Some toys have a "floor price" — no matter what, the price never goes below ₹100. Some have a "ceiling price" — it never goes above ₹200. These are called Support (floor) and Resistance (ceiling).
Now imagine a toy that keeps bouncing between ₹100 and ₹200... but one day it breaks through the ₹200 ceiling. That's a Breakout! Our job is to spot this the moment it happens — for 500 stocks at once.
Angel One API
Our "price tag reader". It gives us live and historical prices for every Nifty 500 stock every 5 minutes (or 15 minutes).
Pattern Detector
The "rule book". It checks: has the price touched the same ceiling 3 times? Did it suddenly jump above? If yes → Breakout signal!
XGBoost Ranker
The "smartness layer". Out of 500 stocks, maybe 40 signal breakouts daily. XGBoost ranks which 5 are MOST likely to actually move big.
Telegram Alert
The "ding!" on your phone. The moment a ranked signal fires, your Telegram group gets a detailed message with entry, target, and stop loss.
Web Dashboard
The "scoreboard". A live web page showing all active signals, ranked by confidence, with charts and status updates.
Risk Rules
The "safety belt". Never risk more than 1% of capital per trade. Always define stop loss before entry. Non-negotiable rules baked in.
Think of a car factory assembly line. Raw metal goes in → stamped into parts → painted → assembled → quality checked → shipped. Our scanner is the same: Raw price data goes in → cleaned → patterns detected → ML scored → alert sent → shown on screen.
SmartAPI WebSocket + REST
Fetch + Resample OHLCV
5-min & 15-min bars
Pivot Points + Touch Count
Signal Generator (5m+15m)
30+ Features → Score 0–1
Real-time Alert Dispatch
/signals /health endpoints
Live Dashboard + Charts
Signal history + Backtest
Containerised Deploy
Bar-close timing
Environment Requirements
# Core Data + Numerics pandas==2.2.0 numpy==1.26.4 scipy==1.12.0 # Angel One SmartAPI smartapi-python==1.3.4 websocket-client==1.7.0 pyotp==2.9.0 # Machine Learning xgboost==2.0.3 scikit-learn==1.4.0 optuna==3.5.0 # hyperparameter tuning # API + Web Backend fastapi==0.109.0 uvicorn==0.27.0 redis==5.0.1 sqlalchemy==2.0.25 psycopg2-binary==2.9.9 # Alerts python-telegram-bot==20.7 # Visualisation (Jupyter) plotly==5.18.0 mplfinance==0.12.10b0 jupyter==1.0.0
Folder Structure
nifty500_scanner/ ├── data/ │ ├── ingestor.py # Angel One fetch │ ├── preprocessor.py # clean + resample │ └── cache.py # Redis interface ├── patterns/ │ ├── sr_detector.py # Support/Resistance │ ├── breakout.py # Signal logic │ └── liquidity.py # Grab detection ├── scanner/ │ ├── scanner_5m.py # 5-min module │ ├── scanner_15m.py # 15-min module │ └── watchlist.py # Nifty 500 list ├── ml/ │ ├── features.py # Feature engineering │ ├── train.py # XGBoost training │ └── ranker.py # Live inference ├── alerts/ │ └── telegram_bot.py # Telegram sender ├── api/ │ └── main.py # FastAPI endpoints ├── notebooks/ │ └── 01_prototype.ipynb # Jupyter MVP ├── docker-compose.yml └── .env
Imagine a weather station sending you temperature readings every minute. But sometimes the sensor glitches and sends crazy numbers (99999°C), or gaps exist. Before we trust the data, we clean it — remove the crazy numbers, fill gaps, and then group readings into 5-minute or 15-minute buckets (like averaging temperature every 5 minutes instead of every second).
import os from SmartApi import SmartConnect import pyotp import pandas as pd from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class AngelOneIngestor: """ Handles all data fetching from Angel One SmartAPI. Supports historical OHLCV and live tick data. """ TIMEFRAME_MAP = { "5m": "FIVE_MINUTE", "15m": "FIFTEEN_MINUTE", "1h": "ONE_HOUR", "1d": "ONE_DAY", } def __init__(self): self.api = SmartConnect(api_key=os.getenv("ANGEL_API_KEY")) totp = pyotp.TOTP(os.getenv("ANGEL_TOTP_SECRET")) self.api.generateSession( os.getenv("ANGEL_CLIENT_ID"), os.getenv("ANGEL_MPIN"), totp.now() ) logger.info("✅ Angel One session established") def fetch_candles( self, symbol_token: str, # e.g. "2885" for RELIANCE exchange: str = "NSE", timeframe: str = "5m", lookback_days: int = 30 ) -> pd.DataFrame: """Fetch OHLCV candles for a single symbol.""" to_date = datetime.now() from_date = to_date - timedelta(days=lookback_days) params = { "exchange": exchange, "symboltoken": symbol_token, "interval": self.TIMEFRAME_MAP[timeframe], "fromdate": from_date.strftime("%Y-%m-%d %H:%M"), "todate": to_date.strftime("%Y-%m-%d %H:%M"), } resp = self.api.getCandleData(params) if not resp["data"]: raise ValueError(f"No data returned for token {symbol_token}") df = pd.DataFrame( resp["data"], columns=["timestamp","open","high","low","close","volume"] ) df["timestamp"] = pd.to_datetime(df["timestamp"]) df = df.set_index("timestamp").sort_index() return self._clean(df) def _clean(self, df: pd.DataFrame) -> pd.DataFrame: """ Preprocessing pipeline: 1. Remove pre-market / post-market bars (keep 09:15 - 15:30) 2. Drop rows with zero volume (non-trading bars) 3. Forward-fill micro-gaps (<= 2 bars) 4. Remove clear outliers (price > 5x median or < 0) """ # 1. Market hours filter df = df.between_time("09:15", "15:30") # 2. Zero-volume drop df = df[df["volume"] > 0] # 3. Forward-fill gaps up to 2 bars full_idx = pd.date_range( df.index.min(), df.index.max(), freq="5min" ) df = df.reindex(full_idx).ffill(limit=2).dropna() # 4. Outlier removal med_close = df["close"].median() df = df[(df["close"] > 0) & (df["close"] < med_close * 5)] return df.astype(float) def resample_to_15m(self, df_5m: pd.DataFrame) -> pd.DataFrame: """Convert 5-min bars to 15-min bars using OHLCV aggregation.""" return df_5m.resample("15min").agg({ "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", }).dropna()
Handling Data Edge Cases
| Problem | Cause | Solution |
|---|---|---|
| Missing bars | Circuit breakers, no trades | ffill up to 2 bars; mark as synthetic |
| Extreme prices | API glitch, split unadjusted | Filter if close > 5× median |
| Pre/post market noise | SGX NIfty, ADR moves | Hard clip to 09:15–15:30 |
| Volume = 0 | Holiday partial session | Drop row entirely |
| Timezone mismatch | API returns UTC | Convert to Asia/Kolkata always |
Drop a ball. It bounces off the floor (support) 3 times. The 4th time you drop it — it crashes through the floor. That's a breakdown! The opposite (hitting the ceiling 3 times then bursting through) is a breakout. Our detector counts these bounces and fires when the price "breaks through".
Support/Resistance Levels
import numpy as np import pandas as pd from scipy.signal import argrelextrema class SRDetector: """ Identifies significant Support & Resistance levels using local extrema + cluster merging. """ def __init__(self, window: int = 5, # bars each side for pivot cluster_pct: float = 0.003, # 0.3% merge radius min_touches: int = 2 # min touches to be "valid" ): self.window = window self.cluster_pct = cluster_pct self.min_touches = min_touches def find_levels(self, df: pd.DataFrame) -> dict: """ Returns dict with 'support' and 'resistance' lists, each level having: price, touches, strength_score. """ highs = df["high"].values lows = df["low"].values # Local maxima → Resistance candidates res_idx = argrelextrema(highs, np.greater_equal, order=self.window)[0] # Local minima → Support candidates sup_idx = argrelextrema(lows, np.less_equal, order=self.window)[0] res_prices = highs[res_idx] sup_prices = lows[sup_idx] resistance = self._cluster_levels(res_prices, df) support = self._cluster_levels(sup_prices, df) return { "resistance": [l for l in resistance if l["touches"] >= self.min_touches], "support": [l for l in support if l["touches"] >= self.min_touches], } def _cluster_levels(self, prices, df) -> list: """Merge nearby price levels into single S/R zones.""" if len(prices) == 0: return [] levels, seen = [], set() for price in np.sort(prices): if any(abs(price - s) / s < self.cluster_pct for s in seen): continue seen.add(price) touches = self._count_touches(price, df) levels.append({ "price": round(price, 2), "touches": touches, "strength": min(touches / 5, 1.0), # 0-1 score }) return levels def _count_touches(self, level: float, df) -> int: """Count how many bars came within 0.2% of this level.""" tol = level * 0.002 # 0.2% tolerance return int(((df["high"] >= level - tol) & (df["low"] <= level + tol)).sum())
Breakout/Breakdown Engine
from dataclasses import dataclass from typing import Optional, Literal import pandas as pd SignalType = Literal["BREAKOUT", "BREAKDOWN", "NONE"] @dataclass class Signal: symbol: str signal_type: SignalType timeframe: str # "5m" or "15m" level: float # broken S or R price close: float # current candle close touches: int # tests before breakout vol_ratio: float # vol / avg_vol_20 body_pct: float # candle body size % liq_grab: bool # liquidity grab detected timestamp: str class BreakoutEngine: """ Criteria for a VALID BREAKOUT: ───────────────────────────── ✅ R-level tested ≥ 2 times before this bar ✅ Close > resistance + (ATR * 0.25) → avoid false breaks ✅ Candle body > 50% of total range → strong conviction bar ✅ Volume ≥ 1.5x 20-bar average (configurable) Criteria for a VALID BREAKDOWN: ──────────────────────────────── ✅ S-level tested ≥ 2 times before this bar ✅ Close < support - (ATR * 0.25) ✅ Candle body > 50% of total range ✅ Volume ≥ 1.5x 20-bar average """ def __init__(self, atr_buffer_mult: float = 0.25, min_vol_ratio: float = 1.5, min_body_pct: float = 0.5, min_touches: int = 2, ): self.atr_mult = atr_buffer_mult self.vol_thresh = min_vol_ratio self.body_thresh = min_body_pct self.min_touches = min_touches def evaluate(self, symbol: str, df: pd.DataFrame, levels: dict, timeframe: str) -> Optional[Signal]: """ Evaluate the latest bar against detected S/R levels. Returns Signal if breakout/breakdown detected, else None. """ bar = df.iloc[-1] atr = self._atr(df, 14) avg_vol = df["volume"].iloc[-21:-1].mean() vol_ratio = bar["volume"] / avg_vol if avg_vol > 0 else 0 bar_range = bar["high"] - bar["low"] body_pct = (abs(bar["close"] - bar["open"]) / bar_range) if bar_range > 0 else 0 # CHECK RESISTANCE BREAKOUT for level in levels.get("resistance", []): if (level["touches"] >= self.min_touches and bar["close"] > level["price"] + atr * self.atr_mult and body_pct >= self.body_thresh and vol_ratio >= self.vol_thresh): return Signal( symbol=symbol, signal_type="BREAKOUT", timeframe=timeframe, level=level["price"], close=bar["close"], touches=level["touches"], vol_ratio=round(vol_ratio,2), body_pct=round(body_pct,2), liq_grab=self._liq_grab_check(df, level["price"]), timestamp=str(df.index[-1]) ) # CHECK SUPPORT BREAKDOWN (mirror logic) for level in levels.get("support", []): if (level["touches"] >= self.min_touches and bar["close"] < level["price"] - atr * self.atr_mult and body_pct >= self.body_thresh and vol_ratio >= self.vol_thresh): return Signal( symbol=symbol, signal_type="BREAKDOWN", timeframe=timeframe, level=level["price"], close=bar["close"], touches=level["touches"], vol_ratio=round(vol_ratio,2), body_pct=round(body_pct,2), liq_grab=self._liq_grab_check(df, level["price"]), timestamp=str(df.index[-1]) ) return None def _atr(self, df, period=14) -> float: """Average True Range — measures volatility.""" h, l, pc = df["high"], df["low"], df["close"].shift(1) tr = pd.concat([h-l, (h-pc).abs(), (l-pc).abs()], axis=1).max(axis=1) return tr.rolling(period).mean().iloc[-1] def _liq_grab_check(self, df, level: float) -> bool: """ Liquidity Grab: prior bar wicks BELOW support (or above resistance) but closes back above/below — a stop hunt before the real move. Detected in last 3 bars. """ recent = df.iloc[-4:-1] # last 3 bars before signal tol = level * 0.003 wicked_through = (recent["low"] < level - tol) closed_above = (recent["close"] > level) return bool((wicked_through & closed_above).any())
Signal Quality Thresholds — Quick Reference
| Parameter | 5-min Default | 15-min Default | Why |
|---|---|---|---|
| Min S/R Touches | 2 | 2 | Fewer touches = less reliable level |
| ATR Buffer Multiplier | 0.25× | 0.35× | 15-min needs bigger confirmation |
| Volume Ratio (min) | 1.5× | 1.3× | 15-min bars aggregate more volume |
| Body % of Range | 50% | 50% | Doji candles = weak signal |
| Lookback Window | 30 days | 30 days | More history = better S/R quality |
Imagine 50 kids auditioning for a movie. A talent scout doesn't just pick randomly — they look at acting skill, confidence, camera presence, and past performance. XGBoost is our talent scout. It scores each breakout signal on 30+ factors and says "this one has an 87% chance of following through — send the alert!"
Feature Definitions (30+ features, zero look-ahead)
| Category | Feature Name | Formula | Type |
|---|---|---|---|
| Price Action | body_pct | |close - open| / (high - low) | CORE |
| Price Action | wick_ratio | upper_wick / body | CORE |
| Price Action | gap_pct | (open - prev_close) / prev_close | CORE |
| S/R Strength | sr_touches | # of touches at level | CORE |
| S/R Strength | sr_age_bars | bars since level first formed | CONTEXT |
| S/R Strength | level_break_pct | (close - level) / level × 100 | CORE |
| Volume | vol_ratio_20 | volume / mean(volume, 20) | CORE |
| Volume | vol_trend_5 | vol slope over last 5 bars | MOMENTUM |
| Momentum | rsi_14 | RSI(14) at signal bar | MOMENTUM |
| Momentum | adx_14 | ADX(14) — trend strength | MOMENTUM |
| Momentum | macd_hist | MACD histogram at signal | MOMENTUM |
| Volatility | atr_pct | ATR(14) / close × 100 | CONTEXT |
| Volatility | bbw_20 | Bollinger Band Width (20,2) | CONTEXT |
| Volatility | hist_vol_10 | σ(log returns, 10 bars) | CONTEXT |
| Liq. Grab | liq_grab_flag | 1/0 boolean | SPECIAL |
| Time | hour_of_day | int 9–15 (market hours) | SPECIAL |
| Time | bar_from_open | # bars since 09:15 | SPECIAL |
| Sector | sector_momentum | sector ETF return same day | CONTEXT |
import pandas as pd import numpy as np class FeatureEngineer: """ Builds feature vector for XGBoost inference. ⚠️ CRITICAL: all features use ONLY past data (df.iloc[:-1]) to prevent look-ahead bias. """ def build(self, df: pd.DataFrame, signal, levels: dict) -> dict: """Build full feature dict for a given signal + OHLCV dataframe.""" bar = df.iloc[-1] hist = df.iloc[:-1] # ← ONLY HISTORICAL, never current bar features = {} # ── Price Action Features ────────────────────────────── bar_range = bar["high"] - bar["low"] features["body_pct"] = abs(bar["close"] - bar["open"]) / (bar_range + 1e-9) features["wick_upper"]= (bar["high"] - max(bar["open"],bar["close"])) / (bar_range + 1e-9) features["wick_lower"]= (min(bar["open"],bar["close"]) - bar["low"]) / (bar_range + 1e-9) features["close_pct"] = (bar["close"] - bar["low"]) / (bar_range + 1e-9) prev = hist.iloc[-1] features["gap_pct"] = (bar["open"] - prev["close"]) / (prev["close"] + 1e-9) features["ret_1bar"] = (bar["close"] - prev["close"]) / (prev["close"] + 1e-9) # ── S/R Level Features ──────────────────────────────── features["sr_touches"] = signal.touches features["level_break_pct"] = abs(bar["close"] - signal.level) / signal.level features["liq_grab"] = int(signal.liq_grab) # ── Volume Features ─────────────────────────────────── vol_window = hist["volume"].iloc[-20:] features["vol_ratio_20"] = bar["volume"] / (vol_window.mean() + 1) features["vol_trend_5"] = np.polyfit( range(5), hist["volume"].iloc[-5:].values, 1 )[0] # ── Momentum Indicators (computed on hist only) ─────── features["rsi_14"] = self._rsi(hist, 14) features["adx_14"] = self._adx(hist, 14) features["macd_hist"] = self._macd_hist(hist) # ── Volatility ──────────────────────────────────────── atr = self._atr(hist, 14) features["atr_pct"] = atr / bar["close"] log_rets = np.log(hist["close"] / hist["close"].shift(1)).dropna() features["hist_vol_10"] = log_rets.iloc[-10:].std() # ── Time Context ────────────────────────────────────── ts = df.index[-1] features["hour_of_day"] = ts.hour + ts.minute / 60 open_time = ts.replace(hour=9, minute=15) features["mins_from_open"] = (int)((ts - open_time).total_seconds() / 60) features["is_first_30min"] = int(features["mins_from_open"] <= 30) features["is_last_30min"] = int(features["mins_from_open"] >= 330) return features def _rsi(self, df, p=14): d = df["close"].diff() g = d.clip(lower=0).ewm(alpha=1/p).mean() l = (-d).clip(lower=0).ewm(alpha=1/p).mean() rs = g / (l + 1e-9) return (100 - 100 / (1 + rs)).iloc[-1] # _adx and _macd_hist follow same pattern (omitted for brevity) def _atr(self, df, p=14): h, l, pc = df["high"], df["low"], df["close"].shift(1) tr = pd.concat([h-l,(h-pc).abs(),(l-pc).abs()],axis=1).max(axis=1) return tr.rolling(p).mean().iloc[-1]
Training Plan
- Collect 12 months of historical signals via backtest
- Label: did price move ≥1.5× ATR in target direction within 10 bars? → 1 else 0
- Train/Validation split: time-based (no shuffling!) — first 8 months train, last 4 validate
- Class imbalance: expect ~30% win rate; use scale_pos_weight in XGBoost
- Tune with Optuna (300 trials): max_depth, n_estimators, learning_rate, subsample
- Evaluate with precision-recall AUC (not accuracy — imbalanced classes)
- Retrain monthly with walk-forward validation
Look-ahead Bias Prevention
⚠️ CRITICAL RULES — NEVER VIOLATE
- Feature computation: always use df.iloc[:-1] (exclude current bar's data)
- Labels (training): use future returns from T+1, never same-bar close
- Validation split: strictly time-ordered — no random shuffling
- No future indicators (tomorrow's volume, etc.) in features
- Bar-close signaling: compute signal ONLY after bar closes (09:20 for 5m, 09:30 for 15m)
When a smoke detector goes off, it doesn't just make a noise — it tells you which floor, which room, and what to do. Our Telegram alert tells traders: which stock, breakout or breakdown, at what price, where to put stop loss, and the ML confidence score.
import asyncio from telegram import Bot import os class TelegramAlerter: def __init__(self): self.bot = Bot(token=os.getenv("TELEGRAM_TOKEN")) self.chat_id = os.getenv("TELEGRAM_CHAT_ID") async def send_signal(self, signal, ml_score: float): """Send formatted signal alert to Telegram channel.""" emoji = "🚀" if signal.signal_type == "BREAKOUT" else "📉" direction = "BUY" if signal.signal_type == "BREAKOUT" else "SELL/SHORT" # Calculate levels (example: ATR-based targets) sl_dist = signal.close * 0.007 # 0.7% stop target = signal.close + (2 * sl_dist) if direction == "BUY" else signal.close - (2 * sl_dist) stop = signal.close - sl_dist if direction == "BUY" else signal.close + sl_dist msg = f""" {emoji} *{signal.signal_type} ALERT* — {signal.symbol} 📋 *Timeframe:* `{signal.timeframe}` 💰 *CMP:* `₹{signal.close:,.2f}` 🔑 *Key Level:* `₹{signal.level:,.2f}` ({signal.touches} tests) 📊 *Direction:* `{direction}` 🎯 *Target:* `₹{target:,.2f}` (+{(target/signal.close-1)*100:.1f}%) 🛑 *Stop Loss:* `₹{stop:,.2f}` (-{abs(stop/signal.close-1)*100:.1f}%) 📐 *Risk:Reward:* `1:2` 🔥 *Volume:* `{signal.vol_ratio:.1f}x average` ⚡ *Liq Grab:* `{"YES ✅" if signal.liq_grab else "NO"}` 🤖 *ML Score:* `{ml_score*100:.0f}/100` ⏰ `{signal.timestamp}` _This is NOT financial advice. Trade at your own risk._ """ await self.bot.send_message( chat_id=self.chat_id, text=msg, parse_mode="Markdown" ) def send(self, signal, ml_score: float): """Sync wrapper for async send.""" asyncio.run(self.send_signal(signal, ml_score))
📋 Timeframe: 5m
💰 CMP: ₹2,847.50
🔑 Key Level: ₹2,835.00 (3 tests)
📊 Direction: BUY
🎯 Target: ₹2,887.30 (+1.4%)
🛑 Stop Loss: ₹2,827.60 (-0.7%)
📐 Risk:Reward: 1:2
🔥 Volume: 2.3x average
⚡ Liq Grab: YES ✅
🤖 ML Score: 83/100
⏰ 2026-01-15 10:35:00 IST
This is NOT financial advice.
Setup Checklist
- Create Telegram Bot via @BotFather → get TOKEN
- Create a channel or group → get CHAT_ID (use @getidsbot)
- Set env vars: TELEGRAM_TOKEN and TELEGRAM_CHAT_ID
- Test with send_test() method before live
- Rate limit: max 1 message per signal per symbol per session
- Add error handling for Telegram API rate limits (429)
import schedule, time, logging from datetime import datetime from concurrent.futures import ThreadPoolExecutor import xgboost as xgb from data.ingestor import AngelOneIngestor from patterns.sr_detector import SRDetector from patterns.breakout import BreakoutEngine from ml.features import FeatureEngineer from alerts.telegram_bot import TelegramAlerter from scanner.watchlist import NIFTY500_TOKENS # {symbol: token} dict logger = logging.getLogger(__name__) class Scanner5m: """ 5-Minute Intraday Breakout/Breakdown Scanner. Runs on every 5-min bar close (09:20, 09:25, ..., 15:25). Scans all 500 stocks in parallel using thread pool. """ MIN_ML_SCORE = 0.60 # only alert if model confidence ≥ 60% WORKERS = 20 # parallel threads for 500 stocks def __init__(self): self.ingestor = AngelOneIngestor() self.sr = SRDetector(min_touches=2) self.engine = BreakoutEngine() self.features = FeatureEngineer() self.alerter = TelegramAlerter() self.model = xgb.Booster() self.model.load_model("ml/model_5m.json") self.alerted_today = set() # avoid duplicate alerts def scan_symbol(self, symbol: str, token: str): """Pipeline for a single stock. Returns Signal or None.""" try: df = self.ingestor.fetch_candles(token, timeframe="5m") levels = self.sr.find_levels(df.iloc[:-1]) # ← no look-ahead! signal = self.engine.evaluate(symbol, df, levels, "5m") if signal is None: return None feats = self.features.build(df, signal, levels) dmat = xgb.DMatrix([list(feats.values())]) score = float(self.model.predict(dmat)[0]) signal.ml_score = score if score >= self.MIN_ML_SCORE: logger.info(f"🎯 {symbol} score={score:.2f} → ALERT") return signal except Exception as e: logger.warning(f"{symbol} scan failed: {e}") return None def run_scan(self): """Run full 500-stock scan at bar close. Called by scheduler.""" now = datetime.now() logger.info(f"⏱ Scan started at {now.strftime('%H:%M:%S')}") items = list(NIFTY500_TOKENS.items()) signals_found = [] with ThreadPoolExecutor(max_workers=self.WORKERS) as ex: futures = { ex.submit(self.scan_symbol, sym, tok): sym for sym, tok in items if sym not in self.alerted_today } for f in futures: result = f.result() if result: signals_found.append(result) self.alerted_today.add(result.symbol) # Sort by ML score, send top alerts signals_found.sort(key=lambda s: s.ml_score, reverse=True) for sig in signals_found[:10]: # max 10 alerts per scan self.alerter.send(sig, sig.ml_score) logger.info(f"✅ Scan done. Found {len(signals_found)} signals.") def start(self): """Schedule scans at every 5-min bar close during market hours.""" # Run at :20 past every hour (to ensure 5-min bar is fully closed) for minute in [20,25,30,35,40,45,50,55]: schedule.every().hour.at(f":{minute:02d}").do(self.run_scan) schedule.every().hour.at(":00").do(self.run_scan) schedule.every().hour.at(":05").do(self.run_scan) schedule.every().hour.at(":10").do(self.run_scan) schedule.every().hour.at(":15").do(self.run_scan) logger.info("🚀 5-min scanner running. Ctrl+C to stop.") while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": scanner = Scanner5m() scanner.start()
FastAPI Web Endpoint
from fastapi import FastAPI, WebSocket from fastapi.middleware.cors import CORSMiddleware import json app = FastAPI(title="Nifty500 Scanner API", version="1.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"]) # In-memory signal store (replace with Redis/DB in production) active_signals = [] @app.get("/signals") async def get_signals( timeframe: str = "all", min_score: float = 0.6, signal_type: str = "all" ): """Return filtered list of active signals.""" results = active_signals if timeframe != "all": results = [s for s in results if s["timeframe"] == timeframe] if signal_type != "all": results = [s for s in results if s["type"] == signal_type] results = [s for s in results if s["ml_score"] >= min_score] return {"signals": sorted(results, key=lambda x: -x["ml_score"])} @app.websocket("/ws/signals") async def websocket_signals(ws: WebSocket): """WebSocket for real-time signal push to web UI.""" await ws.accept() try: while True: await ws.send_json({"signals": active_signals[-20:]}) await asyncio.sleep(5) except: pass @app.get("/health") async def health(): return {"status": "ok", "active_signals": len(active_signals)}
Phase 1 — MVP (Week 1–2)
5-min scanner with pattern detection + Telegram alerts. No ML yet. Manual confirmation required.
- Setup Angel One SmartAPI credentials + test connection
- Build AngelOneIngestor with cleaning pipeline
- Implement SRDetector with pivot point method
- Build BreakoutEngine with basic criteria (no ML)
- Wire TelegramAlerter with formatted message template
- Create scheduler (APScheduler or schedule) at bar close
- Load Nifty 500 watchlist with token mapping
- Test end-to-end with paper: 5 stocks first, then 50, then 500
- Add basic logging and error handling
- Deliverable: Running Python script sending Telegram alerts live
Phase 2 — Intelligence Layer (Week 3–5)
15-min scanner, XGBoost model training, and signal scoring. Alerts now include ML confidence.
- Build 15-min scanner module (mirror of 5-min, adjusted thresholds)
- Build FeatureEngineer with full 30+ feature set
- Collect historical signals via backtest (12 months data)
- Label signals with forward return outcomes
- Train XGBoost with time-series cross-validation
- Tune with Optuna, evaluate on precision-recall AUC
- Integrate ranker into both scanner modules
- Add MIN_ML_SCORE threshold (start at 0.60, tune)
- Add signal deduplication (no repeat alerts same stock same day)
- Deliverable: Dual scanner with ML-ranked Telegram alerts
Phase 3 — Web Dashboard (Week 6–8)
FastAPI backend + React frontend. Live WebSocket signal feed. Sandboxed demo mode.
- Build FastAPI endpoints: /signals, /health, /ws/signals
- Build React dashboard: signal table, candlestick chart overlay
- Add filter controls: timeframe, signal type, min score
- WebSocket integration for real-time signal updates
- PostgreSQL persistence for signal history and backtest results
- Docker Compose: API + Scanner + Redis + PostgreSQL + Frontend
- Demo mode with recorded signals (no live API needed for showcase)
- HTTPS via Nginx reverse proxy + SSL (Let's Encrypt)
- Deliverable: Public-accessible web dashboard with live/demo signals
Even the best driver wears a seatbelt. Risk management is your seatbelt. It doesn't stop you from driving — it limits the damage if something goes wrong. These rules are built into the system, not optional suggestions.
Position Sizing Formula
def calculate_position( capital: float, # total account capital entry: float, # entry price stop_loss: float, # stop loss price risk_pct: float = 0.01 # risk 1% per trade max ) -> dict: """ Position sizing using the 1% Rule. Example: ₹5,00,000 capital, entry ₹500, stop ₹490 → Risk per share = ₹10 → Max risk = ₹5000 (1% of capital) → Shares = 5000 / 10 = 500 → Position value = 500 × ₹500 = ₹2,50,000 (50% of capital) """ risk_per_share = abs(entry - stop_loss) if risk_per_share == 0: raise ValueError("Stop loss cannot equal entry!") max_risk_amount = capital * risk_pct shares = int(max_risk_amount / risk_per_share) position_value = shares * entry # Safety cap: never more than 20% of capital in one trade if position_value > capital * 0.20: shares = int((capital * 0.20) / entry) position_value = shares * entry return { "shares": shares, "position_value": round(position_value, 2), "risk_amount": round(shares * risk_per_share, 2), "risk_pct": round((shares * risk_per_share) / capital * 100, 2), "r_multiple_1": round(entry + risk_per_share * 2, 2), # 1:2 target }
Hard Risk Rules
🛑 SYSTEM-ENFORCED LIMITS
- Max 1% capital risk per trade — hardcoded in sizer
- Max 20% capital in single position — even if 1% allows more
- Max 5% total open risk at any time — no more than 5 simultaneous trades
- Stop Loss mandatory — no signal sent without defined SL
- No trading in first 5 minutes (09:15–09:20) — avoid opening noise
- No new positions after 14:30 — intraday squareoff risk
- Max 3 alerts per stock per day — prevents overtrading
Backtesting Considerations
- Use out-of-sample data only for final evaluation
- Include realistic transaction costs: 0.03% brokerage + STT + GST
- Account for slippage: assume +0.05% worse entry than signal close
- Avoid optimisation bias — choose thresholds before seeing test results
- Minimum 200 signals for statistically significant backtest
- Report drawdown, Sharpe ratio, and win rate separately
⚠️ LEGAL DISCLAIMER
This system is a research and educational tool. It does not constitute financial advice, investment recommendations, or trading signals endorsed by any regulated entity. All trading involves significant risk of loss. Past backtested performance does not guarantee future results. Consult a SEBI-registered investment advisor before making any trading decisions. The developer(s) assume no liability for financial losses arising from use of this system. Always test thoroughly in paper trading mode before risking real capital.
version: "3.9" services: redis: image: redis:7-alpine ports: ["6379:6379"] restart: always postgres: image: postgres:16 environment: POSTGRES_DB: scanner_db POSTGRES_USER: scanner POSTGRES_PASSWORD: "${DB_PASSWORD}" volumes: ["pgdata:/var/lib/postgresql/data"] restart: always scanner: build: . command: python -m scanner.scanner_5m env_file: .env depends_on: [redis, postgres] restart: always scanner_15m: build: . command: python -m scanner.scanner_15m env_file: .env depends_on: [redis, postgres] restart: always api: build: . command: uvicorn api.main:app --host 0.0.0.0 --port 8000 ports: ["8000:8000"] env_file: .env depends_on: [redis, postgres] restart: always frontend: image: nginx:alpine ports: ["80:80", "443:443"] volumes: - ./frontend/dist:/usr/share/nginx/html - ./nginx.conf:/etc/nginx/conf.d/default.conf restart: always volumes: pgdata:
Environment Variables (.env)
# Angel One SmartAPI ANGEL_API_KEY=your_api_key_here ANGEL_CLIENT_ID=your_client_id ANGEL_MPIN=your_mpin ANGEL_TOTP_SECRET=your_totp_base32_secret # Telegram TELEGRAM_TOKEN=bot123456:ABC-your-token TELEGRAM_CHAT_ID=-100123456789 # Database DATABASE_URL=postgresql://scanner:pass@postgres/scanner_db DB_PASSWORD=change_me_in_production # Redis REDIS_URL=redis://redis:6379/0 # Scanner Config MIN_ML_SCORE=0.60 MAX_ALERTS_PER_SCAN=10 ENABLE_15M_SCANNER=true
Quick Start Commands
# 1. Clone and setup git clone https://github.com/you/nifty500-scanner cd nifty500-scanner cp .env.example .env # → Fill in your credentials in .env # 2. Install dependencies pip install -r requirements.txt # 3. Test data fetch (single stock) python -c "from data.ingestor import *; i=AngelOneIngestor(); df=i.fetch_candles('2885'); print(df.tail())" # 4. Run MVP scanner (Phase 1) python -m scanner.scanner_5m # 5. Full production deploy docker-compose up -d # 6. Check logs docker-compose logs -f scanner # 7. Access API curl http://localhost:8000/signals curl http://localhost:8000/health
📋 Confirmed Assumptions
Scope: All 500 Nifty 500 constituents, updated monthly from NSE official list
Data: 5-min and 15-min OHLCV bars via Angel One SmartAPI (historical + real-time WebSocket)
API Access: Angel One Pro subscription required for SmartAPI + TOTP 2FA authentication
Alert Channel: Telegram Bot API — supports both private messages and group/channel broadcasts
Execution: Signals only — NOT auto-execution. Manual trade placement by user at their own discretion.
🚀 Next Steps to Get Started TODAY
1. Register on Angel One → apply for SmartAPI access at smartapi.angelbroking.com
2. Create a Telegram Bot via @BotFather → save the TOKEN
3. Clone starter code structure (or build from snippets above)
4. Test data fetch with a single stock in Jupyter notebook first
5. Paper trade Phase 1 for 2 weeks before enabling ML ranking
Built with Angel One SmartAPI · XGBoost · FastAPI · React · Docker
⚠️ For educational purposes only. Not financial advice.