11274 字
56 分钟
Golang生产向脚手架搭建(基于DDD、SQLC、Connect-Go)

本文思路为DDD驱动开发,项目结构为按模块划分的垂直结构。

技术栈速览#

领域技术选型
依赖注入Wire (编译期)
路由/RPCChi + Connect-Go
数据库SQLC + Pgx (Postgres)
权限OpenFGA (ReBAC)
认证JWT
日志/追踪Zap + OpenTelemetry
异步队列Asynq (Redis)
构建工具Taskfile
开发环境Devbox (Nix)
容器镜像Distroless
错误处理RFC 7807 + 自定义扩展

依赖注入:Wire#

Wire 是编译期依赖注入,相比于 Uber Dig 等运行时注入框架,它如果依赖缺失,代码是编译不过的。这是它最大的优势:Fail Fast

/cmd/server/internal/
├── user/ <-- 所有和"用户"相关的代码都在这里
│ ├── user.go <-- 领域实体 (Entity), 接口定义 (Repo Interface)
│ ├── service.go <-- 应用服务 (Application Service)
│ ├── pg_repo.go <-- 基础设施实现 (SQLC Repo)
│ ├── handler.go <-- 接口适配器 (Connect-Go Handler)
│ └── provider.go <-- 用户模块自己的 ProviderSet
├── order/ <-- 所有和"订单"相关的代码...
│ ├── order.go
│ ├── service.go
│ └── provider.go <-- 订单模块自己的 ProviderSet
└── pkg/ <-- 跨模块共享的公共包 (DB连接, Logger)
├── db/
└── logger/
/cmd/server/wire.go <-- 管理最终的依赖注入

实际上它可以称得上是整个应用除了main.go之外的第二个入口

以用户模块举例#

provider.go

package user
import "github.com/google/wire"
// ProviderSet 包含了 user 模块所有的依赖
var ProviderSet = wire.NewSet(
// 提供应用层(仅示例,未给出具体代码)
NewUserService,
// 提供数据层(仅示例,未给出具体代码)
NewUserRepo,
// 绑定接口与实现 (这是在模块内部完成的,外部不知道细节)
// 告诉 Wire: 当 NewUserService 需要 UserRepository 接口时,使用 UserPgRepo 这个 struct 来填充。
wire.Bind(new(UserRepository), new(*UserRepo)),
// 提供接口层 Handler
NewUserHandler,
)

wire.go中注册:

//go:build wireinject
// +build wireinject
// 上面这两行注释必不可少,用于wire识别。
package main
import (
"github.com/google/wire"
"my-project/internal/user" // 导入 user 模块
"my-project/internal/pkg/db" // 导入共享的 DB 连接
)
// initializeApp 注入器,拼积木
func initializeApp() (*App, func(), error) {
panic(wire.Build(
// 一些共享的基础设施
conf.ProviderSet,
logger.ProviderSet,
db.ProviderSet,
...
// 业务模块
user.ProviderSet,
...
// App 的构造函数
NewApp,
))
}

开发环境:Devbox#

Devbox 是基于 Nix 的开发环境管理工具,它能确保团队所有成员使用完全一致的开发环境,告别 “在我机器上能跑” 的问题。

初始化#

Terminal window
# 安装 Devbox
curl -fsSL https://get.jetify.com/devbox | bash
# 在项目根目录初始化
devbox init

定义开发环境 devbox.json#

{
"$schema": "https://raw.githubusercontent.com/jetify-com/devbox/main/.schema/devbox.schema.json",
"packages": [
"go@1.23",
"postgresql@16",
"redis@7",
"protobuf@28",
"buf@latest",
"sqlc@latest",
"go-task@latest",
"golangci-lint@latest"
],
"shell": {
"init_hook": [
"echo 'Welcome to my-project dev environment!'",
"export GOPATH=$HOME/go",
"export PATH=$PATH:$GOPATH/bin"
],
"scripts": {
"dev": "task dev",
"test": "task test",
"generate": "task generate"
}
}
}

使用#

Terminal window
# 进入开发环境
devbox shell
# 或者直接运行脚本
devbox run dev
devbox run test

配置管理:Viper#

定义配置文件: configs/config.yaml#

通常我们需要这么一份类似于SpringBootappilication.yml,做一个通用的配置:

server:
http:
addr: :8000
timeout: 1s
grpc:
addr: :9000
data:
database:
driver: postgres
source: postgres://user:pass@localhost:5432/mydb?sslmode=disable
redis:
addr: 127.0.0.1:6379
password: ""
log:
level: debug
filename: "app.log"

定义与加载: internal/conf/conf.go#

package conf
import (
"github.com/spf13/viper"
"strings"
)
// Bootstrap 是所有配置的根节点
type Bootstrap struct {
Server *Server `mapstructure:"server"`
Data *Data `mapstructure:"data"`
Log *Log `mapstructure:"log"`
63 collapsed lines
}
type Server struct {
Http *Http `mapstructure:"http"`
Grpc *Grpc `mapstructure:"grpc"`
}
type Http struct {
Addr string `mapstructure:"addr"`
Timeout string `mapstructure:"timeout"`
}
type Grpc struct {
Addr string `mapstructure:"addr"`
}
type Data struct {
Database *Database `mapstructure:"database"`
Redis *Redis `mapstructure:"redis"`
}
type Database struct {
Driver string `mapstructure:"driver"`
Source string `mapstructure:"source"`
}
type Redis struct {
Addr string `mapstructure:"addr"`
Password string `mapstructure:"password"`
}
type Log struct {
Level string `mapstructure:"level"`
Filename string `mapstructure:"filename"`
}
// Load 加载配置
// path: 配置文件路径,例如 "./configs/config.yaml"
func Load(path string) (*Bootstrap, error) {
v := viper.New()
// 设置配置文件路径
v.SetConfigFile(path)
// 允许环境变量覆盖 (对 Docker/K8s 等部署非常重要)
// 例如:export APP_DATA_DATABASE_SOURCE="xxx" 可以覆盖 yaml 里的配置
v.AutomaticEnv()
// 将 . 替换为 _ (环境变量通常不允许有点)
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// 设置环境变量前缀,防止冲突,例如 APP_SERVER_HTTP_ADDR
v.SetEnvPrefix("APP")
// 读取配置
if err := v.ReadInConfig(); err != nil {
return nil, err
}
// 将配置映射到结构体
var c Bootstrap
if err := v.Unmarshal(&c, func(dc *mapstructure.DecoderConfig) {
dc.TagName = "json" // 或者 "yaml", "mapstructure"
}); err != nil {
return nil, err
}
return &c, nil
}

提供注入口internal/conf/provider.go#

package conf
import "github.com/google/wire"
var ProviderSet = wire.NewSet(
// 提供提取 Log 配置的方法
func(c *Bootstrap) *Log { return c.Log },
// 提供提取 Data 配置的方法
func(c *Bootstrap) *Data { return c.Data },
// 提供提取 Server 配置的方法
func(c *Bootstrap) *Server { return c.Server },
// ...
)

使用:#

假设启动配置在main.go这个入口文件里头,大概长这样:

func main() {
// 程序启动,先加载配置
// 这里的路径可以通过命令行参数传入,更灵活
cfg, err := conf.Load("./configs/config.yaml")
if err != nil {
panic(err)
}
// 传入配置,初始化 App
app, cleanup, err := initApp(cfg)
if err != nil {
panic(err)
}
defer cleanup()
// 启动
app.Run()
}

日志采集:OpenTelemetry#

日志、链路追踪(Trace)、指标(Metrics)三合一

初始化OTel:#

infrastructure/trace/otel.go

假设我们已经有这么份版本配置文件并按如下注入:

package main
// 这些变量默认为 "unknown" 或 "dev"
// 在 go build 时会被覆盖
var (
// Version 语义化版本号,例如 v1.2.3
Version = "dev"
// CommitHash Git 提交哈希,用于精确定位代码
CommitHash = "none"
// BuildTime 构建时间
BuildTime = "unknown"
)
Terminal window
# 获取当前 git tag 或者分支名
VERSION=$(git describe --tags --always || echo "dev")
# 获取 commit hash
COMMIT=$(git rev-parse --short HEAD)
# 获取当前时间
TIME=$(date +%FT%T%z)
# 注入变量
go build -ldflags \
"-X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${TIME}" \
-o bin/server ./cmd/server
package trace
import (
"context"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
45 collapsed lines
)
// InitProvider 初始化 OpenTelemetry
// serviceName: 服务名称,如 "user-service"
// endpoint: 收集器地址,如 "localhost:4318" (Jaeger 的 OTLP http 端口)
func InitProvider(ctx context.Context, serviceName, endpoint string) (func(context.Context) error, error) {
// 创建 Exporter (数据发送到哪里)
// 这里使用 HTTP 协议发送到 Jaeger/Collector
exporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint(endpoint),
otlptracehttp.WithInsecure(), // 本地测试通常不用 TLS
)
if err != nil {
return nil, err
}
// 创建 Resource (标识当前服务身份)
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceVersion(Version),
attribute.String("git.commit", CommitHash),
attribute.String("build.time", BuildTime),
),
)
if err != nil {
return nil, err
}
// 创建 TraceProvider (核心管理器)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter), // 批量发送,性能更好
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样率:100% (生产环境建议调低)
)
// 设置全局 Provider
otel.SetTracerProvider(tp)
// 设置全局 Propagator (传播格式)
// W3C TraceContext 是现在的标准,能在不同语言/框架间传递 TraceID
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
// 返回关闭函数,用于在 main 中平滑退出
return tp.Shutdown, nil
}

封装通用logger#

两个核心库:

  1. gopkg.in/natefinch/lumberjack.v2: 负责日志文件切割(归档)。
  2. go.uber.org/zap: 高性能日志核心。
