Nifty 500 Intraday Scanner — Complete Blueprint
// Complete Blueprint v1.0 — Nifty 500 Intraday Scanner

From Zero
To Production
Price Action Scanner

A complete, plain-English + code-ready blueprint for building a real-time Nifty 500 breakout/breakdown scanner using Angel One API, XGBoost ML ranking, and Telegram alerts — explained so simply a 5-year-old could follow the logic, and so thoroughly a senior engineer could ship it.

Phase 1 · MVP 5-Min Scanner
Phase 2 · 15-Min + XGBoost
Phase 3 · Web UI + Live Demo
RELIANCE  ▲ 2.3%  |  HDFCBANK  ▼ 0.8%  |  INFY  ▲ 1.1%  |  TCS  ▼ 0.4%  |  BHARTIARTL  ▲ 3.2%  |  WIPRO  ▲ 0.9%  |  ICICIBANK  ▼ 1.5%  |  KOTAKBANK  ▲ 0.6%  |  SBIN  ▲ 2.1%  |  LTIM  ▼ 0.3%  |  SCANNER STATUS: ● LIVE  |  SIGNALS TODAY: 47  |  HIGH CONFIDENCE: 12
What Are We Actually Building?
Explained Like You're 5
🧒 The Toy Store Analogy

Imagine a toy store where a toy's price tag is like a stock price. Some toys have a "floor price" — no matter what, the price never goes below ₹100. Some have a "ceiling price" — it never goes above ₹200. These are called Support (floor) and Resistance (ceiling).

Now imagine a toy that keeps bouncing between ₹100 and ₹200... but one day it breaks through the ₹200 ceiling. That's a Breakout! Our job is to spot this the moment it happens — for 500 stocks at once.

01
📡

Angel One API

Our "price tag reader". It gives us live and historical prices for every Nifty 500 stock every 5 minutes (or 15 minutes).

02
🔍

Pattern Detector

The "rule book". It checks: has the price touched the same ceiling 3 times? Did it suddenly jump above? If yes → Breakout signal!

03
🤖

XGBoost Ranker

The "smartness layer". Out of 500 stocks, maybe 40 signal breakouts daily. XGBoost ranks which 5 are MOST likely to actually move big.

04
📲

Telegram Alert

The "ding!" on your phone. The moment a ranked signal fires, your Telegram group gets a detailed message with entry, target, and stop loss.

05
🌐

Web Dashboard

The "scoreboard". A live web page showing all active signals, ranked by confidence, with charts and status updates.

06
🛡️

Risk Rules

The "safety belt". Never risk more than 1% of capital per trade. Always define stop loss before entry. Non-negotiable rules baked in.

How All the Pieces Connect
🧒 The Assembly Line Analogy

Think of a car factory assembly line. Raw metal goes in → stamped into parts → painted → assembled → quality checked → shipped. Our scanner is the same: Raw price data goes in → cleaned → patterns detected → ML scored → alert sent → shown on screen.

// SYSTEM ARCHITECTURE DIAGRAM
ANGEL ONE API
SmartAPI WebSocket + REST
──▶
DATA INGESTOR
Fetch + Resample OHLCV
──▶
REDIS CACHE
5-min & 15-min bars
S/R DETECTOR
Pivot Points + Touch Count
──▶
BREAKOUT ENGINE
Signal Generator (5m+15m)
──▶
XGBOOST RANKER
30+ Features → Score 0–1
TELEGRAM BOT
Real-time Alert Dispatch
──▶
FASTAPI BACKEND
/signals /health endpoints
──▶
REACT WEB UI
Live Dashboard + Charts
POSTGRESQL
Signal history + Backtest
──▶
DOCKER
Containerised Deploy
──▶
CRON / APScheduler
Bar-close timing

Environment Requirements

requirements.txt
# Core Data + Numerics
pandas==2.2.0
numpy==1.26.4
scipy==1.12.0

# Angel One SmartAPI
smartapi-python==1.3.4
websocket-client==1.7.0
pyotp==2.9.0

# Machine Learning
xgboost==2.0.3
scikit-learn==1.4.0
optuna==3.5.0          # hyperparameter tuning

# API + Web Backend
fastapi==0.109.0
uvicorn==0.27.0
redis==5.0.1
sqlalchemy==2.0.25
psycopg2-binary==2.9.9

# Alerts
python-telegram-bot==20.7

# Visualisation (Jupyter)
plotly==5.18.0
mplfinance==0.12.10b0
jupyter==1.0.0

Folder Structure

