mirror of https://github.com/aptly-dev/aptly
203 lines
4.5 KiB
Go
203 lines
4.5 KiB
Go
package etcddb
|
|
|
|
import (
|
|
"github.com/aptly-dev/aptly/database"
|
|
"github.com/pborman/uuid"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
|
"fmt"
|
|
)
|
|
|
|
type EtcDStorage struct {
|
|
url string
|
|
db *clientv3.Client
|
|
tmpPrefix string // prefix for temporary DBs
|
|
}
|
|
|
|
// CreateTemporary creates new DB of the same type in temp dir
|
|
func (s *EtcDStorage) CreateTemporary() (database.Storage, error) {
|
|
tmp := uuid.NewRandom().String()
|
|
return &EtcDStorage{
|
|
url: s.url,
|
|
db: s.db,
|
|
tmpPrefix: tmp,
|
|
}, nil
|
|
}
|
|
|
|
func (s *EtcDStorage) applyPrefix(key []byte) []byte {
|
|
if len(s.tmpPrefix) != 0 {
|
|
return append([]byte(s.tmpPrefix+"/"), key...)
|
|
}
|
|
return key
|
|
}
|
|
|
|
// Get key value from etcd
|
|
func (s *EtcDStorage) Get(key []byte) (value []byte, err error) {
|
|
realKey := s.applyPrefix(key)
|
|
getResp, err := s.db.Get(Ctx, string(realKey))
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, kv := range getResp.Kvs {
|
|
value = kv.Value
|
|
break
|
|
}
|
|
if len(value) == 0 {
|
|
err = database.ErrNotFound
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Put saves key to etcd, if key has the same value in DB already, it is not saved
|
|
func (s *EtcDStorage) Put(key []byte, value []byte) (err error) {
|
|
realKey := s.applyPrefix(key)
|
|
_, err = s.db.Put(Ctx, string(realKey), string(value))
|
|
if err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Delete removes key from etcd
|
|
func (s *EtcDStorage) Delete(key []byte) (err error) {
|
|
realKey := s.applyPrefix(key)
|
|
_, err = s.db.Delete(Ctx, string(realKey))
|
|
if err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// KeysByPrefix returns all keys that start with prefix
|
|
func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte {
|
|
realPrefix := s.applyPrefix(prefix)
|
|
result := make([][]byte, 0, 20)
|
|
getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix())
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for _, ev := range getResp.Kvs {
|
|
key := ev.Key
|
|
keyc := make([]byte, len(key))
|
|
copy(keyc, key)
|
|
result = append(result, key)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// FetchByPrefix returns all values with keys that start with prefix
|
|
func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte {
|
|
realPrefix := s.applyPrefix(prefix)
|
|
result := make([][]byte, 0, 20)
|
|
getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix())
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for _, kv := range getResp.Kvs {
|
|
valc := make([]byte, len(kv.Value))
|
|
copy(valc, kv.Value)
|
|
result = append(result, kv.Value)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
|
|
func (s *EtcDStorage) HasPrefix(prefix []byte) bool {
|
|
realPrefix := s.applyPrefix(prefix)
|
|
getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return getResp.Count > 0
|
|
}
|
|
|
|
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
|
|
// StorageProcessor on key value pair
|
|
func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error {
|
|
realPrefix := s.applyPrefix(prefix)
|
|
getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, kv := range getResp.Kvs {
|
|
err := proc(kv.Key, kv.Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close finishes etcd connect
|
|
func (s *EtcDStorage) Close() error {
|
|
// do not close temporary db
|
|
if len(s.tmpPrefix) != 0 {
|
|
return nil
|
|
}
|
|
if s.db == nil {
|
|
return nil
|
|
}
|
|
err := s.db.Close()
|
|
s.db = nil
|
|
return err
|
|
}
|
|
|
|
// Reopen tries to open (re-open) the database
|
|
func (s *EtcDStorage) Open() error {
|
|
if s.db != nil {
|
|
return nil
|
|
}
|
|
var err error
|
|
s.db, err = internalOpen(s.url)
|
|
return err
|
|
}
|
|
|
|
// CreateBatch creates a Batch object
|
|
func (s *EtcDStorage) CreateBatch() database.Batch {
|
|
if s.db == nil {
|
|
return nil
|
|
}
|
|
return &EtcDBatch{
|
|
s: s,
|
|
}
|
|
}
|
|
|
|
// OpenTransaction creates new transaction.
|
|
func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) {
|
|
tmpdb, err := s.CreateTemporary()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &transaction{s: s, tmpdb: tmpdb}, nil
|
|
}
|
|
|
|
// CompactDB does nothing for etcd
|
|
func (s *EtcDStorage) CompactDB() error {
|
|
return nil
|
|
}
|
|
|
|
// Drop removes only temporary DBs with etcd (i.e. remove all prefixed keys)
|
|
func (s *EtcDStorage) Drop() error {
|
|
if len(s.tmpPrefix) != 0 {
|
|
getResp, err := s.db.Get(Ctx, s.tmpPrefix, clientv3.WithPrefix())
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for _, kv := range getResp.Kvs {
|
|
_, err = s.db.Delete(Ctx, string(kv.Key))
|
|
if err != nil {
|
|
return fmt.Errorf("cannot delete tempdb entry: %s", kv.Key)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Check interface
|
|
var (
|
|
_ database.Storage = &EtcDStorage{}
|
|
)
|