diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 23173f9..f01c621 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -28,7 +28,7 @@ applyTo: '/**' * The documentation for quantflow is available at `https://quantflow.quantmid.com` * Documentation is built using [mkdocs](https://www.mkdocs.org/) and stored in the `docs/` directory. The documentation source files are written in markdown format. * Do not use em dashes (—) in documentation files or docstrings. Use colons, parentheses, or restructure the sentence instead. -* Math in documentation and docstrings uses `$...$` for inline and `$$...$$` or `\begin{equation}...\end{equation}` for block equations. Do not use `.. math::` or `:math:` (RST syntax). +* Math in documentation and docstrings: always use `\begin{equation}...\end{equation}` for any formula or equation. Use `$...$` only for brief inline references to variables (e.g. $F$, $K$). Do not use `$$...$$`, `` `...` ``, or RST syntax (`.. math::`, `:math:`). * Glossary entries in `docs/glossary.md` must be kept in alphabetical order. * To rebuild doc examples run `uv run ./dev/build-examples` — runs all scripts in `docs/examples/` and writes their output to `docs/examples_output/` diff --git a/app/volatility_surface.py b/app/volatility_surface.py index 5f56aef..7c30ceb 100644 --- a/app/volatility_surface.py +++ b/app/volatility_surface.py @@ -42,12 +42,6 @@ def _(mo): return -@app.cell -def _(): - kwargs = dict() - return - - @app.cell def _(mo): asset = mo.ui.dropdown(["btc", "eth", "sol"], value="btc", label="asset") @@ -57,7 +51,7 @@ def _(mo): @app.cell -async def _(asset, inverse): +async def _(asset, inverse, mo): import pandas as pd from quantflow.data.deribit import Deribit @@ -74,18 +68,43 @@ async def _(asset, inverse): surface.bs() # disable outliers surface.disable_outliers() - surface.plot3d() - return pd, surface + # + def int_or_none(v): + try: + return int(v) + except TypeError: + return None + + maturites = [c.maturity for c in surface.maturities] + maturity_dropdown = mo.ui.dropdown( + options={m.strftime("%Y-%m-%d"): i for i, m in enumerate(maturites)}, + label="Maturity" + ) + maturity_dropdown + return int_or_none, maturity_dropdown, pd, surface @app.cell -def _(pd, surface): +def _(int_or_none, maturity_dropdown, surface): + index = int_or_none(maturity_dropdown.value) + surface.plot3d(index=index) + return (index,) + + +@app.cell +def _(index, pd, surface): # display inputs - only options with converged implied volatility - surface_inputs = surface.inputs(converged=True) + surface_inputs = surface.inputs(converged=True, index=index) pd.DataFrame([i.model_dump() for i in surface_inputs.inputs]) return +@app.cell +def _(surface): + surface.term_structure() + return + + @app.cell def _(): return diff --git a/docs/api/utils/rates.md b/docs/api/utils/rates.md new file mode 100644 index 0000000..2c97d65 --- /dev/null +++ b/docs/api/utils/rates.md @@ -0,0 +1,4 @@ +# Numbers + + +::: quantflow.utils.interest_rates.Rate diff --git a/docs/glossary.md b/docs/glossary.md index 83fced8..e30fc39 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -97,6 +97,24 @@ The [probability density function](https://en.wikipedia.org/wiki/Probability_den F_x(x) = \int_{-\infty}^x f_x(s) ds \end{equation} +## Put-Call Parity + +Put-call parity is a no-arbitrage relationship between the prices of European call +and put options with the same strike $K$ and maturity. Denoting forward-space prices +$c = C/F$ and $p = P/F$ (see [Black Pricing](api/options/black.md)), the relationship +reads: + +\begin{equation} + c - p = 1 - \frac{K}{F} = 1 - e^k +\end{equation} + +where $k$ is the [log-strike](#log-strike). +In quoting currency terms, multiplying through by $F$: + +\begin{equation} + C - P = F - K +\end{equation} + ## Time To Maturity (TTM) Time to maturity is the time remaining until an option or forward contract expires, diff --git a/quantflow/options/inputs.py b/quantflow/options/inputs.py index 09e198c..9c4bb91 100644 --- a/quantflow/options/inputs.py +++ b/quantflow/options/inputs.py @@ -47,6 +47,10 @@ class VolSurfaceSecurity(BaseModel): def vol_surface_type(self) -> VolSecurityType: raise NotImplementedError("vol_surface_type must be implemented by subclasses") + @classmethod + def forward(cls) -> Self: + raise NotImplementedError("forward_input must be implemented by subclasses") + class DefaultVolSecurity(VolSurfaceSecurity): security_type: VolSecurityType = Field( diff --git a/quantflow/options/surface.py b/quantflow/options/surface.py index 037bf2a..3185722 100644 --- a/quantflow/options/surface.py +++ b/quantflow/options/surface.py @@ -1,6 +1,7 @@ from __future__ import annotations import enum +import math import warnings from datetime import datetime, timedelta from decimal import Decimal @@ -14,7 +15,7 @@ from quantflow.utils import plot from quantflow.utils.dates import utcnow -from quantflow.utils.interest_rates import rate_from_spot_and_forward +from quantflow.utils.interest_rates import Rate from quantflow.utils.numbers import ( ZERO, DecimalNumber, @@ -80,6 +81,20 @@ class Price(BaseModel, Generic[S]): def mid(self) -> Decimal: return (self.bid + self.ask) / 2 + @property + def spread(self) -> Decimal: + return self.ask - self.bid + + @property + def bp_spread(self) -> Decimal: + """Bid-ask spread in basis points, calculated as spread divided by mid + price and multiplied by 10000""" + mid = self.mid + if mid > ZERO: + return 10000 * self.spread / mid + else: + return Decimal("inf") + class SpotPrice(Price[S]): """Represents the spot bid/ask price of an underlying asset""" @@ -117,6 +132,80 @@ def inputs(self) -> ForwardInput: volume=self.volume, ) + def is_valid(self) -> bool: + """Check if the forward price is valid, which means that the bid and ask + are positive and the bid is less than or equal to the ask""" + return self.bid > ZERO and self.ask > ZERO and self.bid <= self.ask + + +class ImpliedFwdPrice(FwdPrice[S]): + """Represents the implied forward price of an underlying asset at a specific + maturity, extracted from option prices via put-call parity""" + + strike: DecimalNumber = Field( + description="Strike price of the options used to extract the forward price" + ) + + def moneyness(self, ttm: float) -> float: + """Moneyness of the implied forward""" + return math.log(float(self.strike / self.mid)) / math.sqrt(ttm) + + @classmethod + def aggregate( + cls, forwards: list[Self], ttm: float, default: FwdPrice[S] | None = None + ) -> FwdPrice[S] | None: + """Aggregate multiple implied forward prices into a single forward price""" + forwards = [f for f in forwards if f.is_valid()] + if not forwards: + return default + weights = 0.0 + values = 0.0 + spreads = 0.0 + cleaned: list[Self] = [] + spread_bp_cutoff = 50 + while True: + cleaned = [ + forward for forward in forwards if forward.bp_spread < spread_bp_cutoff + ] + if not cleaned: + spread_bp_cutoff *= 2 + else: + forwards = cleaned + break + for forward in forwards: + m = forward.moneyness(ttm) + moneyness_weight = math.exp(-(m**2) / 2) + spread_weight = math.exp( + -10000 * float(forward.spread) / float(forward.mid) + ) + w = moneyness_weight * spread_weight + weights += w + values += w * float(forward.mid) + spreads += w * float(forward.spread) + if ( + default is not None + and default.is_valid() + and default.bp_spread < spread_bp_cutoff + ): + w = math.exp(-10000 * float(default.spread) / float(default.mid)) + weights += w + values += w * float(default.mid) + spreads += w * float(default.spread) + mid = to_decimal(values / weights) + spread = to_decimal(spreads / weights) + if ( + default is not None + and default.is_valid() + and abs(mid - default.mid) / default.spread < 1 + ): + return default + return FwdPrice( + security=forwards[0].security.forward(), + bid=mid - spread / 2, + ask=mid + spread / 2, + maturity=forwards[0].maturity, + ) + class OptionMetadata(BaseModel): """Represents the metadata of an option, including its strike, type, maturity, @@ -370,6 +459,11 @@ def converged(self) -> bool: for both bid and ask""" return self.bid.converged and self.ask.converged + @property + def mid(self) -> Decimal: + """Calculate the mid option price by averaging the bid and ask prices""" + return (self.bid.price + self.ask.price) / 2 + def iv_bid_ask_spread(self) -> float: """Calculate the bid-ask spread of the implied volatility""" return self.ask.implied_vol - self.bid.implied_vol @@ -440,6 +534,52 @@ class Strike(BaseModel, Generic[S]): default=None, description="Put option prices for the strike" ) + def implied_forward(self) -> ImpliedFwdPrice[S] | None: + r"""Extract the implied forward price from put-call parity. + + Requires both a call and a put at this strike. Uses mid prices. + For inverse options (prices quoted in the underlying currency) + put-call parity reads + + \begin{equation} + F = \frac{K}{1 - c + p} + \end{equation} + + For non-inverse options (prices quoted in the quote currency) + + \begin{equation} + F = K + C - P + \end{equation} + + Returns None when the strike does not have both a call and a put, + or when the denominator is non-positive (arbitrage condition violated). + """ + if self.call is None or self.put is None: + return None + cp_bid = self.call.bid.price - self.put.ask.price + cp_ask = self.call.ask.price - self.put.bid.price + if self.call.meta.inverse: + d_bid = 1 - cp_bid + d_ask = 1 - cp_ask + if d_bid <= ZERO or d_ask <= ZERO: + return None + bid = self.strike / d_bid + ask = self.strike / d_ask + else: + bid = self.strike + cp_bid + ask = self.strike + cp_ask + if bid <= ZERO or ask <= ZERO: + return None + if bid > ask: + return None + return ImpliedFwdPrice( + security=self.call.security.forward(), + strike=self.strike, + maturity=self.call.meta.maturity, + bid=bid, + ask=ask, + ) + def options_iter( self, forward: Annotated[Decimal, Doc("Forward price of the underlying asset")], @@ -548,6 +688,23 @@ def ttm(self, ref_date: datetime) -> float: """Time to maturity in years""" return self.day_counter.dcf(ref_date, self.maturity) + def forward_rate(self, ref_date: datetime, spot: SpotPrice[S]) -> Rate: + """Compute the implied continuous rate from spot and forward mid""" + return Rate.from_spot_and_forward( + spot.mid, + self.forward.mid, + ref_date, + self.maturity, + day_counter=self.day_counter, + ) + + def forward_spread_fraction(self) -> Decimal: + """Bid-ask spread of the forward as a fraction of its mid price""" + mid = self.forward.mid + if mid <= ZERO: + return Decimal("Inf") + return (self.forward.ask - self.forward.bid) / mid + def info_dict(self, ref_date: datetime, spot: SpotPrice[S]) -> dict: """Return a dictionary with information about the cross section""" return dict( @@ -555,9 +712,8 @@ def info_dict(self, ref_date: datetime, spot: SpotPrice[S]) -> dict: ttm=self.ttm(ref_date), forward=self.forward.mid, basis=self.forward.mid - spot.mid, - rate_percent=rate_from_spot_and_forward( - spot.mid, self.forward.mid, self.maturity - ref_date - ).percent, + rate_percent=self.forward_rate(ref_date, spot).percent, + fwd_spread_pct=round(100 * self.forward_spread_fraction(), 4), open_interest=self.forward.open_interest, volume=self.forward.volume, ) @@ -753,6 +909,9 @@ def securities( select: Annotated[ OptionSelection, Doc("Option selection method") ] = OptionSelection.all, + index: Annotated[ + int | None, Doc("Index of the cross section to use, if None use all") + ] = None, converged: Annotated[ bool, Doc( @@ -764,8 +923,13 @@ def securities( ) -> Iterator[SpotPrice[S] | FwdPrice[S] | OptionPrices[S]]: """Iterator over securities in the volatility surface""" yield self.spot - for maturity in self.maturities: - yield from maturity.securities(select=select, converged=converged) + if index is not None: + yield from self.maturities[index].securities( + select=select, converged=converged + ) + else: + for maturity in self.maturities: + yield from maturity.securities(select=select, converged=converged) def inputs( self, @@ -773,6 +937,9 @@ def inputs( select: Annotated[ OptionSelection, Doc("Option selection method") ] = OptionSelection.all, + index: Annotated[ + int | None, Doc("Index of the cross section to use, if None use all") + ] = None, converged: Annotated[ bool, Doc( @@ -788,12 +955,15 @@ def inputs( asset=self.asset, ref_date=self.ref_date, inputs=list( - s.inputs() for s in self.securities(select=select, converged=converged) + s.inputs() + for s in self.securities( + select=select, converged=converged, index=index + ) ), ) def term_structure(self) -> pd.DataFrame: - """Return the term structure of the volatility surface""" + """Return the term structure of the volatility surface as a DataFrame""" return pd.DataFrame( cross.info_dict(self.ref_date, self.spot) for cross in self.maturities ) @@ -1038,6 +1208,70 @@ def disable_outliers( ) return self + def calibrate_forwards( + self, + *, + max_spread_fraction: Annotated[ + float, + Doc( + "Maximum allowed forward bid-ask spread as a fraction of the mid " + "price. Forwards exceeding this threshold are considered unreliable " + "and replaced with a synthetic price derived from interpolated rates. " + "A value of 0.05 flags forwards whose spread is more than 5% of mid." + ), + ] = 0.05, + ) -> Self: + """Replace forwards with wide bid-ask spreads with synthetic prices + interpolated from the smooth rate term structure. + + For each maturity the implied continuous rate is computed as + `r = log(F_mid / S) / T`. Maturities whose forward bid-ask spread + exceeds `max_spread_fraction` of the mid are treated as unreliable. + A piecewise-linear interpolation (with flat extrapolation at the + boundaries) is fitted through the reliable `(T, r)` pairs, and the + synthetic forward is: + + `F_synth = S * exp(r_interp * T)` + + The synthetic bid and ask are both set to this value, giving a + zero spread, and the cross-section forward is replaced accordingly. + Returns a new `VolSurface` instance leaving the original unchanged. + """ + spot = self.spot.mid + max_spread = to_decimal(max_spread_fraction) + good_ttms: list[float] = [] + good_rates: list[float] = [] + bad_indices: list[int] = [] + + for i, cross in enumerate(self.maturities): + ttm = cross.ttm(self.ref_date) + spread_frac = cross.forward_spread_fraction() + rate = cross.forward_rate(self.ref_date, self.spot) + if ttm > 0 and spread_frac <= max_spread: + good_ttms.append(ttm) + good_rates.append(float(rate.rate)) + else: + bad_indices.append(i) + + if not good_ttms or not bad_indices: + return self + + ttm_arr = np.array(good_ttms) + rate_arr = np.array(good_rates) + + new_maturities = list(self.maturities) + for i in bad_indices: + cross = self.maturities[i] + ttm = cross.ttm(self.ref_date) + if ttm <= 0: + continue + r_synth = float(np.interp(ttm, ttm_arr, rate_arr)) + f_synth = to_decimal(float(spot) * math.exp(r_synth * ttm)) + new_fwd = cross.forward.model_copy(update=dict(bid=f_synth, ask=f_synth)) + new_maturities[i] = cross.model_copy(update=dict(forward=new_fwd)) + + return self.model_copy(update=dict(maturities=tuple(new_maturities))) + def plot( self, *, @@ -1059,13 +1293,16 @@ def plot3d( select: Annotated[ OptionSelection, Doc("Option selection method") ] = OptionSelection.best, + index: Annotated[ + int | None, Doc("Index of the cross section to use, if None use all") + ] = None, dragmode: Annotated[ str, Doc("Drag interaction mode for the 3D scene") ] = "turntable", **kwargs: Any, ) -> Any: """Plot the volatility surface""" - df = self.options_df(select=select, converged=True) + df = self.options_df(select=select, index=index, converged=True) return plot.plot_vol_surface_3d(df, dragmode=dragmode, **kwargs) @@ -1123,19 +1360,31 @@ def add_option( else: self.strikes[strike].put = option - def cross_section(self) -> VolCrossSection[S] | None: - if self.forward is None or self.forward.mid == ZERO: - return None + def cross_section( + self, + ref_date: Annotated[ + datetime | None, Doc("Reference date for the volatility surface") + ] = None, + ) -> VolCrossSection[S] | None: strikes = [] + implied_forwards = [] for strike in sorted(self.strikes): sk = self.strikes[strike] if sk.call is None and sk.put is None: continue + if implied_forward := sk.implied_forward(): + implied_forwards.append(implied_forward) strikes.append(sk) + forward = self.forward + if implied_forwards: + ttm = self.day_counter.dcf(ref_date or utcnow(), self.maturity) + forward = ImpliedFwdPrice.aggregate(implied_forwards, ttm, self.forward) + if forward is None or not forward.is_valid(): + return None return ( VolCrossSection( maturity=self.maturity, - forward=self.forward, + forward=forward, strikes=tuple(strikes), day_counter=self.day_counter, ) @@ -1280,12 +1529,13 @@ def surface( if not self.spot or self.spot.mid == ZERO: raise ValueError("No spot price provided") maturities = [] + ref_date = ref_date or utcnow() for maturity in sorted(self.maturities): - if section := self.maturities[maturity].cross_section(): + if section := self.maturities[maturity].cross_section(ref_date=ref_date): maturities.append(section) return VolSurface( asset=self.asset, - ref_date=ref_date or utcnow(), + ref_date=ref_date, spot=self.spot, maturities=tuple(maturities), day_counter=self.day_counter, diff --git a/quantflow/utils/interest_rates.py b/quantflow/utils/interest_rates.py index 3c2cdd2..d4fd45f 100644 --- a/quantflow/utils/interest_rates.py +++ b/quantflow/utils/interest_rates.py @@ -1,51 +1,100 @@ from __future__ import annotations import math -from datetime import timedelta +from datetime import datetime from decimal import Decimal -from typing import NamedTuple +from typing import Self -from .numbers import to_decimal +from ccy import DayCounter, Period +from pydantic import BaseModel, Field +from typing_extensions import Annotated, Doc +from .numbers import ZERO, Number, to_decimal -class Rate(NamedTuple): - rate: Decimal = Decimal("0") - frequency: int = 0 +ROUND_RATE = 7 + + +class Rate(BaseModel, arbitrary_types_allowed=True): + """Class representing an interest rate with optional compounding frequency""" + + rate: Decimal = Field( + default=ZERO, description="Interest rate as a decimal (e.g. 0.05 for 5%)" + ) + day_counter: DayCounter = Field( + default=DayCounter.ACTACT, + description="Day count convention to use", + ) + frequency: Period | None = Field( + default=None, + description=( + "Compounding frequency, when None it is considered as " + "continuous compounding" + ), + ) @classmethod - def from_number(cls, rate: float, frequency: int = 0) -> Rate: - return cls(rate=round(to_decimal(rate), 7), frequency=frequency) + def from_number( + cls, + rate: Annotated[Number, Doc("interest rate as a decimal (e.g. 0.05 for 5%)")], + *, + frequency: Annotated[ + Period | None, + Doc( + "Compounding frequency, when None it is considered as " + "continuous compounding" + ), + ] = None, + day_counter: Annotated[ + DayCounter, Doc("Day count convention to use") + ] = DayCounter.ACTACT, + ) -> Self: + """Create a Rate instance from a Number""" + return cls( + rate=round(to_decimal(rate), ROUND_RATE), + frequency=frequency, + day_counter=day_counter, + ) @property def percent(self) -> Decimal: - return round(100 * self.rate, 5) + """Interest rate as a percentage""" + return round(100 * self.rate, ROUND_RATE - 2) @property def bps(self) -> Decimal: - return round(10000 * self.rate, 3) - - -def rate_from_spot_and_forward( - spot: Decimal, forward: Decimal, maturity: timedelta, frequency: int = 0 -) -> Rate: - """Calculate rate from spot and forward - - Args: - basis: basis point - maturity: maturity in years - frequency: number of payments per year - 0 for continuous compounding - - Returns: - Rate - """ - # use Act/365 for now - ttm = maturity.days / 365 - if ttm <= 0: - return Rate(frequency=frequency) - if frequency == 0: - return Rate.from_number( - rate=math.log(forward / spot) / ttm, frequency=frequency - ) - else: - # TODO: implement this - raise NotImplementedError + """Interest rate as basis points, 1 bps = 0.01% = 0.0001 in decimal""" + return round(10000 * self.rate, ROUND_RATE - 4) + + @classmethod + def from_spot_and_forward( + cls, + spot: Annotated[Decimal, Doc("Spot price of the underlying asset")], + forward: Annotated[Decimal, Doc("Forward price of the underlying asset")], + ref_date: Annotated[datetime, Doc("Reference date for the calculation")], + maturity_date: Annotated[datetime, Doc("Maturity date for the calculation")], + *, + frequency: Annotated[ + Period | None, + Doc( + "Compounding frequency, when None it is considered as " + "continuous compounding" + ), + ] = None, + day_counter: Annotated[ + DayCounter, Doc("Day count convention to use") + ] = DayCounter.ACTACT, + ) -> Self: + """Calculate rate from spot and forward""" + # use Act/365 for now + ttm = day_counter.dcf(ref_date, maturity_date) + if ttm <= 0: + return cls(frequency=frequency, day_counter=day_counter) + if frequency is None: + return cls.from_number( + rate=math.log(float(forward / spot)) / ttm, + day_counter=day_counter, + frequency=frequency, + ) + else: + # TODO: implement this + raise NotImplementedError("Discrete compounding is not implemented yet") diff --git a/quantflow_tests/conftest.py b/quantflow_tests/conftest.py index 76acad3..e6e341e 100644 --- a/quantflow_tests/conftest.py +++ b/quantflow_tests/conftest.py @@ -1,3 +1,25 @@ +import json +from pathlib import Path + import dotenv +import pytest + +from quantflow.options.surface import VolSurfaceInputs, surface_from_inputs dotenv.load_dotenv() + +FIXTURES = Path(__file__).parent / "fixtures" + + +def load_fixture(name: str) -> list[dict]: + return json.loads((FIXTURES / name).read_text()) + + +def load_fixture_dict(name: str) -> dict: + return json.loads((FIXTURES / name).read_text()) + + +@pytest.fixture +def vol_surface(): + inputs = load_fixture_dict("volsurface.json") + return surface_from_inputs(VolSurfaceInputs(**inputs)) diff --git a/quantflow_tests/volsurface.json b/quantflow_tests/fixtures/volsurface.json similarity index 100% rename from quantflow_tests/volsurface.json rename to quantflow_tests/fixtures/volsurface.json diff --git a/quantflow_tests/test_ai.py b/quantflow_tests/test_ai.py index 0c0ddcd..55ebb33 100644 --- a/quantflow_tests/test_ai.py +++ b/quantflow_tests/test_ai.py @@ -14,7 +14,6 @@ from quantflow.ai.tools import charts, crypto, fred, stocks, vault from quantflow.ai.tools.base import McpTool from quantflow.data.vault import Vault -from quantflow.options.surface import VolSurfaceInputs, surface_from_inputs # --------------------------------------------------------------------------- # Helpers @@ -67,12 +66,6 @@ def mock_fred() -> AsyncMock: return mock -@pytest.fixture -def vol_surface(): - with open("quantflow_tests/volsurface.json") as fp: - return surface_from_inputs(VolSurfaceInputs(**json.load(fp))) - - @pytest.fixture def vault_server(mcp_tool: McpTool) -> FastMCP: mcp = FastMCP("test-vault")