2022-11-05 07:44:37 +00:00
|
|
|
// Package db is an middleware that manages multiple database pools and provides applications with an way to access the database
|
2021-02-26 23:38:06 +00:00
|
|
|
package db
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2022-11-18 13:10:59 +00:00
|
|
|
"errors"
|
2021-11-11 22:06:48 +00:00
|
|
|
"fmt"
|
2021-02-27 10:54:10 +00:00
|
|
|
"io/fs"
|
2021-02-26 23:38:06 +00:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/gin-gonic/gin"
|
2022-11-06 11:29:08 +00:00
|
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/jackc/tern/v2/migrate"
|
2022-11-05 07:29:17 +00:00
|
|
|
"github.com/rs/zerolog/log"
|
2021-02-26 23:38:06 +00:00
|
|
|
"go.sebtobie.de/httpserver/middleware"
|
2022-11-19 12:41:28 +00:00
|
|
|
|
|
|
|
uuid "github.com/jackc/pgx-gofrs-uuid"
|
2021-02-26 23:38:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// ContextKey is the key that is used in a gin.Context to get the Middleware
|
|
|
|
const ContextKey string = "db"
|
|
|
|
|
2021-02-28 13:01:31 +00:00
|
|
|
// ConnGet is an function that returns an connection instance.
|
|
|
|
type ConnGet func(string) *pgxpool.Conn
|
|
|
|
|
|
|
|
var _ ConnGet = NewMiddleware().GetConn
|
2021-02-26 23:38:06 +00:00
|
|
|
var _ middleware.Middleware = &Middleware{}
|
2022-11-07 17:45:04 +00:00
|
|
|
var _ middleware.PostSetupMiddleware = &Middleware{}
|
2021-02-26 23:38:06 +00:00
|
|
|
|
2022-11-18 13:10:59 +00:00
|
|
|
// GetConnection is an simple helper function that returns an connection to the db
|
|
|
|
func GetConnection(c *gin.Context, db string) (*pgxpool.Conn, error) {
|
|
|
|
if co, ok := c.Get(ContextKey); ok {
|
2022-11-19 10:28:27 +00:00
|
|
|
if cg, ok := co.(ConnGet); ok {
|
|
|
|
return cg(db), nil
|
|
|
|
}
|
|
|
|
return nil, fmt.Errorf("Failed to convert the method. %T != ConnGet", co)
|
2022-11-18 13:10:59 +00:00
|
|
|
}
|
|
|
|
return nil, errors.New("No db.Middleware set up. ")
|
|
|
|
}
|
|
|
|
|
2021-02-26 23:38:06 +00:00
|
|
|
// Middleware return a handler that sets the db into the context of every request.
|
|
|
|
// uri is an url in the form dbtype:connectargs
|
|
|
|
type Middleware struct {
|
|
|
|
databases map[string]*pgxpool.Pool
|
|
|
|
lock sync.Mutex
|
|
|
|
}
|
|
|
|
|
2022-11-05 07:44:37 +00:00
|
|
|
// NewMiddleware return an initialized Middleware Object.
|
2021-02-26 23:38:06 +00:00
|
|
|
func NewMiddleware() *Middleware {
|
|
|
|
return &Middleware{
|
|
|
|
databases: make(map[string]*pgxpool.Pool),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddDB adds an db connection to the middleware.
|
|
|
|
func (m *Middleware) AddDB(name, uri string) (err error) {
|
|
|
|
m.lock.Lock()
|
|
|
|
if m.databases == nil {
|
|
|
|
m.databases = map[string]*pgxpool.Pool{}
|
|
|
|
}
|
|
|
|
m.lock.Unlock()
|
|
|
|
var (
|
2022-11-06 11:29:08 +00:00
|
|
|
db *pgxpool.Pool
|
2021-02-26 23:38:06 +00:00
|
|
|
)
|
2022-11-06 11:29:08 +00:00
|
|
|
db, err = pgxpool.New(context.TODO(), uri)
|
2021-02-26 23:38:06 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("Could not open the database")
|
|
|
|
return err
|
|
|
|
}
|
2022-11-19 12:41:28 +00:00
|
|
|
db.Config().AfterConnect = func(_ context.Context, c *pgx.Conn) error {
|
|
|
|
uuid.Register(c.TypeMap())
|
|
|
|
return nil
|
|
|
|
}
|
2021-02-26 23:38:06 +00:00
|
|
|
m.lock.Lock()
|
|
|
|
defer m.lock.Unlock()
|
|
|
|
if olddb, found := m.databases[name]; found {
|
|
|
|
olddb.Close()
|
|
|
|
}
|
|
|
|
m.databases[name] = db
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-02-28 13:01:31 +00:00
|
|
|
func (m *Middleware) getconn(name string) *pgxpool.Conn {
|
2021-02-26 23:38:06 +00:00
|
|
|
if db, found := m.databases[name]; found {
|
|
|
|
conn, err := db.Acquire(context.TODO())
|
|
|
|
if err != nil {
|
2021-02-28 13:01:31 +00:00
|
|
|
log.Error().Err(err).Msgf("Could not get the connection from the pool %s", name)
|
2021-02-26 23:38:06 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return conn
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-28 13:01:31 +00:00
|
|
|
// GetConn returns a connection from the specified database or if not found one of the default database.
|
|
|
|
func (m *Middleware) GetConn(name string) *pgxpool.Conn {
|
|
|
|
m.lock.Lock()
|
|
|
|
defer m.lock.Unlock()
|
|
|
|
if conn := m.getconn(name); conn != nil {
|
|
|
|
return conn
|
|
|
|
}
|
|
|
|
conn := m.getconn("default")
|
|
|
|
return conn
|
|
|
|
}
|
|
|
|
|
2021-02-26 23:38:06 +00:00
|
|
|
// Gin is the Entrypoint for Gin.
|
|
|
|
func (m *Middleware) Gin(c *gin.Context) {
|
2022-11-19 10:28:27 +00:00
|
|
|
c.Set(ContextKey, ConnGet(m.GetConn))
|
2021-02-26 23:38:06 +00:00
|
|
|
}
|
|
|
|
|
2021-11-11 22:06:48 +00:00
|
|
|
// Setup adds the connections from the configfile into the middleware
|
|
|
|
func (m *Middleware) Setup(mc middleware.Config) {
|
|
|
|
for key, value := range mc {
|
|
|
|
dsn := value.(string)
|
|
|
|
err := m.AddDB(key, dsn)
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("Failed to parse the config")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Defaults returns an default config for connections
|
|
|
|
func (*Middleware) Defaults() middleware.Config {
|
2022-11-06 09:52:24 +00:00
|
|
|
return map[string]any{
|
2021-11-11 22:06:48 +00:00
|
|
|
"default": "host=/run/postgresql port=5432 dbname=httpserver",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-26 23:38:06 +00:00
|
|
|
// Teardown closes the DBConnection
|
|
|
|
func (m *Middleware) Teardown() {
|
|
|
|
for name, pool := range m.databases {
|
|
|
|
log.Info().Msgf("Starting to close databasepool %s", name)
|
|
|
|
pool.Close()
|
|
|
|
log.Info().Msgf("Closed Databasepool %s", name)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-07 17:45:04 +00:00
|
|
|
// PostSetup is an function for getting the migrations of the site
|
|
|
|
func (m *Middleware) PostSetup(sites []any) (err error) {
|
2021-11-11 22:06:48 +00:00
|
|
|
m.lock.Lock()
|
|
|
|
defer m.lock.Unlock()
|
|
|
|
var (
|
|
|
|
conn *pgxpool.Conn
|
|
|
|
mig *migrate.Migrator
|
|
|
|
db *pgxpool.Pool
|
|
|
|
)
|
|
|
|
for _, s := range sites {
|
2022-11-14 09:37:23 +00:00
|
|
|
|
|
|
|
if site, ok := s.(MigrationSite); ok {
|
|
|
|
|
|
|
|
db, ok = m.databases[site.Database()]
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("Failed to get the database. The Databasepool %s does not exist", site.Database())
|
|
|
|
}
|
|
|
|
conn, err = db.Acquire(context.TODO())
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer conn.Release()
|
|
|
|
mig, err = site.Migrations(conn.Conn())
|
|
|
|
err = mig.Migrate(context.TODO())
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if poolsite, ok := s.(PoolSite); ok {
|
|
|
|
poolsite.Pool(m.databases[site.Database()])
|
|
|
|
}
|
2021-11-11 22:06:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func logmigrations(version int32, name, dir string, sql string) {
|
2021-10-25 20:28:39 +00:00
|
|
|
log.Info().Int32("version", version).Str("name", name).Str("dir", dir).Msgf("Running Migration %s with version %d in %sward direction", name, version, dir)
|
2021-02-26 23:38:06 +00:00
|
|
|
}
|
|
|
|
|
2021-11-11 22:06:48 +00:00
|
|
|
// MigrationSite is an interface for Sites that use an database.
|
|
|
|
// this enables sites to provide database migrations.
|
|
|
|
type MigrationSite interface {
|
|
|
|
Database() string
|
|
|
|
Migrations(*pgx.Conn) (*migrate.Migrator, error)
|
|
|
|
}
|
|
|
|
|
2022-11-14 09:37:23 +00:00
|
|
|
// PoolSite is an interface for site that need access to the pool outside of requests
|
|
|
|
type PoolSite interface {
|
|
|
|
MigrationSite
|
|
|
|
Pool(*pgxpool.Pool)
|
|
|
|
}
|
|
|
|
|
2021-02-26 23:38:06 +00:00
|
|
|
// SetupMigrator sets up the migrator to migrate the database.
|
2021-02-27 10:54:10 +00:00
|
|
|
func SetupMigrator(prefix string, connection *pgx.Conn, migrations fs.FS) (mig *migrate.Migrator, err error) {
|
2021-02-26 23:38:06 +00:00
|
|
|
mig, err = migrate.NewMigratorEx(
|
|
|
|
context.TODO(),
|
|
|
|
connection,
|
2022-11-19 10:05:06 +00:00
|
|
|
prefix+"version",
|
2021-02-26 23:38:06 +00:00
|
|
|
&migrate.MigratorOptions{
|
2022-11-06 11:29:08 +00:00
|
|
|
DisableTx: false,
|
2021-02-26 23:38:06 +00:00
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
2021-02-27 13:03:20 +00:00
|
|
|
log.Error().Err(err).Msg("Error while creating the migrator")
|
2021-02-26 23:38:06 +00:00
|
|
|
return
|
|
|
|
}
|
2022-11-19 10:05:06 +00:00
|
|
|
mig.OnStart = logmigrations
|
|
|
|
mig.Data["prefix"] = prefix
|
2022-11-06 11:29:08 +00:00
|
|
|
err = mig.LoadMigrations(migrations)
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("Error while loading migrations")
|
|
|
|
return
|
|
|
|
}
|
2022-11-19 10:05:06 +00:00
|
|
|
log.Trace().Interface("migrations", mig.Migrations).Interface("data", mig.Data).Err(err).Send()
|
2021-02-26 23:38:06 +00:00
|
|
|
return
|
|
|
|
}
|