Featured image of post Working with PostgreSQL in Go using pgx

Working with PostgreSQL in Go using pgx

Introduction

Having had the chance recently to work with SQL again after years of NoSQL (DynamoDB, MongoDB, Redis, CosmosDB), I was surprised to see how much I missed it.

The Project I am working on uses Go and even tough Go has a couple of good ORMs (ent) and bad ones (gorm), I decided to go with the native PostgreSQL driver pgx. Why? Because I needed all the performance I could get and pgx provides some nice features like:

  • prepared statements
  • connection pooling
  • binary decoding - pgx can decode query results directly into Go structs, which can be faster and more convenient than manually parsing rows.
  • feature rich - pgx supports a wide range of PostgreSQL features, including notifications, large objects, and COPY.

The only downside is the somewhat lack of documentation. I had to go through GitHub issues and read the source code to understand the API. Having invested the time to learn it, I thought it would be nice to share my experience.

In this post I’ll write down some of the things I learned while working with pgx. I will try to keep it as simple as possible, but I will also try to cover some of the more advanced features.

Specifically, I will cover the following topics:

  • Connection pool setup
  • Inserts
  • Bulk inserts
  • Querying and parsing results into Go structs

Connection pool setup

Setting up a connection pool is the recommended way when we are working with DBs. Instead of creating a new connection for each query a connection pool will reuse connections. To set up a connection pool I need to call pgxpool.New and pass it a connection string:

package pg

import (
	"context"
	"fmt"
	"sync"

	"github.com/jackc/pgx/v5/pgxpool"
)

type postgres struct {
	db *pgxpool.Pool
}

var (
	pgInstance *postgres
	pgOnce     sync.Once
)

func NewPG(ctx context.Context, connString string) (*postgres, error) {
	pgOnce.Do(func() {
		db, err := pgxpool.New(ctx, connString)
		if err != nil {
			return fmt.Errorf("unable to create connection pool: %w", err)
		}

		pgInstance = &postgres{db}
	})

	return pgInstance, nil
}

func (pg *postgres) Ping(ctx context.Context) error {
	return pg.db.Ping(ctx)
}

func (pg *postgres) Close() {
	pg.db.Close()
}

Note that I am using a singleton pattern to make sure that I only have one connection pool. This is not strictly necessary, but it is a good practice to avoid creating multiple connection pools.

Also note that I like to put my database connection code in a separate package. This way I can reuse it in other projects.

Inserting data

We will cover three ways of inserting data. Single row inserts, bulk inserts and COPY inserts.

Single row inserts

Inserting a single row is pretty straightforward. Having initialized a connection pool, we can call Exec on it and pass it a SQL query and the values we want to insert:


func (pg *postgres) InsertUser(ctx context.Context) error {
  query := `INSERT INTO users (name, email) VALUES (@userName, @userEmail)`
  args := pgx.NamedArgs{
    "userName": "Bobby",
    "userEmail": "[email protected]",
  }
  _, err := pg.db.Exec(ctx, query, args)
  if err != nil {
    return fmt.Errorf("unable to insert row: %w", err)
  }

  return nil
}

Note that I am using named arguments instead of positional arguments (e.g. $1, $2). This is a good practice because it makes the code more readable and it is also more secure. If you use positional arguments, you can easily make a mistake and pass the arguments in the wrong order. With named arguments, you can’t make this mistake.

Bulk inserts

Bulk inserts are a bit more complicated. You need to create a pgx.Batch and add all the queries you want to execute to it. Then you need to call SendBatch on the connection pool and pass it the batch. This will return a pgx.BatchResults object. You can then iterate over the results and check for errors:

func (pg *postgres) BulkInsertUsers(ctx context.Context, users []model.User) error {
  query := `INSERT INTO users (name, email) VALUES (@userName, @userEmail)`

  batch := &pgx.Batch{}
  for _, user := range users {
    args := pgx.NamedArgs{
      "userName": user.Name,
      "userEmail": user.Email,
    }
    batch.Queue(query, args)
  }
  
  results := pg.db.SendBatch(ctx, batch)
  defer results.Close()

  for _, user := range users {
    _, err := results.Exec()
    if err != nil {
      var pgErr *pgconn.PgError
      if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation {
          log.Printf("user %s already exists", user.Name)
          continue
      }

      return fmt.Errorf("unable to insert row: %w", err)
    }
  }

  return results.Close()
}

This code is a bit more involved but I think it shows a couple nice to knows like how to check for a specific error code and how to use the pgx.Batch struct.

The SendBatch method returns a pgx.BatchResults struct. This struct has a Exec method that is somewhat misleading. It doesn’t actually execute the insert, it just returns the result. I can call Exec multiple times to get the results of all the inserts in the batch. The Close method on the pgx.BatchResults struct is what actually executes the batch. Its idempotent, so I can call it multiple times.

COPY inserts

Bulk inserts a performant way to insert data, but they are not the fastest. If you want to insert a lot of data, you should use the COPY method.

We can do a COPY insert with the CopyFrom method:

func (pg *postgres) CopyInsertUsers(ctx context.Context, users []model.User) error {
  entries := [][]any{}
  columns := []string{"name", "email"}
  tableName := "user"

  for _, user := range users {
    entries = append(entries, []any{user.Name, user.Email})
  }

  _, err := pg.db.CopyFrom(
    ctx,
    pgx.Identifier{tableName},
    columns,
    pgx.CopyFromRows(entries),
  )

  if err != nil {
    return fmt.Errorf("error copying into %s table: %w", tableName, err)
  }

  return nil
}

That isn’t too bad =)

When to use COPY and when to use bulk inserts really for me boils down to wether I need to know if a particular insert failed or not. If I don’t need to know, I will use COPY. If I do need to know, I will use bulk inserts.

Querying data

pgx has a couple of methods for querying data. The most basic one is QueryRow. This method will return a pgx.Row struct. This struct has a Scan method that will scan the result into a struct:

func (pg *postgres) GetUsers(ctx context.Context) ([]model.User, error) {
  query := `SELECT name, email FROM user LIMIT 10`
  
  rows, err := pg.db.Query(ctx, query)
  if err != nil {
    return nil, fmt.Errorf("unable to query users: %w", err)
  }
  defer rows.Close()

  users := []model.User{}
  for rows.Next() {
    user := model.User{}
    err := rows.Scan(&user.Name, &user.Email)
    if err != nil {
      return nil, fmt.Errorf("unable to scan row: %w", err)
    }
    users = append(users, user)
  }

  return users, nil
}

However, what I found out is that since v5 we have generics and can do this instead:

func (pg *postgres) GetUser(ctx context.Context, name string) ([]model.User, error) {
  query := `SELECT name, email FROM user LIMIT 10`

  rows, err := pg.db.Query(ctx, query)
  if err != nil {
    return nil, fmt.Errorf("unable to query users: %w", err)
  }
  defer rows.Close()

  return pgx.CollectRows(rows, pgx.RowToStructByName(model.User))
}

This is a lot simpler and it also has the advantage of not having to know the column names in advance. It will just scan the result into the struct.

Doing SQL in Go got a lot of hate in the past because of interface{} and manually scanning the result into a struct. But with pgx v5, this is no longer the case. I think that libraries like sqlx and scany are great but not necessary anymore.

Conclusion

We covered a lot of ground in this post. We looked at how to connect to Postgres using pgx.Pool, three different ways how to insert data, how to query and parse data. We also looked at how to use the new pgx v5 features like named arguments and the pgx.CollectRows method.

I hope you enjoyed this post and that you learned something new. If you have any questions or remarks, feel free to ping me on twitter @bobby_donchev.

Thanks.