package etcddb import ( "github.com/aptly-dev/aptly/database" "github.com/pborman/uuid" clientv3 "go.etcd.io/etcd/client/v3" "fmt" ) type EtcDStorage struct { url string db *clientv3.Client tmpPrefix string // prefix for temporary DBs } // CreateTemporary creates new DB of the same type in temp dir func (s *EtcDStorage) CreateTemporary() (database.Storage, error) { tmp := uuid.NewRandom().String() return &EtcDStorage{ url: s.url, db: s.db, tmpPrefix: tmp, }, nil } func (s *EtcDStorage) applyPrefix(key []byte) []byte { if len(s.tmpPrefix) != 0 { return append([]byte(s.tmpPrefix+"/"), key...) } return key } // Get key value from etcd func (s *EtcDStorage) Get(key []byte) (value []byte, err error) { realKey := s.applyPrefix(key) getResp, err := s.db.Get(Ctx, string(realKey)) if err != nil { return } for _, kv := range getResp.Kvs { value = kv.Value break } if len(value) == 0 { err = database.ErrNotFound return } return } // Put saves key to etcd, if key has the same value in DB already, it is not saved func (s *EtcDStorage) Put(key []byte, value []byte) (err error) { realKey := s.applyPrefix(key) _, err = s.db.Put(Ctx, string(realKey), string(value)) if err != nil { return } return } // Delete removes key from etcd func (s *EtcDStorage) Delete(key []byte) (err error) { realKey := s.applyPrefix(key) _, err = s.db.Delete(Ctx, string(realKey)) if err != nil { return } return } // KeysByPrefix returns all keys that start with prefix func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte { realPrefix := s.applyPrefix(prefix) result := make([][]byte, 0, 20) getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix()) if err != nil { return nil } for _, ev := range getResp.Kvs { key := ev.Key keyc := make([]byte, len(key)) copy(keyc, key) result = append(result, key) } return result } // FetchByPrefix returns all values with keys that start with prefix func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte { realPrefix := s.applyPrefix(prefix) result := make([][]byte, 0, 20) getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix()) if err != nil { return nil } for _, kv := range getResp.Kvs { valc := make([]byte, len(kv.Value)) copy(valc, kv.Value) result = append(result, kv.Value) } return result } // HasPrefix checks whether it can find any key with given prefix and returns true if one exists func (s *EtcDStorage) HasPrefix(prefix []byte) bool { realPrefix := s.applyPrefix(prefix) getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix()) if err != nil { return false } return getResp.Count > 0 } // ProcessByPrefix iterates through all entries where key starts with prefix and calls // StorageProcessor on key value pair func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error { realPrefix := s.applyPrefix(prefix) getResp, err := s.db.Get(Ctx, string(realPrefix), clientv3.WithPrefix()) if err != nil { return err } for _, kv := range getResp.Kvs { err := proc(kv.Key, kv.Value) if err != nil { return err } } return nil } // Close finishes etcd connect func (s *EtcDStorage) Close() error { // do not close temporary db if len(s.tmpPrefix) != 0 { return nil } if s.db == nil { return nil } err := s.db.Close() s.db = nil return err } // Reopen tries to open (re-open) the database func (s *EtcDStorage) Open() error { if s.db != nil { return nil } var err error s.db, err = internalOpen(s.url) return err } // CreateBatch creates a Batch object func (s *EtcDStorage) CreateBatch() database.Batch { if s.db == nil { return nil } return &EtcDBatch{ s: s, } } // OpenTransaction creates new transaction. func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) { tmpdb, err := s.CreateTemporary() if err != nil { return nil, err } return &transaction{s: s, tmpdb: tmpdb}, nil } // CompactDB does nothing for etcd func (s *EtcDStorage) CompactDB() error { return nil } // Drop removes only temporary DBs with etcd (i.e. remove all prefixed keys) func (s *EtcDStorage) Drop() error { if len(s.tmpPrefix) != 0 { getResp, err := s.db.Get(Ctx, s.tmpPrefix, clientv3.WithPrefix()) if err != nil { return nil } for _, kv := range getResp.Kvs { _, err = s.db.Delete(Ctx, string(kv.Key)) if err != nil { return fmt.Errorf("cannot delete tempdb entry: %s", kv.Key) } } } return nil } // Check interface var ( _ database.Storage = &EtcDStorage{} )