project layout
nifty500_scanner/
├── data/
│   ├── ingestor.py        # Angel One fetch
│   ├── preprocessor.py    # clean + resample
│   └── cache.py           # Redis interface
├── patterns/
│   ├── sr_detector.py     # Support/Resistance
│   ├── breakout.py        # Signal logic
│   └── liquidity.py       # Grab detection
├── scanner/
│   ├── scanner_5m.py      # 5-min module
│   ├── scanner_15m.py     # 15-min module
│   └── watchlist.py       # Nifty 500 list
├── ml/
│   ├── features.py        # Feature engineering
│   ├── train.py           # XGBoost training
│   └── ranker.py          # Live inference
├── alerts/
│   └── telegram_bot.py    # Telegram sender
├── api/
│   └── main.py            # FastAPI endpoints
├── notebooks/
│   └── 01_prototype.ipynb # Jupyter MVP
├── docker-compose.yml
└── .env
Getting the Raw Prices In
🧒 The Weather Station Analogy

Imagine a weather station sending you temperature readings every minute. But sometimes the sensor glitches and sends crazy numbers (99999°C), or gaps exist. Before we trust the data, we clean it — remove the crazy numbers, fill gaps, and then group readings into 5-minute or 15-minute buckets (like averaging temperature every 5 minutes instead of every second).