package logger
import (
"context"
"log/slog"
"net/http"
"os"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapslog"
"go.uber.org/zap/zapcore"
107 collapsed lines
"gopkg.in/natefinch/lumberjack.v2"
)
// Config 日志配置
type Config struct {
Level string // debug, info, warn, error
Filename string // 日志文件路径,为空则只输出到控制台
MaxSize int // 单个文件最大尺寸 (MB)
MaxBackups int // 保留旧文件最大个数
MaxAge int // 保留旧文件最大天数
Compress bool // 是否压缩
}
// Init 初始化全局 slog
func Init(cfg *Config) {
// 设置日志级别
atomicLevel := zap.NewAtomicLevel()
switch cfg.Level {
case "debug":
atomicLevel.SetLevel(zap.DebugLevel)
case "warn":
atomicLevel.SetLevel(zap.WarnLevel)
case "error":
atomicLevel.SetLevel(zap.ErrorLevel)
default:
atomicLevel.SetLevel(zap.InfoLevel)
}
// Encoder (格式化)
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder // 时间格式: 2026-01-13T12:00:00.000Z
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder // 级别格式: INFO, ERROR
var encoder zapcore.Encoder
if cfg.Filename == "" {
encoder = zapcore.NewConsoleEncoder(encoderConfig) // 本地开发: Console
} else {
encoder = zapcore.NewJSONEncoder(encoderConfig) // 生产环境: JSON
}
// 配置 WriteSyncer (输出位置)
var writer zapcore.WriteSyncer
if cfg.Filename == "" {
writer = zapcore.AddSync(os.Stdout)
} else {
lumberJackLogger := &lumberjack.Logger{
Filename: cfg.Filename,
MaxSize: cfg.MaxSize,
MaxBackups: cfg.MaxBackups,
MaxAge: cfg.MaxAge,
Compress: cfg.Compress,
}
// 双写:文件 + 控制台 (方便 Docker/K8s 采集)
writer = zapcore.NewMultiWriteSyncer(
zapcore.AddSync(lumberJackLogger),
zapcore.AddSync(os.Stdout),
)
}
// 创建 Zap Core
core := zapcore.NewCore(encoder, writer, atomicLevel)
zapLogger := zap.New(core, zap.AddCaller())
// 将 Zap 转换为 Slog Handler
// zapslog.NewHandler 会把 slog 的请求转给 zap 处理
zapHandler := zapslog.NewHandler(zapLogger.Core(), nil)
// 包装 TraceHandler (实现自动提取 TraceID)
finalHandler := &TraceHandler{Handler: zapHandler}
// 设置为全局默认 Logger
// 之后在任何地方调用 slog.InfoContext() 都会经过这里
slog.SetDefault(slog.New(finalHandler))
}
// ---------------------------------------------------------
// TraceHandler: Slog 中间件
// 用于从 Context 中提取 OTel TraceID 并注入到日志属性中
// ---------------------------------------------------------
type TraceHandler struct {
slog.Handler
}
func (h *TraceHandler) Handle(ctx context.Context, r slog.Record) error {
// 从 Context 提取 OpenTelemetry Span
span := trace.SpanFromContext(ctx)
if span.SpanContext().IsValid() {
traceID := span.SpanContext().TraceID().String()
spanID := span.SpanContext().SpanID().String()
// 将 trace_id 和 span_id 添加到本次日志记录的属性中
r.AddAttrs(
slog.String("trace_id", traceID),
slog.String("span_id", spanID),
)
}
// 调用下一层 Handler (即 Zap)
return h.Handler.Handle(ctx, r)
}
// WithAttrs 和 WithGroup 是 slog.Handler 接口必须实现的方法
// 透传给底层 handler
func (h *TraceHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &TraceHandler{Handler: h.Handler.WithAttrs(attrs)}
}
func (h *TraceHandler) WithGroup(name string) slog.Handler {
return &TraceHandler{Handler: h.Handler.WithGroup(name)}
}

注入Controller层(Chi + Connect-Go)

import (
"github.com/riandyrn/otelchi"
)
func NewRouter() *chi.Mux {
r := chi.NewRouter()
// service-name 会显示在 Jaeger 的 Span 名称中
r.Use(otelchi.Middleware("my-project"))
// ... 其他路由
return r
}

注入Repo层(使用 pgx 原生支持)

import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/exaring/otelpgx"
)
func NewDB(cfg *conf.Data) *pgxpool.Pool {
poolConfig, _ := pgxpool.ParseConfig(cfg.Database.Source)
// 安装 OTel 追踪器
poolConfig.ConnConfig.Tracer = otelpgx.NewTracer()
pool, _ := pgxpool.NewWithConfig(context.Background(), poolConfig)
return pool
}

注册#

func main() {
// 加载配置
cfg, err := conf.Load("./configs/config.yaml")
if err != nil {
panic(err)
}
logger.Init(&logger.Config{
Level: cfg.Log.Level,
Filename: cfg.Log.Filename,
// ...
})
ctx, cancel := context.WithCancel(context.Background())
33 collapsed lines
defer cancel()
otelShutdown, err := trace.InitProvider(ctx, "my-server-name", cfg.Trace.Endpoint)
if err != nil {
// 启动时如果连不上监控系统,panic
slog.Error("Failed to init OTel provider", "err", err)
panic(err)
}
// 注册平滑退出的清理函数 (Defer LIFO 原则)
defer func() {
// 创建一个独立的 context 用于关闭操作,给予 5秒 超时
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := otelShutdown(shutdownCtx); err != nil {
slog.Error("Failed to shutdown TracerProvider", "err", err)
} else {
slog.Info("TracerProvider shutdown success")
}
}()
// 初始化 Chi Router
r := chi.NewRouter()
// 注册核心中间件 (顺序非常重要)
// A. OTel: 生成 TraceID (必须第一)
r.Use(otelchi.Middleware("my-server-name"))
// B. AccessLogger: 记录 HTTP 请求日志 (依赖 TraceID)
r.Use(middleware.AccessLogger)
// C. Recovery: 兜底 Panic (依赖 Logger 和 TraceID),后面会在报错统一管理中提到
r.Use(middleware.Recoverer)
// .....后面省略
// ... 启动 App ...
}

使用#

对于上下文的传递,一般建议使用r.Context()

Controller#
import "log/slog"
func (h *UserHandler) Login(ctx context.Context, req *connect.Request[userv1.LoginRequest]) (*connect.Response[userv1.LoginResponse], error) {
// Connect-Go 自带 context
// 使用 InfoContext 显式传递上下文,TraceID 会自动挂载
slog.InfoContext(ctx, "用户登录", "username", req.Msg.Username)
// 如果出错
if err != nil {
slog.ErrorContext(ctx, "登录失败", "err", err)
}
// ....
}
Service#

Service 层的方法签名,必须第一个参数是 context.Context

internal/service/user.go
import "log/slog"
type UserService struct{}
func (s *UserService) Get(ctx context.Context, id string) (*User, error) {
// 模拟业务逻辑
if id == "" {
slog.WarnContext(ctx, "参数校验失败: id为空", "id", id)
return nil, errors.New("id required")
}
// ... 数据库操作 ...
slog.DebugContext(ctx, "查询数据库成功", "id", id)
return &User{ID: id, Name: "Go"}, nil
}
Repo#

一样,必须传递ctx

internal/user/pg_repo.go
import "log/slog"
func (r *UserPgRepo) FindByID(ctx context.Context, id int) (*User, error) {
// SQLC 生成的 Queries 自带 context 支持
row, err := r.queries.GetUserByID(ctx, int64(id))
if err != nil {
return nil, err
}
return r.toDomain(row), nil
}
细化监控业务逻辑耗时:#
internal/user/service.go
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"log/slog"
)
// 定义一个 Tracer,通常包名作为 name
var tracer = otel.Tracer("internal/user")
func (s *UserService) CreateUser(ctx context.Context, req DTO) error {
// 开启一个新的 Span,命名为 "hash_password"
21 collapsed lines
// 这会在原来的 Trace 树上长出一个子分支
ctx, span := tracer.Start(ctx, "hash_password")
defer span.End()
// ... 模拟耗时操作 ...
time.Sleep(20 * time.Millisecond)
// 你还可以给 Span 加属性(Tag)
// 这样你可以在 Jaeger 搜索栏通过 user.id=123 或 user.name=abc 搜到这条请求
span.SetAttributes(attribute.String("user.id", id))
span.SetAttributes(attribute.String("user.name", req.Name))
// 日志中也会自动带有 trace_id
slog.WarnContext(ctx, "密码处理完毕")
// 业务检查
if id == "" {
// 记录 Error 到 Span
span.RecordError(errors.New("id required"))
span.SetStatus(codes.Error, "id required")
return nil, errors.New("id required")
}
return nil
}

验证:Docker + Jaeger#

version: "3"
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # 前端 UI 端口
- "4318:4318" # OTLP HTTP 接收端口 (我们在 Go 代码里填的这个)
environment:
- LOG_LEVEL=debug

路由:Chi + Connect-Go#

Chi 是一个轻量级、符合标准库 net/http 的路由器。Connect-Go 是 Buf 团队开发的现代 RPC 框架,同时支持 gRPC、gRPC-Web 和 HTTP/JSON。

internal/
├── user/ <-- 用户模块(自治)
│ ├── handler.go <-- 定义 Connect-Go Handler
│ ├── service.go
│ ├── repo.go
│ └── provider.go <-- Wire 的 ProviderSet
├── order/ <-- 订单模块
│ ├── handler.go
│ └── ...
└── server/ <-- 基础设施层(负责把大家组装起来)
├── http.go <-- Chi + Connect-Go 入口
└── grpc.go

Proto 定义#

首先定义 Protobuf 文件:proto/user/v1/user.proto

syntax = "proto3";
package user.v1;
option go_package = "my-project/gen/user/v1;userv1";
message GetUserRequest {
int64 id = 1;
}
message GetUserResponse {
int64 id = 1;
14 collapsed lines
string username = 2;
string email = 3;
}
message CreateUserRequest {
string username = 1;
string email = 2;
string password = 3;
}
message CreateUserResponse {
int64 id = 1;
}
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}

使用 Buf 生成代码:

buf.gen.yaml

version: v2
plugins:
- remote: buf.build/protocolbuffers/go
out: gen
opt: paths=source_relative
- remote: buf.build/connectrpc/go
out: gen
opt: paths=source_relative

模块内部:#

以User为例定义Controllerinternal/user/handler.go#
package user
import (
"context"
"connectrpc.com/connect"
userv1 "my-project/gen/user/v1"
"my-project/gen/user/v1/userv1connect"
)
// Handler 用户模块的 Connect-Go 接口适配器
type Handler struct {
33 collapsed lines
svc *UserService // 依赖 Service
}
// 确保实现接口
var _ userv1connect.UserServiceHandler = (*Handler)(nil)
// GetUser 具体处理逻辑
func (h *Handler) GetUser(
ctx context.Context,
req *connect.Request[userv1.GetUserRequest],
) (*connect.Response[userv1.GetUserResponse], error) {
user, err := h.svc.Get(ctx, int(req.Msg.Id))
if err != nil {
return nil, err // 错误会被 Connect 拦截器统一处理
}
return connect.NewResponse(&userv1.GetUserResponse{
Id: int64(user.ID),
Username: user.Username,
Email: user.Email,
}), nil
}
func (h *Handler) CreateUser(
ctx context.Context,
req *connect.Request[userv1.CreateUserRequest],
) (*connect.Response[userv1.CreateUserResponse], error) {
// 调用 Service
id, err := h.svc.Create(ctx, req.Msg)
if err != nil {
return nil, err
}
return connect.NewResponse(&userv1.CreateUserResponse{
Id: int64(id),
}), nil
}

模块内部暴露依赖:internal/user/provider.go#

package user
import "github.com/google/wire"
var ProviderSet = wire.NewSet(
NewUserRepo,
NewUserService,
// 对于handler,推荐直接使用,wire会自动注入。
wire.Struct(new(Handler), "*"),
)

集中组装:internal/server/http.go#

package server
import (
"net/http"
"connectrpc.com/connect"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/riandyrn/otelchi"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
56 collapsed lines
"my-project/gen/user/v1/userv1connect"
"my-project/gen/order/v1/orderv1connect"
"my-project/internal/conf"
"my-project/internal/middleware"
"my-project/internal/user"
"my-project/internal/order"
)
// HTTPServer 包装 http.Server
type HTTPServer struct {
Server *http.Server
}
// 做一个集中入口函数
// Wire 会自动把 user.Handler, order.Handler 注入进来
func NewHTTPServer(
cfg *conf.Server, // 引入配置
userHandler *user.Handler, // 用户模块 Handler
orderHandler *order.Handler, // 订单模块 Handler
) *HTTPServer {
r := chi.NewRouter()
// 中间件
r.Use(otelchi.Middleware("my-project"))
r.Use(chimiddleware.RequestID)
r.Use(middleware.AccessLogger)
r.Use(middleware.Recoverer)
// Connect-Go 拦截器 (用于统一错误处理、日志等)
interceptors := connect.WithInterceptors(
middleware.NewErrorInterceptor(),
middleware.NewLoggingInterceptor(),
)
// 注册 Connect-Go 服务
// 每个模块的 Handler 都在这里挂载
userPath, userHandler := userv1connect.NewUserServiceHandler(userHandler, interceptors)
r.Mount(userPath, userHandler)
orderPath, orderHandler := orderv1connect.NewOrderServiceHandler(orderHandler, interceptors)
r.Mount(orderPath, orderHandler)
// 支持 HTTP/2 (Connect-Go 需要)
srv := &http.Server{
Addr: cfg.Http.Addr,
Handler: h2c.NewHandler(r, &http2.Server{}),
}
return &HTTPServer{Server: srv}
}
// Run 方法
func (s *HTTPServer) Run() error {
return s.Server.ListenAndServe()
}
// Shutdown 方法
func (s *HTTPServer) Shutdown(ctx context.Context) error {
return s.Server.Shutdown(ctx)
}

wire.go中注册#

func initApp(cfg *conf.Bootstrap) (*App, func(), error) {
panic(wire.Build(
// 基础设施
conf.ProviderSet,
logger.ProviderSet,
// 业务模块 (它们会提供 xxx.Handler)
user.ProviderSet,
order.ProviderSet,
// Server 层 (消费 xxx.Handler)
server.NewHTTPServer, // 这里会接收上面的 Handler 并生成 *HTTPServer
// App 启动入口
newApp,
))
}

权限校验:OpenFGA#

OpenFGA 是 Auth0 开源的细粒度授权系统,基于 Google Zanzibar 论文实现。相比 Casbin,它支持更复杂的关系型权限模型(如 Google Drive 的分享权限),并且提供独立的授权服务,支持水平扩展。

核心概念#

  • Type: 对象类型,如 user, document, folder
  • Relation: 关系,如 owner, editor, viewer
  • Tuple: 权限元组,如 user:alice is editor of document:readme

定义授权模型#

configs/fga-model.dsl

model
schema 1.1
type user
type document
relations
define owner: [user]
define editor: [user] or owner
define viewer: [user] or editor
type folder
relations
define owner: [user]
define editor: [user] or owner
define viewer: [user] or editor
define parent: [folder]
# 继承父文件夹的权限
define can_view: viewer or can_view from parent
define can_edit: editor or can_edit from parent
type organization
relations
define admin: [user]
define member: [user] or admin

初始化 OpenFGA Client#

internal/infrastructure/authz/openfga.go

package authz
import (
"context"
"log/slog"
openfga "github.com/openfga/go-sdk"
"github.com/openfga/go-sdk/client"
"my-project/internal/conf"
)
// Authorizer 授权检查器
85 collapsed lines
type Authorizer struct {
client *client.OpenFgaClient
storeID string
}
// NewAuthorizer 构造函数
func NewAuthorizer(cfg *conf.Auth) (*Authorizer, error) {
fgaClient, err := client.NewSdkClient(&client.ClientConfiguration{
ApiUrl: cfg.OpenFGA.ApiURL, // 如 http://localhost:8080
StoreId: cfg.OpenFGA.StoreID, // Store ID
})
if err != nil {
return nil, err
}
return &Authorizer{
client: fgaClient,
storeID: cfg.OpenFGA.StoreID,
}, nil
}
// Check 检查权限
// user: "user:alice"
// relation: "viewer"
// object: "document:readme"
func (a *Authorizer) Check(ctx context.Context, user, relation, object string) (bool, error) {
resp, err := a.client.Check(ctx).Body(client.ClientCheckRequest{
User: user,
Relation: relation,
Object: object,
}).Execute()
if err != nil {
slog.ErrorContext(ctx, "OpenFGA check failed", "err", err, "user", user, "relation", relation, "object", object)
return false, err
}
return resp.GetAllowed(), nil
}
// WriteTuple 写入权限元组
func (a *Authorizer) WriteTuple(ctx context.Context, user, relation, object string) error {
_, err := a.client.Write(ctx).Body(client.ClientWriteRequest{
Writes: []client.ClientTupleKey{
{
User: user,
Relation: relation,
Object: object,
},
},
}).Execute()
if err != nil {
slog.ErrorContext(ctx, "OpenFGA write failed", "err", err)
return err
}
return nil
}
// DeleteTuple 删除权限元组
func (a *Authorizer) DeleteTuple(ctx context.Context, user, relation, object string) error {
_, err := a.client.Write(ctx).Body(client.ClientWriteRequest{
Deletes: []client.ClientTupleKeyWithoutCondition{
{
User: user,
Relation: relation,
Object: object,
},
},
}).Execute()
return err
}
// ListObjects 列出用户有权限的对象
func (a *Authorizer) ListObjects(ctx context.Context, user, relation, objectType string) ([]string, error) {
resp, err := a.client.ListObjects(ctx).Body(client.ClientListObjectsRequest{
User: user,
Relation: relation,
Type: objectType,
}).Execute()
if err != nil {
return nil, err
}
return resp.GetObjects(), nil
}

Provider#

internal/infrastructure/authz/provider.go

package authz
import "github.com/google/wire"
var ProviderSet = wire.NewSet(NewAuthorizer)

中间件#

internal/middleware/authz.go

package middleware
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"my-project/internal/infrastructure/authz"
"my-project/pkg/apierror"
)
79 collapsed lines
// OpenFGAMiddleware 权限校验中间件
// resourceExtractor: 从请求中提取资源标识,如 "/documents/123" -> "document:123"
func OpenFGAMiddleware(
authorizer *authz.Authorizer,
resourceExtractor func(r *http.Request) (objectType, objectID, relation string),
) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 从 Context 获取用户信息 (由 JWT 中间件注入)
userID, ok := ctx.Value(CtxKeyUID).(uint)
if !ok {
apierror.WriteError(w, apierror.NewUnauthorized("未登录"))
return
}
// 提取资源信息
objectType, objectID, relation := resourceExtractor(r)
if objectType == "" || objectID == "" {
// 无需权限校验的路由,直接放行
next.ServeHTTP(w, r)
return
}
// 构建 OpenFGA 格式
user := fmt.Sprintf("user:%d", userID)
object := fmt.Sprintf("%s:%s", objectType, objectID)
// 检查权限
allowed, err := authorizer.Check(ctx, user, relation, object)
if err != nil {
apierror.WriteError(w, apierror.NewInternal(err))
return
}
if !allowed {
apierror.WriteError(w, apierror.NewForbidden("没有权限访问该资源"))
return
}
next.ServeHTTP(w, r)
})
}
}
// DefaultResourceExtractor 默认资源提取器
// 根据 HTTP Method 映射到 OpenFGA relation
func DefaultResourceExtractor(r *http.Request) (objectType, objectID, relation string) {
path := r.URL.Path
// 示例: /api/v1/documents/123
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) < 3 {
return "", "", ""
}
// 提取资源类型和 ID
// /api/v1/documents/123 -> objectType="document", objectID="123"
resourcePart := parts[2] // "documents"
objectType = strings.TrimSuffix(resourcePart, "s") // 去掉复数 s
if len(parts) >= 4 {
objectID = parts[3]
} else {
return "", "", ""
}
// HTTP Method -> Relation 映射
switch r.Method {
case http.MethodGet:
relation = "viewer"
case http.MethodPut, http.MethodPatch:
relation = "editor"
case http.MethodDelete:
relation = "owner"
default:
relation = "viewer"
}
return objectType, objectID, relation
}

注册#

func NewHTTPServer(
cfg *conf.Server,
tokenizer *auth.Tokenizer,
authorizer *authz.Authorizer, // Wire 注入 OpenFGA
// ...
) *HTTPServer {
r := chi.NewRouter()
// 先认证
r.Use(middleware.JWTAuth(tokenizer))
// 后鉴权 (需要资源级别权限的路由)
r.Route("/api/v1/documents", func(r chi.Router) {
r.Use(middleware.OpenFGAMiddleware(authorizer, middleware.DefaultResourceExtractor))
r.Get("/{id}", documentHandler.Get)
r.Put("/{id}", documentHandler.Update)
r.Delete("/{id}", documentHandler.Delete)
})
// ...
}

在 Service 层使用#

假设你有这么个document的Service,仅举例。

type DocumentService struct {
repo document.Repository
authorizer *authz.Authorizer
}
// Create 创建文档时自动授予 owner 权限
func (s *DocumentService) Create(ctx context.Context, userID uint, req CreateReq) (*Document, error) {
// 创建文档
doc, err := s.repo.Create(ctx, req.ToDomain())
if err != nil {
return nil, err
}
42 collapsed lines
// 写入权限元组: user:123 is owner of document:456
user := fmt.Sprintf("user:%d", userID)
object := fmt.Sprintf("document:%d", doc.ID)
if err := s.authorizer.WriteTuple(ctx, user, "owner", object); err != nil {
// 权限写入失败,回滚文档创建
_ = s.repo.Delete(ctx, doc.ID)
return nil, err
}
return doc, nil
}
// Share 分享文档
func (s *DocumentService) Share(ctx context.Context, docID uint, targetUserID uint, role string) error {
user := fmt.Sprintf("user:%d", targetUserID)
object := fmt.Sprintf("document:%d", docID)
// role: "viewer", "editor"
return s.authorizer.WriteTuple(ctx, user, role, object)
}
// ListMyDocuments 列出用户有权限查看的所有文档
func (s *DocumentService) ListMyDocuments(ctx context.Context, userID uint) ([]Document, error) {
user := fmt.Sprintf("user:%d", userID)
// 获取用户可以查看的所有文档 ID
objects, err := s.authorizer.ListObjects(ctx, user, "viewer", "document")
if err != nil {
return nil, err
}
// objects: ["document:1", "document:2", ...]
ids := make([]int, 0, len(objects))
for _, obj := range objects {
parts := strings.Split(obj, ":")
if len(parts) == 2 {
if id, err := strconv.Atoi(parts[1]); err == nil {
ids = append(ids, id)
}
}
}
return s.repo.FindByIDs(ctx, ids)
}

配置#

configs/config.yaml

auth:
jwt:
secret: "your-secret-key"
issuer: "my-project"
expire: 86400
openfga:
api_url: "http://localhost:8080" # OpenFGA 服务地址
store_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV"

Docker Compose 启动 OpenFGA#

services:
openfga:
image: openfga/openfga:latest
command: run
environment:
- OPENFGA_DATASTORE_ENGINE=postgres
- OPENFGA_DATASTORE_URI=postgres://user:pass@postgres:5432/openfga?sslmode=disable
ports:
- "8080:8080" # HTTP API
- "8081:8081" # gRPC API
- "3000:3000" # Playground UI
depends_on:
- postgres

与 Casbin 的对比#

特性CasbinOpenFGA
部署模式嵌入式库独立服务
权限模型RBAC/ABAC/ACLReBAC (关系型)
扩展性单机水平扩展
权限继承有限支持原生支持复杂继承
适用场景简单权限控制复杂资源共享场景
学习曲线中等(主要是语法)中等

Casbincsv配置的语法学习曲线比较麻烦的同时难以调试,OpenFGA算是Cerbos的进化版,虽然是自己作为独立的服务,增加了请求的步骤以及时间,但在代码上算是彻底解耦,自带Playground调试的同时适合后续分布式扩展。

同时,解决了要维护permissionsrolesrole_permissionsuser_roles这四张经典RBAC表的问题。

当然,users表还是得有的。

认证:JWT#

定义工具组件#

internal/infrastructure/auth/jwt.go

package auth
import (
"errors"
"time"
"github.com/golang-jwt/jwt/v5"
"my-project/internal/conf"
)
// MyClaims 自定义载荷
type MyClaims struct {
52 collapsed lines
UserID uint `json:"uid"`
Role string `json:"role"` // 放入角色,给 OpenFGA 用
jwt.RegisteredClaims
}
// Tokenizer 负责签发和解析
type Tokenizer struct {
secret []byte
issuer string
expire time.Duration
}
// NewTokenizer 构造函数 (从配置中读取密钥)
func NewTokenizer(cfg *conf.Auth) *Tokenizer {
return &Tokenizer{
secret: []byte(cfg.Jwt.Secret),
issuer: cfg.Jwt.Issuer,
expire: time.Duration(cfg.Jwt.Expire) * time.Second,
}
}
// Sign 签发 Token
func (t *Tokenizer) Sign(userID uint, role string) (string, error) {
claims := MyClaims{
UserID: userID,
Role: role,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(t.expire)), // 过期时间
IssuedAt: jwt.NewNumericDate(time.Now()), // 签发时间
Issuer: t.issuer, // 签发人
},
}
// 使用 HS256 算法进行签名
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
// 生成签名字符串
return token.SignedString(t.secret)
}
// Parse 解析 Token (给中间件用)
func (t *Tokenizer) Parse(tokenString string) (*MyClaims, error) {
token, err := jwt.ParseWithClaims(tokenString, &MyClaims{}, func(token *jwt.Token) (interface{}, error) {
return t.secret, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*MyClaims); ok && token.Valid {
return claims, nil
}
return nil, errors.New("invalid token")
}

提供Provider:#

internal/infrastructure/auth/provider.go

package auth
import "github.com/google/wire"
var ProviderSet = wire.NewSet(NewTokenizer)

中间件#

internal/middleware/jwt.go

package middleware
import (
"context"
"net/http"
"strings"
"my-project/internal/infrastructure/auth"
"my-project/pkg/apierror"
)
type ctxKey string
34 collapsed lines
const (
CtxKeyUID ctxKey = "uid"
CtxKeyRole ctxKey = "role"
)
// JWTAuth 构造函数需要传入 Tokenizer
func JWTAuth(tokenizer *auth.Tokenizer) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
apierror.WriteError(w, apierror.NewUnauthorized("未携带Token"))
return
}
// 提取 Bearer 后面的部分
parts := strings.SplitN(authHeader, " ", 2)
if len(parts) != 2 || parts[0] != "Bearer" {
apierror.WriteError(w, apierror.NewUnauthorized("Token格式错误"))
return
}
// 调用工具解析
claims, err := tokenizer.Parse(parts[1])
if err != nil {
apierror.WriteError(w, apierror.NewUnauthorized("Token无效或已过期"))
return
}
// 注入 Context
ctx := context.WithValue(r.Context(), CtxKeyUID, claims.UserID)
ctx = context.WithValue(ctx, CtxKeyRole, claims.Role)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}

注册:#

func NewHTTPServer(
cfg *conf.Server,
tokenizer *auth.Tokenizer, // Wire 注入
authorizer *authz.Authorizer, // Wire 注入 OpenFGA
// ...
) *HTTPServer {
r := chi.NewRouter()
r.Use(middleware.JWTAuth(tokenizer))
r.Use(middleware.OpenFGAMiddleware(authorizer, middleware.DefaultResourceExtractor))
// ...
}

使用:#

假设你有一个UserService

type UserService struct {
// ... 其他依赖
tokenizer *auth.Tokenizer // 注入 Token 工具
}
func (s *UserService) Login(ctx context.Context, req LoginReq) (*LoginResp, error) {
// 校验用户和密码 (省略)...
user, _ := s.repo.FindByUsername(ctx, req.Username)
// 签发 Token
token, err := s.tokenizer.Sign(user.ID, user.Role)
if err != nil {
return nil, err
}
return &LoginResp{AccessToken: token}, nil
}

表单验证:#

Connect-Go 配合 Protobuf 使用时,推荐使用 protovalidate 进行验证。

Proto 定义验证规则#

syntax = "proto3";
package user.v1;
import "buf/validate/validate.proto";
message CreateUserRequest {
string username = 1 [(buf.validate.field).string = {
min_len: 3,
max_len: 20
}];
string email = 2 [(buf.validate.field).string.email = true];
string password = 3 [(buf.validate.field).string = {
min_len: 6,
max_len: 128
}];
string role = 4 [(buf.validate.field).string = {
in: ["user", "admin"]
}];
}

Connect-Go 拦截器#

package middleware
import (
"context"
"buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate"
"connectrpc.com/connect"
"github.com/bufbuild/protovalidate-go"
"google.golang.org/protobuf/proto"
"my-project/pkg/apierror"
)
func NewValidationInterceptor() connect.UnaryInterceptorFunc {
validator, _ := protovalidate.New()
return func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
// 验证请求
if msg, ok := req.Any().(proto.Message); ok {
if err := validator.Validate(msg); err != nil {
return nil, apierror.NewValidationError(err)
}
}
return next(ctx, req)
}
}
}

对于复杂验证逻辑#

Service 层使用 Ozzo-Validation

import (
validation "github.com/go-ozzo/ozzo-validation/v4"
"github.com/go-ozzo/ozzo-validation/v4/is"
)
type ComplexForm struct {
Name string
Age int
Emails []string
Type string // "A" or "B"
ExtraData string
}
// 实现 Validatable 接口
func (f ComplexForm) Validate() error {
return validation.ValidateStruct(&f,
// 简单字段校验
validation.Field(&f.Name, validation.Required, validation.Length(5, 20)),
// 嵌套数组校验:每个 Email 必须是 email 格式
validation.Field(&f.Emails, validation.Each(is.Email)),
// 条件校验:当 Type="A" 时,ExtraData 必填
validation.Field(&f.ExtraData, validation.When(f.Type == "A", validation.Required).Else(validation.Nil)),
)
}

ORM:SQLC (Postgres + Pgx)#

SQLC编译期 SQL 代码生成器,它根据你写的 SQL 语句生成类型安全的 Go 代码。相比于 ORM,它更接近原生 SQL,性能更好。

为什么选SQLC,从我的角度出发:

  1. 回归到写SQL这种最本质的东西上,无需应付各种ORM框架的语法,后续想怎么迁移就怎么迁移。
  2. SQLORM代替本质上是因为麻烦,但现在LLM最不怕的就是麻烦,文档反正都是要写的,文档写好了它也懂怎么生成。
  3. LLM写的SQL人类可以Review,退一万步来说,初级开发者自己写的东西还不如$20一个月的Claude Code会员(

目录结构大致如下

/cmd/server/internal/
├── data/ <-- 【基础数据层】数据库模型
│ ├── db/ <-- SQLC 生成的代码
│ │ ├── db.go <-- DBTX 接口
│ │ ├── models.go <-- 生成的模型 (PO)
│ │ ├── querier.go <-- 查询接口
│ │ └── user.sql.go <-- 生成的查询实现
│ ├── migrations/ <-- SQL 迁移文件
│ │ ├── 001_init.up.sql
│ │ └── 001_init.down.sql
│ ├── queries/ <-- 原始 SQL 文件
│ │ └── user.sql
│ └── client.go <-- pgxpool 初始化
├── user/ <-- 【用户模块】
│ ├── domain.go <-- 领域实体 (DO) & 接口定义 (核心)
│ ├── service.go <-- 应用服务 (只操作 DO)
│ └── infrastructure/ <-- 基础设施实现
│ └── persistence/
│ └── user_repo.go <-- 实现 Repo 接口,负责 DO <-> PO 转换
└── order/ <-- 【其它你可能有的模块】
└── ...

SQLC 配置#

sqlc.yaml

version: "2"
sql:
- engine: "postgresql"
queries: "internal/data/queries"
schema: "internal/data/migrations"
gen:
go:
package: "db"
out: "internal/data/db"
sql_package: "pgx/v5"
emit_json_tags: true
emit_interface: true
emit_empty_slices: true

定义 SQL 查询#

internal/data/queries/user.sql

-- name: GetUserByID :one
SELECT id, username, email, password_hash, role, created_at, updated_at
FROM users
WHERE id = $1;
-- name: GetUserByUsername :one
SELECT id, username, email, password_hash, role, created_at, updated_at
FROM users
WHERE username = $1;
-- name: CreateUser :one
INSERT INTO users (username, email, password_hash, role)
18 collapsed lines
VALUES ($1, $2, $3, $4)
RETURNING id, created_at;
-- name: UpdateUser :exec
UPDATE users
SET username = $2, email = $3, updated_at = NOW()
WHERE id = $1;
-- name: DeleteUser :exec
DELETE FROM users WHERE id = $1;
-- name: ListUsers :many
SELECT id, username, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1 OFFSET $2;
-- name: GetUserForUpdate :one
SELECT id, username, email, version
FROM users
WHERE id = $1
FOR UPDATE;

执行 sqlc generate 生成代码。

领域层定义#

这里定义的是业务眼中的用户,完全不知道数据库是什么。

简称为DO

package user
import "context"
// User 领域实体 (Domain Object)
// 这是一个纯粹的结构体,没有 tag (或者只有 json tag),不依赖 sqlc
type User struct {
ID int
Username string
Email string
Role string
// 这里可以定义领域行为,例如:
// func (u *User) ChangePassword(newPwd string) { ... }
}
// Repository 接口定义
// Service 层只依赖这个接口
type Repository interface {
FindByID(ctx context.Context, id int) (*User, error)
Save(ctx context.Context, user *User) (int, error)
// ... 等方法,仅作为示范
}

基建层:#

internal/user/infrastructure/persistence/user_repo.go

负责将DO转为PO,以及PO转为DO

package persistence
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"my-project/internal/data/db"
"my-project/internal/user"
"my-project/pkg/apierror"
46 collapsed lines
)
type UserRepo struct {
pool *pgxpool.Pool
queries *db.Queries
}
func NewUserRepo(pool *pgxpool.Pool) user.Repository {
return &UserRepo{
pool: pool,
queries: db.New(pool),
}
}
// toDomain 将 SQLC 的 PO 转换为 业务的 DO
func (r *UserRepo) toDomain(row db.User) *user.User {
return &user.User{
ID: int(row.ID),
Username: row.Username,
Email: row.Email,
Role: row.Role,
}
}
// FindByID 实现接口
func (r *UserRepo) FindByID(ctx context.Context, id int) (*user.User, error) {
row, err := r.queries.GetUserByID(ctx, int64(id))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, apierror.NewNotFound("user", id)
}
return nil, err
}
return r.toDomain(row), nil
}
// Save 实现接口
func (r *UserRepo) Save(ctx context.Context, u *user.User) (int, error) {
result, err := r.queries.CreateUser(ctx, db.CreateUserParams{
Username: u.Username,
Email: u.Email,
PasswordHash: u.PasswordHash,
Role: u.Role,
})
if err != nil {
return 0, err
}
return int(result.ID), nil
}

使用:#

仅提供给Service调用user_repo.go

package user
type Service struct {
repo Repository
}
func (s *Service) GetUserInfo(ctx context.Context, id int) (*User, error) {
// s.repo.FindByID 返回的是 *User (DO),不是 sqlc 生成的类型
return s.repo.FindByID(ctx, id)
}

为什么多一层中间层:#

虽然 SQLC 生成的代码很干净,但过度依赖其生成的 Struct 穿透到 Service 甚至 Controller 层,会导致整个项目与数据库表结构强绑定。通过 Repository 返回纯净的 DO,我们可以在 Repo 内部灵活切换实现(加缓存、换数据库),而 Service 层完全无感知。

对于DTO<->DO,推荐在DTO内部完成:

在Go里面,Function做不了重载,但是对于Method来说,因为挂载的对象不同,自然可以使用同名。

虽然有 struct copy 的开销,但在 Go 中这种内存操作极快(纳秒级),相比于 DB I/O(毫秒级)可以忽略不计。

换来的是业务逻辑与底层实现的彻底解耦,这在长期维护中是非常划算的。

package dto
import "my-project/internal/user" // 引用DO实体所在的包
type RegisterReq struct {
Username string
Password string
}
// ToDomain 将 RegisterReq 转为 User 实体
func (r *RegisterReq) ToDomain() *user.User {
return &user.User{
Username: r.Username,
// 这里还可以做一些简单的非业务逻辑处理
Password: r.Password,
}
}

事务:#

只是这样的话,并不足以在生产环境中使用,没有事务没有锁的数据库操作只能算个玩具。

定义:#

我们来做一个通用的事务管理器internal/data/tm.go

package data
import (
"context"
"errors"
"log/slog"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
68 collapsed lines
"my-project/internal/data/db"
)
// 定义 Context Key,私有类型防止冲突
type txKey struct{}
// GetQueriesFromContext 供 Repo 层提取 Queries
// 如果 Context 里有事务,返回事务内的 Queries;否则返回 nil。
func GetQueriesFromContext(ctx context.Context) *db.Queries {
if q, ok := ctx.Value(txKey{}).(*db.Queries); ok {
return q
}
return nil
}
// TransactionManager 接口
type TransactionManager interface {
Exec(ctx context.Context, fn func(ctx context.Context) error) error
}
// PgxTransaction 具体实现
type PgxTransaction struct {
pool *pgxpool.Pool
}
func NewTransactionManager(pool *pgxpool.Pool) TransactionManager {
return &PgxTransaction{pool: pool}
}
// Exec 执行事务
func (tm *PgxTransaction) Exec(ctx context.Context, fn func(ctx context.Context) error) error {
txID := uuid.New().String()
log := slog.With("tx_id", txID, "component", "tx_manager")
start := time.Now()
// 开启事务
tx, err := tm.pool.Begin(ctx)
if err != nil {
log.ErrorContext(ctx, "Tx: Begin Failed", "err", err)
return err
}
log.DebugContext(ctx, "Tx: Begin")
// 创建事务内的 Queries
queries := db.New(tx)
ctxWithTx := context.WithValue(ctx, txKey{}, queries)
defer func() {
if p := recover(); p != nil {
_ = tx.Rollback(ctx)
panic(p)
}
}()
// 执行业务逻辑 (传入带有 tx 的 context)
if err := fn(ctxWithTx); err != nil {
if rerr := tx.Rollback(ctx); rerr != nil {
log.ErrorContext(ctx, "Tx: Rollback Failed", "rollback_err", rerr, "origin_err", err)
}
return err
}
// 提交
if err := tx.Commit(ctx); err != nil {
return err
}
log.DebugContext(ctx, "Tx: Committed", "duration", time.Since(start))
return nil
}

