Loading pkg/core/backend_resources.go +21 −6 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ const ( ResourceIsNew ResourceChanged ResourceIsGone ResourceError ) // BackendResources implements a collection of resources for our backend. The Loading Loading @@ -111,7 +112,10 @@ func (ctx *BackendResources) propagateUpdate(r Resource, event int) { return } distName := ctx.Collection[r.Type()].getPartitionName(r) distName, err := ctx.Collection[r.Type()].getPartitionName(r) if err != nil { return } eventRecipient, ok := ctx.EventRecipients[distName] if !ok { // no recipients for that resource Loading Loading @@ -197,31 +201,42 @@ func newPartitionedWithDistributors(rg ResourceGroup) *partitionedWithDistributo } func (p partitionedWithDistributors) Add(resource Resource) error { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { return err } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.Add(resource) } func (p partitionedWithDistributors) AddOrUpdate(resource Resource) int { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { log.Println("Error updating partitioned distributor:", err) return ResourceError } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.AddOrUpdate(resource) } func (p partitionedWithDistributors) Remove(resource Resource) error { hashring := p.partitions[p.getPartitionName(resource)] name, err := p.getPartitionName(resource) if err != nil { return err } hashring := p.partitions[name] return hashring.Remove(resource) } func (p partitionedWithDistributors) getPartitionName(resource Resource) string { func (p partitionedWithDistributors) getPartitionName(resource Resource) (string, error) { distName := resource.Distributor() if distName != "" { if _, ok := p.partitions[distName]; !ok { distName = "none" } return distName return distName, nil } return p.partitionedHashring.getPartitionName(resource) } Loading pkg/core/collection.go +1 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ type ResourceGroup interface { Prune() []Resource getHashring(partitionName string) *Hashring getPartitionName(resource Resource) string getPartitionName(resource Resource) (string, error) save() error } Loading pkg/core/collection_test.go +21 −5 Original line number Diff line number Diff line Loading @@ -41,9 +41,18 @@ func TestRelations(t *testing.T) { c.Add(d2) c.Add(d1related) d1partition := c[d1.Type()].getPartitionName(d1) d2partition := c[d2.Type()].getPartitionName(d2) d1relatedpartition := c[d1related.Type()].getPartitionName(d1related) d1partition, err := c[d1.Type()].getPartitionName(d1) if err != nil { t.Fatal("Can't get partition name:", err) } d2partition, err := c[d2.Type()].getPartitionName(d2) if err != nil { t.Fatal("Can't get partition name:", err) } d1relatedpartition, err := c[d1related.Type()].getPartitionName(d1related) if err != nil { t.Fatal("Can't get partition name:", err) } if d1partition != d1relatedpartition { t.Fatal("related resources have different partitions") Loading Loading @@ -86,7 +95,10 @@ func TestStoreRelations(t *testing.T) { d.UniqueId = Hashkey(rand.Uint64()) c.Add(d) c.Save() partitionName := c[d.Type()].getPartitionName(d) partitionName, err := c[d.Type()].getPartitionName(d) if err != nil { t.Fatal("Can't get partition name:", err) } // Let's run it few times as there is some 50% chance to get the resource in the same partition by it's UID runs := 10 Loading @@ -99,7 +111,11 @@ func TestStoreRelations(t *testing.T) { }) d.UniqueId = Hashkey(rand.Uint64()) c.Add(d) if partitionName != c[d.Type()].getPartitionName(d) { name, err := c[d.Type()].getPartitionName(d) if err != nil { t.Fatal("Can't get partition name:", err) } if partitionName != name { t.Fatal("Loading collection from storage got the resource in the wrong partition") } } Loading pkg/core/hashring.go +2 −2 Original line number Diff line number Diff line Loading @@ -426,8 +426,8 @@ func (h *Hashring) Clear() { h.hashnodes = []*hashnode{} } func (h *Hashring) getPartitionName(resource Resource) string { return "" func (h *Hashring) getPartitionName(resource Resource) (string, error) { return "", nil } func (h *Hashring) initStore(name string, dir string, newResource func() Resource) { Loading pkg/core/partitioned_hashring.go +16 −5 Original line number Diff line number Diff line Loading @@ -39,21 +39,32 @@ func newPartitionedHashring(proportions map[string]int) *partitionedHashring { } func (p partitionedHashring) Add(resource Resource) error { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { return err } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.Add(resource) } func (p partitionedHashring) AddOrUpdate(resource Resource) int { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { log.Println("Error updating partitioned distributor:", err) return ResourceError } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.AddOrUpdate(resource) } func (p partitionedHashring) Remove(resource Resource) error { hashring := p.partitions[p.getPartitionName(resource)] name, err := p.getPartitionName(resource) if err != nil { return err } hashring := p.partitions[name] return hashring.Remove(resource) } Loading Loading @@ -95,7 +106,7 @@ func (p partitionedHashring) Clear() { } } func (p partitionedHashring) getPartitionName(resource Resource) (partitionName string) { func (p partitionedHashring) getPartitionName(resource Resource) (partitionName string, err error) { identifiers := resource.RelationIdentifiers() for _, id := range identifiers { name, ok := p.relations[id] Loading @@ -105,7 +116,7 @@ func (p partitionedHashring) getPartitionName(resource Resource) (partitionName } if partitionName == "" { partitionName = p.stencil.GetPartitionName(resource) return p.stencil.GetPartitionName(resource) } return } Loading Loading
pkg/core/backend_resources.go +21 −6 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ const ( ResourceIsNew ResourceChanged ResourceIsGone ResourceError ) // BackendResources implements a collection of resources for our backend. The Loading Loading @@ -111,7 +112,10 @@ func (ctx *BackendResources) propagateUpdate(r Resource, event int) { return } distName := ctx.Collection[r.Type()].getPartitionName(r) distName, err := ctx.Collection[r.Type()].getPartitionName(r) if err != nil { return } eventRecipient, ok := ctx.EventRecipients[distName] if !ok { // no recipients for that resource Loading Loading @@ -197,31 +201,42 @@ func newPartitionedWithDistributors(rg ResourceGroup) *partitionedWithDistributo } func (p partitionedWithDistributors) Add(resource Resource) error { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { return err } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.Add(resource) } func (p partitionedWithDistributors) AddOrUpdate(resource Resource) int { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { log.Println("Error updating partitioned distributor:", err) return ResourceError } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.AddOrUpdate(resource) } func (p partitionedWithDistributors) Remove(resource Resource) error { hashring := p.partitions[p.getPartitionName(resource)] name, err := p.getPartitionName(resource) if err != nil { return err } hashring := p.partitions[name] return hashring.Remove(resource) } func (p partitionedWithDistributors) getPartitionName(resource Resource) string { func (p partitionedWithDistributors) getPartitionName(resource Resource) (string, error) { distName := resource.Distributor() if distName != "" { if _, ok := p.partitions[distName]; !ok { distName = "none" } return distName return distName, nil } return p.partitionedHashring.getPartitionName(resource) } Loading
pkg/core/collection.go +1 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ type ResourceGroup interface { Prune() []Resource getHashring(partitionName string) *Hashring getPartitionName(resource Resource) string getPartitionName(resource Resource) (string, error) save() error } Loading
pkg/core/collection_test.go +21 −5 Original line number Diff line number Diff line Loading @@ -41,9 +41,18 @@ func TestRelations(t *testing.T) { c.Add(d2) c.Add(d1related) d1partition := c[d1.Type()].getPartitionName(d1) d2partition := c[d2.Type()].getPartitionName(d2) d1relatedpartition := c[d1related.Type()].getPartitionName(d1related) d1partition, err := c[d1.Type()].getPartitionName(d1) if err != nil { t.Fatal("Can't get partition name:", err) } d2partition, err := c[d2.Type()].getPartitionName(d2) if err != nil { t.Fatal("Can't get partition name:", err) } d1relatedpartition, err := c[d1related.Type()].getPartitionName(d1related) if err != nil { t.Fatal("Can't get partition name:", err) } if d1partition != d1relatedpartition { t.Fatal("related resources have different partitions") Loading Loading @@ -86,7 +95,10 @@ func TestStoreRelations(t *testing.T) { d.UniqueId = Hashkey(rand.Uint64()) c.Add(d) c.Save() partitionName := c[d.Type()].getPartitionName(d) partitionName, err := c[d.Type()].getPartitionName(d) if err != nil { t.Fatal("Can't get partition name:", err) } // Let's run it few times as there is some 50% chance to get the resource in the same partition by it's UID runs := 10 Loading @@ -99,7 +111,11 @@ func TestStoreRelations(t *testing.T) { }) d.UniqueId = Hashkey(rand.Uint64()) c.Add(d) if partitionName != c[d.Type()].getPartitionName(d) { name, err := c[d.Type()].getPartitionName(d) if err != nil { t.Fatal("Can't get partition name:", err) } if partitionName != name { t.Fatal("Loading collection from storage got the resource in the wrong partition") } } Loading
pkg/core/hashring.go +2 −2 Original line number Diff line number Diff line Loading @@ -426,8 +426,8 @@ func (h *Hashring) Clear() { h.hashnodes = []*hashnode{} } func (h *Hashring) getPartitionName(resource Resource) string { return "" func (h *Hashring) getPartitionName(resource Resource) (string, error) { return "", nil } func (h *Hashring) initStore(name string, dir string, newResource func() Resource) { Loading
pkg/core/partitioned_hashring.go +16 −5 Original line number Diff line number Diff line Loading @@ -39,21 +39,32 @@ func newPartitionedHashring(proportions map[string]int) *partitionedHashring { } func (p partitionedHashring) Add(resource Resource) error { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { return err } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.Add(resource) } func (p partitionedHashring) AddOrUpdate(resource Resource) int { name := p.getPartitionName(resource) name, err := p.getPartitionName(resource) if err != nil { log.Println("Error updating partitioned distributor:", err) return ResourceError } p.addRelationIdentifiers(resource, name) hashring := p.partitions[name] return hashring.AddOrUpdate(resource) } func (p partitionedHashring) Remove(resource Resource) error { hashring := p.partitions[p.getPartitionName(resource)] name, err := p.getPartitionName(resource) if err != nil { return err } hashring := p.partitions[name] return hashring.Remove(resource) } Loading Loading @@ -95,7 +106,7 @@ func (p partitionedHashring) Clear() { } } func (p partitionedHashring) getPartitionName(resource Resource) (partitionName string) { func (p partitionedHashring) getPartitionName(resource Resource) (partitionName string, err error) { identifiers := resource.RelationIdentifiers() for _, id := range identifiers { name, ok := p.relations[id] Loading @@ -105,7 +116,7 @@ func (p partitionedHashring) getPartitionName(resource Resource) (partitionName } if partitionName == "" { partitionName = p.stencil.GetPartitionName(resource) return p.stencil.GetPartitionName(resource) } return } Loading