LambdaFlows Logo
LambdaFlows™
Building an AI-Powered Rebalancing Engine for Uniswap V3: A Technical Deep Dive
technicalAImachine learningUniswapDeFiPythonblockchain

Building an AI-Powered Rebalancing Engine for Uniswap V3: A Technical Deep Dive

Ansyn Engineering9 min read

Uniswap V3's concentrated liquidity model revolutionized decentralized exchange mechanics by allowing liquidity providers (LPs) to allocate capital within specific price ranges. While this significantly improved capital efficiency, it introduced a new challenge: when and how to rebalance positions as prices move.

At Ansyn, we built Uniswap Advisor—a production-grade system that combines real-time blockchain data, statistical forecasting models, and impermanent loss-aware decision logic to provide intelligent rebalancing recommendations.

System Architecture

Our system is organized into four distinct layers:

Client Layer

  • FastAPI REST API (Port 8000) — Programmatic access for integrations
  • Streamlit Dashboard (Port 8501) — Interactive analysis and visualization

Service Layer

  • Position Evaluator — Core evaluation logic
  • Price History Service — Blockchain data fetching with intelligent caching
  • Decision Logger — Comprehensive audit trail

Core Layer

  • Strategies — 5 distinct rebalancing approaches
  • Uniswap Math — Tick/price conversions, liquidity calculations
  • Analyzers — Volatility, fees, and position analysis

Data Layer

  • Alchemy RPC — Multi-chain blockchain access
  • Price Cache — Parquet files for efficient storage
  • Model Cache — Trained ML models

The Rebalancing Problem

In Uniswap V3, LPs provide liquidity within a price range [P_lower, P_upper]. The position earns fees only when the current price is within this range. When price exits the range, the position becomes 100% concentrated in one asset and earns zero fees.

The core question: When price exits your range, should you:

  1. Rebalance immediately to a new range centered on current price?
  2. Wait for price to return to your original range?

This decision depends on:

  • Transaction costs (gas fees)
  • Impermanent loss from rebalancing
  • Probability of price returning vs. continuing to drift
  • Foregone fee earnings while out of range

Impermanent Loss Awareness

A critical insight: rebalancing locks in impermanent loss. If you rebalance when price is 10% below your range, you're selling ETH at a 10% discount compared to your entry. If price subsequently returns to your original range, you would have been better off waiting.

Our AI strategies explicitly model this trade-off.

Strategy Implementations

Strategy 1: Tau Reset (Baseline)

The simplest approach—rebalance every 24 hours regardless of price.

class TauResetStrategy(BaseStrategy):
    def evaluate(self, position: Position, pool_state: PoolState) -> Decision:
        hours_since_last = (now() - position.last_rebalance).total_seconds() / 3600

        if hours_since_last >= 24:
            return Decision(
                action="REBALANCE",
                new_range=self._center_range_on_price(pool_state.current_price),
                reason="24-hour interval reached"
            )
        return Decision(action="WAIT", reason=f"Next rebalance in ...")

Pros: Predictable, simple, no ML required Cons: Ignores market conditions, may rebalance unnecessarily

Strategy 2: Auto-Rebalance (Price-Triggered)

Rebalance when price moves more than 1% or exits the range.

class AutoRebalanceStrategy(BaseStrategy):
    def __init__(self, threshold_pct: float = 0.01):
        self.threshold = threshold_pct

    def evaluate(self, position: Position, pool_state: PoolState) -> Decision:
        price_change = abs(pool_state.current_price - position.center_price) / position.center_price

        if not position.is_in_range(pool_state.current_price) or price_change > self.threshold:
            return Decision(action="REBALANCE", ...)
        return Decision(action="WAIT", ...)

Pros: Responsive to volatility Cons: Can over-trade in choppy markets, no forecasting

Strategy 3: Bucket Tau Reset (Volatility-Adaptive)

Adjusts rebalance frequency based on realized volatility regime.

class BucketTauResetStrategy(BaseStrategy):
    VOLATILITY_BUCKETS = [
        ("low", 0.02, 48),
        ("medium", 0.05, 24),
        ("high", inf, 12)
    ]

Pros: Adapts to market conditions Cons: Still reactive, not predictive

Strategies 4 and 5: AI-SES and AI-ARIMA (ML-Powered)

Our flagship strategies use time-series forecasting to predict price movements and make probabilistic decisions.

Deep Dive: AI-SES Strategy

