我们面临的挑战是为部署在GKE上的一个多租户生成式AI推理服务设计并实现一套遥测系统。该系统不仅要能处理由模型推理产生的高基数(high-cardinality)时序数据,还必须保证严格的租户隔离和安全性。具体来说,每个API请求都携带了租户ID、用户ID和模型版本等信息,我们需要基于这些维度来追踪token生成速率、请求延迟、GPU利用率等关键指标。
架构决策:为何放弃主流的Prometheus方案
在项目初期,我们首先评估了云原生领域最常见的监控方案:Prometheus + Grafana。
方案A: 基于Prometheus Exporter的传统方案
这个方案的思路是在每个AI推理服务的Pod中内置一个Prometheus Exporter,暴露带有标签(labels)的指标。例如:inference_latency_seconds{tenant_id="t-abc", model_id="llama3-8b-v1", api_key_id="key-xyz"} 0.75
优势:
- 生态成熟,与Kubernetes集成度高,服务发现配置简单。
- 社区庞大,有大量的现成文档和实践。
劣势:
- 高基数问题: 这是此方案的致命缺陷。Prometheus的设计哲学不适合处理标签基数爆炸的场景。在我们的多租户系统中,
tenant_id
,api_key_id
甚至user_id
的组合数量可能是数万甚至数百万。每一个独特的标签组合都会在Prometheus中创建一个新的时间序列。这会导致Prometheus内存消耗急剧上升、查询性能断崖式下跌,甚至频繁OOM。 - Push与Pull模型不匹配: AI推理服务通常是无状态的短生命周期任务,更适合主动推送(Push)指标,而不是等待Prometheus来拉取(Pull)。使用Prometheus Pushgateway可以解决部分问题,但它本身又会成为新的单点瓶颈和高基数问题聚合点。
- 安全与认证复杂: Exporter端点通常暴露在内部网络。要为每个拉取请求实现精细化的认证和授权,验证请求来源是否合法,并将认证信息与指标关联起来,需要复杂的网络策略和附加组件,实现起来非常笨拙。
方案B: 构建基于InfluxDB和JWT的遥测网关
考虑到方案A的根本性问题,我们设计了第二套架构。其核心思想是引入一个独立的遥tery Gateway)服务,并选用InfluxDB作为时序数据库。
graph TD subgraph GKE Cluster A[GenAI Inference Pod 1] -- Metrics Push (HTTP + JWT) --> B{Telemetry Gateway Service}; C[GenAI Inference Pod N] -- Metrics Push (HTTP + JWT) --> B; B -- InfluxDB Line Protocol --> D[(InfluxDB)]; end E[User/Client] -- API Request w/ JWT --> F[API Gateway]; F -- Forwards Request --> A; A -- Generates new JWT for Telemetry --> B; subgraph Monitoring D -- Flux Query --> G[Grafana / Chronograf]; end
架构组件解析:
- 生成式AI推理Pod: 负责模型推理。在处理完每个请求后,它会收集性能指标,并使用一个内部生成的、包含遥测元数据的JWT,将指标主动推送到遥测网关。
- 遥测网关 (Telemetry Gateway): 一个轻量级的Go服务。它的职责是:
- 接收指标推送的HTTP请求。
- 验证请求中的JWT签名和声明(claims)。
- 从JWT的claims中提取租户ID、模型ID等元数据。
- 将这些元数据作为InfluxDB的Tags,将性能指标作为Fields,组装成InfluxDB的Line Protocol格式。
- 批量、异步地将数据写入InfluxDB。
- InfluxDB: 专为处理高写入和高查询负载的时序数据而设计。其Tags和Fields分离的数据模型,天然适合解决高基数问题。Tags被索引,用于快速过滤和分组;Fields不被索引,存储原始数值。
- JWT (JSON Web Tokens): 作为安全凭证。它不仅用于认证,更关键的是它承载了不可篡改的遥测上下文信息。
最终选择与理由:
我们最终选择了方案B。尽管它引入了自定义的服务组件(遥测网关),增加了少量的开发和维护成本,但它从根本上解决了方案A的两个核心痛点:
- 高基数处理: InfluxDB将高基数的
tenant_id
等作为可索引的Tags,查询性能远超Prometheus。 - 安全与隔离: JWT提供了一种健壮的、分布式的认证和授权机制。遥测网关作为单一入口,可以集中实施安全策略,确保只有合法的服务才能写入指标,且指标的租户归属信息是可信的。
在真实项目中,系统的可预测性和稳定性远比初期的便捷性重要。一个在高负载下会崩溃的监控系统是毫无价值的。
核心实现概览
以下是遥测网关和客户端instrumentation的关键代码实现。我们选择Go语言来构建遥测网关,因为它在网络编程和并发处理方面表现出色。
1. JWT的生成与声明设计
客户端(AI推理Pod)在推送指标前,需要生成一个专用于遥测的JWT。这个JWT的生命周期很短(例如60秒),仅用于单次或短时间的指标推送。
# ai_pod/telemetry_client.py
# 这是一个在Python AI服务中生成JWT并推送指标的示例
import jwt
import time
import requests
import os
import logging
# 配置应从环境变量或配置中心获取
TELEMETRY_GATEWAY_URL = os.environ.get("TELEMETRY_GATEWAY_URL", "http://telemetry-gateway.default.svc.cluster.local/v1/write")
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY") # 必须与网关的密钥一致
JWT_ISSUER = "genai-inference-service"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def generate_telemetry_jwt(tenant_id: str, model_id: str, api_key_id: str) -> str:
"""
为遥测数据生成一个短生命周期的JWT
"""
payload = {
'iss': JWT_ISSUER,
'iat': int(time.time()),
'exp': int(time.time()) + 60, # 60秒过期
'aud': 'telemetry-gateway',
# --- 自定义声明,用于遥测上下文 ---
'ctx': {
'tenant_id': tenant_id,
'model_id': model_id,
'api_key_id': api_key_id,
# 可以在这里添加更多维度, e.g., 'region', 'gpu_type'
}
}
if not JWT_SECRET_KEY:
raise ValueError("JWT_SECRET_KEY is not set.")
encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm="HS256")
return encoded_jwt
def push_inference_metrics(tenant_id: str, model_id: str, api_key_id: str, latency_ms: float, tokens_out: int):
"""
收集指标并推送到遥测网关
"""
try:
token = generate_telemetry_jwt(tenant_id, model_id, api_key_id)
# 使用 InfluxDB Line Protocol 格式
# measurement,tag_key=tag_value field_key=field_value timestamp
# 这里的 measurement 是 'inference_performance'
# tags 是从JWT中提取的,这里为了演示,在客户端也组装了,但实际上网关会用JWT中的信息
# fields 是具体的性能指标
# timestamp 是纳秒级时间戳
line_protocol_data = (
f"inference_performance,tenant_id={tenant_id},model_id={model_id},api_key_id={api_key_id} "
f"latency_ms={latency_ms},tokens_out={tokens_out}i {int(time.time() * 1e9)}"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "text/plain; charset=utf-8",
}
response = requests.post(
TELEMETRY_GATEWAY_URL,
data=line_protocol_data.encode('utf-8'),
headers=headers,
timeout=5 # 设置超时
)
if response.status_code != 204: # InfluxDB写入成功通常返回204 No Content
logging.error(f"Failed to push metrics: {response.status_code} - {response.text}")
else:
logging.info("Successfully pushed metrics.")
except jwt.PyJWTError as e:
logging.error(f"JWT generation failed: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"HTTP request to telemetry gateway failed: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during metric push: {e}")
# --- 模拟一次AI推理后的调用 ---
if __name__ == "__main__":
# 在真实应用中,这些值来自API请求的上下文
if not JWT_SECRET_KEY:
print("Error: JWT_SECRET_KEY environment variable must be set.")
else:
push_inference_metrics(
tenant_id="customer-a-123",
model_id="gemma-7b-it",
api_key_id="ak_f9e8a1c3",
latency_ms=850.5,
tokens_out=256
)
设计要点:
- JWT的
payload
中,我们使用了一个自定义的ctx
(context) 字段来封装所有遥测相关的维度。这使得JWT的结构清晰,易于扩展。 - JWT的生命周期必须很短,以降低token泄露的风险。
- 客户端代码包含了完整的错误处理和日志记录,这在生产环境中至关重要。
2. 遥测网关 (Go)
这是系统的核心。它是一个无状态服务,可以水平扩展以应对高流量。
// telemetry-gateway/main.go
package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
// TelemetryClaims 定义了我们期望从JWT中获取的遥测上下文
type TelemetryClaims struct {
Context map[string]string `json:"ctx"`
jwt.RegisteredClaims
}
var (
influxAPI api.WriteAPI
jwtSecret []byte
logger *slog.Logger
)
func init() {
// --- 日志初始化 ---
logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
// --- InfluxDB客户端初始化 ---
// 生产环境中,这些配置应来自环境变量或ConfigMap
influxURL := getEnv("INFLUXDB_URL", "http://localhost:8086")
influxToken := getEnv("INFLUXDB_TOKEN", "")
influxOrg := getEnv("INFLUXDB_ORG", "my-org")
influxBucket := getEnv("INFLUXDB_BUCKET", "genai-metrics")
if influxToken == "" {
logger.Error("INFLUXDB_TOKEN is not set. Exiting.")
os.Exit(1)
}
client := influxdb2.NewClient(influxURL, influxToken)
// 使用带批处理的异步写入API
// BatchSize: 达到500条数据时刷新
// FlushInterval: 或者每1000毫秒刷新一次
influxAPI = client.WriteAPI(influxOrg, influxBucket)
// 监听写入错误
go func() {
for err := range influxAPI.Errors() {
logger.Error("InfluxDB async write error", "error", err)
}
}()
// --- JWT密钥初始化 ---
secret := getEnv("JWT_SECRET_KEY", "")
if secret == "" {
logger.Error("JWT_SECRET_KEY is not set. Exiting.")
os.Exit(1)
}
jwtSecret = []byte(secret)
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
// jwtAuthMiddleware 验证JWT并将其声明附加到请求上下文中
func jwtAuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header required", http.StatusUnauthorized)
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
if tokenString == authHeader {
http.Error(w, "Bearer token required", http.StatusUnauthorized)
return
}
claims := &TelemetryClaims{}
token, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return jwtSecret, nil
})
if err != nil || !token.Valid {
logger.Warn("Invalid JWT token received", "error", err)
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// 单元测试思路:可以模拟一个带有有效/无效/过期token的请求,断言中间件的行为是否正确。
// 将解析后的声明存入context,供下游handler使用
ctx := context.WithValue(r.Context(), "telemetry_claims", claims)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// writeHandler 处理指标写入请求
func writeHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
// 从上下文中获取经过验证的claims
claims, ok := r.Context().Value("telemetry_claims").(*TelemetryClaims)
if !ok {
logger.Error("Could not retrieve claims from context. This should not happen after middleware.")
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// 这里的实现有一个简化:直接从客户端读取已经格式化好的line protocol。
// 一个更健壮的实现是:网关读取JSON body,然后根据JWT claims中的`ctx`动态构建tags,确保tags的来源绝对可信。
// 这可以防止客户端伪造`tenant_id`等关键tag。
// 但为了演示,我们暂时采用当前方式。
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body", http.StatusBadRequest)
return
}
defer r.Body.Close()
line := string(body)
// 实际生产中,需要在这里对line protocol进行解析和校验,
// 并用claims中的`ctx`覆盖或添加tags,确保安全。
logger.Info("Received metric", "tenant_id", claims.Context["tenant_id"], "line_protocol_length", len(line))
// 异步写入数据点
influxAPI.WriteRecord(line)
w.WriteHeader(http.StatusNoContent)
}
func main() {
defer influxAPI.Flush() // 确保程序退出时,缓冲区数据被写入
mux := http.NewServeMux()
// 核心写入端点受JWT中间件保护
mux.Handle("/v1/write", jwtAuthMiddleware(http.HandlerFunc(writeHandler)))
// 健康检查端点
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
server := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}
logger.Info("Starting Telemetry Gateway on port 8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("Server failed to start", "error", err)
}
}
代码关键点剖析:
- 依赖管理: 使用Go Modules管理依赖 (
github.com/golang-jwt/jwt/v5
,github.com/influxdata/influxdb-client-go/v2
)。 - 配置: 所有敏感信息(密钥、Token)和环境配置都通过环境变量注入,符合12-Factor App原则,便于在GKE中通过
ConfigMap
和Secret
进行管理。 - 中间件模式:
jwtAuthMiddleware
清晰地将认证逻辑与业务逻辑分离。这是标准的Go HTTP服务设计模式。 - 异步批量写入: InfluxDB Go客户端支持异步批量写入。这对于性能至关重要,它能显著减少网络往返和数据库写压力。我们配置了批处理大小和刷新间隔,并在程序退出时调用
Flush()
来确保所有缓冲数据都被发送。 - 结构化日志: 使用
slog
进行结构化日志记录。这在GKE中非常有用,日志可以被Fluentd或类似工具收集,并轻松地在Google Cloud Logging中进行查询和分析。 - 健壮性: 添加了健康检查端点 (
/healthz
),这对于GKE的存活探针(liveness probe)和就绪探针(readiness probe)至关重要。
3. GKE部署配置
# telemetry-gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetry-gateway
labels:
app: telemetry-gateway
spec:
replicas: 3 # 启动3个副本以实现高可用
selector:
matchLabels:
app: telemetry-gateway
template:
metadata:
labels:
app: telemetry-gateway
spec:
containers:
- name: gateway
image: your-gcr-repo/telemetry-gateway:v1.0.0
ports:
- containerPort: 8080
env:
- name: INFLUXDB_URL
valueFrom:
configMapKeyRef:
name: influxdb-config
key: url
- name: INFLUXDB_ORG
value: "my-org"
- name: INFLUXDB_BUCKET
value: "genai-metrics"
- name: INFLUXDB_TOKEN
valueFrom:
secretKeyRef:
name: influxdb-secret
key: token
- name: JWT_SECRET_KEY
valueFrom:
secretKeyRef:
name: jwt-secret
key: secret-key
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
---
# telemetry-gateway-service.yaml
apiVersion: v1
kind: Service
metadata:
name: telemetry-gateway
spec:
selector:
app: telemetry-gateway
ports:
- protocol: TCP
port: 80
targetPort: 8080
---
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: telemetry-gateway-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: telemetry-gateway
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 75
部署考量:
- 高可用性: 部署了3个副本,并使用
Service
进行负载均衡。 - 弹性伸缩: 配置了
HorizontalPodAutoscaler
(HPA),当CPU利用率超过75%时自动扩容,最多可达10个副本,以应对流量高峰。 - 配置与密钥管理: 使用Kubernetes原生的
ConfigMap
和Secret
来管理配置,避免将敏感信息硬编码在镜像中。
架构的局限性与未来迭代路径
当前这套架构有效地解决了我们面临的核心问题,但它并非完美无缺。
- 网关的写入瓶颈: 尽管遥测网关本身可以水平扩展,但所有流量最终都汇聚到InfluxDB。如果写入量超过了InfluxDB单实例或集群的处理能力,这里依然会成为瓶颈。一个可行的优化路径是在网关和InfluxDB之间引入一个消息队列(如Google Pub/Sub或Kafka),利用其削峰填谷的能力,让网关可以更快地响应客户端,同时让下游的消费者可以根据数据库的负载情况来调整写入速率。
- JWT密钥轮换: 当前方案使用了一个静态的共享密钥。在更严格的安全要求下,需要建立一套自动化的密钥轮换机制。这可以借助HashiCorp Vault或GCP Secret Manager等工具实现。
- 数据聚合与降采样: 对于非常长期的历史数据分析,原始的细粒度指标可能会占用大量存储。可以引入InfluxDB的Tasks或独立的流处理作业(如Flink),对旧数据进行降采样(downsampling),例如将分钟级数据聚合成小时级或天级,以降低存储成本和提高长期趋势查询的性能。
- Schema校验强化: 当前网关对客户端发来的Line Protocol信任度较高。一个更安全的做法是在网关层强制解析body,忽略客户端自带的tags,完全基于JWT claims中的
ctx
来重新构建tags,这样可以彻底杜绝客户端伪造关键维度信息的可能性。