Commit 03b2b56f authored by itchyonion's avatar itchyonion
Browse files

Fix broker race condition

parent c983c13a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -154,8 +154,8 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
		heap.Push(ctx.restrictedSnowflakes, snowflake)
	}
	ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
	ctx.snowflakeLock.Unlock()
	ctx.idToSnowflake[id] = snowflake
	ctx.snowflakeLock.Unlock()
	return snowflake
}

+22 −19
Original line number Diff line number Diff line
@@ -190,19 +190,10 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {

	offer.fingerprint = BridgeFingerprint.ToBytes()

	// Only hand out known restricted snowflakes to unrestricted clients
	var snowflakeHeap *SnowflakeHeap
	if offer.natType == NATUnrestricted {
		snowflakeHeap = i.ctx.restrictedSnowflakes
	snowflake := i.matchSnowflake(offer.natType)
	if snowflake != nil {
		snowflake.offerChannel <- offer
	} else {
		snowflakeHeap = i.ctx.snowflakes
	}

	// Immediately fail if there are no snowflakes available.
	i.ctx.snowflakeLock.Lock()
	numSnowflakes := snowflakeHeap.Len()
	i.ctx.snowflakeLock.Unlock()
	if numSnowflakes <= 0 {
		i.ctx.metrics.lock.Lock()
		i.ctx.metrics.clientDeniedCount++
		i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
@@ -216,13 +207,6 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
		return sendClientResponse(resp, response)
	}

	// Otherwise, find the most available snowflake proxy, and pass the offer to it.
	// Delete must be deferred in order to correctly process answer request later.
	i.ctx.snowflakeLock.Lock()
	snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
	i.ctx.snowflakeLock.Unlock()
	snowflake.offerChannel <- offer

	// Wait for the answer to be returned on the channel or timeout.
	select {
	case answer := <-snowflake.answerChannel:
@@ -248,6 +232,25 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
	return err
}

func (i *IPC) matchSnowflake(natType string) *Snowflake {
	// Only hand out known restricted snowflakes to unrestricted clients
	var snowflakeHeap *SnowflakeHeap
	if natType == NATUnrestricted {
		snowflakeHeap = i.ctx.restrictedSnowflakes
	} else {
		snowflakeHeap = i.ctx.snowflakes
	}

	i.ctx.snowflakeLock.Lock()
	defer i.ctx.snowflakeLock.Unlock()

	if snowflakeHeap.Len() > 0 {
		return heap.Pop(snowflakeHeap).(*Snowflake)
	} else {
		return nil
	}
}

func (i *IPC) ProxyAnswers(arg messages.Arg, response *[]byte) error {
	answer, id, err := messages.DecodeAnswerRequest(arg.Body)
	if err != nil || answer == "" {