在GKE上为多租户生成式AI服务构建基于JWT和InfluxDB的安全遥测管道


我们面临的挑战是为部署在GKE上的一个多租户生成式AI推理服务设计并实现一套遥测系统。该系统不仅要能处理由模型推理产生的高基数(high-cardinality)时序数据,还必须保证严格的租户隔离和安全性。具体来说,每个API请求都携带了租户ID、用户ID和模型版本等信息,我们需要基于这些维度来追踪token生成速率、请求延迟、GPU利用率等关键指标。

架构决策:为何放弃主流的Prometheus方案

在项目初期,我们首先评估了云原生领域最常见的监控方案:Prometheus + Grafana。

方案A: 基于Prometheus Exporter的传统方案

这个方案的思路是在每个AI推理服务的Pod中内置一个Prometheus Exporter,暴露带有标签(labels)的指标。例如:
inference_latency_seconds{tenant_id="t-abc", model_id="llama3-8b-v1", api_key_id="key-xyz"} 0.75

优势:

  • 生态成熟,与Kubernetes集成度高,服务发现配置简单。
  • 社区庞大,有大量的现成文档和实践。

劣势:

  • 高基数问题: 这是此方案的致命缺陷。Prometheus的设计哲学不适合处理标签基数爆炸的场景。在我们的多租户系统中,tenant_id, api_key_id 甚至 user_id 的组合数量可能是数万甚至数百万。每一个独特的标签组合都会在Prometheus中创建一个新的时间序列。这会导致Prometheus内存消耗急剧上升、查询性能断崖式下跌,甚至频繁OOM。
  • Push与Pull模型不匹配: AI推理服务通常是无状态的短生命周期任务,更适合主动推送(Push)指标,而不是等待Prometheus来拉取(Pull)。使用Prometheus Pushgateway可以解决部分问题,但它本身又会成为新的单点瓶颈和高基数问题聚合点。
  • 安全与认证复杂: Exporter端点通常暴露在内部网络。要为每个拉取请求实现精细化的认证和授权,验证请求来源是否合法,并将认证信息与指标关联起来,需要复杂的网络策略和附加组件,实现起来非常笨拙。

方案B: 构建基于InfluxDB和JWT的遥测网关

考虑到方案A的根本性问题,我们设计了第二套架构。其核心思想是引入一个独立的遥tery Gateway)服务,并选用InfluxDB作为时序数据库。

graph TD
    subgraph GKE Cluster
        A[GenAI Inference Pod 1] -- Metrics Push (HTTP + JWT) --> B{Telemetry Gateway Service};
        C[GenAI Inference Pod N] -- Metrics Push (HTTP + JWT) --> B;
        B -- InfluxDB Line Protocol --> D[(InfluxDB)];
    end

    E[User/Client] -- API Request w/ JWT --> F[API Gateway];
    F -- Forwards Request --> A;
    A -- Generates new JWT for Telemetry --> B;

    subgraph Monitoring
        D -- Flux Query --> G[Grafana / Chronograf];
    end

架构组件解析:

  1. 生成式AI推理Pod: 负责模型推理。在处理完每个请求后,它会收集性能指标,并使用一个内部生成的、包含遥测元数据的JWT,将指标主动推送到遥测网关。
  2. 遥测网关 (Telemetry Gateway): 一个轻量级的Go服务。它的职责是:
    • 接收指标推送的HTTP请求。
    • 验证请求中的JWT签名和声明(claims)。
    • 从JWT的claims中提取租户ID、模型ID等元数据。
    • 将这些元数据作为InfluxDB的Tags,将性能指标作为Fields,组装成InfluxDB的Line Protocol格式。
    • 批量、异步地将数据写入InfluxDB。
  3. InfluxDB: 专为处理高写入和高查询负载的时序数据而设计。其Tags和Fields分离的数据模型,天然适合解决高基数问题。Tags被索引,用于快速过滤和分组;Fields不被索引,存储原始数值。
  4. JWT (JSON Web Tokens): 作为安全凭证。它不仅用于认证,更关键的是它承载了不可篡改的遥测上下文信息。

最终选择与理由:

我们最终选择了方案B。尽管它引入了自定义的服务组件(遥测网关),增加了少量的开发和维护成本,但它从根本上解决了方案A的两个核心痛点:

  • 高基数处理: InfluxDB将高基数的tenant_id等作为可索引的Tags,查询性能远超Prometheus。
  • 安全与隔离: JWT提供了一种健壮的、分布式的认证和授权机制。遥测网关作为单一入口,可以集中实施安全策略,确保只有合法的服务才能写入指标,且指标的租户归属信息是可信的。

