使用 Tekton 与 Kotlin 自定义任务为 Android 开发团队实现 CockroachDB 数据库的自动化编排


我们团队面临一个日益棘手的瓶颈:移动端(主要是Android)的特性开发严重依赖后端服务的并行演进,而每个特性分支都需要一个隔离的、包含特定数据集的数据库环境进行联调测试。我们使用的数据库是 CockroachDB,它的多活和弹性伸缩能力很棒,但为每个特性分支手动创建、迁移和填充(seeding)数据库schema,已经成了DBA团队无法承受之重,也拖慢了整个研发流程。

最初的设想很简单,利用我们已有的 Tekton CI/CD 平台,让 Android 开发者能够通过某种方式自助完成这个过程。一个直接的想法是编写一堆 shell 脚本,用 Tekton 的 script 步骤来执行 cockroach sql 命令行工具。这个方案很快就被否决了。在真实项目中,数据库的变更管理远比执行几个SQL文件复杂:需要保证幂等性、处理事务、记录迁移版本、根据不同环境注入不同种子数据,以及在任务失败时进行有意义的回滚或状态报告。用 shell 脚本来编排这些逻辑,会迅速演变成一场难以维护的噩梦。

这里的坑在于,我们需要的是一个健壮、可测试、可重用的数据库操作单元,而不仅仅是命令的堆砌。因此,我们决定将核心的数据库编排逻辑封装到一个自定义的 Tekton 任务中,并选择 Kotlin/JVM 来实现这个任务。选择 Kotlin 的理由很充分:我们的 Android 团队和部分后端团队已经深度使用,语言本身表现力强且安全;更重要的是,我们可以直接利用成熟的 Java 生态,比如 HikariCP 连接池、PostgreSQL JDBC 驱动(CockroachDB 兼容)以及强大的日志和JSON处理库。这能让我们写出生产级的、可靠的代码,而不是脆弱的脚本。

整个系统的架构设计如下:

graph TD
    subgraph "开发者侧"
        A[Android Studio Client]
    end

    subgraph "平台层"
        B(API Gateway) --> C{Tekton Controller}
    end

    subgraph "Tekton Pipeline"
        C -- 创建 --> D[PipelineRun]
        D -- 执行 --> E[Task: run-db-orchestrator]
    end

    subgraph "自定义任务执行环境 (Pod)"
        E -- 运行 --> F[Container: Kotlin Orchestrator]
    end

    subgraph "目标数据库"
        F -- JDBC --> G[(CockroachDB Cluster)]
    end

    A -- 触发部署请求 (REST API) --> B
    B -- 鉴权 & 转发 --> C
    C -- 返回 PipelineRun 状态 --> B
    B -- 推送状态更新 --> A

第一步:构建 Kotlin 自定义任务镜像

我们的核心是一个独立的 Kotlin 项目,它将被打包成一个 Docker 镜像,供 Tekton 任务使用。这个程序需要能接收 Tekton 传入的参数(如数据库地址、凭证、要执行的迁移脚本目录、操作类型等),并能将执行结果以文件形式写回给 Tekton。

项目结构很简单:

.
├── build.gradle.kts
├── Dockerfile
└── src
    └── main
        └── kotlin
            └── com
                └── mycompany
                    └── tekton
                        └── db
                            ├── Main.kt
                            ├── DatabaseManager.kt
                            └── model
                                ├── Config.kt
                                └── Result.kt

build.gradle.kts 文件需要配置 Kotlin JVM 插件、序列化插件,并打包成一个包含所有依赖的 fat JAR。

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    kotlin("jvm") version "1.9.20"
    kotlin("plugin.serialization") version "1.9.20"
    application
}

group = "com.mycompany.tekton.db"
version = "1.0.0"

repositories {
    mavenCentral()
}

dependencies {
    // CockroachDB 兼容的 PostgreSQL JDBC 驱动
    implementation("org.postgresql:postgresql:42.6.0")
    // 强大的数据库连接池
    implementation("com.zaxxer:HikariCP:5.0.1")
    // Kotlin 官方序列化库,用于处理输入输出
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
    // 日志框架
    implementation("ch.qos.logback:logback-classic:1.4.11")
    implementation("org.slf4j:slf4j-api:2.0.9")
    // 用于解析命令行参数
    implementation("com.github.ajalt.clikt:clikt:4.2.0")
}

application {
    mainClass.set("com.mycompany.tekton.db.MainKt")
}

