-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrepo.go
100 lines (82 loc) · 2.27 KB
/
repo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
outbox "github.com/nikolayk812/pgx-outbox"
"github.com/nikolayk812/pgx-outbox/examples/01_sns/clients/tracing"
"github.com/nikolayk812/pgx-outbox/types"
"go.opentelemetry.io/otel/attribute"
)
type Repo interface {
CreateUser(ctx context.Context, user User) (User, error)
}
type repo struct {
pool *pgxpool.Pool
writer outbox.Writer
mapper UserMessageMapper
}
func NewRepo(pool *pgxpool.Pool, writer outbox.Writer, mapper UserMessageMapper) (Repo, error) {
if pool == nil {
return nil, fmt.Errorf("pool is nil")
}
if writer == nil {
return nil, fmt.Errorf("writer is nil")
}
if mapper == nil {
return nil, fmt.Errorf("mapper is nil")
}
return &repo{
pool: pool,
writer: writer,
mapper: mapper,
}, nil
}
func (r *repo) CreateUser(ctx context.Context, user User) (u User, txErr error) {
ctx, span, finishSpan := tracing.StartSpan(ctx, tracerName, "user_created")
tx, commitFunc, err := r.beginTx(ctx)
if err != nil {
return u, fmt.Errorf("beginTx: %w", err)
}
defer func() {
if txErr = commitFunc(txErr); txErr != nil {
txErr = fmt.Errorf("commitFunc: %w", txErr)
}
finishSpan(txErr)
}()
user, err = r.createUser(ctx, tx, user)
if err != nil {
return u, fmt.Errorf("createUser: %w", err)
}
message, err := r.mapper(user)
if err != nil {
return u, fmt.Errorf("mapper: %w", err)
}
message.Metadata = map[string]string{
tracing.MetadataTraceID: span.SpanContext().TraceID().String(),
tracing.MetadataSpanID: span.SpanContext().SpanID().String(),
}
if _, err := r.writer.Write(ctx, tx, message); err != nil {
return u, fmt.Errorf("writer.Write: %w", err)
}
span.SetAttributes(attribute.String("user.id", user.ID.String()))
return user, nil
}
func (r *repo) createUser(ctx context.Context, tx pgx.Tx, user User) (u User, _ error) {
if tx == nil {
return u, fmt.Errorf("tx is nil")
}
var createdAt time.Time
err := tx.QueryRow(ctx,
"INSERT INTO users (id, name, age) VALUES ($1, $2, $3) RETURNING created_at",
user.ID, user.Name, user.Age).
Scan(&createdAt)
if err != nil {
return u, fmt.Errorf("tx.QueryRow: %w", err)
}
user.CreatedAt = createdAt
return user, nil
}
type UserMessageMapper types.ToMessageFunc[User]