在真实项目中,系统的可预测性和稳定性远比初期的便捷性重要。一个在高负载下会崩溃的监控系统是毫无价值的。

核心实现概览

以下是遥测网关和客户端instrumentation的关键代码实现。我们选择Go语言来构建遥测网关,因为它在网络编程和并发处理方面表现出色。

1. JWT的生成与声明设计

客户端(AI推理Pod)在推送指标前,需要生成一个专用于遥测的JWT。这个JWT的生命周期很短(例如60秒),仅用于单次或短时间的指标推送。

# ai_pod/telemetry_client.py
# 这是一个在Python AI服务中生成JWT并推送指标的示例

import jwt
import time
import requests
import os
import logging

# 配置应从环境变量或配置中心获取
TELEMETRY_GATEWAY_URL = os.environ.get("TELEMETRY_GATEWAY_URL", "http://telemetry-gateway.default.svc.cluster.local/v1/write")
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY") # 必须与网关的密钥一致
JWT_ISSUER = "genai-inference-service"

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def generate_telemetry_jwt(tenant_id: str, model_id: str, api_key_id: str) -> str:
    """
    为遥测数据生成一个短生命周期的JWT
    """
    payload = {
        'iss': JWT_ISSUER,
        'iat': int(time.time()),
        'exp': int(time.time()) + 60,  # 60秒过期
        'aud': 'telemetry-gateway',
        # --- 自定义声明,用于遥测上下文 ---
        'ctx': {
            'tenant_id': tenant_id,
            'model_id': model_id,
            'api_key_id': api_key_id,
            # 可以在这里添加更多维度, e.g., 'region', 'gpu_type'
        }
    }
    if not JWT_SECRET_KEY:
        raise ValueError("JWT_SECRET_KEY is not set.")
        
    encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm="HS256")
    return encoded_jwt

def push_inference_metrics(tenant_id: str, model_id: str, api_key_id: str, latency_ms: float, tokens_out: int):
    """
    收集指标并推送到遥测网关
    """
    try:
        token = generate_telemetry_jwt(tenant_id, model_id, api_key_id)
        
        # 使用 InfluxDB Line Protocol 格式
        # measurement,tag_key=tag_value field_key=field_value timestamp
        # 这里的 measurement 是 'inference_performance'
        # tags 是从JWT中提取的,这里为了演示,在客户端也组装了,但实际上网关会用JWT中的信息
        # fields 是具体的性能指标
        # timestamp 是纳秒级时间戳
        line_protocol_data = (
            f"inference_performance,tenant_id={tenant_id},model_id={model_id},api_key_id={api_key_id} "
            f"latency_ms={latency_ms},tokens_out={tokens_out}i {int(time.time() * 1e9)}"
        )

        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "text/plain; charset=utf-8",
        }

        response = requests.post(
            TELEMETRY_GATEWAY_URL, 
            data=line_protocol_data.encode('utf-8'), 
            headers=headers,
            timeout=5 # 设置超时
        )

        if response.status_code != 204: # InfluxDB写入成功通常返回204 No Content
            logging.error(f"Failed to push metrics: {response.status_code} - {response.text}")
        else:
            logging.info("Successfully pushed metrics.")

    except jwt.PyJWTError as e:
        logging.error(f"JWT generation failed: {e}")
    except requests.exceptions.RequestException as e:
        logging.error(f"HTTP request to telemetry gateway failed: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred during metric push: {e}")

# --- 模拟一次AI推理后的调用 ---
if __name__ == "__main__":
    # 在真实应用中,这些值来自API请求的上下文
    if not JWT_SECRET_KEY:
        print("Error: JWT_SECRET_KEY environment variable must be set.")
    else:
        push_inference_metrics(
            tenant_id="customer-a-123",
            model_id="gemma-7b-it",
            api_key_id="ak_f9e8a1c3",
            latency_ms=850.5,
            tokens_out=256
        )

设计要点:

  • JWT的payload中,我们使用了一个自定义的ctx (context) 字段来封装所有遥测相关的维度。这使得JWT的结构清晰,易于扩展。
  • JWT的生命周期必须很短,以降低token泄露的风险。
  • 客户端代码包含了完整的错误处理和日志记录,这在生产环境中至关重要。

2. 遥测网关 (Go)

这是系统的核心。它是一个无状态服务,可以水平扩展以应对高流量。

// telemetry-gateway/main.go
package main

