我们团队面临一个日益棘手的瓶颈:移动端(主要是Android)的特性开发严重依赖后端服务的并行演进,而每个特性分支都需要一个隔离的、包含特定数据集的数据库环境进行联调测试。我们使用的数据库是 CockroachDB,它的多活和弹性伸缩能力很棒,但为每个特性分支手动创建、迁移和填充(seeding)数据库schema,已经成了DBA团队无法承受之重,也拖慢了整个研发流程。
最初的设想很简单,利用我们已有的 Tekton CI/CD 平台,让 Android 开发者能够通过某种方式自助完成这个过程。一个直接的想法是编写一堆 shell 脚本,用 Tekton 的 script
步骤来执行 cockroach sql
命令行工具。这个方案很快就被否决了。在真实项目中,数据库的变更管理远比执行几个SQL文件复杂:需要保证幂等性、处理事务、记录迁移版本、根据不同环境注入不同种子数据,以及在任务失败时进行有意义的回滚或状态报告。用 shell 脚本来编排这些逻辑,会迅速演变成一场难以维护的噩梦。
这里的坑在于,我们需要的是一个健壮、可测试、可重用的数据库操作单元,而不仅仅是命令的堆砌。因此,我们决定将核心的数据库编排逻辑封装到一个自定义的 Tekton 任务中,并选择 Kotlin/JVM 来实现这个任务。选择 Kotlin 的理由很充分:我们的 Android 团队和部分后端团队已经深度使用,语言本身表现力强且安全;更重要的是,我们可以直接利用成熟的 Java 生态,比如 HikariCP 连接池、PostgreSQL JDBC 驱动(CockroachDB 兼容)以及强大的日志和JSON处理库。这能让我们写出生产级的、可靠的代码,而不是脆弱的脚本。
整个系统的架构设计如下:
graph TD subgraph "开发者侧" A[Android Studio Client] end subgraph "平台层" B(API Gateway) --> C{Tekton Controller} end subgraph "Tekton Pipeline" C -- 创建 --> D[PipelineRun] D -- 执行 --> E[Task: run-db-orchestrator] end subgraph "自定义任务执行环境 (Pod)" E -- 运行 --> F[Container: Kotlin Orchestrator] end subgraph "目标数据库" F -- JDBC --> G[(CockroachDB Cluster)] end A -- 触发部署请求 (REST API) --> B B -- 鉴权 & 转发 --> C C -- 返回 PipelineRun 状态 --> B B -- 推送状态更新 --> A
第一步:构建 Kotlin 自定义任务镜像
我们的核心是一个独立的 Kotlin 项目,它将被打包成一个 Docker 镜像,供 Tekton 任务使用。这个程序需要能接收 Tekton 传入的参数(如数据库地址、凭证、要执行的迁移脚本目录、操作类型等),并能将执行结果以文件形式写回给 Tekton。
项目结构很简单:
.
├── build.gradle.kts
├── Dockerfile
└── src
└── main
└── kotlin
└── com
└── mycompany
└── tekton
└── db
├── Main.kt
├── DatabaseManager.kt
└── model
├── Config.kt
└── Result.kt
build.gradle.kts
文件需要配置 Kotlin JVM 插件、序列化插件,并打包成一个包含所有依赖的 fat JAR。
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.9.20"
kotlin("plugin.serialization") version "1.9.20"
application
}
group = "com.mycompany.tekton.db"
version = "1.0.0"
repositories {
mavenCentral()
}
dependencies {
// CockroachDB 兼容的 PostgreSQL JDBC 驱动
implementation("org.postgresql:postgresql:42.6.0")
// 强大的数据库连接池
implementation("com.zaxxer:HikariCP:5.0.1")
// Kotlin 官方序列化库,用于处理输入输出
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
// 日志框架
implementation("ch.qos.logback:logback-classic:1.4.11")
implementation("org.slf4j:slf4j-api:2.0.9")
// 用于解析命令行参数
implementation("com.github.ajalt.clikt:clikt:4.2.0")
}
application {
mainClass.set("com.mycompany.tekton.db.MainKt")
}
// 创建一个包含所有依赖的 fat JAR
tasks.jar {
manifest {
attributes["Main-Class"] = "com.mycompany.tekton.db.MainKt"
}
configurations["runtimeClasspath"].forEach { file ->
from(zipTree(file.absoluteFile))
}
}
接下来是 Dockerfile
,它将编译好的 fat JAR 打包进一个轻量级的基础镜像中。
# 使用一个包含 glibc 的轻量级基础镜像,因为 JVM 需要它
FROM eclipse-temurin:17-jre-focal
# 定义工作目录
WORKDIR /app
# 将构建产出的 fat JAR 复制到镜像中
COPY build/libs/tekton-db-orchestrator-1.0.0.jar /app/orchestrator.jar
# 定义容器启动时执行的命令
ENTRYPOINT ["java", "-jar", "/app/orchestrator.jar"]
第二步:实现核心数据库编排逻辑
DatabaseManager.kt
是所有魔法发生的地方。它封装了连接、事务、迁移和数据填充的逻辑。一个常见的错误是直接在代码中拼接SQL,或者对事务处理掉以轻心。在分布式SQL数据库如CockroachDB上,正确处理事务重试尤为重要。
package com.mycompany.tekton.db
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.slf4j.LoggerFactory
import java.io.File
import java.sql.Connection
import java.sql.SQLException
class DatabaseManager(private val config: Config) {
private val logger = LoggerFactory.getLogger(javaClass)
private val dataSource: HikariDataSource
init {
val hikariConfig = HikariConfig().apply {
jdbcUrl = "jdbc:postgresql://${config.dbHost}:${config.dbPort}/${config.dbName}?sslmode=require"
username = config.dbUser
password = config.dbPassword
driverClassName = "org.postgresql.Driver"
maximumPoolSize = 3 // 对于一次性任务,不需要大连接池
connectionTimeout = 30000 // 30秒
}
dataSource = HikariDataSource(hikariConfig)
}
/**
* CockroachDB 推荐的事务执行包装器,可以自动处理 SERIALIZABLE 隔离级别下的事务重试。
* 这是一个非常关键的实践,能极大提升在有争用情况下的稳定性。
*/
private fun <T> runInTransaction(block: (Connection) -> T): T {
dataSource.connection.use { conn ->
conn.autoCommit = false
// CockroachDB 推荐使用 SERIALIZABLE 隔离级别以获得最强的一致性保证
conn.transactionIsolation = Connection.TRANSACTION_SERIALIZABLE
var retries = 0
while (retries < 5) { // 最多重试5次
try {
val result = block(conn)
conn.commit()
return result
} catch (e: SQLException) {
// 错误码 40001 表示可重试的事务冲突
if ("40001" == e.sqlState) {
logger.warn("Transaction retryable error occurred, retrying... (attempt ${retries + 1})", e)
conn.rollback()
retries++
Thread.sleep(200L * (retries)) // 增加退避等待时间
} else {
logger.error("Non-retryable SQL exception occurred.", e)
conn.rollback()
throw e
}
}
}
throw SQLException("Transaction failed after multiple retries.")
}
}
/**
* 确保用于版本控制的表存在。
* 这个操作本身也应该在事务中执行,保证原子性。
*/
private fun ensureSchemaVersionTableExists() {
runInTransaction { conn ->
val sql = """
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMPTZ DEFAULT now()
);
""".trimIndent()
conn.createStatement().use { it.execute(sql) }
logger.info("Table 'schema_migrations' is ready.")
}
}
/**
* 应用所有尚未应用的数据库迁移脚本。
* 脚本按文件名排序执行。
*/
fun applyMigrations() {
ensureSchemaVersionTableExists()
val appliedVersions = runInTransaction { conn ->
conn.prepareStatement("SELECT version FROM schema_migrations").use { stmt ->
stmt.executeQuery().use { rs ->
generateSequence {
if (rs.next()) rs.getString("version") else null
}.toSet()
}
}
}
logger.info("Found ${appliedVersions.size} applied migrations in database.")
val migrationDir = File(config.migrationsPath)
if (!migrationDir.exists() || !migrationDir.isDirectory) {
logger.warn("Migrations path '${config.migrationsPath}' does not exist or is not a directory. Skipping.")
return
}
migrationDir.listFiles { _, name -> name.endsWith(".sql") }
?.sorted()
?.forEach { file ->
val version = file.name
if (version !in appliedVersions) {
logger.info("Applying migration: $version")
val sqlScript = file.readText()
runInTransaction { conn ->
// 执行迁移脚本
conn.createStatement().use { it.execute(sqlScript) }
// 记录版本
conn.prepareStatement("INSERT INTO schema_migrations (version) VALUES (?)").use { stmt ->
stmt.setString(1, version)
stmt.executeUpdate()
}
}
logger.info("Successfully applied and recorded migration: $version")
} else {
logger.info("Skipping already applied migration: $version")
}
}
}
fun close() {
dataSource.close()
}
}
Main.kt
负责解析命令行参数,并把它们转换成一个强类型的 Config
对象,然后调用 DatabaseManager
。Tekton 通过文件系统传递参数和接收结果,所以我们需要读写约定的路径,例如 /tekton/results/
。
package com.mycompany.tekton.db
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.mycompany.tekton.db.model.Config
import com.mycompany.tekton.db.model.Result
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.slf4j.LoggerFactory
import java.io.File
import kotlin.system.exitProcess
fun main(args: Array<String>) = DbOrchestratorCommand().main(args)
class DbOrchestratorCommand : CliktCommand(name = "db-orchestrator") {
private val dbHost by option("--db-host", help = "Database host").required()
// ... 其他参数定义
private val migrationsPath by option("--migrations-path", help = "Path to SQL migration files").default("/workspace/migrations")
private val operation by option("--operation", help = "Operation to perform (e.g., 'migrate')").required()
private val logger = LoggerFactory.getLogger(javaClass)
override fun run() {
val config = Config(dbHost, /*...其他参数...*/, migrationsPath)
try {
val manager = DatabaseManager(config)
when (operation) {
"migrate" -> {
logger.info("Starting database migration operation.")
manager.applyMigrations()
writeResult(Result(success = true, message = "Migration completed successfully."))
}
// 可以扩展其他操作,如 'seed', 'cleanup'
else -> throw IllegalArgumentException("Unsupported operation: $operation")
}
manager.close()
} catch (e: Exception) {
logger.error("Orchestration failed!", e)
writeResult(Result(success = false, message = e.message ?: "An unknown error occurred."))
exitProcess(1)
}
}
private fun writeResult(result: Result) {
try {
// Tekton 约定将结果写入 /tekton/results/<result-name>
val resultFile = File("/tekton/results/status")
resultFile.parentFile.mkdirs()
val jsonResult = Json.encodeToString(result)
resultFile.writeText(jsonResult)
logger.info("Wrote result to ${resultFile.absolutePath}: $jsonResult")
} catch (e: Exception) {
logger.error("Failed to write result file.", e)
}
}
}
第三步:定义 Tekton Task 和 Pipeline
现在,我们可以定义一个 Tekton Task
来使用我们构建的 Docker 镜像。这个 Task
定义了输入参数和输出结果。
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: cockroachdb-orchestrator
spec:
description: >-
Performs database orchestration tasks like migration or seeding
on a CockroachDB cluster using a custom Kotlin-based tool.
params:
- name: image
description: The orchestrator container image to run.
default: "my-registry/tekton-db-orchestrator:1.0.0"
- name: db-host
description: CockroachDB host address.
- name: db-port
description: CockroachDB port.
default: "26257"
- name: db-user
description: Database user.
- name: db-name
description: Database name to connect to.
- name: operation
description: The operation to perform (e.g., migrate, seed).
# 数据库密码通过 Kubernetes Secret 传入,而不是明文参数
workspaces:
- name: source
description: A workspace containing migration scripts.
- name: secrets
description: A workspace mounting the database credentials secret.
mountPath: /etc/secrets
results:
- name: status
description: JSON string representing the execution result.
steps:
- name: run-orchestration
image: $(params.image)
script: |
#!/bin/sh
set -e
# 从挂载的 Secret 文件中读取密码
DB_PASSWORD=$(cat /etc/secrets/password)
# 调用我们的 Kotlin 程序
/usr/bin/java -jar /app/orchestrator.jar \
--db-host $(params.db-host) \
--db-port $(params.db-port) \
--db-user $(params.db-user) \
--db-name $(params.db-name) \
--db-password "$DB_PASSWORD" \
--operation $(params.operation) \
--migrations-path $(workspaces.source.path)/sql/migrations
# 定义错误处理和日志记录
onError: continue
这个 Task
可以被编排进一个更复杂的 Pipeline
中,比如先克隆代码库(获取迁移脚本),然后执行数据库编排,成功后再部署后端服务。
第四步:Android 客户端作为触发器
为了让 Android 开发者自助使用,我们不希望他们接触 Tekton YAML。我们在内部开发者门户上提供了一个简单的API,背后调用 Tekton 的 API Server 来创建一个 PipelineRun
。Android Studio 插件或者一个简单的内部 Android 应用可以调用这个 API。
以下是一个 Android ViewModel
中使用 Ktor Client
触发并轮询流水线状态的示例代码:
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import io.ktor.client.*
import io.ktor.client.call.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
// 数据类,对应 API 的响应
@kotlinx.serialization.Serializable
data class PipelineTriggerResponse(val pipelineRunName: String, val status: String)
@kotlinx.serialization.Serializable
data class PipelineStatusResponse(val status: String, val message: String?, val logs: String?)
class DbProvisionViewModel : ViewModel() {
private val _uiState = MutableStateFlow<String>("Idle")
val uiState = _uiState.asStateFlow()
private val client = HttpClient(CIO) {
install(ContentNegotiation) {
json()
}
// 在真实项目中,这里会有认证插件
}
fun provisionDatabaseForFeature(featureBranch: String) {
viewModelScope.launch(Dispatchers.IO) {
try {
_uiState.value = "Triggering pipeline for branch: $featureBranch..."
// 1. 触发流水线
val triggerResponse: PipelineTriggerResponse = client.post("https://internal-api/v1/db-provision") {
contentType(ContentType.Application.Json)
setBody(mapOf("branch" to featureBranch))
}.body()
val pipelineRunName = triggerResponse.pipelineRunName
_uiState.value = "Pipeline '$pipelineRunName' started. Polling for status..."
// 2. 轮询状态,直到完成或失败
var isDone = false
while (!isDone) {
delay(5000) // 每5秒轮询一次
val statusResponse: PipelineStatusResponse = client.get("https://internal-api/v1/db-provision/status/$pipelineRunName").body()
when (statusResponse.status.lowercase()) {
"succeeded" -> {
_uiState.value = "✅ Success: ${statusResponse.message}"
isDone = true
}
"failed" -> {
_uiState.value = "❌ Failed: ${statusResponse.message}\nLogs:\n${statusResponse.logs}"
isDone = true
}
else -> {
_uiState.value = "⏳ In Progress: Current status is '${statusResponse.status}'..."
}
}
}
} catch (e: Exception) {
_uiState.value = "Error: ${e.message}"
}
}
}
}
这套方案上线后,Android 团队的开发效率得到了显著提升。他们不再需要等待DBA的排期,只需在自己的开发环境中点击一个按钮,就能获得一个与特性分支代码完全匹配的、干净的数据库环境。
当前这套系统的实现依然有其局限性。首先,Android 客户端的状态更新依赖于轮询,这在长时间运行的流水线中效率不高,未来可以引入 WebSocket 或 Server-Sent Events 来实现状态的实时推送。其次,我们的 Kotlin 任务镜像是基于 JRE 的,体积较大(约300MB),启动存在一定的冷启动开销。一个有潜力的优化方向是使用 GraalVM 将 Kotlin 应用编译成本地可执行文件,可以极大地缩小镜像体积并加快启动速度。最后,权限控制目前较为粗放,后续需要与内部的身份系统集成,实现更精细化的 RBAC,确保开发者只能为自己负责的项目创建和管理数据库环境。