data/ingestor.py — Angel One SmartAPI Connection
import os
from SmartApi import SmartConnect
import pyotp
import pandas as pd
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class AngelOneIngestor:
    """
    Handles all data fetching from Angel One SmartAPI.
    Supports historical OHLCV and live tick data.
    """

    TIMEFRAME_MAP = {
        "5m":  "FIVE_MINUTE",
        "15m": "FIFTEEN_MINUTE",
        "1h":  "ONE_HOUR",
        "1d":  "ONE_DAY",
    }

    def __init__(self):
        self.api = SmartConnect(api_key=os.getenv("ANGEL_API_KEY"))
        totp = pyotp.TOTP(os.getenv("ANGEL_TOTP_SECRET"))
        self.api.generateSession(
            os.getenv("ANGEL_CLIENT_ID"),
            os.getenv("ANGEL_MPIN"),
            totp.now()
        )
        logger.info("✅ Angel One session established")

    def fetch_candles(
        self,
        symbol_token: str,    # e.g. "2885" for RELIANCE
        exchange: str = "NSE",
        timeframe: str = "5m",
        lookback_days: int = 30
    ) -> pd.DataFrame:
        """Fetch OHLCV candles for a single symbol."""
        
        to_date   = datetime.now()
        from_date = to_date - timedelta(days=lookback_days)
        
        params = {
            "exchange":    exchange,
            "symboltoken": symbol_token,
            "interval":    self.TIMEFRAME_MAP[timeframe],
            "fromdate":    from_date.strftime("%Y-%m-%d %H:%M"),
            "todate":      to_date.strftime("%Y-%m-%d %H:%M"),
        }
        
        resp = self.api.getCandleData(params)
        if not resp["data"]:
            raise ValueError(f"No data returned for token {symbol_token}")
        
        df = pd.DataFrame(
            resp["data"],
            columns=["timestamp","open","high","low","close","volume"]
        )
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df = df.set_index("timestamp").sort_index()
        
        return self._clean(df)

    def _clean(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Preprocessing pipeline:
        1. Remove pre-market / post-market bars (keep 09:15 - 15:30)
        2. Drop rows with zero volume (non-trading bars)
        3. Forward-fill micro-gaps (<= 2 bars)
        4. Remove clear outliers (price > 5x median or < 0)
        """
        # 1. Market hours filter
        df = df.between_time("09:15", "15:30")
        
        # 2. Zero-volume drop
        df = df[df["volume"] > 0]
        
        # 3. Forward-fill gaps up to 2 bars
        full_idx = pd.date_range(
            df.index.min(), df.index.max(), freq="5min"
        )
        df = df.reindex(full_idx).ffill(limit=2).dropna()
        
        # 4. Outlier removal
        med_close = df["close"].median()
        df = df[(df["close"] > 0) & (df["close"] < med_close * 5)]
        
        return df.astype(float)

    def resample_to_15m(self, df_5m: pd.DataFrame) -> pd.DataFrame:
        """Convert 5-min bars to 15-min bars using OHLCV aggregation."""
        return df_5m.resample("15min").agg({
            "open":   "first",
            "high":   "max",
            "low":    "min",
            "close":  "last",
            "volume": "sum",
        }).dropna()

Handling Data Edge Cases

ProblemCauseSolution
Missing bars Circuit breakers, no trades ffill up to 2 bars; mark as synthetic
Extreme prices API glitch, split unadjusted Filter if close > 5× median
Pre/post market noise SGX NIfty, ADR moves Hard clip to 09:15–15:30
Volume = 0 Holiday partial session Drop row entirely
Timezone mismatch API returns UTC Convert to Asia/Kolkata always
Detecting Breakout & Breakdown Patterns
🧒 The Bouncing Ball Analogy

Drop a ball. It bounces off the floor (support) 3 times. The 4th time you drop it — it crashes through the floor. That's a breakdown! The opposite (hitting the ceiling 3 times then bursting through) is a breakout. Our detector counts these bounces and fires when the price "breaks through".

Support/Resistance Levels

patterns/sr_detector.py
import numpy as np
import pandas as pd
from scipy.signal import argrelextrema

class SRDetector:
    """
    Identifies significant Support & Resistance levels
    using local extrema + cluster merging.
    """
    def __init__(self,
        window: int = 5,          # bars each side for pivot
        cluster_pct: float = 0.003, # 0.3% merge radius
        min_touches: int = 2        # min touches to be "valid"
    ):
        self.window      = window
        self.cluster_pct = cluster_pct
        self.min_touches = min_touches

    def find_levels(self, df: pd.DataFrame) -> dict:
        """
        Returns dict with 'support' and 'resistance' lists,
        each level having: price, touches, strength_score.
        """
        highs = df["high"].values
        lows  = df["low"].values

        # Local maxima → Resistance candidates
        res_idx = argrelextrema(highs, np.greater_equal,
                                order=self.window)[0]
        # Local minima → Support candidates
        sup_idx = argrelextrema(lows, np.less_equal,
                                order=self.window)[0]

        res_prices = highs[res_idx]
        sup_prices = lows[sup_idx]

        resistance = self._cluster_levels(res_prices, df)
        support    = self._cluster_levels(sup_prices, df)

        return {
            "resistance": [l for l in resistance
                           if l["touches"] >= self.min_touches],
            "support":    [l for l in support
                           if l["touches"] >= self.min_touches],
        }

    def _cluster_levels(self, prices, df) -> list:
        """Merge nearby price levels into single S/R zones."""
        if len(prices) == 0:
            return []
        
        levels, seen = [], set()
        for price in np.sort(prices):
            if any(abs(price - s) / s < self.cluster_pct
                   for s in seen):
                continue
            seen.add(price)
            touches = self._count_touches(price, df)
            levels.append({
                "price":   round(price, 2),
                "touches": touches,
                "strength": min(touches / 5, 1.0),  # 0-1 score
            })
        return levels

    def _count_touches(self, level: float, df) -> int:
        """Count how many bars came within 0.2% of this level."""
        tol = level * 0.002  # 0.2% tolerance
        return int(((df["high"] >= level - tol) &
                    (df["low"]  <= level + tol)).sum())

Breakout/Breakdown Engine

patterns/breakout.py
from dataclasses import dataclass
from typing import Optional, Literal
import pandas as pd

SignalType = Literal["BREAKOUT", "BREAKDOWN", "NONE"]

@dataclass
class Signal:
    symbol:       str
    signal_type:  SignalType
    timeframe:    str          # "5m" or "15m"
    level:        float        # broken S or R price
    close:        float        # current candle close
    touches:      int          # tests before breakout
    vol_ratio:    float        # vol / avg_vol_20
    body_pct:     float        # candle body size %
    liq_grab:     bool         # liquidity grab detected
    timestamp:    str

class BreakoutEngine:
    """
    Criteria for a VALID BREAKOUT:
    ─────────────────────────────
    ✅ R-level tested ≥ 2 times before this bar
    ✅ Close > resistance + (ATR * 0.25)  → avoid false breaks
    ✅ Candle body > 50% of total range   → strong conviction bar
    ✅ Volume ≥ 1.5x 20-bar average        (configurable)

    Criteria for a VALID BREAKDOWN:
    ────────────────────────────────
    ✅ S-level tested ≥ 2 times before this bar  
    ✅ Close < support - (ATR * 0.25)
    ✅ Candle body > 50% of total range
    ✅ Volume ≥ 1.5x 20-bar average
    """
    
    def __init__(self,
        atr_buffer_mult: float = 0.25,
        min_vol_ratio:   float = 1.5,
        min_body_pct:    float = 0.5,
        min_touches:     int   = 2,
    ):
        self.atr_mult    = atr_buffer_mult
        self.vol_thresh  = min_vol_ratio
        self.body_thresh = min_body_pct
        self.min_touches = min_touches

    def evaluate(self, symbol: str,
                 df: pd.DataFrame,
                 levels: dict,
                 timeframe: str) -> Optional[Signal]:
        """
        Evaluate the latest bar against detected S/R levels.
        Returns Signal if breakout/breakdown detected, else None.
        """
        bar  = df.iloc[-1]
        atr  = self._atr(df, 14)
        avg_vol = df["volume"].iloc[-21:-1].mean()
        vol_ratio = bar["volume"] / avg_vol if avg_vol > 0 else 0
        bar_range = bar["high"] - bar["low"]
        body_pct  = (abs(bar["close"] - bar["open"]) /
                     bar_range) if bar_range > 0 else 0

        # CHECK RESISTANCE BREAKOUT
        for level in levels.get("resistance", []):
            if (level["touches"] >= self.min_touches and
                bar["close"] > level["price"] + atr * self.atr_mult and
                body_pct >= self.body_thresh and
                vol_ratio >= self.vol_thresh):
                return Signal(
                    symbol=symbol, signal_type="BREAKOUT",
                    timeframe=timeframe,
                    level=level["price"], close=bar["close"],
                    touches=level["touches"], vol_ratio=round(vol_ratio,2),
                    body_pct=round(body_pct,2),
                    liq_grab=self._liq_grab_check(df, level["price"]),
                    timestamp=str(df.index[-1])
                )

        # CHECK SUPPORT BREAKDOWN (mirror logic)
        for level in levels.get("support", []):
            if (level["touches"] >= self.min_touches and
                bar["close"] < level["price"] - atr * self.atr_mult and
                body_pct >= self.body_thresh and
                vol_ratio >= self.vol_thresh):
                return Signal(
                    symbol=symbol, signal_type="BREAKDOWN",
                    timeframe=timeframe,
                    level=level["price"], close=bar["close"],
                    touches=level["touches"], vol_ratio=round(vol_ratio,2),
                    body_pct=round(body_pct,2),
                    liq_grab=self._liq_grab_check(df, level["price"]),
                    timestamp=str(df.index[-1])
                )
        return None

    def _atr(self, df, period=14) -> float:
        """Average True Range — measures volatility."""
        h, l, pc = df["high"], df["low"], df["close"].shift(1)
        tr = pd.concat([h-l, (h-pc).abs(), (l-pc).abs()], axis=1).max(axis=1)
        return tr.rolling(period).mean().iloc[-1]

    def _liq_grab_check(self, df, level: float) -> bool:
        """
        Liquidity Grab: prior bar wicks BELOW support (or above
        resistance) but closes back above/below — a stop hunt
        before the real move. Detected in last 3 bars.
        """
        recent = df.iloc[-4:-1]  # last 3 bars before signal
        tol = level * 0.003
        wicked_through = (recent["low"] < level - tol)
        closed_above   = (recent["close"] > level)
        return bool((wicked_through & closed_above).any())

Signal Quality Thresholds — Quick Reference

Parameter5-min Default15-min DefaultWhy
Min S/R Touches 2 2 Fewer touches = less reliable level
ATR Buffer Multiplier 0.25× 0.35× 15-min needs bigger confirmation
Volume Ratio (min) 1.5× 1.3× 15-min bars aggregate more volume
Body % of Range 50% 50% Doji candles = weak signal
Lookback Window 30 days 30 days More history = better S/R quality
Teaching the Machine to Pick Winners
🧒 The Talent Scout Analogy

Imagine 50 kids auditioning for a movie. A talent scout doesn't just pick randomly — they look at acting skill, confidence, camera presence, and past performance. XGBoost is our talent scout. It scores each breakout signal on 30+ factors and says "this one has an 87% chance of following through — send the alert!"

Feature Definitions (30+ features, zero look-ahead)

CategoryFeature NameFormulaType
Price Action body_pct |close - open| / (high - low) CORE
Price Action wick_ratio upper_wick / body CORE
Price Action gap_pct (open - prev_close) / prev_close CORE
S/R Strength sr_touches # of touches at level CORE
S/R Strength sr_age_bars bars since level first formed CONTEXT
S/R Strength level_break_pct (close - level) / level × 100 CORE
Volume vol_ratio_20 volume / mean(volume, 20) CORE
Volume vol_trend_5 vol slope over last 5 bars MOMENTUM
Momentum rsi_14 RSI(14) at signal bar MOMENTUM
Momentum adx_14 ADX(14) — trend strength MOMENTUM
Momentum macd_hist MACD histogram at signal MOMENTUM
Volatility atr_pct ATR(14) / close × 100 CONTEXT
Volatility bbw_20 Bollinger Band Width (20,2) CONTEXT
Volatility hist_vol_10 σ(log returns, 10 bars) CONTEXT
Liq. Grab liq_grab_flag 1/0 boolean SPECIAL
Time hour_of_day int 9–15 (market hours) SPECIAL
Time bar_from_open # bars since 09:15 SPECIAL
Sector sector_momentum sector ETF return same day CONTEXT
ml/features.py — Feature Engineering Pipeline
import pandas as pd
import numpy as np

class FeatureEngineer:
    """
    Builds feature vector for XGBoost inference.
    ⚠️  CRITICAL: all features use ONLY past data (df.iloc[:-1])
                  to prevent look-ahead bias.
    """

    def build(self, df: pd.DataFrame,
               signal, levels: dict) -> dict:
        """Build full feature dict for a given signal + OHLCV dataframe."""
        bar = df.iloc[-1]
        hist = df.iloc[:-1]  # ← ONLY HISTORICAL, never current bar

        features = {}

        # ── Price Action Features ──────────────────────────────
        bar_range = bar["high"] - bar["low"]
        features["body_pct"]  = abs(bar["close"] - bar["open"]) / (bar_range + 1e-9)
        features["wick_upper"]= (bar["high"] - max(bar["open"],bar["close"])) / (bar_range + 1e-9)
        features["wick_lower"]= (min(bar["open"],bar["close"]) - bar["low"]) / (bar_range + 1e-9)
        features["close_pct"] = (bar["close"] - bar["low"]) / (bar_range + 1e-9)
        prev = hist.iloc[-1]
        features["gap_pct"]   = (bar["open"] - prev["close"]) / (prev["close"] + 1e-9)
        features["ret_1bar"]  = (bar["close"] - prev["close"]) / (prev["close"] + 1e-9)

        # ── S/R Level Features ────────────────────────────────
        features["sr_touches"]   = signal.touches
        features["level_break_pct"] = abs(bar["close"] - signal.level) / signal.level
        features["liq_grab"]     = int(signal.liq_grab)

        # ── Volume Features ───────────────────────────────────
        vol_window = hist["volume"].iloc[-20:]
        features["vol_ratio_20"] = bar["volume"] / (vol_window.mean() + 1)
        features["vol_trend_5"]  = np.polyfit(
            range(5), hist["volume"].iloc[-5:].values, 1
        )[0]

        # ── Momentum Indicators (computed on hist only) ───────
        features["rsi_14"]  = self._rsi(hist, 14)
        features["adx_14"]  = self._adx(hist, 14)
        features["macd_hist"] = self._macd_hist(hist)

        # ── Volatility ────────────────────────────────────────
        atr = self._atr(hist, 14)
        features["atr_pct"]  = atr / bar["close"]
        log_rets = np.log(hist["close"] / hist["close"].shift(1)).dropna()
        features["hist_vol_10"] = log_rets.iloc[-10:].std()

        # ── Time Context ──────────────────────────────────────
        ts = df.index[-1]
        features["hour_of_day"]   = ts.hour + ts.minute / 60
        open_time = ts.replace(hour=9, minute=15)
        features["mins_from_open"] = (int)((ts - open_time).total_seconds() / 60)
        features["is_first_30min"] = int(features["mins_from_open"] <= 30)
        features["is_last_30min"]  = int(features["mins_from_open"] >= 330)

        return features

    def _rsi(self, df, p=14):
        d = df["close"].diff()
        g = d.clip(lower=0).ewm(alpha=1/p).mean()
        l = (-d).clip(lower=0).ewm(alpha=1/p).mean()
        rs = g / (l + 1e-9)
        return (100 - 100 / (1 + rs)).iloc[-1]

    # _adx and _macd_hist follow same pattern (omitted for brevity)
    def _atr(self, df, p=14):
        h, l, pc = df["high"], df["low"], df["close"].shift(1)
        tr = pd.concat([h-l,(h-pc).abs(),(l-pc).abs()],axis=1).max(axis=1)
        return tr.rolling(p).mean().iloc[-1]

Training Plan

  • Collect 12 months of historical signals via backtest
  • Label: did price move ≥1.5× ATR in target direction within 10 bars? → 1 else 0
  • Train/Validation split: time-based (no shuffling!) — first 8 months train, last 4 validate
  • Class imbalance: expect ~30% win rate; use scale_pos_weight in XGBoost
  • Tune with Optuna (300 trials): max_depth, n_estimators, learning_rate, subsample
  • Evaluate with precision-recall AUC (not accuracy — imbalanced classes)
  • Retrain monthly with walk-forward validation

Look-ahead Bias Prevention

⚠️ CRITICAL RULES — NEVER VIOLATE

  • Feature computation: always use df.iloc[:-1] (exclude current bar's data)
  • Labels (training): use future returns from T+1, never same-bar close
  • Validation split: strictly time-ordered — no random shuffling
  • No future indicators (tomorrow's volume, etc.) in features
  • Bar-close signaling: compute signal ONLY after bar closes (09:20 for 5m, 09:30 for 15m)
The Instant Ping System
🧒 The Fire Alarm Analogy

When a smoke detector goes off, it doesn't just make a noise — it tells you which floor, which room, and what to do. Our Telegram alert tells traders: which stock, breakout or breakdown, at what price, where to put stop loss, and the ML confidence score.

alerts/telegram_bot.py
import asyncio
from telegram import Bot
import os

class TelegramAlerter:
    def __init__(self):
        self.bot = Bot(token=os.getenv("TELEGRAM_TOKEN"))
        self.chat_id = os.getenv("TELEGRAM_CHAT_ID")

    async def send_signal(self, signal, ml_score: float):
        """Send formatted signal alert to Telegram channel."""
        emoji = "🚀" if signal.signal_type == "BREAKOUT" else "📉"
        direction = "BUY" if signal.signal_type == "BREAKOUT" else "SELL/SHORT"

        # Calculate levels (example: ATR-based targets)
        sl_dist  = signal.close * 0.007  # 0.7% stop
        target   = signal.close + (2 * sl_dist) if direction == "BUY" else signal.close - (2 * sl_dist)
        stop     = signal.close - sl_dist if direction == "BUY" else signal.close + sl_dist

        msg = f"""
{emoji} *{signal.signal_type} ALERT* — {signal.symbol}

📋 *Timeframe:* `{signal.timeframe}`
💰 *CMP:* `₹{signal.close:,.2f}`
🔑 *Key Level:* `₹{signal.level:,.2f}` ({signal.touches} tests)
📊 *Direction:* `{direction}`

🎯 *Target:*   `₹{target:,.2f}`  (+{(target/signal.close-1)*100:.1f}%)
🛑 *Stop Loss:* `₹{stop:,.2f}`  (-{abs(stop/signal.close-1)*100:.1f}%)
📐 *Risk:Reward:* `1:2`

🔥 *Volume:* `{signal.vol_ratio:.1f}x average`
⚡ *Liq Grab:* `{"YES ✅" if signal.liq_grab else "NO"}`
🤖 *ML Score:* `{ml_score*100:.0f}/100`

⏰ `{signal.timestamp}`

_This is NOT financial advice. Trade at your own risk._
"""
        await self.bot.send_message(
            chat_id=self.chat_id,
            text=msg,
            parse_mode="Markdown"
        )

    def send(self, signal, ml_score: float):
        """Sync wrapper for async send."""
        asyncio.run(self.send_signal(signal, ml_score))
📱 SAMPLE TELEGRAM MESSAGE PREVIEW
🚀 BREAKOUT ALERT — RELIANCE

📋 Timeframe: 5m
💰 CMP: ₹2,847.50
🔑 Key Level: ₹2,835.00 (3 tests)
📊 Direction: BUY

🎯 Target: ₹2,887.30 (+1.4%)
🛑 Stop Loss: ₹2,827.60 (-0.7%)
📐 Risk:Reward: 1:2

🔥 Volume: 2.3x average
Liq Grab: YES ✅
🤖 ML Score: 83/100

⏰ 2026-01-15 10:35:00 IST
This is NOT financial advice.

Setup Checklist

  • Create Telegram Bot via @BotFather → get TOKEN
  • Create a channel or group → get CHAT_ID (use @getidsbot)
  • Set env vars: TELEGRAM_TOKEN and TELEGRAM_CHAT_ID
  • Test with send_test() method before live
  • Rate limit: max 1 message per signal per symbol per session
  • Add error handling for Telegram API rate limits (429)
The Main Loop That Ties It Together
scanner/scanner_5m.py — Full Scanner Orchestration
import schedule, time, logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import xgboost as xgb

from data.ingestor import AngelOneIngestor
from patterns.sr_detector import SRDetector
from patterns.breakout import BreakoutEngine
from ml.features import FeatureEngineer
from alerts.telegram_bot import TelegramAlerter
from scanner.watchlist import NIFTY500_TOKENS   # {symbol: token} dict

logger = logging.getLogger(__name__)

class Scanner5m:
    """
    5-Minute Intraday Breakout/Breakdown Scanner.
    Runs on every 5-min bar close (09:20, 09:25, ..., 15:25).
    Scans all 500 stocks in parallel using thread pool.
    """

    MIN_ML_SCORE = 0.60  # only alert if model confidence ≥ 60%
    WORKERS      = 20    # parallel threads for 500 stocks

    def __init__(self):
        self.ingestor  = AngelOneIngestor()
        self.sr        = SRDetector(min_touches=2)
        self.engine    = BreakoutEngine()
        self.features  = FeatureEngineer()
        self.alerter   = TelegramAlerter()
        self.model     = xgb.Booster()
        self.model.load_model("ml/model_5m.json")
        self.alerted_today = set()  # avoid duplicate alerts

    def scan_symbol(self, symbol: str, token: str):
        """Pipeline for a single stock. Returns Signal or None."""
        try:
            df     = self.ingestor.fetch_candles(token, timeframe="5m")
            levels = self.sr.find_levels(df.iloc[:-1])  # ← no look-ahead!
            signal = self.engine.evaluate(symbol, df, levels, "5m")
            if signal is None:
                return None
            
            feats   = self.features.build(df, signal, levels)
            dmat    = xgb.DMatrix([list(feats.values())])
            score   = float(self.model.predict(dmat)[0])
            signal.ml_score = score

            if score >= self.MIN_ML_SCORE:
                logger.info(f"🎯 {symbol} score={score:.2f} → ALERT")
                return signal
        except Exception as e:
            logger.warning(f"{symbol} scan failed: {e}")
        return None

    def run_scan(self):
        """Run full 500-stock scan at bar close. Called by scheduler."""
        now = datetime.now()
        logger.info(f"⏱ Scan started at {now.strftime('%H:%M:%S')}")
        
        items = list(NIFTY500_TOKENS.items())
        signals_found = []

        with ThreadPoolExecutor(max_workers=self.WORKERS) as ex:
            futures = {
                ex.submit(self.scan_symbol, sym, tok): sym
                for sym, tok in items
                if sym not in self.alerted_today
            }
            for f in futures:
                result = f.result()
                if result:
                    signals_found.append(result)
                    self.alerted_today.add(result.symbol)

        # Sort by ML score, send top alerts
        signals_found.sort(key=lambda s: s.ml_score, reverse=True)
        for sig in signals_found[:10]:  # max 10 alerts per scan
            self.alerter.send(sig, sig.ml_score)
        
        logger.info(f"✅ Scan done. Found {len(signals_found)} signals.")

    def start(self):
        """Schedule scans at every 5-min bar close during market hours."""
        # Run at :20 past every hour (to ensure 5-min bar is fully closed)
        for minute in [20,25,30,35,40,45,50,55]:
            schedule.every().hour.at(f":{minute:02d}").do(self.run_scan)
        schedule.every().hour.at(":00").do(self.run_scan)
        schedule.every().hour.at(":05").do(self.run_scan)
        schedule.every().hour.at(":10").do(self.run_scan)
        schedule.every().hour.at(":15").do(self.run_scan)

        logger.info("🚀 5-min scanner running. Ctrl+C to stop.")
        while True:
            schedule.run_pending()
            time.sleep(1)

if __name__ == "__main__":
    scanner = Scanner5m()
    scanner.start()

FastAPI Web Endpoint

api/main.py — REST API for Web Dashboard
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
import json

app = FastAPI(title="Nifty500 Scanner API", version="1.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])

# In-memory signal store (replace with Redis/DB in production)
active_signals = []

@app.get("/signals")
async def get_signals(
    timeframe: str = "all",
    min_score: float = 0.6,
    signal_type: str = "all"
):
    """Return filtered list of active signals."""
    results = active_signals
    if timeframe != "all":
        results = [s for s in results if s["timeframe"] == timeframe]
    if signal_type != "all":
        results = [s for s in results if s["type"] == signal_type]
    results = [s for s in results if s["ml_score"] >= min_score]
    return {"signals": sorted(results, key=lambda x: -x["ml_score"])}

@app.websocket("/ws/signals")
async def websocket_signals(ws: WebSocket):
    """WebSocket for real-time signal push to web UI."""
    await ws.accept()
    try:
        while True:
            await ws.send_json({"signals": active_signals[-20:]})
            await asyncio.sleep(5)
    except:
        pass

@app.get("/health")
async def health():
    return {"status": "ok", "active_signals": len(active_signals)}
Your Roadmap from Day 1 to Production
1

Phase 1 — MVP (Week 1–2)

5-min scanner with pattern detection + Telegram alerts. No ML yet. Manual confirmation required.

  • Setup Angel One SmartAPI credentials + test connection
  • Build AngelOneIngestor with cleaning pipeline
  • Implement SRDetector with pivot point method
  • Build BreakoutEngine with basic criteria (no ML)
  • Wire TelegramAlerter with formatted message template
  • Create scheduler (APScheduler or schedule) at bar close
  • Load Nifty 500 watchlist with token mapping
  • Test end-to-end with paper: 5 stocks first, then 50, then 500
  • Add basic logging and error handling
  • Deliverable: Running Python script sending Telegram alerts live
2

Phase 2 — Intelligence Layer (Week 3–5)

15-min scanner, XGBoost model training, and signal scoring. Alerts now include ML confidence.

  • Build 15-min scanner module (mirror of 5-min, adjusted thresholds)
  • Build FeatureEngineer with full 30+ feature set
  • Collect historical signals via backtest (12 months data)
  • Label signals with forward return outcomes
  • Train XGBoost with time-series cross-validation
  • Tune with Optuna, evaluate on precision-recall AUC
  • Integrate ranker into both scanner modules
  • Add MIN_ML_SCORE threshold (start at 0.60, tune)
  • Add signal deduplication (no repeat alerts same stock same day)
  • Deliverable: Dual scanner with ML-ranked Telegram alerts
3

Phase 3 — Web Dashboard (Week 6–8)

FastAPI backend + React frontend. Live WebSocket signal feed. Sandboxed demo mode.

  • Build FastAPI endpoints: /signals, /health, /ws/signals
  • Build React dashboard: signal table, candlestick chart overlay
  • Add filter controls: timeframe, signal type, min score
  • WebSocket integration for real-time signal updates
  • PostgreSQL persistence for signal history and backtest results
  • Docker Compose: API + Scanner + Redis + PostgreSQL + Frontend
  • Demo mode with recorded signals (no live API needed for showcase)
  • HTTPS via Nginx reverse proxy + SSL (Let's Encrypt)
  • Deliverable: Public-accessible web dashboard with live/demo signals
The Safety Rules — Non-Negotiable
🧒 The Seatbelt Analogy

Even the best driver wears a seatbelt. Risk management is your seatbelt. It doesn't stop you from driving — it limits the damage if something goes wrong. These rules are built into the system, not optional suggestions.

Position Sizing Formula

risk/position_sizer.py
def calculate_position(
    capital: float,       # total account capital
    entry: float,         # entry price
    stop_loss: float,     # stop loss price
    risk_pct: float = 0.01 # risk 1% per trade max
) -> dict:
    """
    Position sizing using the 1% Rule.
    
    Example: ₹5,00,000 capital, entry ₹500, stop ₹490
    → Risk per share = ₹10
    → Max risk = ₹5000 (1% of capital)
    → Shares = 5000 / 10 = 500
    → Position value = 500 × ₹500 = ₹2,50,000 (50% of capital)
    """
    risk_per_share = abs(entry - stop_loss)
    if risk_per_share == 0:
        raise ValueError("Stop loss cannot equal entry!")
    
    max_risk_amount = capital * risk_pct
    shares          = int(max_risk_amount / risk_per_share)
    position_value  = shares * entry
    
    # Safety cap: never more than 20% of capital in one trade
    if position_value > capital * 0.20:
        shares = int((capital * 0.20) / entry)
        position_value = shares * entry
    
    return {
        "shares":         shares,
        "position_value": round(position_value, 2),
        "risk_amount":    round(shares * risk_per_share, 2),
        "risk_pct":       round((shares * risk_per_share) / capital * 100, 2),
        "r_multiple_1":   round(entry + risk_per_share * 2, 2),  # 1:2 target
    }

Hard Risk Rules

🛑 SYSTEM-ENFORCED LIMITS

  • Max 1% capital risk per trade — hardcoded in sizer
  • Max 20% capital in single position — even if 1% allows more
  • Max 5% total open risk at any time — no more than 5 simultaneous trades
  • Stop Loss mandatory — no signal sent without defined SL
  • No trading in first 5 minutes (09:15–09:20) — avoid opening noise
  • No new positions after 14:30 — intraday squareoff risk
  • Max 3 alerts per stock per day — prevents overtrading

Backtesting Considerations

  • Use out-of-sample data only for final evaluation
  • Include realistic transaction costs: 0.03% brokerage + STT + GST
  • Account for slippage: assume +0.05% worse entry than signal close
  • Avoid optimisation bias — choose thresholds before seeing test results
  • Minimum 200 signals for statistically significant backtest
  • Report drawdown, Sharpe ratio, and win rate separately

⚠️ LEGAL DISCLAIMER

This system is a research and educational tool. It does not constitute financial advice, investment recommendations, or trading signals endorsed by any regulated entity. All trading involves significant risk of loss. Past backtested performance does not guarantee future results. Consult a SEBI-registered investment advisor before making any trading decisions. The developer(s) assume no liability for financial losses arising from use of this system. Always test thoroughly in paper trading mode before risking real capital.

Shipping to Production
docker-compose.yml
version: "3.9"
services:

  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    restart: always

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB:       scanner_db
      POSTGRES_USER:     scanner
      POSTGRES_PASSWORD: "${DB_PASSWORD}"
    volumes: ["pgdata:/var/lib/postgresql/data"]
    restart: always

  scanner:
    build: .
    command: python -m scanner.scanner_5m
    env_file: .env
    depends_on: [redis, postgres]
    restart: always

  scanner_15m:
    build: .
    command: python -m scanner.scanner_15m
    env_file: .env
    depends_on: [redis, postgres]
    restart: always

  api:
    build: .
    command: uvicorn api.main:app --host 0.0.0.0 --port 8000
    ports: ["8000:8000"]
    env_file: .env
    depends_on: [redis, postgres]
    restart: always

  frontend:
    image: nginx:alpine
    ports: ["80:80", "443:443"]
    volumes:
      - ./frontend/dist:/usr/share/nginx/html
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    restart: always

volumes:
  pgdata:

Environment Variables (.env)

.env.example
# Angel One SmartAPI
ANGEL_API_KEY=your_api_key_here
ANGEL_CLIENT_ID=your_client_id
ANGEL_MPIN=your_mpin
ANGEL_TOTP_SECRET=your_totp_base32_secret

# Telegram
TELEGRAM_TOKEN=bot123456:ABC-your-token
TELEGRAM_CHAT_ID=-100123456789

# Database
DATABASE_URL=postgresql://scanner:pass@postgres/scanner_db
DB_PASSWORD=change_me_in_production

# Redis
REDIS_URL=redis://redis:6379/0

# Scanner Config
MIN_ML_SCORE=0.60
MAX_ALERTS_PER_SCAN=10
ENABLE_15M_SCANNER=true

Quick Start Commands

terminal
# 1. Clone and setup
git clone https://github.com/you/nifty500-scanner
cd nifty500-scanner
cp .env.example .env
# → Fill in your credentials in .env

# 2. Install dependencies
pip install -r requirements.txt

# 3. Test data fetch (single stock)
python -c "from data.ingestor import *;
i=AngelOneIngestor();
df=i.fetch_candles('2885');
print(df.tail())"

# 4. Run MVP scanner (Phase 1)
python -m scanner.scanner_5m

# 5. Full production deploy
docker-compose up -d

# 6. Check logs
docker-compose logs -f scanner

# 7. Access API
curl http://localhost:8000/signals
curl http://localhost:8000/health