import (
	"context"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/golang-jwt/jwt/v5"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api"
)

// TelemetryClaims 定义了我们期望从JWT中获取的遥测上下文
type TelemetryClaims struct {
	Context map[string]string `json:"ctx"`
	jwt.RegisteredClaims
}

var (
	influxAPI   api.WriteAPI
	jwtSecret   []byte
	logger      *slog.Logger
)

func init() {
	// --- 日志初始化 ---
	logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))

	// --- InfluxDB客户端初始化 ---
	// 生产环境中,这些配置应来自环境变量或ConfigMap
	influxURL := getEnv("INFLUXDB_URL", "http://localhost:8086")
	influxToken := getEnv("INFLUXDB_TOKEN", "")
	influxOrg := getEnv("INFLUXDB_ORG", "my-org")
	influxBucket := getEnv("INFLUXDB_BUCKET", "genai-metrics")

	if influxToken == "" {
		logger.Error("INFLUXDB_TOKEN is not set. Exiting.")
		os.Exit(1)
	}

	client := influxdb2.NewClient(influxURL, influxToken)
	// 使用带批处理的异步写入API
	// BatchSize: 达到500条数据时刷新
	// FlushInterval: 或者每1000毫秒刷新一次
	influxAPI = client.WriteAPI(influxOrg, influxBucket)

	// 监听写入错误
	go func() {
		for err := range influxAPI.Errors() {
			logger.Error("InfluxDB async write error", "error", err)
		}
	}()

	// --- JWT密钥初始化 ---
	secret := getEnv("JWT_SECRET_KEY", "")
	if secret == "" {
		logger.Error("JWT_SECRET_KEY is not set. Exiting.")
		os.Exit(1)
	}
	jwtSecret = []byte(secret)
}


func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}


// jwtAuthMiddleware 验证JWT并将其声明附加到请求上下文中
func jwtAuthMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		authHeader := r.Header.Get("Authorization")
		if authHeader == "" {
			http.Error(w, "Authorization header required", http.StatusUnauthorized)
			return
		}

		tokenString := strings.TrimPrefix(authHeader, "Bearer ")
		if tokenString == authHeader {
			http.Error(w, "Bearer token required", http.StatusUnauthorized)
			return
		}

		claims := &TelemetryClaims{}
		token, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
			if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
				return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
			}
			return jwtSecret, nil
		})

		if err != nil || !token.Valid {
			logger.Warn("Invalid JWT token received", "error", err)
			http.Error(w, "Invalid token", http.StatusUnauthorized)
			return
		}
		
		// 单元测试思路:可以模拟一个带有有效/无效/过期token的请求,断言中间件的行为是否正确。

		// 将解析后的声明存入context,供下游handler使用
		ctx := context.WithValue(r.Context(), "telemetry_claims", claims)
		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

// writeHandler 处理指标写入请求
func writeHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}
	
	// 从上下文中获取经过验证的claims
	claims, ok := r.Context().Value("telemetry_claims").(*TelemetryClaims)
	if !ok {
		logger.Error("Could not retrieve claims from context. This should not happen after middleware.")
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}
    
    // 这里的实现有一个简化:直接从客户端读取已经格式化好的line protocol。
    // 一个更健壮的实现是:网关读取JSON body,然后根据JWT claims中的`ctx`动态构建tags,确保tags的来源绝对可信。
    // 这可以防止客户端伪造`tenant_id`等关键tag。
    // 但为了演示,我们暂时采用当前方式。
	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Error reading request body", http.StatusBadRequest)
		return
	}
	defer r.Body.Close()

	line := string(body)
    
    // 实际生产中,需要在这里对line protocol进行解析和校验,
    // 并用claims中的`ctx`覆盖或添加tags,确保安全。
	logger.Info("Received metric", "tenant_id", claims.Context["tenant_id"], "line_protocol_length", len(line))


	// 异步写入数据点
	influxAPI.WriteRecord(line)

	w.WriteHeader(http.StatusNoContent)
}

func main() {
	defer influxAPI.Flush() // 确保程序退出时,缓冲区数据被写入

	mux := http.NewServeMux()
	// 核心写入端点受JWT中间件保护
	mux.Handle("/v1/write", jwtAuthMiddleware(http.HandlerFunc(writeHandler)))

	// 健康检查端点
	mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("OK"))
	})
	
	server := &http.Server{
		Addr:         ":8080",
		Handler:      mux,
		ReadTimeout:  5 * time.Second,
		WriteTimeout: 10 * time.Second,
		IdleTimeout:  15 * time.Second,
	}

	logger.Info("Starting Telemetry Gateway on port 8080")
	if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		logger.Error("Server failed to start", "error", err)
	}
}