Algorithm Overview

  1. Data Collection: Fetch 45 days of hourly OHLCV data from Alchemy
  2. Model Training: Fit an Exponential Smoothing (ETS) model
  3. Forecasting: Generate 12-hour ahead predictions with confidence intervals
  4. Decision Logic: Use probabilistic reasoning based on forecast distribution

ETS Model Implementation

We use the Holt-Winters Exponential Smoothing model from statsmodels:

from statsmodels.tsa.holtwinters import ExponentialSmoothing

class AIForecastModel:
    def train(self, prices):
        model = ExponentialSmoothing(
            prices,
            trend='add',
            seasonal='add',
            seasonal_periods=24,  # Daily seasonality
            damped_trend=True
        ).fit(optimized=True)

        # Generate forecasts with confidence intervals
        forecast = model.get_forecast(steps=12)
        mean = forecast.predicted_mean.values
        ci = forecast.conf_int(alpha=0.10)  # 90% CI

        return mean, ci.lower.values, ci.upper.values

IL-Aware Decision Logic

The key innovation is our impermanent loss-aware decision logic:

def evaluate(self, position, forecast):
    in_range = position.is_in_range(current_price)

    if in_range:
        # Calculate probability of exiting range
        exit_prob = self._calc_exit_probability(
            forecast.mean, forecast.lower_90, forecast.upper_90,
            position.range_lower, position.range_upper
        )

        if exit_prob > 0.5:
            return Decision(
                action="REBALANCE",
                confidence=exit_prob,
                reason="High probability of range exit"
            )
        return Decision(action="WAIT", reason="Low exit probability")

    else:
        # OUT OF RANGE - This is where IL awareness matters
        reentry_prob = self._calc_reentry_probability(
            forecast.mean, forecast.lower_90, forecast.upper_90,
            position.range_lower, position.range_upper
        )

        if reentry_prob > 0.5:
            # CRITICAL: Wait to avoid locking in IL
            return Decision(
                action="WAIT",
                confidence=reentry_prob,
                reason="Price likely to re-enter range. Waiting avoids IL."
            )

        return Decision(
            action="REBALANCE",
            new_range=self._optimal_range(forecast.mean),
            reason="Low re-entry probability, rebalancing to capture fees"
        )

Probability Calculations

We model price distribution using the forecast confidence intervals:

def _calc_exit_probability(self, forecast_mean, lower_90, upper_90,
                            range_lower, range_upper):
    # Estimate standard deviation from 90% CI
    # 90% CI spans approximately 3.29 standard deviations
    std = (upper_90[-1] - lower_90[-1]) / 3.29
    final_price_mean = forecast_mean[-1]

    from scipy.stats import norm

    # P(price below range_lower) + P(price above range_upper)
    p_below = norm.cdf(range_lower, loc=final_price_mean, scale=std)
    p_above = 1 - norm.cdf(range_upper, loc=final_price_mean, scale=std)

    return p_below + p_above

Data Pipeline

Intelligent Caching

We implemented an intelligent caching system that minimizes RPC calls:

class PriceHistoryService:
    def get_history(self, pool, days=45):
        cache_key = pool + "_" + str(days) + "d"
        cached = self._load_cache(cache_key)

        if cached is not None:
            # Check if cache needs complementing
            cache_end = cached.index[-1]
            if (now() - cache_end).hours > 1:
                # Fetch only missing hours
                new_data = self._fetch_from_blockchain(pool, start=cache_end, end=now())
                combined = pd.concat([cached, new_data]).drop_duplicates()
                self._save_cache(cache_key, combined)
                return combined
            return cached

        return self._fetch_full_history(pool, days)

Alchemy Rate Limit Handling

Alchemy's API has a 30-day limit for historical queries. We handle this transparently:

def _fetch_full_history(self, pool, days):
    chunks = []
    end_time = now()

    while days > 0:
        chunk_days = min(days, 30)
        start_time = end_time - timedelta(days=chunk_days)

        chunk_data = self._fetch_chunk(pool, start_time, end_time)
        chunks.append(chunk_data)

        end_time = start_time
        days -= chunk_days

    return pd.concat(reversed(chunks))

GPU Training Infrastructure

RunPod Integration

For computationally intensive model training, we built a fully automated GPU orchestration system:

class RunPodOrchestrator:
    async def auto_train(self, config):
        # 1. Create GPU pod
        pod = await self.runpod.create_pod(
            gpu_type=config.gpu_type,
            image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime"
        )

        try:
            # 2. Wait for pod ready
            await self._wait_for_ready(pod.id, timeout=300)

            # 3. SSH into pod and run training
            ssh = await self._establish_ssh(pod.ssh_host, pod.ssh_port)
            await ssh.execute("pip install -r requirements.txt")

            # 4. Run training with progress streaming
            async for line in ssh.execute_stream(
                "python train.py --pool " + config.pool + " --epochs " + str(config.epochs)
            ):
                yield TrainingProgress(line)

            # 5. Download trained models
            await ssh.download("/workspace/models/*", config.local_model_dir)

            return TrainingResult(success=True, model_path=config.local_model_dir)

        finally:
            if config.auto_terminate:
                await self.runpod.terminate_pod(pod.id)

CLI Interface

python scripts/runpod_cli.py auto \
    --pool 0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640 \
    --chain ethereum \
    --epochs 100 \
    --gpu RTX4090 \
    --auto-terminate

Testing Strategy

Walk-Forward Validation

We validate our AI strategies using walk-forward testing—training on historical data and testing on subsequent unseen periods:

@pytest.mark.regression
def test_ai_ses_walkforward():
    pool = "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"  # ETH/USDC

    # Load 45 days of historical data
    data = load_price_history(pool, days=45)

    # Split: 30 days training, 15 days testing
    train_data = data[:30*24]
    test_data = data[30*24:]

    # Train model
    model = AIForecastModel()
    model.train(train_data)

    # Evaluate on test period
    predictions, actuals = [], []
    for i in range(0, len(test_data) - 12, 6):
        forecast = model.predict(test_data[:i], horizon=12)
        predictions.append(forecast.mean[-1])
        actuals.append(test_data.iloc[i+11])

    # Calculate metrics
    mae = mean_absolute_error(actuals, predictions)
    coverage = calculate_ci_coverage(predictions, actuals)

    assert mae < 50, "MAE too high"
    assert coverage > 0.85, "90% CI coverage too low"

API Design

Main Evaluation Endpoint

The main endpoint accepts a position and returns a rebalancing recommendation:

Request:

POST /api/v1/rebalance/evaluate

pool_address: "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"
chain: "ethereum"
position:
  range_lower: 1800.0
  range_upper: 2200.0
  liquidity: 1000000
strategy: "ai-ses"

Response:

action: "WAIT"
confidence: 0.73
reason: "Price likely to re-enter range (73%). Waiting avoids IL."
current_price: 1750.45
is_in_range: false
forecast:
  mean_12h: 1823.50
  lower_90: 1756.20
  upper_90: 1890.80

Technology Stack

ComponentTechnologyRationale
API FrameworkFastAPIAsync support, automatic OpenAPI docs
ML FrameworkPyTorch + StatsmodelsGPU acceleration + robust statistical models
Time SeriesDarts + StatsmodelsETS and ARIMA with confidence intervals
BlockchainWeb3.py + AlchemyReliable RPC with historical data
Data StorageParquet + CSVEfficient storage + readable logs
GPU TrainingRunPodOn-demand GPU resources

Performance Metrics

System Statistics

MetricValue
Total Lines of Code462,090
Core Strategy Logic~3,000 lines
Supported Chains4 (Ethereum, Base, Arbitrum, Polygon)
Strategies5
Average Response TimeUnder 500ms (cached), under 3s (cold)

Forecasting Accuracy (ETH/USDC)

MetricValue
MAE (12h forecast)$23.45
MAPE1.2%
90% CI Coverage91.3%
Direction Accuracy67%

Lessons Learned

1. Cache Everything

Blockchain RPC calls are expensive and slow. We cache price history (Parquet), trained models (Pickle), and pool metadata (in-memory with TTL).

2. Separate Dependencies

Different deployment modes need different dependencies. Our API server has no matplotlib, while the dashboard has full visualization.

3. Log Decisions, Not Just Actions

Logging the full decision context enables post-hoc analysis, model improvement, and audit trails.

4. IL Awareness is Critical

The biggest win came from the IL-aware waiting logic. Simply predicting price direction isn't enough—you need to consider the cost of being wrong.

Future Work

  1. Reinforcement Learning — Train RL agents for cumulative return optimization
  2. Multi-Position Optimization — Consider correlations across positions
  3. MEV-Aware Execution — Optimize transaction timing
  4. Cross-Protocol Support — Extend to other concentrated liquidity AMMs

Conclusion

Building Uniswap Advisor required combining expertise in DeFi mechanics, time-series forecasting, blockchain data engineering, and production ML systems. The result is a system that provides genuine value to liquidity providers—actionable, probabilistic recommendations backed by rigorous testing.


Ansyn builds AI-powered financial tools that make sophisticated investment strategies accessible. For technical inquiries, visit ansyn.ai