// 创建一个包含所有依赖的 fat JAR
tasks.jar {
    manifest {
        attributes["Main-Class"] = "com.mycompany.tekton.db.MainKt"
    }
    configurations["runtimeClasspath"].forEach { file ->
        from(zipTree(file.absoluteFile))
    }
}

接下来是 Dockerfile,它将编译好的 fat JAR 打包进一个轻量级的基础镜像中。

# 使用一个包含 glibc 的轻量级基础镜像,因为 JVM 需要它
FROM eclipse-temurin:17-jre-focal

# 定义工作目录
WORKDIR /app

# 将构建产出的 fat JAR 复制到镜像中
COPY build/libs/tekton-db-orchestrator-1.0.0.jar /app/orchestrator.jar

# 定义容器启动时执行的命令
ENTRYPOINT ["java", "-jar", "/app/orchestrator.jar"]

第二步:实现核心数据库编排逻辑

DatabaseManager.kt 是所有魔法发生的地方。它封装了连接、事务、迁移和数据填充的逻辑。一个常见的错误是直接在代码中拼接SQL,或者对事务处理掉以轻心。在分布式SQL数据库如CockroachDB上,正确处理事务重试尤为重要。

package com.mycompany.tekton.db

import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.slf4j.LoggerFactory
import java.io.File
import java.sql.Connection
import java.sql.SQLException

class DatabaseManager(private val config: Config) {

    private val logger = LoggerFactory.getLogger(javaClass)
    private val dataSource: HikariDataSource

    init {
        val hikariConfig = HikariConfig().apply {
            jdbcUrl = "jdbc:postgresql://${config.dbHost}:${config.dbPort}/${config.dbName}?sslmode=require"
            username = config.dbUser
            password = config.dbPassword
            driverClassName = "org.postgresql.Driver"
            maximumPoolSize = 3 // 对于一次性任务,不需要大连接池
            connectionTimeout = 30000 // 30秒
        }
        dataSource = HikariDataSource(hikariConfig)
    }

    /**
     * CockroachDB 推荐的事务执行包装器,可以自动处理 SERIALIZABLE 隔离级别下的事务重试。
     * 这是一个非常关键的实践,能极大提升在有争用情况下的稳定性。
     */
    private fun <T> runInTransaction(block: (Connection) -> T): T {
        dataSource.connection.use { conn ->
            conn.autoCommit = false
            // CockroachDB 推荐使用 SERIALIZABLE 隔离级别以获得最强的一致性保证
            conn.transactionIsolation = Connection.TRANSACTION_SERIALIZABLE
            
            var retries = 0
            while (retries < 5) { // 最多重试5次
                try {
                    val result = block(conn)
                    conn.commit()
                    return result
                } catch (e: SQLException) {
                    // 错误码 40001 表示可重试的事务冲突
                    if ("40001" == e.sqlState) {
                        logger.warn("Transaction retryable error occurred, retrying... (attempt ${retries + 1})", e)
                        conn.rollback()
                        retries++
                        Thread.sleep(200L * (retries)) // 增加退避等待时间
                    } else {
                        logger.error("Non-retryable SQL exception occurred.", e)
                        conn.rollback()
                        throw e
                    }
                }
            }
            throw SQLException("Transaction failed after multiple retries.")
        }
    }

    /**
     * 确保用于版本控制的表存在。
     * 这个操作本身也应该在事务中执行,保证原子性。
     */
    private fun ensureSchemaVersionTableExists() {
        runInTransaction { conn ->
            val sql = """
                CREATE TABLE IF NOT EXISTS schema_migrations (
                    version VARCHAR(255) PRIMARY KEY,
                    applied_at TIMESTAMPTZ DEFAULT now()
                );
            """.trimIndent()
            conn.createStatement().use { it.execute(sql) }
            logger.info("Table 'schema_migrations' is ready.")
        }
    }
    