代码关键点剖析:

  • 依赖管理: 使用Go Modules管理依赖 (github.com/golang-jwt/jwt/v5, github.com/influxdata/influxdb-client-go/v2)。
  • 配置: 所有敏感信息(密钥、Token)和环境配置都通过环境变量注入,符合12-Factor App原则,便于在GKE中通过ConfigMapSecret进行管理。
  • 中间件模式: jwtAuthMiddleware清晰地将认证逻辑与业务逻辑分离。这是标准的Go HTTP服务设计模式。
  • 异步批量写入: InfluxDB Go客户端支持异步批量写入。这对于性能至关重要,它能显著减少网络往返和数据库写压力。我们配置了批处理大小和刷新间隔,并在程序退出时调用Flush()来确保所有缓冲数据都被发送。
  • 结构化日志: 使用slog进行结构化日志记录。这在GKE中非常有用,日志可以被Fluentd或类似工具收集,并轻松地在Google Cloud Logging中进行查询和分析。
  • 健壮性: 添加了健康检查端点 (/healthz),这对于GKE的存活探针(liveness probe)和就绪探针(readiness probe)至关重要。

3. GKE部署配置

# telemetry-gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: telemetry-gateway
  labels:
    app: telemetry-gateway
spec:
  replicas: 3 # 启动3个副本以实现高可用
  selector:
    matchLabels:
      app: telemetry-gateway
  template:
    metadata:
      labels:
        app: telemetry-gateway
    spec:
      containers:
      - name: gateway
        image: your-gcr-repo/telemetry-gateway:v1.0.0
        ports:
        - containerPort: 8080
        env:
        - name: INFLUXDB_URL
          valueFrom:
            configMapKeyRef:
              name: influxdb-config
              key: url
        - name: INFLUXDB_ORG
          value: "my-org"
        - name: INFLUXDB_BUCKET
          value: "genai-metrics"
        - name: INFLUXDB_TOKEN
          valueFrom:
            secretKeyRef:
              name: influxdb-secret
              key: token
        - name: JWT_SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: jwt-secret
              key: secret-key
        resources:
          requests:
            cpu: "200m"
            memory: "256Mi"
          limits:
            cpu: "500m"
            memory: "512Mi"
        readinessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20
---
# telemetry-gateway-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: telemetry-gateway
spec:
  selector:
    app: telemetry-gateway
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
---
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: telemetry-gateway-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: telemetry-gateway
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 75

部署考量:

  • 高可用性: 部署了3个副本,并使用Service进行负载均衡。
  • 弹性伸缩: 配置了HorizontalPodAutoscaler (HPA),当CPU利用率超过75%时自动扩容,最多可达10个副本,以应对流量高峰。
  • 配置与密钥管理: 使用Kubernetes原生的ConfigMapSecret来管理配置,避免将敏感信息硬编码在镜像中。

架构的局限性与未来迭代路径

当前这套架构有效地解决了我们面临的核心问题,但它并非完美无缺。

  • 网关的写入瓶颈: 尽管遥测网关本身可以水平扩展,但所有流量最终都汇聚到InfluxDB。如果写入量超过了InfluxDB单实例或集群的处理能力,这里依然会成为瓶颈。一个可行的优化路径是在网关和InfluxDB之间引入一个消息队列(如Google Pub/Sub或Kafka),利用其削峰填谷的能力,让网关可以更快地响应客户端,同时让下游的消费者可以根据数据库的负载情况来调整写入速率。
  • JWT密钥轮换: 当前方案使用了一个静态的共享密钥。在更严格的安全要求下,需要建立一套自动化的密钥轮换机制。这可以借助HashiCorp Vault或GCP Secret Manager等工具实现。
  • 数据聚合与降采样: 对于非常长期的历史数据分析,原始的细粒度指标可能会占用大量存储。可以引入InfluxDB的Tasks或独立的流处理作业(如Flink),对旧数据进行降采样(downsampling),例如将分钟级数据聚合成小时级或天级,以降低存储成本和提高长期趋势查询的性能。
  • Schema校验强化: 当前网关对客户端发来的Line Protocol信任度较高。一个更安全的做法是在网关层强制解析body,忽略客户端自带的tags,完全基于JWT claims中的ctx来重新构建tags,这样可以彻底杜绝客户端伪造关键维度信息的可能性。

  目录