1
0
Fork 0
httpserver/middleware/db/db.go

190 Zeilen
4.7 KiB
Go

2022-11-05 07:44:37 +00:00
// Package db is an middleware that manages multiple database pools and provides applications with an way to access the database
2021-02-26 23:38:06 +00:00
package db
import (
"context"
"fmt"
"io/fs"
2021-02-26 23:38:06 +00:00
"sync"
"github.com/gin-gonic/gin"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate"
2022-11-05 07:29:17 +00:00
"github.com/rs/zerolog/log"
2021-02-26 23:38:06 +00:00
"go.sebtobie.de/httpserver/middleware"
)
// ContextKey is the key that is used in a gin.Context to get the Middleware
const ContextKey string = "db"
// ConnGet is an function that returns an connection instance.
type ConnGet func(string) *pgxpool.Conn
var _ ConnGet = NewMiddleware().GetConn
2021-02-26 23:38:06 +00:00
var _ middleware.Middleware = &Middleware{}
// Middleware return a handler that sets the db into the context of every request.
// uri is an url in the form dbtype:connectargs
type Middleware struct {
databases map[string]*pgxpool.Pool
lock sync.Mutex
}
2022-11-05 07:44:37 +00:00
// NewMiddleware return an initialized Middleware Object.
2021-02-26 23:38:06 +00:00
func NewMiddleware() *Middleware {
return &Middleware{
databases: make(map[string]*pgxpool.Pool),
}
}
// AddDB adds an db connection to the middleware.
func (m *Middleware) AddDB(name, uri string) (err error) {
m.lock.Lock()
if m.databases == nil {
m.databases = map[string]*pgxpool.Pool{}
}
m.lock.Unlock()
var (
db *pgxpool.Pool
2021-02-26 23:38:06 +00:00
)
if err != nil {
log.Error().Err(err).Msg("Could not open the database")
return err
}
db, err = pgxpool.New(context.TODO(), uri)
2021-02-26 23:38:06 +00:00
if err != nil {
log.Error().Err(err).Msg("Could not open the database")
return err
}
m.lock.Lock()
defer m.lock.Unlock()
if olddb, found := m.databases[name]; found {
olddb.Close()
}
m.databases[name] = db
return
}
func (m *Middleware) getconn(name string) *pgxpool.Conn {
2021-02-26 23:38:06 +00:00
if db, found := m.databases[name]; found {
conn, err := db.Acquire(context.TODO())
if err != nil {
log.Error().Err(err).Msgf("Could not get the connection from the pool %s", name)
2021-02-26 23:38:06 +00:00
return nil
}
return conn
}
return nil
}
// GetConn returns a connection from the specified database or if not found one of the default database.
func (m *Middleware) GetConn(name string) *pgxpool.Conn {
m.lock.Lock()
defer m.lock.Unlock()
if conn := m.getconn(name); conn != nil {
return conn
}
conn := m.getconn("default")
return conn
}
2021-02-26 23:38:06 +00:00
// Gin is the Entrypoint for Gin.
func (m *Middleware) Gin(c *gin.Context) {
c.Set(ContextKey, m.GetConn)
2021-02-26 23:38:06 +00:00
}
// Setup adds the connections from the configfile into the middleware
func (m *Middleware) Setup(mc middleware.Config) {
for key, value := range mc {
dsn := value.(string)
err := m.AddDB(key, dsn)
if err != nil {
log.Error().Err(err).Msg("Failed to parse the config")
}
}
}
// Defaults returns an default config for connections
func (*Middleware) Defaults() middleware.Config {
return map[string]any{
"default": "host=/run/postgresql port=5432 dbname=httpserver",
}
}
2021-02-26 23:38:06 +00:00
// Teardown closes the DBConnection
func (m *Middleware) Teardown() {
for name, pool := range m.databases {
log.Info().Msgf("Starting to close databasepool %s", name)
pool.Close()
log.Info().Msgf("Closed Databasepool %s", name)
}
}
// Sites is an function for getting the migrations of the site
func (m *Middleware) Sites(sites []any) (err error) {
m.lock.Lock()
defer m.lock.Unlock()
var (
conn *pgxpool.Conn
mig *migrate.Migrator
db *pgxpool.Pool
)
for _, s := range sites {
site, ok := s.(MigrationSite)
if !ok {
continue
}
db, ok = m.databases[site.Database()]
if !ok {
return fmt.Errorf("Failed to get the database. The Databasepool %s does not exist", site.Database())
}
conn, err = db.Acquire(context.TODO())
if err != nil {
return
}
defer conn.Release()
mig, err = site.Migrations(conn.Conn())
err = mig.Migrate(context.TODO())
if err != nil {
return
}
}
return
}
func logmigrations(version int32, name, dir string, sql string) {
2021-10-25 20:28:39 +00:00
log.Info().Int32("version", version).Str("name", name).Str("dir", dir).Msgf("Running Migration %s with version %d in %sward direction", name, version, dir)
2021-02-26 23:38:06 +00:00
}
// MigrationSite is an interface for Sites that use an database.
// this enables sites to provide database migrations.
type MigrationSite interface {
Database() string
Migrations(*pgx.Conn) (*migrate.Migrator, error)
}
2021-02-26 23:38:06 +00:00
// SetupMigrator sets up the migrator to migrate the database.
func SetupMigrator(prefix string, connection *pgx.Conn, migrations fs.FS) (mig *migrate.Migrator, err error) {
2021-02-26 23:38:06 +00:00
mig, err = migrate.NewMigratorEx(
context.TODO(),
connection,
"version",
&migrate.MigratorOptions{
DisableTx: false,
2021-02-26 23:38:06 +00:00
},
)
if err != nil {
2021-02-27 13:03:20 +00:00
log.Error().Err(err).Msg("Error while creating the migrator")
2021-02-26 23:38:06 +00:00
return
}
err = mig.LoadMigrations(migrations)
if err != nil {
log.Error().Err(err).Msg("Error while loading migrations")
return
}
2021-02-26 23:38:06 +00:00
mig.OnStart = logmigrations
mig.Data["prefix"] = prefix
return
}