Skip to content

Commit ded6bbd

Browse files
authored
Revert "Optimize: consistent hash algorithm" (#3926)
Reverts #3918 Turns out in production it was much worse in cpu usage Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
1 parent 8c68eac commit ded6bbd

File tree

4 files changed

+14
-186
lines changed

4 files changed

+14
-186
lines changed

loadbalancer/algorithm.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"errors"
55
"fmt"
66
"math/rand/v2"
7-
"net/netip"
87
"sort"
98
"sync"
109
"sync/atomic"
@@ -119,7 +118,7 @@ func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routin
119118
}
120119
for i, ep := range endpoints {
121120
endpointStartIndex := hashesPerEndpoint * i
122-
for j := range hashesPerEndpoint {
121+
for j := 0; j < hashesPerEndpoint; j++ {
123122
ch.hashRing[endpointStartIndex+j] = endpointHash{i, hash(fmt.Sprintf("%s-%d", ep, j))}
124123
}
125124
}
@@ -128,8 +127,7 @@ func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routin
128127
}
129128

130129
func newConsistentHash(endpoints []string) routing.LBAlgorithm {
131-
n := 10000 / len(endpoints)
132-
return newConsistentHashInternal(endpoints, n)
130+
return newConsistentHashInternal(endpoints, 100)
133131
}
134132

135133
func hash(s string) uint64 {
@@ -138,10 +136,6 @@ func hash(s string) uint64 {
138136

139137
func skipEndpoint(c *routing.LBContext, index int) bool {
140138
host := c.Route.LBEndpoints[index].Host
141-
if len(c.LBEndpoints) > 300 { // 300 see https://github.com/zalando/skipper/pull/3918/
142-
return !existsBinarySearch(host, c.LBEndpoints)
143-
}
144-
// linear scan for low numbers
145139
for i := range c.LBEndpoints {
146140
if c.LBEndpoints[i].Host == host {
147141
return false
@@ -150,30 +144,6 @@ func skipEndpoint(c *routing.LBContext, index int) bool {
150144
return true
151145
}
152146

153-
func existsBinarySearch(target string, list []routing.LBEndpoint) bool {
154-
targetAP, err := netip.ParseAddrPort(target)
155-
if err != nil {
156-
return false
157-
}
158-
159-
idx := sort.Search(len(list), func(i int) bool {
160-
itemAP, err := netip.ParseAddrPort(list[i].Host)
161-
if err != nil {
162-
return false
163-
}
164-
165-
if itemAP.Addr() != targetAP.Addr() {
166-
return !itemAP.Addr().Less(targetAP.Addr())
167-
}
168-
return itemAP.Port() >= targetAP.Port()
169-
})
170-
171-
if idx < len(list) && list[idx].Host == target {
172-
return true
173-
}
174-
return false
175-
}
176-
177147
// Returns index in hash ring with the closest hash to key's hash
178148
func (ch *consistentHash) searchRing(key string, ctx *routing.LBContext) int {
179149
h := hash(key)

loadbalancer/algorithm_test.go

Lines changed: 12 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"fmt"
55
"math"
66
"net/http"
7-
"net/netip"
8-
"sort"
97
"testing"
108

119
"github.com/stretchr/testify/assert"
@@ -231,11 +229,10 @@ func TestApply(t *testing.T) {
231229
const R = 1000
232230
const N = 10
233231
eps := make([]string, 0, N)
234-
for i := range N {
232+
for i := 0; i < N; i++ {
235233
ep := fmt.Sprintf("http://127.0.0.1:123%d/foo", i)
236234
eps = append(eps, ep)
237235
}
238-
sortLBEndpoints(eps)
239236

240237
for _, tt := range []struct {
241238
name string
@@ -286,7 +283,7 @@ func TestApply(t *testing.T) {
286283
}
287284

288285
h := make(map[string]int)
289-
for range R {
286+
for i := 0; i < R; i++ {
290287
lbe := tt.algorithm.Apply(lbctx)
291288
h[lbe.Host] += 1
292289
}
@@ -319,7 +316,6 @@ func TestConsistentHashSearch(t *testing.T) {
319316
}
320317

321318
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
322-
sortLBEndpoints(endpoints)
323319
const key = "192.168.0.1"
324320

325321
ep := apply(key, endpoints)
@@ -343,7 +339,6 @@ func TestConsistentHashSearch(t *testing.T) {
343339

344340
func TestConsistentHashBoundedLoadSearch(t *testing.T) {
345341
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
346-
sortLBEndpoints(endpoints)
347342
r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
348343
route := NewAlgorithmProvider().Do([]*routing.Route{{
349344
Route: eskip.Route{
@@ -364,7 +359,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
364359
defer endpointRegistry.Close()
365360
endpointRegistry.Do([]*routing.Route{route})
366361
noLoad := ch.Apply(ctx)
367-
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]any{}})
362+
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})
368363

369364
if noLoad != nonBounded {
370365
t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash")
@@ -393,7 +388,6 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
393388

394389
func TestConsistentHashKey(t *testing.T) {
395390
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
396-
sortLBEndpoints(endpoints)
397391
ch := newConsistentHash(endpoints)
398392

399393
r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
@@ -407,16 +401,16 @@ func TestConsistentHashKey(t *testing.T) {
407401
},
408402
}})[0]
409403

410-
defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: make(map[string]any)})
411-
remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]any{ConsistentHashKey: net.RemoteHost(r).String()}})
404+
defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: make(map[string]interface{})})
405+
remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: net.RemoteHost(r).String()}})
412406