这个时候,刚刚的user_repo.go就要多实现一个方法getQueries,来判断到底是普通queries还是事务queries

package persistence
import (
"my-project/internal/data"
"my-project/internal/data/db"
)
type UserRepo struct {
pool *pgxpool.Pool
queries *db.Queries
}
57 collapsed lines
func NewUserRepo(pool *pgxpool.Pool) user.Repository {
return &UserRepo{
pool: pool,
queries: db.New(pool),
}
}
func (r *UserRepo) getQueries(ctx context.Context) *db.Queries {
// 尝试从 Context 中获取事务 Queries
if q := data.GetQueriesFromContext(ctx); q != nil {
return q
}
// 如果没有事务,返回基础 Queries
return r.queries
}
// 使用示例
func (r *UserRepo) Save(ctx context.Context, u *user.User) (int, error) {
// 使用 getQueries(ctx) 替代 r.queries
// 如果 Service 层开启了事务,这里的 Create 就是在事务中执行的
result, err := r.getQueries(ctx).CreateUser(ctx, db.CreateUserParams{
Username: u.Username,
Email: u.Email,
PasswordHash: u.PasswordHash,
Role: u.Role,
})
if err != nil {
return 0, err
}
return int(result.ID), nil
}
// 悲观锁例子
func (r *UserRepo) FindByIDForUpdate(ctx context.Context, id int) (*user.User, error) {
// 使用 FOR UPDATE 查询(需要在 SQL 中定义)
row, err := r.getQueries(ctx).GetUserForUpdate(ctx, int64(id))
if err != nil {
return nil, err
}
return r.toDomain(row), nil
}
// 乐观锁CAS例子 (需要表有 version 字段)
func (r *UserRepo) UpdateWithVersion(ctx context.Context, u *user.User) error {
// SQL: UPDATE users SET name=$2, version=version+1 WHERE id=$1 AND version=$3
affected, err := r.getQueries(ctx).UpdateUserWithVersion(ctx, db.UpdateUserWithVersionParams{
ID: int64(u.ID),
Name: u.Username,
Version: u.Version,
})
if err != nil {
return err
}
if affected == 0 {
return apierror.NewConflict("数据已被修改,请刷新重试")
}
return nil
}

对于service来说则是完全不关心user_repo.go里面做了什么,只需要调用事务即可:

type UserService struct {
tm data.TransactionManager // 注入事务管理器
userRepo user.Repository
logRepo log.Repository
}
func (s *UserService) Register(ctx context.Context, req Req) error {
// 开启事务
return s.tm.Exec(ctx, func(ctx context.Context) error {
// 创建用户 (传递 ctx)
if err := s.userRepo.Save(ctx, user); err != nil {
return err // 返回 error 会触发 Rollback
}
// 记录操作日志 (传递 ctx)
// 因为 ctx 里带了 tx,所以 userRepo 和 logRepo 实际上是在同一个 db 事务里
if err := s.logRepo.Add(ctx, "user created"); err != nil {
return err
}
return nil // 返回 nil 触发 Commit
})
}

报错统一管理:RFC 7807 + 自定义扩展#

RFC 7807 是 IETF 定义的 Problem Details for HTTP APIs 标准,它规定了一个统一的错误响应格式。我们在此基础上扩展自定义的 codetrace_id

定义错误结构体#

pkg/apierror/error.go

package apierror
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"go.opentelemetry.io/otel/trace"
)
59 collapsed lines
// ProblemDetail RFC 7807 + 自定义扩展
type ProblemDetail struct {
// RFC 7807 标准字段
Type string `json:"type"` // 错误类型 URI
Title string `json:"title"` // 简短标题
Status int `json:"status"` // HTTP 状态码
Detail string `json:"detail,omitempty"` // 详细描述
Instance string `json:"instance,omitempty"` // 请求路径
// 自定义扩展字段
Code string `json:"code"` // 业务错误码
TraceID string `json:"trace_id,omitempty"` // 链路追踪 ID
// 验证错误详情
Errors []FieldError `json:"errors,omitempty"`
// 额外字段 (用于传递给前端的参数)
Extensions map[string]any `json:"extensions,omitempty"`
// 内部字段 (不序列化)
raw error `json:"-"`
}
// FieldError 字段级错误
type FieldError struct {
Field string `json:"field"`
Code string `json:"code"`
Message string `json:"message"`
}
// Error 实现 error 接口
func (p *ProblemDetail) Error() string {
return p.Detail
}
// Unwrap 支持 errors.Is/As
func (p *ProblemDetail) Unwrap() error {
return p.raw
}
// WithRaw 携带原始错误
func (p *ProblemDetail) WithRaw(err error) *ProblemDetail {
p.raw = err
return p
}
// WithExtension 添加扩展字段
func (p *ProblemDetail) WithExtension(key string, value any) *ProblemDetail {
if p.Extensions == nil {
p.Extensions = make(map[string]any)
}
p.Extensions[key] = value
return p
}
// WithTraceID 从 context 注入 TraceID
func (p *ProblemDetail) WithTraceID(ctx context.Context) *ProblemDetail {
span := trace.SpanFromContext(ctx)
if span.SpanContext().IsValid() {
p.TraceID = span.SpanContext().TraceID().String()
}
return p
}

错误码定义#

pkg/apierror/codes.go

package apierror
// 通用错误码
const (
CodeSuccess = "SUCCESS"
CodeInternalError = "INTERNAL_ERROR"
CodeInvalidParams = "INVALID_PARAMS"
CodeUnauthorized = "UNAUTHORIZED"
CodeForbidden = "FORBIDDEN"
CodeNotFound = "NOT_FOUND"
CodeConflict = "CONFLICT"
CodeTooManyRequests = "TOO_MANY_REQUESTS"
)
// 用户模块错误码
const (
CodeUserNotFound = "USER_NOT_FOUND"
CodeInvalidCredentials = "INVALID_CREDENTIALS"
CodeUsernameExists = "USERNAME_EXISTS"
CodeUserBanned = "USER_BANNED"
)
// 数据库错误码
const (
CodeDBRecordNotFound = "DB_RECORD_NOT_FOUND"
CodeDBDuplicateEntry = "DB_DUPLICATE_ENTRY"
CodeDBDeadlock = "DB_DEADLOCK"
CodeDBLockTimeout = "DB_LOCK_TIMEOUT"
)

构造函数#

pkg/apierror/factory.go

package apierror
import "fmt"
const typeBase = "https://api.myproject.com/errors"
// NewNotFound 404 错误
func NewNotFound(entity string, id any) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/not-found",
Title: "Resource Not Found",
Status: 404,
68 collapsed lines
Detail: fmt.Sprintf("%s with id '%v' not found", entity, id),
Code: CodeNotFound,
}
}
// NewUnauthorized 401 错误
func NewUnauthorized(detail string) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/unauthorized",
Title: "Unauthorized",
Status: 401,
Detail: detail,
Code: CodeUnauthorized,
}
}
// NewForbidden 403 错误
func NewForbidden(detail string) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/forbidden",
Title: "Forbidden",
Status: 403,
Detail: detail,
Code: CodeForbidden,
}
}
// NewBadRequest 400 错误
func NewBadRequest(code, detail string) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/bad-request",
Title: "Bad Request",
Status: 400,
Detail: detail,
Code: code,
}
}
// NewConflict 409 错误
func NewConflict(detail string) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/conflict",
Title: "Conflict",
Status: 409,
Detail: detail,
Code: CodeConflict,
}
}
// NewValidationError 验证错误
func NewValidationError(errs ...FieldError) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/validation-error",
Title: "Validation Error",
Status: 400,
Detail: "一个或多个参数校验失败",
Code: CodeInvalidParams,
Errors: errs,
}
}
// NewInternal 500 错误 (隐藏内部细节)
func NewInternal(raw error) *ProblemDetail {
return &ProblemDetail{
Type: typeBase + "/internal-error",
Title: "Internal Server Error",
Status: 500,
Detail: "服务器开小差了,请稍后重试",
Code: CodeInternalError,
raw: raw,
}
}

错误翻译器(Postgres)#

pkg/apierror/translator.go

package apierror
import (
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
// TranslatePgError 将 Postgres 错误翻译为 ProblemDetail
func TranslatePgError(err error) *ProblemDetail {
if err == nil {
57 collapsed lines
return nil
}
// 记录不存在
if errors.Is(err, pgx.ErrNoRows) {
return NewNotFound("record", "unknown").WithRaw(err)
}
// Postgres 驱动错误
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "23505": // unique_violation
return &ProblemDetail{
Type: typeBase + "/duplicate-entry",
Title: "Duplicate Entry",
Status: 409,
Detail: "数据已存在",
Code: CodeDBDuplicateEntry,
}.WithExtension("constraint", pgErr.ConstraintName).WithRaw(err)
case "23503": // foreign_key_violation
return NewBadRequest(CodeInvalidParams, "关联数据不存在或被占用").
WithExtension("detail", pgErr.Detail).
WithRaw(err)
case "40001": // serialization_failure
return NewConflict("数据冲突,请重试").
WithExtension("reason", "serialization_failure").
WithRaw(err)
case "40P01": // deadlock_detected
return &ProblemDetail{
Type: typeBase + "/deadlock",
Title: "Database Deadlock",
Status: 503,
Detail: "系统繁忙,请稍后重试",
Code: CodeDBDeadlock,
}.WithExtension("reason", "deadlock").WithRaw(err)
}
}
// 未知错误
return NewInternal(err)
}
// IsLockError 判断是否是锁相关错误
func IsLockError(err error) (bool, string) {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "40001":
return true, "SERIALIZATION_FAILURE"
case "40P01":
return true, "DEADLOCK"
case "55P03":
return true, "LOCK_NOT_AVAILABLE"
}
}
return false, ""
}

