Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

insert/update concurrrency with transaction: bad connection and unexpected messsage type 'T' #1092

Open
vothanhdo2602 opened this issue Dec 27, 2024 · 2 comments

Comments

@vothanhdo2602
Copy link

vothanhdo2602 commented Dec 27, 2024

	tx, err := pg.BeginTx(ctx)
	if err != nil {
		return
	}
	errChan := make(chan error)

	wg.Add(1)
	go commontil.BackgroundRun(ctx, &wg, func(ctx context.Context) {
		err = s.dao.UpdateWithTxById(ctx, tx, ws.Unid, ws)
		if err != nil {
			errChan <- err
			return
		}
	})

	wg.Add(1)
	go commontil.BackgroundRun(ctx, &wg, func(ctx context.Context) {
		err = s.credentialConfigurationDao.InsertWithTx(ctx, tx, credentialCfg)
		if err != nil {
			errChan <- err
			return
		}
	})
	wg.Add(1)
	go commontil.BackgroundRun(ctx, &wg, func(ctx context.Context) {
		err = s.storageConfigurationDao.InsertWithTx(ctx, tx, storageCfg)
		if err != nil {
			errChan <- err
			return
		}
	})

	go func() {
		wg.Wait()
		close(errChan)
	}()

	select {
	case err = <-errChan:
		pg.HandleTxErr(ctx, tx, err)
	}
func (s *baseImpl[T]) UpdateWithTxById(ctx context.Context, tx bun.IDB, id string, m *T) error {
	var (
		logger = log.WithCtx(ctx)
	)

	_, err := tx.NewUpdate().Model(m).Where(whereIDTmpl, id).Returning("*").Exec(ctx)
	if err != nil {
		logger.Error(err.Error(), zap.Any("model", m))
		return err
	}
	go rd.HSet(commontil.CopyContext(ctx), *m)
	return err
}

func (s *baseImpl[T]) InsertWithTx(ctx context.Context, tx bun.IDB, m *T) error {
	var (
		logger = log.WithCtx(ctx)
	)

	_, err := tx.NewInsert().Model(m).Exec(ctx)
	if err != nil {
		logger.Error(err.Error(), zap.Any("model", m))
		return err
	}
	go rd.HSet(commontil.CopyContext(ctx), *m)
	return err
}

I get error when insert/update concurrency in transaction. But no more error when i change to sequential.
Error message when use pgdriver: unexpected message 'T'
With pgxpool driver: bad connection

@Tiscs
Copy link
Collaborator

Tiscs commented Dec 27, 2024

Hi, can you provide complete reproducible code or error stack?
Also, which versions of dependencies are used in your project?
Thank you.

@vothanhdo2602
Copy link
Author

vothanhdo2602 commented Dec 27, 2024

Hi @Tiscs , I found the cause of this issue. I'm using newest version. The error sometimes will come when I not add method .Returning or add .Returning("*"), sometimes it works well, and no more error when I add .Returning("NULL") to disable returning, but I really need to return all columns for the common case.

_, err := tx.NewInsert().Model(m).Returning("*").Exec(ctx)             // error
_, err := tx.NewInsert().Model(m).Exec(ctx)                                       // error
_, err := tx.NewInsert().Model(m).Returning("NULL").Exec(ctx)    // no error

error message :
*errors.errorString: driver: bad connection

This is my connection config with pgxpool

var (
	db *bun.DB
)

func Init(ctx context.Context, wg *sync.WaitGroup) {
	var (
		logger = log.WithCtx(ctx)
	)

	if wg != nil {
		defer wg.Done()
	}

	c := config.GetENV().DB.PG
	addr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable", c.Username, c.Password, c.Host, c.Port, c.Database)
	cfg, err := pgxpool.ParseConfig(addr)
	if err != nil {
		logger.Fatal(err.Error())
		return
	}

	// TODO:
	// TLS configuration
	cfg.MaxConns = 90
	cfg.MaxConnIdleTime = 20 * time.Minute
	cfg.MinConns = 10
	cfg.HealthCheckPeriod = 20 * time.Minute

	pool, err := pgxpool.NewWithConfig(ctx, cfg)
	if err != nil {
		logger.Fatal(err.Error())
		return
	}

	sqlDB := stdlib.OpenDBFromPool(pool)

	db = bun.NewDB(sqlDB, pgdialect.New(), bun.WithDiscardUnknownColumns())
}

func GetDB() bun.IDB {
	return db
}

func BeginTx(ctx context.Context) (bun.Tx, error) {
	tx, err := db.BeginTx(ctx, &sql.TxOptions{})
	if err != nil {
		log.WithCtx(ctx).Error(err.Error())
	}
	return tx, err
}

func HandleTxErr(ctx context.Context, tx bun.Tx, err error) {
	var (
		logger = log.WithCtx(ctx)
	)

	if err != nil {
		if rollbackErr := tx.Rollback(); rollbackErr != nil {
			logger.Error(rollbackErr.Error())
		}
		return
	}

	if commitErr := tx.Commit(); commitErr != nil {
		logger.Error(commitErr.Error())
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants