本文思路为DDD驱动开发,项目结构为按模块划分的垂直结构。
技术栈速览
| 领域 | 技术选型 |
|---|---|
| 依赖注入 | Wire (编译期) |
| 路由/RPC | Chi + Connect-Go |
| 数据库 | SQLC + Pgx (Postgres) |
| 权限 | OpenFGA (ReBAC) |
| 认证 | JWT |
| 日志/追踪 | Zap + OpenTelemetry |
| 异步队列 | Asynq (Redis) |
| 构建工具 | Taskfile |
| 开发环境 | Devbox (Nix) |
| 容器镜像 | Distroless |
| 错误处理 | RFC 7807 + 自定义扩展 |
依赖注入:Wire
Wire 是编译期依赖注入,相比于 Uber Dig 等运行时注入框架,它如果依赖缺失,代码是编译不过的。这是它最大的优势:Fail Fast。
/cmd/server/internal/├── user/ <-- 所有和"用户"相关的代码都在这里│ ├── user.go <-- 领域实体 (Entity), 接口定义 (Repo Interface)│ ├── service.go <-- 应用服务 (Application Service)│ ├── pg_repo.go <-- 基础设施实现 (SQLC Repo)│ ├── handler.go <-- 接口适配器 (Connect-Go Handler)│ └── provider.go <-- 用户模块自己的 ProviderSet│├── order/ <-- 所有和"订单"相关的代码...│ ├── order.go│ ├── service.go│ └── provider.go <-- 订单模块自己的 ProviderSet│└── pkg/ <-- 跨模块共享的公共包 (DB连接, Logger) ├── db/ └── logger/
/cmd/server/wire.go <-- 管理最终的依赖注入实际上它可以称得上是整个应用除了main.go之外的第二个入口。
以用户模块举例
provider.go:
package user
import "github.com/google/wire"
// ProviderSet 包含了 user 模块所有的依赖var ProviderSet = wire.NewSet( // 提供应用层(仅示例,未给出具体代码) NewUserService,
// 提供数据层(仅示例,未给出具体代码) NewUserRepo,
// 绑定接口与实现 (这是在模块内部完成的,外部不知道细节) // 告诉 Wire: 当 NewUserService 需要 UserRepository 接口时,使用 UserPgRepo 这个 struct 来填充。 wire.Bind(new(UserRepository), new(*UserRepo)),
// 提供接口层 Handler NewUserHandler,)在wire.go中注册:
//go:build wireinject// +build wireinject// 上面这两行注释必不可少,用于wire识别。
package main
import ( "github.com/google/wire" "my-project/internal/user" // 导入 user 模块 "my-project/internal/pkg/db" // 导入共享的 DB 连接)
// initializeApp 注入器,拼积木func initializeApp() (*App, func(), error) { panic(wire.Build( // 一些共享的基础设施 conf.ProviderSet, logger.ProviderSet, db.ProviderSet, ... // 业务模块 user.ProviderSet, ... // App 的构造函数 NewApp, ))}开发环境:Devbox
Devbox 是基于 Nix 的开发环境管理工具,它能确保团队所有成员使用完全一致的开发环境,告别 “在我机器上能跑” 的问题。
初始化
# 安装 Devboxcurl -fsSL https://get.jetify.com/devbox | bash
# 在项目根目录初始化devbox init定义开发环境 devbox.json
{ "$schema": "https://raw.githubusercontent.com/jetify-com/devbox/main/.schema/devbox.schema.json", "packages": [ "go@1.23", "postgresql@16", "redis@7", "protobuf@28", "buf@latest", "sqlc@latest", "go-task@latest", "golangci-lint@latest" ], "shell": { "init_hook": [ "echo 'Welcome to my-project dev environment!'", "export GOPATH=$HOME/go", "export PATH=$PATH:$GOPATH/bin" ], "scripts": { "dev": "task dev", "test": "task test", "generate": "task generate" } }}使用
# 进入开发环境devbox shell
# 或者直接运行脚本devbox run devdevbox run test配置管理:Viper
定义配置文件: configs/config.yaml
通常我们需要这么一份类似于SpringBoot的appilication.yml,做一个通用的配置:
server: http: addr: :8000 timeout: 1s grpc: addr: :9000
data: database: driver: postgres source: postgres://user:pass@localhost:5432/mydb?sslmode=disable redis: addr: 127.0.0.1:6379 password: ""
log: level: debug filename: "app.log"定义与加载: internal/conf/conf.go
package conf
import ( "github.com/spf13/viper" "strings")
// Bootstrap 是所有配置的根节点type Bootstrap struct { Server *Server `mapstructure:"server"` Data *Data `mapstructure:"data"` Log *Log `mapstructure:"log"`63 collapsed lines
}
type Server struct { Http *Http `mapstructure:"http"` Grpc *Grpc `mapstructure:"grpc"`}
type Http struct { Addr string `mapstructure:"addr"` Timeout string `mapstructure:"timeout"`}
type Grpc struct { Addr string `mapstructure:"addr"`}
type Data struct { Database *Database `mapstructure:"database"` Redis *Redis `mapstructure:"redis"`}
type Database struct { Driver string `mapstructure:"driver"` Source string `mapstructure:"source"`}
type Redis struct { Addr string `mapstructure:"addr"` Password string `mapstructure:"password"`}
type Log struct { Level string `mapstructure:"level"` Filename string `mapstructure:"filename"`}
// Load 加载配置// path: 配置文件路径,例如 "./configs/config.yaml"func Load(path string) (*Bootstrap, error) { v := viper.New()
// 设置配置文件路径 v.SetConfigFile(path)
// 允许环境变量覆盖 (对 Docker/K8s 等部署非常重要) // 例如:export APP_DATA_DATABASE_SOURCE="xxx" 可以覆盖 yaml 里的配置 v.AutomaticEnv() // 将 . 替换为 _ (环境变量通常不允许有点) v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // 设置环境变量前缀,防止冲突,例如 APP_SERVER_HTTP_ADDR v.SetEnvPrefix("APP")
// 读取配置 if err := v.ReadInConfig(); err != nil { return nil, err }
// 将配置映射到结构体 var c Bootstrap if err := v.Unmarshal(&c, func(dc *mapstructure.DecoderConfig) { dc.TagName = "json" // 或者 "yaml", "mapstructure" }); err != nil { return nil, err }
return &c, nil}提供注入口internal/conf/provider.go:
package conf
import "github.com/google/wire"
var ProviderSet = wire.NewSet( // 提供提取 Log 配置的方法 func(c *Bootstrap) *Log { return c.Log }, // 提供提取 Data 配置的方法 func(c *Bootstrap) *Data { return c.Data }, // 提供提取 Server 配置的方法 func(c *Bootstrap) *Server { return c.Server }, // ...)使用:
假设启动配置在main.go这个入口文件里头,大概长这样:
func main() { // 程序启动,先加载配置 // 这里的路径可以通过命令行参数传入,更灵活 cfg, err := conf.Load("./configs/config.yaml") if err != nil { panic(err) }
// 传入配置,初始化 App app, cleanup, err := initApp(cfg) if err != nil { panic(err) } defer cleanup()
// 启动 app.Run()}日志采集:OpenTelemetry
日志、链路追踪(Trace)、指标(Metrics)三合一
初始化OTel:
infrastructure/trace/otel.go
假设我们已经有这么份版本配置文件并按如下注入:
package main// 这些变量默认为 "unknown" 或 "dev"// 在 go build 时会被覆盖var (// Version 语义化版本号,例如 v1.2.3Version = "dev"// CommitHash Git 提交哈希,用于精确定位代码CommitHash = "none"// BuildTime 构建时间BuildTime = "unknown")Terminal window # 获取当前 git tag 或者分支名VERSION=$(git describe --tags --always || echo "dev")# 获取 commit hashCOMMIT=$(git rev-parse --short HEAD)# 获取当前时间TIME=$(date +%FT%T%z)# 注入变量go build -ldflags \"-X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${TIME}" \-o bin/server ./cmd/server
package trace
import ( "context" "time"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0"45 collapsed lines
)
// InitProvider 初始化 OpenTelemetry// serviceName: 服务名称,如 "user-service"// endpoint: 收集器地址,如 "localhost:4318" (Jaeger 的 OTLP http 端口)func InitProvider(ctx context.Context, serviceName, endpoint string) (func(context.Context) error, error) { // 创建 Exporter (数据发送到哪里) // 这里使用 HTTP 协议发送到 Jaeger/Collector exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(endpoint), otlptracehttp.WithInsecure(), // 本地测试通常不用 TLS ) if err != nil { return nil, err }
// 创建 Resource (标识当前服务身份) res, err := resource.New(ctx, resource.WithAttributes( semconv.ServiceName(serviceName), semconv.ServiceVersion(Version), attribute.String("git.commit", CommitHash), attribute.String("build.time", BuildTime), ), ) if err != nil { return nil, err }
// 创建 TraceProvider (核心管理器) tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), // 批量发送,性能更好 sdktrace.WithResource(res), sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样率:100% (生产环境建议调低) )
// 设置全局 Provider otel.SetTracerProvider(tp)
// 设置全局 Propagator (传播格式) // W3C TraceContext 是现在的标准,能在不同语言/框架间传递 TraceID otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, ))
// 返回关闭函数,用于在 main 中平滑退出 return tp.Shutdown, nil}封装通用logger
两个核心库:
gopkg.in/natefinch/lumberjack.v2: 负责日志文件切割(归档)。go.uber.org/zap: 高性能日志核心。
package logger
import ( "context" "log/slog" "net/http" "os"
"go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/exp/zapslog" "go.uber.org/zap/zapcore"107 collapsed lines
"gopkg.in/natefinch/lumberjack.v2")
// Config 日志配置type Config struct { Level string // debug, info, warn, error Filename string // 日志文件路径,为空则只输出到控制台 MaxSize int // 单个文件最大尺寸 (MB) MaxBackups int // 保留旧文件最大个数 MaxAge int // 保留旧文件最大天数 Compress bool // 是否压缩}
// Init 初始化全局 slogfunc Init(cfg *Config) { // 设置日志级别 atomicLevel := zap.NewAtomicLevel() switch cfg.Level { case "debug": atomicLevel.SetLevel(zap.DebugLevel) case "warn": atomicLevel.SetLevel(zap.WarnLevel) case "error": atomicLevel.SetLevel(zap.ErrorLevel) default: atomicLevel.SetLevel(zap.InfoLevel) }
// Encoder (格式化) encoderConfig := zap.NewProductionEncoderConfig() encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder // 时间格式: 2026-01-13T12:00:00.000Z encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder // 级别格式: INFO, ERROR
var encoder zapcore.Encoder if cfg.Filename == "" { encoder = zapcore.NewConsoleEncoder(encoderConfig) // 本地开发: Console } else { encoder = zapcore.NewJSONEncoder(encoderConfig) // 生产环境: JSON }
// 配置 WriteSyncer (输出位置) var writer zapcore.WriteSyncer if cfg.Filename == "" { writer = zapcore.AddSync(os.Stdout) } else { lumberJackLogger := &lumberjack.Logger{ Filename: cfg.Filename, MaxSize: cfg.MaxSize, MaxBackups: cfg.MaxBackups, MaxAge: cfg.MaxAge, Compress: cfg.Compress, } // 双写:文件 + 控制台 (方便 Docker/K8s 采集) writer = zapcore.NewMultiWriteSyncer( zapcore.AddSync(lumberJackLogger), zapcore.AddSync(os.Stdout), ) }
// 创建 Zap Core core := zapcore.NewCore(encoder, writer, atomicLevel) zapLogger := zap.New(core, zap.AddCaller())
// 将 Zap 转换为 Slog Handler // zapslog.NewHandler 会把 slog 的请求转给 zap 处理 zapHandler := zapslog.NewHandler(zapLogger.Core(), nil)
// 包装 TraceHandler (实现自动提取 TraceID) finalHandler := &TraceHandler{Handler: zapHandler}
// 设置为全局默认 Logger // 之后在任何地方调用 slog.InfoContext() 都会经过这里 slog.SetDefault(slog.New(finalHandler))}
// ---------------------------------------------------------// TraceHandler: Slog 中间件// 用于从 Context 中提取 OTel TraceID 并注入到日志属性中// ---------------------------------------------------------
type TraceHandler struct { slog.Handler}
func (h *TraceHandler) Handle(ctx context.Context, r slog.Record) error { // 从 Context 提取 OpenTelemetry Span span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { traceID := span.SpanContext().TraceID().String() spanID := span.SpanContext().SpanID().String()
// 将 trace_id 和 span_id 添加到本次日志记录的属性中 r.AddAttrs( slog.String("trace_id", traceID), slog.String("span_id", spanID), ) }
// 调用下一层 Handler (即 Zap) return h.Handler.Handle(ctx, r)}
// WithAttrs 和 WithGroup 是 slog.Handler 接口必须实现的方法// 透传给底层 handlerfunc (h *TraceHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return &TraceHandler{Handler: h.Handler.WithAttrs(attrs)}}
func (h *TraceHandler) WithGroup(name string) slog.Handler { return &TraceHandler{Handler: h.Handler.WithGroup(name)}}注入Controller层(Chi + Connect-Go)
import ( "github.com/riandyrn/otelchi")
func NewRouter() *chi.Mux { r := chi.NewRouter()
// service-name 会显示在 Jaeger 的 Span 名称中 r.Use(otelchi.Middleware("my-project"))
// ... 其他路由 return r}注入Repo层(使用 pgx 原生支持)
import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/exaring/otelpgx")
func NewDB(cfg *conf.Data) *pgxpool.Pool { poolConfig, _ := pgxpool.ParseConfig(cfg.Database.Source)
// 安装 OTel 追踪器 poolConfig.ConnConfig.Tracer = otelpgx.NewTracer()
pool, _ := pgxpool.NewWithConfig(context.Background(), poolConfig) return pool}注册
func main() { // 加载配置 cfg, err := conf.Load("./configs/config.yaml") if err != nil { panic(err) } logger.Init(&logger.Config{ Level: cfg.Log.Level, Filename: cfg.Log.Filename, // ... }) ctx, cancel := context.WithCancel(context.Background())33 collapsed lines
defer cancel() otelShutdown, err := trace.InitProvider(ctx, "my-server-name", cfg.Trace.Endpoint) if err != nil { // 启动时如果连不上监控系统,panic slog.Error("Failed to init OTel provider", "err", err) panic(err) } // 注册平滑退出的清理函数 (Defer LIFO 原则) defer func() { // 创建一个独立的 context 用于关闭操作,给予 5秒 超时 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel()
if err := otelShutdown(shutdownCtx); err != nil { slog.Error("Failed to shutdown TracerProvider", "err", err) } else { slog.Info("TracerProvider shutdown success") } }()
// 初始化 Chi Router r := chi.NewRouter()
// 注册核心中间件 (顺序非常重要) // A. OTel: 生成 TraceID (必须第一) r.Use(otelchi.Middleware("my-server-name"))
// B. AccessLogger: 记录 HTTP 请求日志 (依赖 TraceID) r.Use(middleware.AccessLogger)
// C. Recovery: 兜底 Panic (依赖 Logger 和 TraceID),后面会在报错统一管理中提到 r.Use(middleware.Recoverer)
// .....后面省略
// ... 启动 App ...}使用
对于上下文的传递,一般建议使用
r.Context()
Controller
import "log/slog"
func (h *UserHandler) Login(ctx context.Context, req *connect.Request[userv1.LoginRequest]) (*connect.Response[userv1.LoginResponse], error) { // Connect-Go 自带 context // 使用 InfoContext 显式传递上下文,TraceID 会自动挂载 slog.InfoContext(ctx, "用户登录", "username", req.Msg.Username)
// 如果出错 if err != nil { slog.ErrorContext(ctx, "登录失败", "err", err) } // ....}Service
Service 层的方法签名,必须第一个参数是 context.Context。
import "log/slog"
type UserService struct{}
func (s *UserService) Get(ctx context.Context, id string) (*User, error) { // 模拟业务逻辑 if id == "" { slog.WarnContext(ctx, "参数校验失败: id为空", "id", id) return nil, errors.New("id required") }
// ... 数据库操作 ... slog.DebugContext(ctx, "查询数据库成功", "id", id)
return &User{ID: id, Name: "Go"}, nil}Repo
一样,必须传递ctx
import "log/slog"
func (r *UserPgRepo) FindByID(ctx context.Context, id int) (*User, error) { // SQLC 生成的 Queries 自带 context 支持 row, err := r.queries.GetUserByID(ctx, int64(id)) if err != nil { return nil, err } return r.toDomain(row), nil}细化监控业务逻辑耗时:
import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "log/slog")
// 定义一个 Tracer,通常包名作为 namevar tracer = otel.Tracer("internal/user")
func (s *UserService) CreateUser(ctx context.Context, req DTO) error { // 开启一个新的 Span,命名为 "hash_password"21 collapsed lines
// 这会在原来的 Trace 树上长出一个子分支 ctx, span := tracer.Start(ctx, "hash_password") defer span.End()
// ... 模拟耗时操作 ... time.Sleep(20 * time.Millisecond)
// 你还可以给 Span 加属性(Tag) // 这样你可以在 Jaeger 搜索栏通过 user.id=123 或 user.name=abc 搜到这条请求 span.SetAttributes(attribute.String("user.id", id)) span.SetAttributes(attribute.String("user.name", req.Name))
// 日志中也会自动带有 trace_id slog.WarnContext(ctx, "密码处理完毕")
// 业务检查 if id == "" { // 记录 Error 到 Span span.RecordError(errors.New("id required")) span.SetStatus(codes.Error, "id required") return nil, errors.New("id required") }
return nil}验证:Docker + Jaeger
version: "3"services: jaeger: image: jaegertracing/all-in-one:latest ports: - "16686:16686" # 前端 UI 端口 - "4318:4318" # OTLP HTTP 接收端口 (我们在 Go 代码里填的这个) environment: - LOG_LEVEL=debug路由:Chi + Connect-Go
Chi 是一个轻量级、符合标准库 net/http 的路由器。Connect-Go 是 Buf 团队开发的现代 RPC 框架,同时支持 gRPC、gRPC-Web 和 HTTP/JSON。
internal/├── user/ <-- 用户模块(自治)│ ├── handler.go <-- 定义 Connect-Go Handler│ ├── service.go│ ├── repo.go│ └── provider.go <-- Wire 的 ProviderSet├── order/ <-- 订单模块│ ├── handler.go│ └── ...└── server/ <-- 基础设施层(负责把大家组装起来) ├── http.go <-- Chi + Connect-Go 入口 └── grpc.goProto 定义
首先定义 Protobuf 文件:proto/user/v1/user.proto
syntax = "proto3";
package user.v1;
option go_package = "my-project/gen/user/v1;userv1";
message GetUserRequest { int64 id = 1;}
message GetUserResponse { int64 id = 1;14 collapsed lines
string username = 2; string email = 3;}
message CreateUserRequest { string username = 1; string email = 2; string password = 3;}
message CreateUserResponse { int64 id = 1;}
service UserService { rpc GetUser(GetUserRequest) returns (GetUserResponse); rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);}使用 Buf 生成代码:
buf.gen.yaml
version: v2plugins: - remote: buf.build/protocolbuffers/go out: gen opt: paths=source_relative - remote: buf.build/connectrpc/go out: gen opt: paths=source_relative模块内部:
以User为例定义Controller:internal/user/handler.go
package user
import ( "context"
"connectrpc.com/connect" userv1 "my-project/gen/user/v1" "my-project/gen/user/v1/userv1connect")
// Handler 用户模块的 Connect-Go 接口适配器type Handler struct {33 collapsed lines
svc *UserService // 依赖 Service}
// 确保实现接口var _ userv1connect.UserServiceHandler = (*Handler)(nil)
// GetUser 具体处理逻辑func (h *Handler) GetUser( ctx context.Context, req *connect.Request[userv1.GetUserRequest],) (*connect.Response[userv1.GetUserResponse], error) { user, err := h.svc.Get(ctx, int(req.Msg.Id)) if err != nil { return nil, err // 错误会被 Connect 拦截器统一处理 }
return connect.NewResponse(&userv1.GetUserResponse{ Id: int64(user.ID), Username: user.Username, Email: user.Email, }), nil}
func (h *Handler) CreateUser( ctx context.Context, req *connect.Request[userv1.CreateUserRequest],) (*connect.Response[userv1.CreateUserResponse], error) { // 调用 Service id, err := h.svc.Create(ctx, req.Msg) if err != nil { return nil, err }
return connect.NewResponse(&userv1.CreateUserResponse{ Id: int64(id), }), nil}模块内部暴露依赖:internal/user/provider.go
package user
import "github.com/google/wire"
var ProviderSet = wire.NewSet( NewUserRepo, NewUserService, // 对于handler,推荐直接使用,wire会自动注入。 wire.Struct(new(Handler), "*"),)集中组装:internal/server/http.go
package server
import ( "net/http"
"connectrpc.com/connect" "github.com/go-chi/chi/v5" chimiddleware "github.com/go-chi/chi/v5/middleware" "github.com/riandyrn/otelchi" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c"
56 collapsed lines
"my-project/gen/user/v1/userv1connect" "my-project/gen/order/v1/orderv1connect" "my-project/internal/conf" "my-project/internal/middleware" "my-project/internal/user" "my-project/internal/order")
// HTTPServer 包装 http.Servertype HTTPServer struct { Server *http.Server}
// 做一个集中入口函数// Wire 会自动把 user.Handler, order.Handler 注入进来func NewHTTPServer( cfg *conf.Server, // 引入配置 userHandler *user.Handler, // 用户模块 Handler orderHandler *order.Handler, // 订单模块 Handler) *HTTPServer {
r := chi.NewRouter()
// 中间件 r.Use(otelchi.Middleware("my-project")) r.Use(chimiddleware.RequestID) r.Use(middleware.AccessLogger) r.Use(middleware.Recoverer)
// Connect-Go 拦截器 (用于统一错误处理、日志等) interceptors := connect.WithInterceptors( middleware.NewErrorInterceptor(), middleware.NewLoggingInterceptor(), )
// 注册 Connect-Go 服务 // 每个模块的 Handler 都在这里挂载 userPath, userHandler := userv1connect.NewUserServiceHandler(userHandler, interceptors) r.Mount(userPath, userHandler)
orderPath, orderHandler := orderv1connect.NewOrderServiceHandler(orderHandler, interceptors) r.Mount(orderPath, orderHandler)
// 支持 HTTP/2 (Connect-Go 需要) srv := &http.Server{ Addr: cfg.Http.Addr, Handler: h2c.NewHandler(r, &http2.Server{}), }
return &HTTPServer{Server: srv}}
// Run 方法func (s *HTTPServer) Run() error { return s.Server.ListenAndServe()}// Shutdown 方法func (s *HTTPServer) Shutdown(ctx context.Context) error { return s.Server.Shutdown(ctx)}在wire.go中注册
func initApp(cfg *conf.Bootstrap) (*App, func(), error) { panic(wire.Build( // 基础设施 conf.ProviderSet, logger.ProviderSet,
// 业务模块 (它们会提供 xxx.Handler) user.ProviderSet, order.ProviderSet,
// Server 层 (消费 xxx.Handler) server.NewHTTPServer, // 这里会接收上面的 Handler 并生成 *HTTPServer
// App 启动入口 newApp, ))}权限校验:OpenFGA
OpenFGA 是 Auth0 开源的细粒度授权系统,基于 Google Zanzibar 论文实现。相比 Casbin,它支持更复杂的关系型权限模型(如 Google Drive 的分享权限),并且提供独立的授权服务,支持水平扩展。
核心概念
- Type: 对象类型,如
user,document,folder - Relation: 关系,如
owner,editor,viewer - Tuple: 权限元组,如
user:aliceiseditorofdocument:readme
定义授权模型
configs/fga-model.dsl
model schema 1.1
type user
type document relations define owner: [user] define editor: [user] or owner define viewer: [user] or editor
type folder relations define owner: [user] define editor: [user] or owner define viewer: [user] or editor define parent: [folder]
# 继承父文件夹的权限 define can_view: viewer or can_view from parent define can_edit: editor or can_edit from parent
type organization relations define admin: [user] define member: [user] or admin初始化 OpenFGA Client
internal/infrastructure/authz/openfga.go
package authz
import ( "context" "log/slog"
openfga "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/client" "my-project/internal/conf")
// Authorizer 授权检查器85 collapsed lines
type Authorizer struct { client *client.OpenFgaClient storeID string}
// NewAuthorizer 构造函数func NewAuthorizer(cfg *conf.Auth) (*Authorizer, error) { fgaClient, err := client.NewSdkClient(&client.ClientConfiguration{ ApiUrl: cfg.OpenFGA.ApiURL, // 如 http://localhost:8080 StoreId: cfg.OpenFGA.StoreID, // Store ID }) if err != nil { return nil, err }
return &Authorizer{ client: fgaClient, storeID: cfg.OpenFGA.StoreID, }, nil}
// Check 检查权限// user: "user:alice"// relation: "viewer"// object: "document:readme"func (a *Authorizer) Check(ctx context.Context, user, relation, object string) (bool, error) { resp, err := a.client.Check(ctx).Body(client.ClientCheckRequest{ User: user, Relation: relation, Object: object, }).Execute()
if err != nil { slog.ErrorContext(ctx, "OpenFGA check failed", "err", err, "user", user, "relation", relation, "object", object) return false, err }
return resp.GetAllowed(), nil}
// WriteTuple 写入权限元组func (a *Authorizer) WriteTuple(ctx context.Context, user, relation, object string) error { _, err := a.client.Write(ctx).Body(client.ClientWriteRequest{ Writes: []client.ClientTupleKey{ { User: user, Relation: relation, Object: object, }, }, }).Execute()
if err != nil { slog.ErrorContext(ctx, "OpenFGA write failed", "err", err) return err }
return nil}
// DeleteTuple 删除权限元组func (a *Authorizer) DeleteTuple(ctx context.Context, user, relation, object string) error { _, err := a.client.Write(ctx).Body(client.ClientWriteRequest{ Deletes: []client.ClientTupleKeyWithoutCondition{ { User: user, Relation: relation, Object: object, }, }, }).Execute()
return err}
// ListObjects 列出用户有权限的对象func (a *Authorizer) ListObjects(ctx context.Context, user, relation, objectType string) ([]string, error) { resp, err := a.client.ListObjects(ctx).Body(client.ClientListObjectsRequest{ User: user, Relation: relation, Type: objectType, }).Execute()
if err != nil { return nil, err }
return resp.GetObjects(), nil}Provider
internal/infrastructure/authz/provider.go
package authz
import "github.com/google/wire"
var ProviderSet = wire.NewSet(NewAuthorizer)中间件
internal/middleware/authz.go
package middleware
import ( "context" "encoding/json" "fmt" "net/http" "strings"
"my-project/internal/infrastructure/authz" "my-project/pkg/apierror")79 collapsed lines
// OpenFGAMiddleware 权限校验中间件// resourceExtractor: 从请求中提取资源标识,如 "/documents/123" -> "document:123"func OpenFGAMiddleware( authorizer *authz.Authorizer, resourceExtractor func(r *http.Request) (objectType, objectID, relation string),) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context()
// 从 Context 获取用户信息 (由 JWT 中间件注入) userID, ok := ctx.Value(CtxKeyUID).(uint) if !ok { apierror.WriteError(w, apierror.NewUnauthorized("未登录")) return }
// 提取资源信息 objectType, objectID, relation := resourceExtractor(r) if objectType == "" || objectID == "" { // 无需权限校验的路由,直接放行 next.ServeHTTP(w, r) return }
// 构建 OpenFGA 格式 user := fmt.Sprintf("user:%d", userID) object := fmt.Sprintf("%s:%s", objectType, objectID)
// 检查权限 allowed, err := authorizer.Check(ctx, user, relation, object) if err != nil { apierror.WriteError(w, apierror.NewInternal(err)) return }
if !allowed { apierror.WriteError(w, apierror.NewForbidden("没有权限访问该资源")) return }
next.ServeHTTP(w, r) }) }}
// DefaultResourceExtractor 默认资源提取器// 根据 HTTP Method 映射到 OpenFGA relationfunc DefaultResourceExtractor(r *http.Request) (objectType, objectID, relation string) { path := r.URL.Path
// 示例: /api/v1/documents/123 parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) < 3 { return "", "", "" }
// 提取资源类型和 ID // /api/v1/documents/123 -> objectType="document", objectID="123" resourcePart := parts[2] // "documents" objectType = strings.TrimSuffix(resourcePart, "s") // 去掉复数 s
if len(parts) >= 4 { objectID = parts[3] } else { return "", "", "" }
// HTTP Method -> Relation 映射 switch r.Method { case http.MethodGet: relation = "viewer" case http.MethodPut, http.MethodPatch: relation = "editor" case http.MethodDelete: relation = "owner" default: relation = "viewer" }
return objectType, objectID, relation}注册
func NewHTTPServer( cfg *conf.Server, tokenizer *auth.Tokenizer, authorizer *authz.Authorizer, // Wire 注入 OpenFGA // ...) *HTTPServer { r := chi.NewRouter()
// 先认证 r.Use(middleware.JWTAuth(tokenizer))
// 后鉴权 (需要资源级别权限的路由) r.Route("/api/v1/documents", func(r chi.Router) { r.Use(middleware.OpenFGAMiddleware(authorizer, middleware.DefaultResourceExtractor)) r.Get("/{id}", documentHandler.Get) r.Put("/{id}", documentHandler.Update) r.Delete("/{id}", documentHandler.Delete) })
// ...}在 Service 层使用
假设你有这么个document的Service,仅举例。
type DocumentService struct { repo document.Repository authorizer *authz.Authorizer}
// Create 创建文档时自动授予 owner 权限func (s *DocumentService) Create(ctx context.Context, userID uint, req CreateReq) (*Document, error) { // 创建文档 doc, err := s.repo.Create(ctx, req.ToDomain()) if err != nil { return nil, err }42 collapsed lines
// 写入权限元组: user:123 is owner of document:456 user := fmt.Sprintf("user:%d", userID) object := fmt.Sprintf("document:%d", doc.ID)
if err := s.authorizer.WriteTuple(ctx, user, "owner", object); err != nil { // 权限写入失败,回滚文档创建 _ = s.repo.Delete(ctx, doc.ID) return nil, err }
return doc, nil}
// Share 分享文档func (s *DocumentService) Share(ctx context.Context, docID uint, targetUserID uint, role string) error { user := fmt.Sprintf("user:%d", targetUserID) object := fmt.Sprintf("document:%d", docID)
// role: "viewer", "editor" return s.authorizer.WriteTuple(ctx, user, role, object)}
// ListMyDocuments 列出用户有权限查看的所有文档func (s *DocumentService) ListMyDocuments(ctx context.Context, userID uint) ([]Document, error) { user := fmt.Sprintf("user:%d", userID)
// 获取用户可以查看的所有文档 ID objects, err := s.authorizer.ListObjects(ctx, user, "viewer", "document") if err != nil { return nil, err }
// objects: ["document:1", "document:2", ...] ids := make([]int, 0, len(objects)) for _, obj := range objects { parts := strings.Split(obj, ":") if len(parts) == 2 { if id, err := strconv.Atoi(parts[1]); err == nil { ids = append(ids, id) } } }
return s.repo.FindByIDs(ctx, ids)}配置
configs/config.yaml
auth: jwt: secret: "your-secret-key" issuer: "my-project" expire: 86400
openfga: api_url: "http://localhost:8080" # OpenFGA 服务地址 store_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV"Docker Compose 启动 OpenFGA
services: openfga: image: openfga/openfga:latest command: run environment: - OPENFGA_DATASTORE_ENGINE=postgres - OPENFGA_DATASTORE_URI=postgres://user:pass@postgres:5432/openfga?sslmode=disable ports: - "8080:8080" # HTTP API - "8081:8081" # gRPC API - "3000:3000" # Playground UI depends_on: - postgres与 Casbin 的对比
| 特性 | Casbin | OpenFGA |
|---|---|---|
| 部署模式 | 嵌入式库 | 独立服务 |
| 权限模型 | RBAC/ABAC/ACL | ReBAC (关系型) |
| 扩展性 | 单机 | 水平扩展 |
| 权限继承 | 有限支持 | 原生支持复杂继承 |
| 适用场景 | 简单权限控制 | 复杂资源共享场景 |
| 学习曲线 | 中等(主要是语法) | 中等 |
Casbin的csv配置的语法学习曲线比较麻烦的同时难以调试,OpenFGA算是Cerbos的进化版,虽然是自己作为独立的服务,增加了请求的步骤以及时间,但在代码上算是彻底解耦,自带Playground调试的同时适合后续分布式扩展。
同时,解决了要维护permissions、roles、role_permissions、user_roles这四张经典RBAC表的问题。
当然,users表还是得有的。
认证:JWT
定义工具组件
internal/infrastructure/auth/jwt.go
package auth
import ( "errors" "time"
"github.com/golang-jwt/jwt/v5" "my-project/internal/conf")
// MyClaims 自定义载荷type MyClaims struct {52 collapsed lines
UserID uint `json:"uid"` Role string `json:"role"` // 放入角色,给 OpenFGA 用 jwt.RegisteredClaims}
// Tokenizer 负责签发和解析type Tokenizer struct { secret []byte issuer string expire time.Duration}
// NewTokenizer 构造函数 (从配置中读取密钥)func NewTokenizer(cfg *conf.Auth) *Tokenizer { return &Tokenizer{ secret: []byte(cfg.Jwt.Secret), issuer: cfg.Jwt.Issuer, expire: time.Duration(cfg.Jwt.Expire) * time.Second, }}
// Sign 签发 Tokenfunc (t *Tokenizer) Sign(userID uint, role string) (string, error) { claims := MyClaims{ UserID: userID, Role: role, RegisteredClaims: jwt.RegisteredClaims{ ExpiresAt: jwt.NewNumericDate(time.Now().Add(t.expire)), // 过期时间 IssuedAt: jwt.NewNumericDate(time.Now()), // 签发时间 Issuer: t.issuer, // 签发人 }, }
// 使用 HS256 算法进行签名 token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
// 生成签名字符串 return token.SignedString(t.secret)}
// Parse 解析 Token (给中间件用)func (t *Tokenizer) Parse(tokenString string) (*MyClaims, error) { token, err := jwt.ParseWithClaims(tokenString, &MyClaims{}, func(token *jwt.Token) (interface{}, error) { return t.secret, nil })
if err != nil { return nil, err }
if claims, ok := token.Claims.(*MyClaims); ok && token.Valid { return claims, nil }
return nil, errors.New("invalid token")}提供Provider:
internal/infrastructure/auth/provider.go
package auth
import "github.com/google/wire"
var ProviderSet = wire.NewSet(NewTokenizer)中间件
internal/middleware/jwt.go
package middleware
import ( "context" "net/http" "strings"
"my-project/internal/infrastructure/auth" "my-project/pkg/apierror")
type ctxKey string34 collapsed lines
const ( CtxKeyUID ctxKey = "uid" CtxKeyRole ctxKey = "role")
// JWTAuth 构造函数需要传入 Tokenizerfunc JWTAuth(tokenizer *auth.Tokenizer) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { authHeader := r.Header.Get("Authorization") if authHeader == "" { apierror.WriteError(w, apierror.NewUnauthorized("未携带Token")) return }
// 提取 Bearer 后面的部分 parts := strings.SplitN(authHeader, " ", 2) if len(parts) != 2 || parts[0] != "Bearer" { apierror.WriteError(w, apierror.NewUnauthorized("Token格式错误")) return }
// 调用工具解析 claims, err := tokenizer.Parse(parts[1]) if err != nil { apierror.WriteError(w, apierror.NewUnauthorized("Token无效或已过期")) return }
// 注入 Context ctx := context.WithValue(r.Context(), CtxKeyUID, claims.UserID) ctx = context.WithValue(ctx, CtxKeyRole, claims.Role)
next.ServeHTTP(w, r.WithContext(ctx)) }) }}注册:
func NewHTTPServer( cfg *conf.Server, tokenizer *auth.Tokenizer, // Wire 注入 authorizer *authz.Authorizer, // Wire 注入 OpenFGA // ...) *HTTPServer { r := chi.NewRouter()
r.Use(middleware.JWTAuth(tokenizer)) r.Use(middleware.OpenFGAMiddleware(authorizer, middleware.DefaultResourceExtractor))
// ...}使用:
假设你有一个UserService
type UserService struct { // ... 其他依赖 tokenizer *auth.Tokenizer // 注入 Token 工具}
func (s *UserService) Login(ctx context.Context, req LoginReq) (*LoginResp, error) { // 校验用户和密码 (省略)... user, _ := s.repo.FindByUsername(ctx, req.Username)
// 签发 Token token, err := s.tokenizer.Sign(user.ID, user.Role) if err != nil { return nil, err }
return &LoginResp{AccessToken: token}, nil}表单验证:
Connect-Go 配合 Protobuf 使用时,推荐使用 protovalidate 进行验证。
Proto 定义验证规则
syntax = "proto3";
package user.v1;
import "buf/validate/validate.proto";
message CreateUserRequest { string username = 1 [(buf.validate.field).string = { min_len: 3, max_len: 20 }];
string email = 2 [(buf.validate.field).string.email = true];
string password = 3 [(buf.validate.field).string = { min_len: 6, max_len: 128 }];
string role = 4 [(buf.validate.field).string = { in: ["user", "admin"] }];}Connect-Go 拦截器
package middleware
import ( "context"
"buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate" "connectrpc.com/connect" "github.com/bufbuild/protovalidate-go" "google.golang.org/protobuf/proto"
"my-project/pkg/apierror")
func NewValidationInterceptor() connect.UnaryInterceptorFunc { validator, _ := protovalidate.New()
return func(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { // 验证请求 if msg, ok := req.Any().(proto.Message); ok { if err := validator.Validate(msg); err != nil { return nil, apierror.NewValidationError(err) } } return next(ctx, req) } }}对于复杂验证逻辑
在 Service 层使用 Ozzo-Validation:
import ( validation "github.com/go-ozzo/ozzo-validation/v4" "github.com/go-ozzo/ozzo-validation/v4/is")
type ComplexForm struct { Name string Age int Emails []string Type string // "A" or "B" ExtraData string}
// 实现 Validatable 接口func (f ComplexForm) Validate() error { return validation.ValidateStruct(&f, // 简单字段校验 validation.Field(&f.Name, validation.Required, validation.Length(5, 20)),
// 嵌套数组校验:每个 Email 必须是 email 格式 validation.Field(&f.Emails, validation.Each(is.Email)),
// 条件校验:当 Type="A" 时,ExtraData 必填 validation.Field(&f.ExtraData, validation.When(f.Type == "A", validation.Required).Else(validation.Nil)), )}ORM:SQLC (Postgres + Pgx)
SQLC 是编译期 SQL 代码生成器,它根据你写的 SQL 语句生成类型安全的 Go 代码。相比于 ORM,它更接近原生 SQL,性能更好。
为什么选SQLC,从我的角度出发:
- 回归到写
SQL这种最本质的东西上,无需应付各种ORM框架的语法,后续想怎么迁移就怎么迁移。 - 写
SQL被ORM代替本质上是因为麻烦,但现在LLM最不怕的就是麻烦,文档反正都是要写的,文档写好了它也懂怎么生成。 LLM写的SQL人类可以Review,退一万步来说,初级开发者自己写的东西还不如$20一个月的Claude Code会员(
目录结构大致如下
/cmd/server/internal/├── data/ <-- 【基础数据层】数据库模型│ ├── db/ <-- SQLC 生成的代码│ │ ├── db.go <-- DBTX 接口│ │ ├── models.go <-- 生成的模型 (PO)│ │ ├── querier.go <-- 查询接口│ │ └── user.sql.go <-- 生成的查询实现│ ├── migrations/ <-- SQL 迁移文件│ │ ├── 001_init.up.sql│ │ └── 001_init.down.sql│ ├── queries/ <-- 原始 SQL 文件│ │ └── user.sql│ └── client.go <-- pgxpool 初始化│├── user/ <-- 【用户模块】│ ├── domain.go <-- 领域实体 (DO) & 接口定义 (核心)│ ├── service.go <-- 应用服务 (只操作 DO)│ └── infrastructure/ <-- 基础设施实现│ └── persistence/│ └── user_repo.go <-- 实现 Repo 接口,负责 DO <-> PO 转换│└── order/ <-- 【其它你可能有的模块】 └── ...SQLC 配置
sqlc.yaml
version: "2"sql: - engine: "postgresql" queries: "internal/data/queries" schema: "internal/data/migrations" gen: go: package: "db" out: "internal/data/db" sql_package: "pgx/v5" emit_json_tags: true emit_interface: true emit_empty_slices: true定义 SQL 查询
internal/data/queries/user.sql
-- name: GetUserByID :oneSELECT id, username, email, password_hash, role, created_at, updated_atFROM usersWHERE id = $1;
-- name: GetUserByUsername :oneSELECT id, username, email, password_hash, role, created_at, updated_atFROM usersWHERE username = $1;
-- name: CreateUser :oneINSERT INTO users (username, email, password_hash, role)18 collapsed lines
VALUES ($1, $2, $3, $4)RETURNING id, created_at;
-- name: UpdateUser :execUPDATE usersSET username = $2, email = $3, updated_at = NOW()WHERE id = $1;
-- name: DeleteUser :execDELETE FROM users WHERE id = $1;
-- name: ListUsers :manySELECT id, username, email, role, created_atFROM usersORDER BY created_at DESCLIMIT $1 OFFSET $2;
-- name: GetUserForUpdate :oneSELECT id, username, email, versionFROM usersWHERE id = $1FOR UPDATE;执行 sqlc generate 生成代码。
领域层定义
这里定义的是业务眼中的用户,完全不知道数据库是什么。
简称为DO。
package user
import "context"
// User 领域实体 (Domain Object)// 这是一个纯粹的结构体,没有 tag (或者只有 json tag),不依赖 sqlctype User struct { ID int Username string Email string Role string // 这里可以定义领域行为,例如: // func (u *User) ChangePassword(newPwd string) { ... }}
// Repository 接口定义// Service 层只依赖这个接口type Repository interface { FindByID(ctx context.Context, id int) (*User, error) Save(ctx context.Context, user *User) (int, error) // ... 等方法,仅作为示范}基建层:
internal/user/infrastructure/persistence/user_repo.go
负责将DO转为PO,以及PO转为DO
package persistence
import ( "context" "errors"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool"
"my-project/internal/data/db" "my-project/internal/user" "my-project/pkg/apierror"46 collapsed lines
)
type UserRepo struct { pool *pgxpool.Pool queries *db.Queries}
func NewUserRepo(pool *pgxpool.Pool) user.Repository { return &UserRepo{ pool: pool, queries: db.New(pool), }}
// toDomain 将 SQLC 的 PO 转换为 业务的 DOfunc (r *UserRepo) toDomain(row db.User) *user.User { return &user.User{ ID: int(row.ID), Username: row.Username, Email: row.Email, Role: row.Role, }}
// FindByID 实现接口func (r *UserRepo) FindByID(ctx context.Context, id int) (*user.User, error) { row, err := r.queries.GetUserByID(ctx, int64(id)) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, apierror.NewNotFound("user", id) } return nil, err }
return r.toDomain(row), nil}
// Save 实现接口func (r *UserRepo) Save(ctx context.Context, u *user.User) (int, error) { result, err := r.queries.CreateUser(ctx, db.CreateUserParams{ Username: u.Username, Email: u.Email, PasswordHash: u.PasswordHash, Role: u.Role, }) if err != nil { return 0, err } return int(result.ID), nil}使用:
仅提供给Service调用user_repo.go
package user
type Service struct { repo Repository}
func (s *Service) GetUserInfo(ctx context.Context, id int) (*User, error) { // s.repo.FindByID 返回的是 *User (DO),不是 sqlc 生成的类型 return s.repo.FindByID(ctx, id)}为什么多一层中间层:
虽然 SQLC 生成的代码很干净,但过度依赖其生成的 Struct 穿透到 Service 甚至 Controller 层,会导致整个项目与数据库表结构强绑定。通过 Repository 返回纯净的 DO,我们可以在 Repo 内部灵活切换实现(加缓存、换数据库),而 Service 层完全无感知。
对于DTO<->DO,推荐在DTO内部完成:
在Go里面,Function做不了重载,但是对于Method来说,因为挂载的对象不同,自然可以使用同名。
虽然有 struct copy 的开销,但在 Go 中这种内存操作极快(纳秒级),相比于 DB I/O(毫秒级)可以忽略不计。
换来的是业务逻辑与底层实现的彻底解耦,这在长期维护中是非常划算的。
package dto
import "my-project/internal/user" // 引用DO实体所在的包
type RegisterReq struct { Username string Password string}
// ToDomain 将 RegisterReq 转为 User 实体func (r *RegisterReq) ToDomain() *user.User { return &user.User{ Username: r.Username, // 这里还可以做一些简单的非业务逻辑处理 Password: r.Password, }}事务:
只是这样的话,并不足以在生产环境中使用,没有事务没有锁的数据库操作只能算个玩具。
定义:
我们来做一个通用的事务管理器internal/data/tm.go
package data
import ( "context" "errors" "log/slog" "time"
"github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool"
68 collapsed lines
"my-project/internal/data/db")
// 定义 Context Key,私有类型防止冲突type txKey struct{}
// GetQueriesFromContext 供 Repo 层提取 Queries// 如果 Context 里有事务,返回事务内的 Queries;否则返回 nil。func GetQueriesFromContext(ctx context.Context) *db.Queries { if q, ok := ctx.Value(txKey{}).(*db.Queries); ok { return q } return nil}
// TransactionManager 接口type TransactionManager interface { Exec(ctx context.Context, fn func(ctx context.Context) error) error}
// PgxTransaction 具体实现type PgxTransaction struct { pool *pgxpool.Pool}
func NewTransactionManager(pool *pgxpool.Pool) TransactionManager { return &PgxTransaction{pool: pool}}
// Exec 执行事务func (tm *PgxTransaction) Exec(ctx context.Context, fn func(ctx context.Context) error) error { txID := uuid.New().String() log := slog.With("tx_id", txID, "component", "tx_manager")
start := time.Now()
// 开启事务 tx, err := tm.pool.Begin(ctx) if err != nil { log.ErrorContext(ctx, "Tx: Begin Failed", "err", err) return err }
log.DebugContext(ctx, "Tx: Begin")
// 创建事务内的 Queries queries := db.New(tx) ctxWithTx := context.WithValue(ctx, txKey{}, queries)
defer func() { if p := recover(); p != nil { _ = tx.Rollback(ctx) panic(p) } }()
// 执行业务逻辑 (传入带有 tx 的 context) if err := fn(ctxWithTx); err != nil { if rerr := tx.Rollback(ctx); rerr != nil { log.ErrorContext(ctx, "Tx: Rollback Failed", "rollback_err", rerr, "origin_err", err) } return err }
// 提交 if err := tx.Commit(ctx); err != nil { return err }
log.DebugContext(ctx, "Tx: Committed", "duration", time.Since(start)) return nil}这个时候,刚刚的user_repo.go就要多实现一个方法getQueries,来判断到底是普通queries还是事务queries:
package persistence
import ( "my-project/internal/data" "my-project/internal/data/db")
type UserRepo struct { pool *pgxpool.Pool queries *db.Queries}
57 collapsed lines
func NewUserRepo(pool *pgxpool.Pool) user.Repository { return &UserRepo{ pool: pool, queries: db.New(pool), }}
func (r *UserRepo) getQueries(ctx context.Context) *db.Queries { // 尝试从 Context 中获取事务 Queries if q := data.GetQueriesFromContext(ctx); q != nil { return q } // 如果没有事务,返回基础 Queries return r.queries}
// 使用示例func (r *UserRepo) Save(ctx context.Context, u *user.User) (int, error) { // 使用 getQueries(ctx) 替代 r.queries // 如果 Service 层开启了事务,这里的 Create 就是在事务中执行的 result, err := r.getQueries(ctx).CreateUser(ctx, db.CreateUserParams{ Username: u.Username, Email: u.Email, PasswordHash: u.PasswordHash, Role: u.Role, }) if err != nil { return 0, err } return int(result.ID), nil}
// 悲观锁例子func (r *UserRepo) FindByIDForUpdate(ctx context.Context, id int) (*user.User, error) { // 使用 FOR UPDATE 查询(需要在 SQL 中定义) row, err := r.getQueries(ctx).GetUserForUpdate(ctx, int64(id)) if err != nil { return nil, err } return r.toDomain(row), nil}
// 乐观锁CAS例子 (需要表有 version 字段)func (r *UserRepo) UpdateWithVersion(ctx context.Context, u *user.User) error { // SQL: UPDATE users SET name=$2, version=version+1 WHERE id=$1 AND version=$3 affected, err := r.getQueries(ctx).UpdateUserWithVersion(ctx, db.UpdateUserWithVersionParams{ ID: int64(u.ID), Name: u.Username, Version: u.Version, })
if err != nil { return err }
if affected == 0 { return apierror.NewConflict("数据已被修改,请刷新重试") }
return nil}对于service来说则是完全不关心user_repo.go里面做了什么,只需要调用事务即可:
type UserService struct { tm data.TransactionManager // 注入事务管理器 userRepo user.Repository logRepo log.Repository}
func (s *UserService) Register(ctx context.Context, req Req) error { // 开启事务 return s.tm.Exec(ctx, func(ctx context.Context) error { // 创建用户 (传递 ctx) if err := s.userRepo.Save(ctx, user); err != nil { return err // 返回 error 会触发 Rollback }
// 记录操作日志 (传递 ctx) // 因为 ctx 里带了 tx,所以 userRepo 和 logRepo 实际上是在同一个 db 事务里 if err := s.logRepo.Add(ctx, "user created"); err != nil { return err }
return nil // 返回 nil 触发 Commit })}报错统一管理:RFC 7807 + 自定义扩展
RFC 7807 是 IETF 定义的 Problem Details for HTTP APIs 标准,它规定了一个统一的错误响应格式。我们在此基础上扩展自定义的 code 和 trace_id。
定义错误结构体
pkg/apierror/error.go
package apierror
import ( "context" "encoding/json" "errors" "fmt" "net/http"
"go.opentelemetry.io/otel/trace")
59 collapsed lines
// ProblemDetail RFC 7807 + 自定义扩展type ProblemDetail struct { // RFC 7807 标准字段 Type string `json:"type"` // 错误类型 URI Title string `json:"title"` // 简短标题 Status int `json:"status"` // HTTP 状态码 Detail string `json:"detail,omitempty"` // 详细描述 Instance string `json:"instance,omitempty"` // 请求路径
// 自定义扩展字段 Code string `json:"code"` // 业务错误码 TraceID string `json:"trace_id,omitempty"` // 链路追踪 ID
// 验证错误详情 Errors []FieldError `json:"errors,omitempty"`
// 额外字段 (用于传递给前端的参数) Extensions map[string]any `json:"extensions,omitempty"`
// 内部字段 (不序列化) raw error `json:"-"`}
// FieldError 字段级错误type FieldError struct { Field string `json:"field"` Code string `json:"code"` Message string `json:"message"`}
// Error 实现 error 接口func (p *ProblemDetail) Error() string { return p.Detail}
// Unwrap 支持 errors.Is/Asfunc (p *ProblemDetail) Unwrap() error { return p.raw}
// WithRaw 携带原始错误func (p *ProblemDetail) WithRaw(err error) *ProblemDetail { p.raw = err return p}
// WithExtension 添加扩展字段func (p *ProblemDetail) WithExtension(key string, value any) *ProblemDetail { if p.Extensions == nil { p.Extensions = make(map[string]any) } p.Extensions[key] = value return p}
// WithTraceID 从 context 注入 TraceIDfunc (p *ProblemDetail) WithTraceID(ctx context.Context) *ProblemDetail { span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { p.TraceID = span.SpanContext().TraceID().String() } return p}错误码定义
pkg/apierror/codes.go
package apierror
// 通用错误码const ( CodeSuccess = "SUCCESS" CodeInternalError = "INTERNAL_ERROR" CodeInvalidParams = "INVALID_PARAMS" CodeUnauthorized = "UNAUTHORIZED" CodeForbidden = "FORBIDDEN" CodeNotFound = "NOT_FOUND" CodeConflict = "CONFLICT" CodeTooManyRequests = "TOO_MANY_REQUESTS")
// 用户模块错误码const ( CodeUserNotFound = "USER_NOT_FOUND" CodeInvalidCredentials = "INVALID_CREDENTIALS" CodeUsernameExists = "USERNAME_EXISTS" CodeUserBanned = "USER_BANNED")
// 数据库错误码const ( CodeDBRecordNotFound = "DB_RECORD_NOT_FOUND" CodeDBDuplicateEntry = "DB_DUPLICATE_ENTRY" CodeDBDeadlock = "DB_DEADLOCK" CodeDBLockTimeout = "DB_LOCK_TIMEOUT")构造函数
pkg/apierror/factory.go
package apierror
import "fmt"
const typeBase = "https://api.myproject.com/errors"
// NewNotFound 404 错误func NewNotFound(entity string, id any) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/not-found", Title: "Resource Not Found", Status: 404,68 collapsed lines
Detail: fmt.Sprintf("%s with id '%v' not found", entity, id), Code: CodeNotFound, }}
// NewUnauthorized 401 错误func NewUnauthorized(detail string) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/unauthorized", Title: "Unauthorized", Status: 401, Detail: detail, Code: CodeUnauthorized, }}
// NewForbidden 403 错误func NewForbidden(detail string) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/forbidden", Title: "Forbidden", Status: 403, Detail: detail, Code: CodeForbidden, }}
// NewBadRequest 400 错误func NewBadRequest(code, detail string) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/bad-request", Title: "Bad Request", Status: 400, Detail: detail, Code: code, }}
// NewConflict 409 错误func NewConflict(detail string) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/conflict", Title: "Conflict", Status: 409, Detail: detail, Code: CodeConflict, }}
// NewValidationError 验证错误func NewValidationError(errs ...FieldError) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/validation-error", Title: "Validation Error", Status: 400, Detail: "一个或多个参数校验失败", Code: CodeInvalidParams, Errors: errs, }}
// NewInternal 500 错误 (隐藏内部细节)func NewInternal(raw error) *ProblemDetail { return &ProblemDetail{ Type: typeBase + "/internal-error", Title: "Internal Server Error", Status: 500, Detail: "服务器开小差了,请稍后重试", Code: CodeInternalError, raw: raw, }}错误翻译器(Postgres)
pkg/apierror/translator.go
package apierror
import ( "errors"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn")
// TranslatePgError 将 Postgres 错误翻译为 ProblemDetailfunc TranslatePgError(err error) *ProblemDetail { if err == nil {57 collapsed lines
return nil }
// 记录不存在 if errors.Is(err, pgx.ErrNoRows) { return NewNotFound("record", "unknown").WithRaw(err) }
// Postgres 驱动错误 var pgErr *pgconn.PgError if errors.As(err, &pgErr) { switch pgErr.Code { case "23505": // unique_violation return &ProblemDetail{ Type: typeBase + "/duplicate-entry", Title: "Duplicate Entry", Status: 409, Detail: "数据已存在", Code: CodeDBDuplicateEntry, }.WithExtension("constraint", pgErr.ConstraintName).WithRaw(err)
case "23503": // foreign_key_violation return NewBadRequest(CodeInvalidParams, "关联数据不存在或被占用"). WithExtension("detail", pgErr.Detail). WithRaw(err)
case "40001": // serialization_failure return NewConflict("数据冲突,请重试"). WithExtension("reason", "serialization_failure"). WithRaw(err)
case "40P01": // deadlock_detected return &ProblemDetail{ Type: typeBase + "/deadlock", Title: "Database Deadlock", Status: 503, Detail: "系统繁忙,请稍后重试", Code: CodeDBDeadlock, }.WithExtension("reason", "deadlock").WithRaw(err) } }
// 未知错误 return NewInternal(err)}
// IsLockError 判断是否是锁相关错误func IsLockError(err error) (bool, string) { var pgErr *pgconn.PgError if errors.As(err, &pgErr) { switch pgErr.Code { case "40001": return true, "SERIALIZATION_FAILURE" case "40P01": return true, "DEADLOCK" case "55P03": return true, "LOCK_NOT_AVAILABLE" } } return false, ""}HTTP 响应工具
pkg/apierror/response.go
package apierror
import ( "encoding/json" "log/slog" "net/http")
// WriteError 写入错误响应func WriteError(w http.ResponseWriter, p *ProblemDetail) { w.Header().Set("Content-Type", "application/problem+json") w.WriteHeader(p.Status) json.NewEncoder(w).Encode(p)}
// WriteJSON 写入成功响应func WriteJSON(w http.ResponseWriter, status int, data any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data)}Connect-Go 错误拦截器
internal/middleware/error_interceptor.go
package middleware
import ( "context" "errors" "log/slog"
"connectrpc.com/connect" "google.golang.org/protobuf/encoding/protojson"
"my-project/pkg/apierror")72 collapsed lines
func NewErrorInterceptor() connect.UnaryInterceptorFunc { return func(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { resp, err := next(ctx, req) if err == nil { return resp, nil }
// 尝试断言为 ProblemDetail var problem *apierror.ProblemDetail if errors.As(err, &problem) { // 注入 TraceID problem.WithTraceID(ctx)
// 记录日志 logError(ctx, problem)
// 转换为 Connect 错误 return nil, toConnectError(problem) }
// 未知错误,包装为内部错误 internal := apierror.NewInternal(err).WithTraceID(ctx) logError(ctx, internal) return nil, toConnectError(internal) } }}
func logError(ctx context.Context, p *apierror.ProblemDetail) { attrs := []any{ slog.String("code", p.Code), slog.Int("status", p.Status), slog.String("trace_id", p.TraceID), }
if p.raw != nil { attrs = append(attrs, slog.Any("err", p.raw)) slog.ErrorContext(ctx, p.Detail, attrs...) } else { slog.WarnContext(ctx, p.Detail, attrs...) }}
func toConnectError(p *apierror.ProblemDetail) *connect.Error { code := httpToConnectCode(p.Status) cerr := connect.NewError(code, errors.New(p.Detail))
// 将 ProblemDetail 作为详情附加 detail, _ := connect.NewErrorDetail(p) cerr.AddDetail(detail)
return cerr}
func httpToConnectCode(status int) connect.Code { switch status { case 400: return connect.CodeInvalidArgument case 401: return connect.CodeUnauthenticated case 403: return connect.CodePermissionDenied case 404: return connect.CodeNotFound case 409: return connect.CodeAlreadyExists case 429: return connect.CodeResourceExhausted case 503: return connect.CodeUnavailable default: return connect.CodeInternal }}Recovery 中间件
internal/middleware/recovery.go
package middleware
import ( "encoding/json" "log/slog" "net" "net/http" "os" "runtime/debug" "strings"
"my-project/pkg/apierror"38 collapsed lines
)
func Recoverer(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err != nil { ctx := r.Context()
// 判断是否是网络连接中断 var brokenPipe bool if ne, ok := err.(*net.OpError); ok { if se, ok := ne.Err.(*os.SyscallError); ok { errStr := strings.ToLower(se.Error()) if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { brokenPipe = true } } }
// 记录日志 slog.ErrorContext(ctx, "Panic!", slog.Any("error", err), slog.String("stack", string(debug.Stack())), slog.Bool("broken_pipe", brokenPipe), )
if brokenPipe { // 连接已断开,无法写入响应 return }
// 返回 RFC 7807 格式错误 problem := apierror.NewInternal(nil).WithTraceID(ctx) w.Header().Set("Content-Type", "application/problem+json") w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(problem) } }() next.ServeHTTP(w, r) })}使用示例
Repo 层
func (r *UserRepo) FindByID(ctx context.Context, id int) (*user.User, error) { row, err := r.getQueries(ctx).GetUserByID(ctx, int64(id)) if err != nil { // 翻译数据库错误 return nil, apierror.TranslatePgError(err). WithExtension("entity", "user"). WithExtension("id", id) } return r.toDomain(row), nil}Service 层
func (s *UserService) Get(ctx context.Context, id int) (*User, error) { user, err := s.repo.FindByID(ctx, id) if err != nil { return nil, err // 直接返回已翻译的错误 }
if user.Banned { return nil, apierror.NewForbidden("用户已被封禁"). WithExtension("user_id", id) }
return user, nil}响应示例
{ "type": "https://api.myproject.com/errors/not-found", "title": "Resource Not Found", "status": 404, "detail": "user with id '123' not found", "code": "NOT_FOUND", "trace_id": "a1b2c3d4e5f6789", "extensions": { "entity": "user", "id": 123 }}测试
核心库: testing(标准库)
不需要引入任何依赖,只需要创建一个以 _test.go 结尾的文件,并写一个 Test 开头的函数即可。
核心思想:表格驱动测试
package calc
import "testing"
func TestAdd(t *testing.T) { // 定义表格 (测试用例集合) tests := []struct { name string // 用例名称 a, b int // 输入 want int // 期望输出 }{ {"正数相加", 1, 2, 3}, {"负数相加", -1, -2, -3}, {"零值测试", 0, 0, 0}, }
// 循环执行 for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := Add(tt.a, tt.b); got != tt.want { t.Errorf("Add() = %v, want %v", got, tt.want) } }) }}断言库:Testify
类似于AssertJ与Hamcrest,它提供了 assert 和 require
import ( "testing" "github.com/stretchr/testify/assert")
func TestSomething(t *testing.T) { // 类似 Java 的 Assertions assert.Equal(t, 123, 123, "它们应该相等") assert.NotNil(t, object)
// require 会在失败时立即停止测试(类似 JUnit 的 assert) // 而 assert 失败后还会继续执行后续代码 // require.NoError(t, err)}Mock框架:stretchr/testify + vektra/mockery
定义一个interface
interface UserRepo { Get(id int) User}执行:mockery --name UserRepo
测试:
func TestService(t *testing.T) { // 创建 Mock 对象 mockRepo := mocks.NewUserRepo(t)
// 像 Mockito 一样打桩 mockRepo.On("Get", 1).Return(User{Name: "Go"}, nil)
// 调用 service := NewService(mockRepo) // ...
// 验证调用 mockRepo.AssertExpectations(t)}数据库测试
不要Mock,使用Testcontainers起一个docker容器跑真实数据库。
func TestUserRepo_Create(t *testing.T) { // 1. 使用 Testcontainers 启动一个 Docker Postgres // 2. 连接该 DB // 3. 运行迁移脚本建表 // 4. repo.Create(user) // 5. check db result}任务与构建:Taskfile
Taskfile 是 Makefile 的现代替代品,使用 YAML 语法,更加清晰易读。
Taskfile.yaml
version: "3"
vars: VERSION: sh: git describe --tags --always || echo "dev" COMMIT: sh: git rev-parse --short HEAD BUILD_TIME: sh: date +%FT%T%z
tasks: default:113 collapsed lines
desc: "显示帮助信息" cmds: - task --list
# 开发相关 dev: desc: "启动开发服务器" cmds: - go run ./cmd/server
dev:worker: desc: "启动 Worker" cmds: - go run ./cmd/worker
# 代码生成 generate: desc: "生成所有代码 (Wire, SQLC, Buf)" cmds: - task: generate:wire - task: generate:sqlc - task: generate:buf
generate:wire: desc: "生成 Wire 依赖注入代码" cmds: - wire ./cmd/server - wire ./cmd/worker
generate:sqlc: desc: "生成 SQLC 代码" cmds: - sqlc generate
generate:buf: desc: "生成 Protobuf 代码" cmds: - buf generate
# 构建相关 build: desc: "构建所有二进制" cmds: - task: build:server - task: build:worker
build:server: desc: "构建 Server" cmds: - | go build -ldflags "-X main.Version={{.VERSION}} -X main.CommitHash={{.COMMIT}} -X main.BuildTime={{.BUILD_TIME}}" \ -o bin/server ./cmd/server
build:worker: desc: "构建 Worker" cmds: - | go build -ldflags "-X main.Version={{.VERSION}} -X main.CommitHash={{.COMMIT}} -X main.BuildTime={{.BUILD_TIME}}" \ -o bin/worker ./cmd/worker
# 测试相关 test: desc: "运行所有测试" cmds: - go test -v -race -cover ./...
test:coverage: desc: "生成测试覆盖率报告" cmds: - go test -coverprofile=coverage.out ./... - go tool cover -html=coverage.out -o coverage.html
# 代码质量 lint: desc: "运行代码检查" cmds: - golangci-lint run ./...
fmt: desc: "格式化代码" cmds: - go fmt ./... - goimports -w .
# 数据库迁移 migrate:up: desc: "执行数据库迁移" cmds: - go run ./cmd/migration up
migrate:down: desc: "回滚数据库迁移" cmds: - go run ./cmd/migration down
migrate:create: desc: "创建新的迁移文件" cmds: - migrate create -ext sql -dir internal/data/migrations -seq {{.CLI_ARGS}}
# Docker 相关 docker:build: desc: "构建 Docker 镜像" cmds: - docker build -t my-project:{{.VERSION}} .
docker:push: desc: "推送 Docker 镜像" cmds: - docker push my-project:{{.VERSION}}
# 清理 clean: desc: "清理构建产物" cmds: - rm -rf bin/ - rm -rf coverage.*使用:
# 查看所有任务task
# 运行开发服务器task dev
# 生成代码task generate
# 构建task build
# 测试task testDocker 镜像:Distroless
Distroless 镜像只包含应用运行时必需的最小依赖,没有 shell、包管理器等,大大减少了攻击面。
Dockerfile
# ============================================# Stage 1: 构建阶段# ============================================FROM golang:1.23-alpine AS builder
# 安装构建依赖RUN apk add --no-cache git ca-certificates tzdata
WORKDIR /app
# 先复制依赖文件,利用 Docker 缓存COPY go.mod go.sum ./47 collapsed lines
RUN go mod download
# 复制源代码COPY . .
# 构建参数ARG VERSION=devARG COMMIT=unknownARG BUILD_TIME=unknown
# 编译 (CGO_ENABLED=0 确保静态编译)RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ -ldflags "-w -s -X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \ -o /app/server ./cmd/server
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ -ldflags "-w -s -X main.Version=${VERSION} -X main.CommitHash=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \ -o /app/worker ./cmd/worker
# ============================================# Stage 2: 运行阶段 (Distroless)# ============================================FROM gcr.io/distroless/static-debian12:nonroot
# 复制时区数据COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
# 复制 CA 证书 (HTTPS 请求需要)COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# 复制二进制文件COPY --from=builder /app/server /serverCOPY --from=builder /app/worker /worker
# 复制配置文件目录COPY --from=builder /app/configs /configs
# 设置时区ENV TZ=Asia/Shanghai
# 非 root 用户运行 (Distroless 默认使用 nonroot 用户)USER nonroot:nonroot
# 暴露端口EXPOSE 8000 9000
# 健康检查 (Distroless 没有 curl,使用自定义健康检查端点)# HEALTHCHECK 需要在 docker-compose 或 k8s 中配置
# 默认启动 serverENTRYPOINT ["/server"]docker-compose.yml
version: "3.8"
services: server: build: context: . args: VERSION: ${VERSION:-dev} COMMIT: ${COMMIT:-unknown} BUILD_TIME: ${BUILD_TIME:-unknown} image: my-project:${VERSION:-latest} ports:43 collapsed lines
- "8000:8000" - "9000:9000" environment: - APP_DATA_DATABASE_SOURCE=postgres://user:pass@postgres:5432/mydb?sslmode=disable - APP_DATA_REDIS_ADDR=redis:6379 depends_on: - postgres - redis restart: unless-stopped
worker: image: my-project:${VERSION:-latest} entrypoint: ["/worker"] environment: - APP_DATA_DATABASE_SOURCE=postgres://user:pass@postgres:5432/mydb?sslmode=disable - APP_DATA_REDIS_ADDR=redis:6379 depends_on: - postgres - redis restart: unless-stopped
postgres: image: postgres:16-alpine environment: POSTGRES_USER: user POSTGRES_PASSWORD: pass POSTGRES_DB: mydb volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432"
redis: image: redis:7-alpine ports: - "6379:6379"
jaeger: image: jaegertracing/all-in-one:latest ports: - "16686:16686" - "4318:4318" environment: - LOG_LEVEL=debug
volumes: postgres_data:异步任务队列:Asynq
github.com/hibiken/asynq
简单以user模块举例
/internal/├── user/│ ├── job/ <-- 这里放 Consumer (消费者逻辑)│ │ ├── handler.go <-- 类似于 HTTP 的 handler.go│ │ └── payload.go <-- 定义任务参数结构体、Task Type 常量│ ├── service.go│ └── provider.go <-- Wire Provider│└── server/ ├── http.go <-- Chi + Connect-Go Server └── worker.go <-- Asynq Server (集中组装)(异步任务部分与原文类似,此处省略重复内容)
代码质量:GolangCI-Lint
.golangci.yaml
示例如下,按照实际需求调整:
run: timeout: 5m skip-dirs: - internal/data/db # 忽略 SQLC 生成代码 - gen # 忽略 Protobuf 生成代码
linters: enable: - errcheck - gosimple - govet - ineffassign - staticcheck - typecheck - unused - goconst # 检查重复字符串 - gocyclo # 检查圈复杂度 (防止函数过长)
linters-settings: gocyclo: min-complexity: 15目录总结
参考:golang-standards/project-layout
/my-project├── cmd/ <-- 【入口层】所有可执行程序的入口│ ├── server/ <-- HTTP/gRPC API 服务入口│ │ ├── main.go <-- 负责加载配置、初始化 Log、启动 App│ │ ├── wire.go <-- Wire 依赖定义 (HTTP)│ │ └── wire_gen.go <-- Wire 自动生成代码│ ││ ├── worker/ <-- 异步任务 Worker 入口│ │ ├── main.go <-- 负责启动 Asynq Server│ │ ├── wire.go <-- Wire 依赖定义 (Worker)│ │ └── wire_gen.go <-- Wire 自动生成代码│ │86 collapsed lines
│ └── migration/ <-- 数据库迁移工具入口│ └── main.go│├── configs/ <-- 【配置层】│ ├── config.yaml <-- 本地开发配置文件│ └── fga-model.dsl <-- OpenFGA 授权模型│├── deploy/ <-- 【部署层】│ ├── Dockerfile <-- 多阶段 Distroless 构建│ └── docker-compose.yml <-- 本地环境编排 (Postgres, Redis, Jaeger)│├── gen/ <-- 【生成代码层】(Buf 生成的 Protobuf)│ └── user/v1/│ ├── user.pb.go│ └── userv1connect/│├── proto/ <-- 【Protobuf 定义】│ └── user/v1/│ └── user.proto│├── internal/ <-- 业务层│ ││ ├── conf/ <-- 配置定义│ │ ├── conf.go <-- Viper 加载逻辑│ │ └── provider.go <-- Wire Provider│ ││ ├── data/ <-- 数据层│ │ ├── db/ <-- SQLC 自动生成的代码│ │ │ ├── db.go│ │ │ ├── models.go│ │ │ └── user.sql.go│ │ ├── migrations/ <-- SQL 迁移文件│ │ ├── queries/ <-- 原始 SQL 查询│ │ ├── client.go <-- pgxpool 初始化│ │ ├── tm.go <-- Transaction Manager│ │ └── provider.go <-- Wire Provider│ ││ ├── infrastructure/ <-- 【基建】(非业务逻辑)│ │ └── auth/ <-- 认证工具│ │ ├── jwt.go│ │ └── provider.go│ ││ ├── middleware/ <-- 【中间件】│ │ ├── jwt.go <-- 认证│ │ ├── authz.go <-- 鉴权 (OpenFGA)│ │ ├── logger.go <-- HTTP 请求日志│ │ ├── recovery.go <-- 全局 Panic 兜底│ │ ├── error_interceptor.go <-- Connect-Go 错误拦截器│ │ └── validation.go <-- 参数校验│ ││ ├── server/ <-- 【Server 组装层】│ │ ├── http.go <-- Chi + Connect-Go│ │ └── worker.go <-- Asynq Server│ ││ └── user/ <-- 【业务模块:用户(举例)】(垂直切片)│ ├── domain.go <-- 领域实体 (DO) & Repo 接口定义│ ├── service.go <-- 应用服务│ ├── handler.go <-- Connect-Go Handler│ ├── provider.go <-- Wire ProviderSet│ ││ ├── job/ <-- 异步任务相关│ │ ├── handler.go│ │ └── payload.go│ ││ └── infrastructure/ <-- 模块内的基础设施实现│ └── persistence/│ └── user_repo.go <-- 实现 Repo 接口 (调用 SQLC)│├── pkg/ <-- 【公共库层】(可被外部项目引用)│ ├── apierror/ <-- RFC 7807 错误处理│ │ ├── error.go <-- ProblemDetail 结构体│ │ ├── codes.go <-- 错误码定义│ │ ├── factory.go <-- 构造函数│ │ ├── translator.go <-- 数据库错误翻译│ │ └── response.go <-- HTTP 响应工具│ ││ └── logger/ <-- 日志库封装│ ├── zap.go│ └── trace_handler.go│├── devbox.json <-- Devbox 开发环境配置├── devbox.lock├── buf.yaml <-- Buf 配置├── buf.gen.yaml <-- Buf 代码生成配置├── sqlc.yaml <-- SQLC 配置├── Taskfile.yaml <-- Taskfile (替代 Makefile)├── .golangci.yaml <-- GolangCI-Lint 配置├── go.mod├── go.sum└── README.md入口文件优雅中断服务
package main
import ( "context" "errors" "log/slog" "net/http" "os" "os/signal" "syscall" "time"
43 collapsed lines
"my-project/internal/conf" // ... 其他导入)
func main() { // 加载配置 cfg := conf.Load()
// 初始化 App (Wire注入) // 注意:这里的 cleanup 通常包含关闭 DB、Redis、Trace 的逻辑 app, cleanup, err := initApp(cfg) if err != nil { panic(err) } // 这里的 cleanup 必须最后执行(LIFO原则): // 先停 HTTP 服务 -> 再断开 DB 连接 defer cleanup()
// 启动 HTTP 服务 (在 Goroutine 中运行) go func() { slog.Info("Server is running", "addr", cfg.Server.Http.Addr) if err := app.HTTPServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("HTTP Server listen failed", "err", err) panic(err) } }()
// 监听系统信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// 阻塞在这里,直到收到信号 <-quit slog.Info("Shutting down server...")
// 执行优雅停机 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()
if err := app.HTTPServer.Shutdown(ctx); err != nil { slog.Error("Server forced to shutdown", "err", err) } else { slog.Info("Server exited properly") }
// 函数结束,触发最上面的 defer cleanup()}