HTTP 响应工具#

pkg/apierror/response.go

package apierror
import (
"encoding/json"
"log/slog"
"net/http"
)
// WriteError 写入错误响应
func WriteError(w http.ResponseWriter, p *ProblemDetail) {
w.Header().Set("Content-Type", "application/problem+json")
w.WriteHeader(p.Status)
json.NewEncoder(w).Encode(p)
}
// WriteJSON 写入成功响应
func WriteJSON(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}

Connect-Go 错误拦截器#

internal/middleware/error_interceptor.go

package middleware
import (
"context"
"errors"
"log/slog"
"connectrpc.com/connect"
"google.golang.org/protobuf/encoding/protojson"
"my-project/pkg/apierror"
)
72 collapsed lines
func NewErrorInterceptor() connect.UnaryInterceptorFunc {
return func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
resp, err := next(ctx, req)
if err == nil {
return resp, nil
}
// 尝试断言为 ProblemDetail
var problem *apierror.ProblemDetail
if errors.As(err, &problem) {
// 注入 TraceID
problem.WithTraceID(ctx)
// 记录日志
logError(ctx, problem)
// 转换为 Connect 错误
return nil, toConnectError(problem)
}
// 未知错误,包装为内部错误
internal := apierror.NewInternal(err).WithTraceID(ctx)
logError(ctx, internal)
return nil, toConnectError(internal)
}
}
}
func logError(ctx context.Context, p *apierror.ProblemDetail) {
attrs := []any{
slog.String("code", p.Code),
slog.Int("status", p.Status),
slog.String("trace_id", p.TraceID),
}
if p.raw != nil {
attrs = append(attrs, slog.Any("err", p.raw))
slog.ErrorContext(ctx, p.Detail, attrs...)
} else {
slog.WarnContext(ctx, p.Detail, attrs...)
}
}
func toConnectError(p *apierror.ProblemDetail) *connect.Error {
code := httpToConnectCode(p.Status)
cerr := connect.NewError(code, errors.New(p.Detail))
// 将 ProblemDetail 作为详情附加
detail, _ := connect.NewErrorDetail(p)
cerr.AddDetail(detail)
return cerr
}
func httpToConnectCode(status int) connect.Code {
switch status {
case 400:
return connect.CodeInvalidArgument
case 401:
return connect.CodeUnauthenticated
case 403:
return connect.CodePermissionDenied
case 404:
return connect.CodeNotFound
case 409:
return connect.CodeAlreadyExists
case 429:
return connect.CodeResourceExhausted
case 503:
return connect.CodeUnavailable
default:
return connect.CodeInternal
}
}

Recovery 中间件#

internal/middleware/recovery.go

package middleware
import (
"encoding/json"
"log/slog"
"net"
"net/http"
"os"
"runtime/debug"
"strings"
"my-project/pkg/apierror"
38 collapsed lines
)
func Recoverer(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
ctx := r.Context()
// 判断是否是网络连接中断
var brokenPipe bool
if ne, ok := err.(*net.OpError); ok {
if se, ok := ne.Err.(*os.SyscallError); ok {
errStr := strings.ToLower(se.Error())
if strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "connection reset by peer") {
brokenPipe = true
}
}
}
// 记录日志
slog.ErrorContext(ctx, "Panic!",
slog.Any("error", err),
slog.String("stack", string(debug.Stack())),
slog.Bool("broken_pipe", brokenPipe),
)
if brokenPipe {
// 连接已断开,无法写入响应
return
}
// 返回 RFC 7807 格式错误
problem := apierror.NewInternal(nil).WithTraceID(ctx)
w.Header().Set("Content-Type", "application/problem+json")
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(problem)
}
}()
next.ServeHTTP(w, r)
})
}

使用示例#

Repo 层#
func (r *UserRepo) FindByID(ctx context.Context, id int) (*user.User, error) {
row, err := r.getQueries(ctx).GetUserByID(ctx, int64(id))
if err != nil {
// 翻译数据库错误
return nil, apierror.TranslatePgError(err).
WithExtension("entity", "user").
WithExtension("id", id)
}
return r.toDomain(row), nil
}
Service 层#
func (s *UserService) Get(ctx context.Context, id int) (*User, error) {
user, err := s.repo.FindByID(ctx, id)
if err != nil {
return nil, err // 直接返回已翻译的错误
}
if user.Banned {
return nil, apierror.NewForbidden("用户已被封禁").
WithExtension("user_id", id)
}
return user, nil
}
响应示例#
{
"type": "https://api.myproject.com/errors/not-found",
"title": "Resource Not Found",
"status": 404,
"detail": "user with id '123' not found",
"code": "NOT_FOUND",
"trace_id": "a1b2c3d4e5f6789",
"extensions": {
"entity": "user",
"id": 123
}
}

测试#

核心库: testing(标准库)

不需要引入任何依赖,只需要创建一个以 _test.go 结尾的文件,并写一个 Test 开头的函数即可。

核心思想:表格驱动测试

calc_test.go
package calc
import "testing"
func TestAdd(t *testing.T) {
// 定义表格 (测试用例集合)
tests := []struct {
name string // 用例名称
a, b int // 输入
want int // 期望输出
}{
{"正数相加", 1, 2, 3},
{"负数相加", -1, -2, -3},
{"零值测试", 0, 0, 0},
}
// 循环执行
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Add(tt.a, tt.b); got != tt.want {
t.Errorf("Add() = %v, want %v", got, tt.want)
}
})
}
}

断言库:Testify#

类似于AssertJHamcrest,它提供了 assert 和 require

import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSomething(t *testing.T) {
// 类似 Java 的 Assertions
assert.Equal(t, 123, 123, "它们应该相等")
assert.NotNil(t, object)
// require 会在失败时立即停止测试(类似 JUnit 的 assert)
// 而 assert 失败后还会继续执行后续代码
// require.NoError(t, err)
}

Mock框架:stretchr/testify + vektra/mockery#

stretchr/testify

vektra/mockery

定义一个interface

interface UserRepo {
Get(id int)
User
}

执行:mockery --name UserRepo

测试:

func TestService(t *testing.T) {
// 创建 Mock 对象
mockRepo := mocks.NewUserRepo(t)
// 像 Mockito 一样打桩
mockRepo.On("Get", 1).Return(User{Name: "Go"}, nil)
// 调用
service := NewService(mockRepo)
// ...
// 验证调用
mockRepo.AssertExpectations(t)
}

数据库测试#

不要Mock,使用Testcontainers起一个docker容器跑真实数据库。

func TestUserRepo_Create(t *testing.T) {
// 1. 使用 Testcontainers 启动一个 Docker Postgres
// 2. 连接该 DB
// 3. 运行迁移脚本建表
// 4. repo.Create(user)
// 5. check db result
}

任务与构建:Taskfile#

TaskfileMakefile 的现代替代品,使用 YAML 语法,更加清晰易读。

Taskfile.yaml

version: "3"
vars:
VERSION:
sh: git describe --tags --always || echo "dev"
COMMIT:
sh: git rev-parse --short HEAD
BUILD_TIME:
sh: date +%FT%T%z
tasks:
default:
113 collapsed lines
desc: "显示帮助信息"
cmds:
- task --list
# 开发相关
dev:
desc: "启动开发服务器"
cmds:
- go run ./cmd/server
dev:worker:
desc: "启动 Worker"
cmds:
- go run ./cmd/worker
# 代码生成
generate:
desc: "生成所有代码 (Wire, SQLC, Buf)"
cmds:
- task: generate:wire
- task: generate:sqlc
- task: generate:buf
generate:wire:
desc: "生成 Wire 依赖注入代码"
cmds:
- wire ./cmd/server
- wire ./cmd/worker
generate:sqlc:
desc: "生成 SQLC 代码"
cmds:
- sqlc generate
generate:buf:
desc: "生成 Protobuf 代码"
cmds:
- buf generate
# 构建相关
build:
desc: "构建所有二进制"
cmds:
- task: build:server
- task: build:worker
build:server:
desc: "构建 Server"
cmds:
- |
go build -ldflags "-X main.Version={{.VERSION}} -X main.CommitHash={{.COMMIT}} -X main.BuildTime={{.BUILD_TIME}}" \
-o bin/server ./cmd/server
build:worker:
desc: "构建 Worker"
cmds:
- |
go build -ldflags "-X main.Version={{.VERSION}} -X main.CommitHash={{.COMMIT}} -X main.BuildTime={{.BUILD_TIME}}" \
-o bin/worker ./cmd/worker
# 测试相关
test:
desc: "运行所有测试"
cmds:
- go test -v -race -cover ./...
test:coverage:
desc: "生成测试覆盖率报告"
cmds:
- go test -coverprofile=coverage.out ./...
- go tool cover -html=coverage.out -o coverage.html
# 代码质量
lint:
desc: "运行代码检查"
cmds:
- golangci-lint run ./...
fmt:
desc: "格式化代码"
cmds:
- go fmt ./...
- goimports -w .
# 数据库迁移
migrate:up:
desc: "执行数据库迁移"
cmds:
- go run ./cmd/migration up
migrate:down:
desc: "回滚数据库迁移"
cmds:
- go run ./cmd/migration down
migrate:create:
desc: "创建新的迁移文件"
cmds:
- migrate create -ext sql -dir internal/data/migrations -seq {{.CLI_ARGS}}
# Docker 相关
docker:build:
desc: "构建 Docker 镜像"
cmds:
- docker build -t my-project:{{.VERSION}} .
docker:push:
desc: "推送 Docker 镜像"
cmds:
- docker push my-project:{{.VERSION}}
# 清理
clean:
desc: "清理构建产物"
cmds:
- rm -rf bin/
- rm -rf coverage.*

使用:

Terminal window
# 查看所有任务
task
# 运行开发服务器
task dev
# 生成代码
task generate
# 构建
task build
# 测试
task test

Docker 镜像:Distroless#

Distroless 镜像只包含应用运行时必需的最小依赖,没有 shell、包管理器等,大大减少了攻击面。

Dockerfile

# ============================================
# Stage 1: 构建阶段
# ============================================
FROM golang:1.23-alpine AS builder
# 安装构建依赖
RUN apk add --no-cache git ca-certificates tzdata
WORKDIR /app
# 先复制依赖文件,利用 Docker 缓存
COPY go.mod go.sum ./
47 collapsed lines
RUN go mod download
# 复制源代码
COPY . .
# 构建参数
ARG VERSION=dev
ARG COMMIT=unknown
ARG BUILD_TIME=unknown
# 编译 (CGO_ENABLED=0 确保静态编译)
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags "-w -s -X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \
-o /app/server ./cmd/server
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags "-w -s -X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \
-o /app/worker ./cmd/worker
# ============================================
# Stage 2: 运行阶段 (Distroless)
# ============================================
FROM gcr.io/distroless/static-debian12:nonroot
# 复制时区数据
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
# 复制 CA 证书 (HTTPS 请求需要)
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# 复制二进制文件
COPY --from=builder /app/server /server
COPY --from=builder /app/worker /worker
# 复制配置文件目录
COPY --from=builder /app/configs /configs
# 设置时区
ENV TZ=Asia/Shanghai
# 非 root 用户运行 (Distroless 默认使用 nonroot 用户)
USER nonroot:nonroot
# 暴露端口
EXPOSE 8000 9000
# 健康检查 (Distroless 没有 curl,使用自定义健康检查端点)
# HEALTHCHECK 需要在 docker-compose 或 k8s 中配置
# 默认启动 server
ENTRYPOINT ["/server"]

docker-compose.yml

version: "3.8"
services:
server:
build:
context: .
args:
VERSION: ${VERSION:-dev}
COMMIT: ${COMMIT:-unknown}
BUILD_TIME: ${BUILD_TIME:-unknown}
image: my-project:${VERSION:-latest}
ports:
43 collapsed lines
- "8000:8000"
- "9000:9000"
environment:
- APP_DATA_DATABASE_SOURCE=postgres://user:pass@postgres:5432/mydb?sslmode=disable
- APP_DATA_REDIS_ADDR=redis:6379
depends_on:
- postgres
- redis
restart: unless-stopped
worker:
image: my-project:${VERSION:-latest}
entrypoint: ["/worker"]
environment:
- APP_DATA_DATABASE_SOURCE=postgres://user:pass@postgres:5432/mydb?sslmode=disable
- APP_DATA_REDIS_ADDR=redis:6379
depends_on:
- postgres
- redis
restart: unless-stopped
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "4318:4318"
environment:
- LOG_LEVEL=debug
volumes:
postgres_data:

异步任务队列:Asynq#

github.com/hibiken/asynq

简单以user模块举例

/internal/
├── user/
│ ├── job/ <-- 这里放 Consumer (消费者逻辑)
│ │ ├── handler.go <-- 类似于 HTTP 的 handler.go
│ │ └── payload.go <-- 定义任务参数结构体、Task Type 常量
│ ├── service.go
│ └── provider.go <-- Wire Provider
└── server/
├── http.go <-- Chi + Connect-Go Server
└── worker.go <-- Asynq Server (集中组装)

(异步任务部分与原文类似,此处省略重复内容)

代码质量:GolangCI-Lint#

.golangci.yaml

示例如下,按照实际需求调整:

run:
timeout: 5m
skip-dirs:
- internal/data/db # 忽略 SQLC 生成代码
- gen # 忽略 Protobuf 生成代码
linters:
enable:
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- typecheck
- unused
- goconst # 检查重复字符串
- gocyclo # 检查圈复杂度 (防止函数过长)
linters-settings:
gocyclo:
min-complexity: 15

目录总结#

参考:golang-standards/project-layout

/my-project
├── cmd/ <-- 【入口层】所有可执行程序的入口
│ ├── server/ <-- HTTP/gRPC API 服务入口
│ │ ├── main.go <-- 负责加载配置、初始化 Log、启动 App
│ │ ├── wire.go <-- Wire 依赖定义 (HTTP)
│ │ └── wire_gen.go <-- Wire 自动生成代码
│ │
│ ├── worker/ <-- 异步任务 Worker 入口
│ │ ├── main.go <-- 负责启动 Asynq Server
│ │ ├── wire.go <-- Wire 依赖定义 (Worker)
│ │ └── wire_gen.go <-- Wire 自动生成代码
│ │
86 collapsed lines
│ └── migration/ <-- 数据库迁移工具入口
│ └── main.go
├── configs/ <-- 【配置层】
│ ├── config.yaml <-- 本地开发配置文件
│ └── fga-model.dsl <-- OpenFGA 授权模型
├── deploy/ <-- 【部署层】
│ ├── Dockerfile <-- 多阶段 Distroless 构建
│ └── docker-compose.yml <-- 本地环境编排 (Postgres, Redis, Jaeger)
├── gen/ <-- 【生成代码层】(Buf 生成的 Protobuf)
│ └── user/v1/
│ ├── user.pb.go
│ └── userv1connect/
├── proto/ <-- 【Protobuf 定义】
│ └── user/v1/
│ └── user.proto
├── internal/ <-- 业务层
│ │
│ ├── conf/ <-- 配置定义
│ │ ├── conf.go <-- Viper 加载逻辑
│ │ └── provider.go <-- Wire Provider
│ │
│ ├── data/ <-- 数据层
│ │ ├── db/ <-- SQLC 自动生成的代码
│ │ │ ├── db.go
│ │ │ ├── models.go
│ │ │ └── user.sql.go
│ │ ├── migrations/ <-- SQL 迁移文件
│ │ ├── queries/ <-- 原始 SQL 查询
│ │ ├── client.go <-- pgxpool 初始化
│ │ ├── tm.go <-- Transaction Manager
│ │ └── provider.go <-- Wire Provider
│ │
│ ├── infrastructure/ <-- 【基建】(非业务逻辑)
│ │ └── auth/ <-- 认证工具
│ │ ├── jwt.go
│ │ └── provider.go
│ │
│ ├── middleware/ <-- 【中间件】
│ │ ├── jwt.go <-- 认证
│ │ ├── authz.go <-- 鉴权 (OpenFGA)
│ │ ├── logger.go <-- HTTP 请求日志
│ │ ├── recovery.go <-- 全局 Panic 兜底
│ │ ├── error_interceptor.go <-- Connect-Go 错误拦截器
│ │ └── validation.go <-- 参数校验
│ │
│ ├── server/ <-- 【Server 组装层】
│ │ ├── http.go <-- Chi + Connect-Go
│ │ └── worker.go <-- Asynq Server
│ │
│ └── user/ <-- 【业务模块:用户(举例)】(垂直切片)
│ ├── domain.go <-- 领域实体 (DO) & Repo 接口定义
│ ├── service.go <-- 应用服务
│ ├── handler.go <-- Connect-Go Handler
│ ├── provider.go <-- Wire ProviderSet
│ │
│ ├── job/ <-- 异步任务相关
│ │ ├── handler.go
│ │ └── payload.go
│ │
│ └── infrastructure/ <-- 模块内的基础设施实现
│ └── persistence/
│ └── user_repo.go <-- 实现 Repo 接口 (调用 SQLC)
├── pkg/ <-- 【公共库层】(可被外部项目引用)
│ ├── apierror/ <-- RFC 7807 错误处理
│ │ ├── error.go <-- ProblemDetail 结构体
│ │ ├── codes.go <-- 错误码定义
│ │ ├── factory.go <-- 构造函数
│ │ ├── translator.go <-- 数据库错误翻译
│ │ └── response.go <-- HTTP 响应工具
│ │
│ └── logger/ <-- 日志库封装
│ ├── zap.go
│ └── trace_handler.go
├── devbox.json <-- Devbox 开发环境配置
├── devbox.lock
├── buf.yaml <-- Buf 配置
├── buf.gen.yaml <-- Buf 代码生成配置
├── sqlc.yaml <-- SQLC 配置
├── Taskfile.yaml <-- Taskfile (替代 Makefile)
├── .golangci.yaml <-- GolangCI-Lint 配置
├── go.mod
├── go.sum
└── README.md

入口文件优雅中断服务#

package main
import (
"context"
"errors"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
43 collapsed lines
"my-project/internal/conf"
// ... 其他导入
)
func main() {
// 加载配置
cfg := conf.Load()
// 初始化 App (Wire注入)
// 注意:这里的 cleanup 通常包含关闭 DB、Redis、Trace 的逻辑
app, cleanup, err := initApp(cfg)
if err != nil {
panic(err)
}
// 这里的 cleanup 必须最后执行(LIFO原则):
// 先停 HTTP 服务 -> 再断开 DB 连接
defer cleanup()
// 启动 HTTP 服务 (在 Goroutine 中运行)
go func() {
slog.Info("Server is running", "addr", cfg.Server.Http.Addr)
if err := app.HTTPServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("HTTP Server listen failed", "err", err)
panic(err)
}
}()
// 监听系统信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// 阻塞在这里,直到收到信号
<-quit
slog.Info("Shutting down server...")
// 执行优雅停机
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := app.HTTPServer.Shutdown(ctx); err != nil {
slog.Error("Server forced to shutdown", "err", err)
} else {
slog.Info("Server exited properly")
}
// 函数结束,触发最上面的 defer cleanup()
}
Golang生产向脚手架搭建(基于DDD、SQLC、Connect-Go)
https://blog.astro777.cfd/posts/guide/go-project-infrastructure/
作者
ASTRO
发布于
2026-02-04
许可协议
CC BY-NC-SA 4.0