Track 4 · Advanced · Lesson 11

Production feedback loop: log → tag → augment → retrain

After this lesson you can wire request/response logging at your endpoint with PII redaction and a sampling budget, tag bad rows (manually, from drift signals, from user feedback), convert tags into SFT examples, and retrain on a measured cadence — closing the loop that Lesson 4.9's drift detection opens.

Level: advanced Read time: ~10 min Prerequisites: Observability & drift in production

Lesson 4.9 ended with a question: drift detection tells you that your live model has gotten worse — what do you do about it? You retrain. Retrain on what? That's the question this lesson answers. The "what" is production rows you collected, tagged, and curated. Without that data-collection loop, drift is a fire alarm with no water nearby.

Drift detects; the loop supplies

Lesson 4.9's drift check re-runs your gold set against the live endpoint on a schedule. When the gold set's pass rate drops, you know something is wrong. You don't know what. Your gold set was curated months ago; what shifted in production isn't in it. The feedback loop's job is to make sure that, when drift fires, you have recent real inputs the model got wrong ready to add to the next training round.

Two halves of one cycle

Drift detection (Lesson 4.9) = the gold set, on a schedule, vs the live endpoint. Catches the problem. Feedback loop (this lesson) = continuous logging + tagging + conversion of production rows into training examples. Supplies the cure. You need both — drift without the loop is alarms without action; the loop without drift is data piling up with no signal that it's needed.

Step 1: log requests and responses

A thin middleware around your inference endpoint writes one record per request. Pydantic for the schema, async writes so the request path stays fast.

from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from fastapi import FastAPI, Request
import json, secrets

class LogEntry(BaseModel):
    request_id: str
    ts: datetime
    model_version: str
    prompt: str
    response: str
    latency_ms: int
    user_id_hash: Optional[str] = None   # hashed, never raw
    response_logprob: Optional[float] = None
    tags: list[str] = []                  # filled in later by the tagger

app = FastAPI()
LOG_PATH = "data/inference.jsonl"

def write_log(entry: LogEntry) -> None:
    with open(LOG_PATH, "a") as f:
        f.write(entry.model_dump_json() + "\n")

@app.post("/chat")
async def chat(req: Request):
    body = await req.json()
    prompt = body["prompt"]
    t0 = datetime.utcnow()
    response, logprob = generate(prompt)        # your model.generate(...) wrapper
    latency = int((datetime.utcnow() - t0).total_seconds() * 1000)

    entry = LogEntry(
        request_id=secrets.token_hex(8),
        ts=t0,
        model_version="qwen-pii-v6",            # match what's deployed
        prompt=redact_pii(prompt),              # see Step 2
        response=response,
        latency_ms=latency,
        user_id_hash=hash_user(body.get("user_id")),
        response_logprob=logprob,
    )
    write_log(entry)
    return {"response": response, "request_id": entry.request_id}

Step 2: the privacy + cost honest beat

Two things make naive "log everything" wrong in production:

import re, hashlib, random

_PII_PATTERNS = [
    (re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+"), "[EMAIL]"),
    (re.compile(r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b"), "[PHONE]"),
    (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
]

def redact_pii(text: str) -> str:
    for pat, tag in _PII_PATTERNS:
        text = pat.sub(tag, text)
    return text

def hash_user(uid: str | None) -> str | None:
    if not uid: return None
    return hashlib.sha256(uid.encode()).hexdigest()[:16]

# Sampling budget: log 100% of low-confidence/flagged, ~5% of healthy responses.
def should_log_full(entry: LogEntry) -> bool:
    if entry.response_logprob is not None and entry.response_logprob < -3.0:
        return True                              # uncertain — always log
    return random.random() < 0.05                # 5% healthy sample

Step 3: tag bad responses

Tags are how you turn a stream of logs into a queue of training examples. Three sources:

  1. Manual: an operator reviews a queue of low-confidence rows + drift-period rows and tags them. Slow but high-quality.
  2. Auto-tag from drift: every row in the period preceding a deployment_drift_detected event (Lesson 4.9) gets a "drift_window" tag automatically — these are the production inputs your model started failing on.
  3. User feedback: thumbs-down, "regenerate," "report" buttons all map to a "user_negative" tag.

Tags are opaque strings on the log entry. Anything you might later want to filter by becomes a tag.

# Auto-tag from drift event (run after drift_detected fires)
import json
from datetime import timedelta

def tag_drift_window(drift_ts: datetime, window=timedelta(hours=12)) -> int:
    start = drift_ts - window
    out: list[dict] = []
    with open(LOG_PATH) as f:
        for line in f:
            entry = json.loads(line)
            if start <= datetime.fromisoformat(entry["ts"]) <= drift_ts:
                if "drift_window" not in entry["tags"]:
                    entry["tags"].append("drift_window")
            out.append(entry)
    with open(LOG_PATH, "w") as f:
        for e in out: f.write(json.dumps(e) + "\n")
    return sum(1 for e in out if "drift_window" in e["tags"])

Step 4: convert tags into training examples

This is where the loop closes. Filtered + tagged log rows become SFT pairs.

from datasets import Dataset

def logs_to_sft(log_path: str, tags: set[str], gold_corrections: dict[str, str]) -> Dataset:
    """Pull tagged rows, replace the bad response with the corrected one.

    gold_corrections maps request_id -> the corrected completion an operator wrote.
    """
    rows = []
    with open(log_path) as f:
        for line in f:
            e = json.loads(line)
            if not (set(e["tags"]) & tags):  # only rows with at least one matching tag
                continue
            corrected = gold_corrections.get(e["request_id"])
            if not corrected:
                continue                             # untagged corrections are dropped
            rows.append({
                "messages": [
                    {"role": "user",      "content": e["prompt"]},
                    {"role": "assistant", "content": corrected},
                ]
            })
    return Dataset.from_list(rows)

The corrected completion comes from the operator review (Step 3, source 1) or from a re-run of a stronger model whose output was verified. Either way, every example carries a correction the model should have produced, not just a flag of "this was wrong."

Step 5: retrain cadence

How often is the harder question. Three honest patterns:

Don't ship a regression — re-run the eval suite

Every retrain has to clear the same eval-pack gates the previous deploy cleared (Track 3, Lesson 3.13's required gates). Otherwise you fix the new failures and reintroduce the old ones. The promotability decision is your safety net; treat it as such.

Key idea

Drift detection (Lesson 4.9) catches the problem; the feedback loop supplies the cure. Log with PII redaction and a sampling budget, tag bad rows (manual / drift / user), convert tagged rows to SFT examples using operator corrections, and retrain on a measured cadence — every retrain re-clearing the eval-pack gates the previous deploy cleared.

One more advanced technique remains: tool-use / function-calling fine-tuning, where the model's output isn't text but a structured call to an external tool.

Key terms

Production logging
Recording request/response pairs at the inference endpoint into a queryable, append-only log (commonly JSONL).
PII redaction
Replacing personal identifiers in logged text with tokens like [EMAIL] before disk; hashing user ids.
Sampling budget
The rule that decides which rows get logged; full logging is too expensive at scale. Keep all interesting rows (low-confidence, flagged, drift-window), sample the rest.
Tag
An opaque string attached to a log entry that marks it as candidate training data (e.g. drift_window, user_negative, operator_review).
Operator correction
The corrected completion an operator (or stronger model) wrote for a tagged bad row; the source of the SFT example.
Retrain cadence
The trigger for when to retrain: on drift signal, on schedule, or on a failure-cluster count threshold.

Check yourself

Answers are saved to this browser.

Progress is stored locally in your browser.