    /**
     * 应用所有尚未应用的数据库迁移脚本。
     * 脚本按文件名排序执行。
     */
    fun applyMigrations() {
        ensureSchemaVersionTableExists()
        
        val appliedVersions = runInTransaction { conn ->
            conn.prepareStatement("SELECT version FROM schema_migrations").use { stmt ->
                stmt.executeQuery().use { rs ->
                    generateSequence {
                        if (rs.next()) rs.getString("version") else null
                    }.toSet()
                }
            }
        }
        
        logger.info("Found ${appliedVersions.size} applied migrations in database.")
        
        val migrationDir = File(config.migrationsPath)
        if (!migrationDir.exists() || !migrationDir.isDirectory) {
            logger.warn("Migrations path '${config.migrationsPath}' does not exist or is not a directory. Skipping.")
            return
        }
        
        migrationDir.listFiles { _, name -> name.endsWith(".sql") }
            ?.sorted()
            ?.forEach { file ->
                val version = file.name
                if (version !in appliedVersions) {
                    logger.info("Applying migration: $version")
                    val sqlScript = file.readText()
                    runInTransaction { conn ->
                        // 执行迁移脚本
                        conn.createStatement().use { it.execute(sqlScript) }
                        // 记录版本
                        conn.prepareStatement("INSERT INTO schema_migrations (version) VALUES (?)").use { stmt ->
                            stmt.setString(1, version)
                            stmt.executeUpdate()
                        }
                    }
                    logger.info("Successfully applied and recorded migration: $version")
                } else {
                    logger.info("Skipping already applied migration: $version")
                }
            }
    }

    fun close() {
        dataSource.close()
    }
}

Main.kt 负责解析命令行参数,并把它们转换成一个强类型的 Config 对象,然后调用 DatabaseManager。Tekton 通过文件系统传递参数和接收结果,所以我们需要读写约定的路径,例如 /tekton/results/

package com.mycompany.tekton.db

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.mycompany.tekton.db.model.Config
import com.mycompany.tekton.db.model.Result
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.slf4j.LoggerFactory
import java.io.File
import kotlin.system.exitProcess

fun main(args: Array<String>) = DbOrchestratorCommand().main(args)

class DbOrchestratorCommand : CliktCommand(name = "db-orchestrator") {
    private val dbHost by option("--db-host", help = "Database host").required()
    // ... 其他参数定义
    private val migrationsPath by option("--migrations-path", help = "Path to SQL migration files").default("/workspace/migrations")
    private val operation by option("--operation", help = "Operation to perform (e.g., 'migrate')").required()

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun run() {
        val config = Config(dbHost, /*...其他参数...*/, migrationsPath)
        
        try {
            val manager = DatabaseManager(config)
            when (operation) {
                "migrate" -> {
                    logger.info("Starting database migration operation.")
                    manager.applyMigrations()
                    writeResult(Result(success = true, message = "Migration completed successfully."))
                }
                // 可以扩展其他操作,如 'seed', 'cleanup'
                else -> throw IllegalArgumentException("Unsupported operation: $operation")
            }
            manager.close()
        } catch (e: Exception) {
            logger.error("Orchestration failed!", e)
            writeResult(Result(success = false, message = e.message ?: "An unknown error occurred."))
            exitProcess(1)
        }
    }

    private fun writeResult(result: Result) {
        try {
            // Tekton 约定将结果写入 /tekton/results/<result-name>
            val resultFile = File("/tekton/results/status")
            resultFile.parentFile.mkdirs()
            val jsonResult = Json.encodeToString(result)
            resultFile.writeText(jsonResult)
            logger.info("Wrote result to ${resultFile.absolutePath}: $jsonResult")
        } catch (e: Exception) {
            logger.error("Failed to write result file.", e)
        }
    }
}

第三步:定义 Tekton Task 和 Pipeline

现在,我们可以定义一个 Tekton Task 来使用我们构建的 Docker 镜像。这个 Task 定义了输入参数和输出结果。

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: cockroachdb-orchestrator
spec:
  description: >-
    Performs database orchestration tasks like migration or seeding
    on a CockroachDB cluster using a custom Kotlin-based tool.
  params:
    - name: image
      description: The orchestrator container image to run.
      default: "my-registry/tekton-db-orchestrator:1.0.0"
    - name: db-host
      description: CockroachDB host address.
    - name: db-port
      description: CockroachDB port.
      default: "26257"
    - name: db-user
      description: Database user.
    - name: db-name
      description: Database name to connect to.
    - name: operation
      description: The operation to perform (e.g., migrate, seed).
  # 数据库密码通过 Kubernetes Secret 传入,而不是明文参数
  workspaces:
    - name: source
      description: A workspace containing migration scripts.
    - name: secrets
      description: A workspace mounting the database credentials secret.
      mountPath: /etc/secrets
  results:
    - name: status
      description: JSON string representing the execution result.
  steps:
    - name: run-orchestration
      image: $(params.image)
      script: |
        #!/bin/sh
        set -e
        
        # 从挂载的 Secret 文件中读取密码
        DB_PASSWORD=$(cat /etc/secrets/password)

        # 调用我们的 Kotlin 程序
        /usr/bin/java -jar /app/orchestrator.jar \
          --db-host $(params.db-host) \
          --db-port $(params.db-port) \
          --db-user $(params.db-user) \
          --db-name $(params.db-name) \
          --db-password "$DB_PASSWORD" \
          --operation $(params.operation) \
          --migrations-path $(workspaces.source.path)/sql/migrations

      # 定义错误处理和日志记录
      onError: continue