413407
if defaultEndpoint != remoteHostEndpoint {
414408
t.Error("remote host should be used as a default key")
415409
}
416410

417411
for i, ep := range endpoints {
418412
key := fmt.Sprintf("%s-%d", ep, 1) // "ep-0" to "ep-99" is the range of keys for this endpoint. If we use this as the hash key it should select endpoint ep.
419-
selected := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]any{ConsistentHashKey: key}})
413+
selected := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}})
420414
if selected != rt.LBEndpoints[i] {
421415
t.Errorf("expected: %v, got %v", rt.LBEndpoints[i], selected)
422416
}
@@ -425,7 +419,6 @@ func TestConsistentHashKey(t *testing.T) {
425419

426420
func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
427421
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
428-
sortLBEndpoints(endpoints)
429422
r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
430423
route := NewAlgorithmProvider().Do([]*routing.Route{{
431424
Route: eskip.Route{
@@ -441,13 +434,13 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
441434
Request: r,
442435
Route: route,
443436
LBEndpoints: route.LBEndpoints,
444-
Params: map[string]any{ConsistentHashBalanceFactor: balanceFactor},
437+
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
445438
}
446439
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
447440
defer endpointRegistry.Close()
448441
endpointRegistry.Do([]*routing.Route{route})
449442

450-
for range 100 {
443+
for i := 0; i < 100; i++ {
451444
ep := ch.Apply(ctx)
452445
ifr0 := route.LBEndpoints[0].Metrics.InflightRequests()
453446
ifr1 := route.LBEndpoints[1].Metrics.InflightRequests()
@@ -482,7 +475,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
482475
}
483476

484477
func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
485-
for range count {
478+
for i := 0; i < count; i++ {
486479
endpoint.Metrics.IncInflightRequest()
487480
registry.GetMetrics(endpoint.Host).IncInflightRequest()
488481
}
@@ -552,88 +545,9 @@ func BenchmarkRandomAlgorithm(b *testing.B) {
552545
LBEndpoints: lbeps,
553546
}
554547

555-
for b.Loop() {
556-
alg.Apply(lbc)
557-
}
558-
}
559-
560-
// similar to datasource processRouteDef() use of sort.SliceStable
561-
func sortLBEndpoints(lbEndpoints []string) {
562-
sort.SliceStable(lbEndpoints, func(i, j int) bool {
563-
apI, errI := netip.ParseAddrPort(lbEndpoints[i])
564-
apJ, errJ := netip.ParseAddrPort(lbEndpoints[j])
565-
566-
if errI != nil || errJ != nil {
567-
return errI == nil
568-
}
569-
570-
ipI := apI.Addr()
571-
ipJ := apJ.Addr()
572-
573-
if ipI != ipJ {
574-
return ipI.Less(ipJ)
575-
}
576-
577-
return apI.Port() < apJ.Port()
578-
})
579-
580-
}
581-
582-
func BenchmarkConsistentHash(b *testing.B) {
583-
for _, N := range []int{10, 100, 1000} {
584-
eps := make([]string, 0, N)
585-
var j int
586-
for i := range N {
587-
j = i / 255
588-
ep := fmt.Sprintf("http://10.0.%d.%d:8080/", j, i%255)
589-
eps = append(eps, ep)
590-
}
591-
sortLBEndpoints(eps)
592-
593-
req, err := http.NewRequest("GET", "http://consistent.bench.test", nil)
594-
if err != nil {
595-
b.Fatalf("Failed to create request: %v", err)
596-
}
548+
b.ResetTimer()
597549

598-
route := NewAlgorithmProvider().Do([]*routing.Route{{
599-
Route: eskip.Route{
600-
BackendType: eskip.LBBackend,
601-
LBAlgorithm: ConsistentHash.String(),
602-
LBEndpoints: eps,
603-
},
604-
}})[0]
605-
606-
alg := newConsistentHash(eps)
607-
lbCtx := &routing.LBContext{
608-
Request: req,
609-
Route: route,
610-
LBEndpoints: route.LBEndpoints,
611-
Params: map[string]any{
612-
ConsistentHashKey: "Foo",
613-
ConsistentHashBalanceFactor: 0.2,
614-
},
615-
}
616-
617-
// populate metrics
618-
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
619-
defer endpointRegistry.Close()
620-
endpointRegistry.Do([]*routing.Route{route})
621-
622-
// set Foo header value
623-
headerValues := [10000]string{}
624-
for i := range len(headerValues) {
625-
headerValues[i] = fmt.Sprintf("foo-%d", i)
626-
}
627-
628-
b.Run(fmt.Sprintf("%d endpoints", N), func(b *testing.B) {
629-
var iter int64
630-
b.ResetTimer()
631-
b.ReportAllocs()
632-
for b.Loop() {
633-
iter++
634-
req.Header.Set("Foo", headerValues[iter%10000])
635-
alg.Apply(lbCtx)
636-
}
637-
})
550+
for n := 0; n < b.N; n++ {
551+
alg.Apply(lbc)
638552
}
639553
}

proxy/proxy_test.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2607,38 +2607,3 @@ func benchmarkCopyStream(b *testing.B, size int64, cpStream func(to flushWriter,
26072607
}
26082608
}
26092609
}
2610-
2611-
func BenchmarkConsistentHashSelectEndpoint(b *testing.B) {
2612-
for _, N := range []int{10, 100, 250, 300, 400, 500, 1000, 5000} {
2613-
eps := make([]string, 0, N)
2614-
var j int
2615-
for i := range N {
2616-
j = i / 255
2617-
ep := fmt.Sprintf("http://10.0.%d.%d:8080", j, i%255)
2618-
eps = append(eps, ep)
2619-
}
2620-
req, err := http.NewRequest("GET", "http://consistent.bench.test", nil)
2621-
if err != nil {
2622-
b.Fatalf("Failed to create request: %v", err)
2623-
}
2624-
doc := fmt.Sprintf(`r: * -> <consistentHash, "%s">`, strings.Join(eps, `", "`))
2625-
tp, err := newTestProxyWithParams(doc, Params{
2626-
AccessLogDisabled: false,
2627-
})
2628-
if err != nil {
2629-
b.Error(err)
2630-
return
2631-
}
2632-
defer tp.close()
2633-
route, _ := tp.routing.Get().Do(req)
2634-
ctx := newContext(nil, req, tp.proxy)
2635-
ctx.route = route
2636-
b.Run(fmt.Sprintf("%d endpoints", N), func(b *testing.B) {
2637-
b.ResetTimer()
2638-
b.ReportAllocs()
2639-
for b.Loop() {
2640-
tp.proxy.selectEndpoint(ctx)
2641-
}
2642-
})
2643-
}
2644-
}

routing/datasource.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package routing
33
import (
44
"errors"
55
"fmt"
6-
"net/netip"
76
"sort"
87
"sync"
98
"time"
@@ -535,26 +534,6 @@ func processRouteDef(o *Options, cpm map[string]PredicateSpec, def *eskip.Route)
535534
}
536535

537536
r := &Route{Route: *def, Scheme: scheme, Host: host, Predicates: cps, Filters: fs, weight: weight}
538-
if def.LBAlgorithm == "consistentHash" {
539-
sort.SliceStable(def.LBEndpoints, func(i, j int) bool {
540-
apI, errI := netip.ParseAddrPort(def.LBEndpoints[i])
541-
apJ, errJ := netip.ParseAddrPort(def.LBEndpoints[j])
542-
543-
if errI != nil || errJ != nil {
544-
return errI == nil
545-
}
546-
547-
ipI := apI.Addr()
548-
ipJ := apJ.Addr()
549-
550-
if ipI != ipJ {
551-
return ipI.Less(ipJ)
552-
}
553-
554-
return apI.Port() < apJ.Port()
555-
})
556-
}
557-
558537
if err := processTreePredicates(r, def.Predicates); err != nil {
559538
return nil, err
560539
}

0 commit comments

Comments
 (0)