这个 Task 可以被编排进一个更复杂的 Pipeline 中,比如先克隆代码库(获取迁移脚本),然后执行数据库编排,成功后再部署后端服务。

第四步:Android 客户端作为触发器

为了让 Android 开发者自助使用,我们不希望他们接触 Tekton YAML。我们在内部开发者门户上提供了一个简单的API,背后调用 Tekton 的 API Server 来创建一个 PipelineRun。Android Studio 插件或者一个简单的内部 Android 应用可以调用这个 API。

以下是一个 Android ViewModel 中使用 Ktor Client 触发并轮询流水线状态的示例代码:

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import io.ktor.client.*
import io.ktor.client.call.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch

// 数据类,对应 API 的响应
@kotlinx.serialization.Serializable
data class PipelineTriggerResponse(val pipelineRunName: String, val status: String)

@kotlinx.serialization.Serializable
data class PipelineStatusResponse(val status: String, val message: String?, val logs: String?)

class DbProvisionViewModel : ViewModel() {

    private val _uiState = MutableStateFlow<String>("Idle")
    val uiState = _uiState.asStateFlow()

    private val client = HttpClient(CIO) {
        install(ContentNegotiation) {
            json()
        }
        // 在真实项目中,这里会有认证插件
    }

    fun provisionDatabaseForFeature(featureBranch: String) {
        viewModelScope.launch(Dispatchers.IO) {
            try {
                _uiState.value = "Triggering pipeline for branch: $featureBranch..."
                
                // 1. 触发流水线
                val triggerResponse: PipelineTriggerResponse = client.post("https://internal-api/v1/db-provision") {
                    contentType(ContentType.Application.Json)
                    setBody(mapOf("branch" to featureBranch))
                }.body()

                val pipelineRunName = triggerResponse.pipelineRunName
                _uiState.value = "Pipeline '$pipelineRunName' started. Polling for status..."

                // 2. 轮询状态,直到完成或失败
                var isDone = false
                while (!isDone) {
                    delay(5000) // 每5秒轮询一次
                    val statusResponse: PipelineStatusResponse = client.get("https://internal-api/v1/db-provision/status/$pipelineRunName").body()

                    when (statusResponse.status.lowercase()) {
                        "succeeded" -> {
                            _uiState.value = "✅ Success: ${statusResponse.message}"
                            isDone = true
                        }
                        "failed" -> {
                            _uiState.value = "❌ Failed: ${statusResponse.message}\nLogs:\n${statusResponse.logs}"
                            isDone = true
                        }
                        else -> {
                            _uiState.value = "⏳ In Progress: Current status is '${statusResponse.status}'..."
                        }
                    }
                }

            } catch (e: Exception) {
                _uiState.value = "Error: ${e.message}"
            }
        }
    }
}

这套方案上线后,Android 团队的开发效率得到了显著提升。他们不再需要等待DBA的排期,只需在自己的开发环境中点击一个按钮,就能获得一个与特性分支代码完全匹配的、干净的数据库环境。

当前这套系统的实现依然有其局限性。首先,Android 客户端的状态更新依赖于轮询,这在长时间运行的流水线中效率不高,未来可以引入 WebSocket 或 Server-Sent Events 来实现状态的实时推送。其次,我们的 Kotlin 任务镜像是基于 JRE 的,体积较大(约300MB),启动存在一定的冷启动开销。一个有潜力的优化方向是使用 GraalVM 将 Kotlin 应用编译成本地可执行文件,可以极大地缩小镜像体积并加快启动速度。最后,权限控制目前较为粗放,后续需要与内部的身份系统集成,实现更精细化的 RBAC,确保开发者只能为自己负责的项目创建和管理数据库环境。


  目录