Commit 3b15fe0e authored by Michael Droettboom's avatar Michael Droettboom
Browse files

Parameterize FunctionalDistribution

parent 081f94dc
Loading
Loading
Loading
Loading
+60 −48
Original line number Diff line number Diff line
@@ -28,57 +28,15 @@ import kotlin.math.log
 * @param sum the accumulated sum of all the samples in the histogram
 */
data class FunctionalHistogram(
    val logBase: Double,
    val bucketsPerMagnitude: Double,
    // map from bucket limits to accumulated values
    val values: MutableMap<Long, Long> = mutableMapOf(),
    var sum: Long = 0
) {
    companion object {
        // The base of the logarithm used to determine bucketing
        internal const val LOG_BASE = 2.0

        // The buckets per each order of magnitude of the logarithm.
        internal const val BUCKETS_PER_MAGNITUDE = 8.0

        // The combined log base and buckets per magnitude.
        internal val EXPONENT = pow(LOG_BASE, 1.0 / BUCKETS_PER_MAGNITUDE)

        /**
         * Maps a sample to a "bucket index" that it belongs in.
         * A "bucket index" is the consecutive integer index of each bucket, useful as a
         * mathematical concept, even though the internal representation is stored and
         * sent using the minimum value in each bucket.
         *
         * @param sample The data sample
         * @return The bucket index the sample belongs in
         */
        internal fun sampleToBucketIndex(sample: Long): Long {
            return log(sample.toDouble() + 1, EXPONENT).toLong()
        }

        /**
         * Determines the minimum value of a bucket, given a bucket index.
         *
         * @param bucketIndex The ordinal index of a bucket
         * @return The minimum value of the bucket
         */
        internal fun bucketIndexToBucketMinimum(bucketIndex: Long): Long {
            return pow(EXPONENT, bucketIndex.toDouble()).toLong()
        }

        /**
         * Maps a sample to the minimum value of the bucket it belongs in.
         *
         * @param sample The sample value
         @ @return the minimum value of the bucket the sample belongs in
         */
        internal fun sampleToBucketMinimum(sample: Long): Long {
            return if (sample == 0L) {
                0L
            } else {
                bucketIndexToBucketMinimum(sampleToBucketIndex(sample))
            }
        }
    private val exponent = pow(logBase, 1.0 / bucketsPerMagnitude)

    companion object {
        /**
         * Factory function that takes stringified JSON and converts it back into a
         * [FunctionalHistogram].
@@ -95,6 +53,17 @@ data class FunctionalHistogram(
                return null
            }

            val logBase = try {
                jsonObject.getDouble("log_base")
            } catch (e: org.json.JSONException) {
                return null
            }
            val bucketsPerMagnitude = try {
                jsonObject.getDouble("buckets_per_magnitude")
            } catch (e: org.json.JSONException) {
                return null
            }

            // Attempt to parse the values map, if it fails then something is wrong and we need to
            // return null.
            val values = try {
@@ -113,12 +82,51 @@ data class FunctionalHistogram(
            val sum = jsonObject.tryGetLong("sum") ?: return null

            return FunctionalHistogram(
                logBase = logBase,
                bucketsPerMagnitude = bucketsPerMagnitude,
                values = values,
                sum = sum
            )
        }
    }

    /**
      * Maps a sample to a "bucket index" that it belongs in.
      * A "bucket index" is the consecutive integer index of each bucket, useful as a
      * mathematical concept, even though the internal representation is stored and
      * sent using the minimum value in each bucket.
      *
      * @param sample The data sample
      * @return The bucket index the sample belongs in
      */
    internal fun sampleToBucketIndex(sample: Long): Long {
        return log(sample.toDouble() + 1, exponent).toLong()
    }

    /**
      * Determines the minimum value of a bucket, given a bucket index.
      *
      * @param bucketIndex The ordinal index of a bucket
      * @return The minimum value of the bucket
      */
    internal fun bucketIndexToBucketMinimum(bucketIndex: Long): Long {
        return pow(exponent, bucketIndex.toDouble()).toLong()
    }

    /**
      * Maps a sample to the minimum value of the bucket it belongs in.
      *
      * @param sample The sample value
      * @return the minimum value of the bucket the sample belongs in
      */
    internal fun sampleToBucketMinimum(sample: Long): Long {
        return if (sample == 0L) {
            0L
        } else {
            bucketIndexToBucketMinimum(sampleToBucketIndex(sample))
        }
    }

    // This is a calculated read-only property that returns the total count of accumulated values
    val count: Long
        get() = values.map { it.value }.sum()
@@ -139,10 +147,12 @@ data class FunctionalHistogram(
     * Helper function to build the [FunctionalHistogram] into a JSONObject for serialization
     * purposes.
     *
     * @return The histogram as JSON for persistence
     * @return The histogram as [JSONObject] for persistence
     */
    internal fun toJsonObject(): JSONObject {
        return JSONObject(mapOf(
            "log_base" to logBase,
            "buckets_per_magnitude" to bucketsPerMagnitude,
            "values" to values.mapKeys { "${it.key}" },
            "sum" to sum
        ))
@@ -152,7 +162,9 @@ data class FunctionalHistogram(
     * Helper function to build the [FunctionalHistogram] into a JSONObject for sending in the
     * ping payload.
     *
     * @return The histogram as JSON for a ping payload
     * All buckets [min, max + 1] are included in the histogram, even if the have zero values.
     *
     * @return The histogram as [JSONObject] for a ping payload
     */
    internal fun toJsonPayloadObject(): JSONObject {
        val completeValues = if (values.size != 0) {
+2 −2
Original line number Diff line number Diff line
@@ -167,12 +167,12 @@ data class PrecomputedHistogram(
     * ping payload. Compared to [toJsonObject] which is designed for lossless roundtripping:
     *
     *   - this does not include the bucketing parameters
     *   - all buckets [0, max) are inserted into values
     *   - all buckets [0, max + 1] are inserted into values
     *
     * @return The histogram as JSON to send in a ping payload
     */
    internal fun toJsonPayloadObject(): JSONObject {
        // Include all buckets [0, max), where max is the maximum bucket with
        // Include all buckets [0, max + 1], where max is the maximum bucket with
        // any value recorded.
        val contiguousValues = if (!values.isEmpty()) {
            val bucketMax = values.keys.max()!!
+8 −2
Original line number Diff line number Diff line
@@ -25,6 +25,12 @@ internal open class MemoryDistributionsStorageEngineImplementation(
) : GenericStorageEngine<FunctionalHistogram>() {

    companion object {
        // The base of the logarithm used to determine bucketing
        internal const val LOG_BASE = 2.0

        // The buckets per each order of magnitude of the logarithm.
        internal const val BUCKETS_PER_MAGNITUDE = 16.0

        // Set a maximum recordable value of 1 terabyte so the buckets aren't
        // completely unbounded.
        internal const val MAX_BYTES: Long = 1L shl 40
@@ -126,14 +132,14 @@ internal open class MemoryDistributionsStorageEngineImplementation(
            // Too large samples should just be truncated, but otherwise we record and handle them
        }

        val dummy = FunctionalHistogram()
        val dummy = FunctionalHistogram(LOG_BASE, BUCKETS_PER_MAGNITUDE)
        validSamples.forEach { sample ->
            super.recordMetric(metricData, dummy, null) { currentValue, _ ->
                currentValue?.let {
                    it.accumulate(sample)
                    it
                } ?: let {
                    val newMD = FunctionalHistogram()
                    val newMD = FunctionalHistogram(LOG_BASE, BUCKETS_PER_MAGNITUDE)
                    newMD.accumulate(sample)
                    return@let newMD
                }
+8 −2
Original line number Diff line number Diff line
@@ -25,6 +25,12 @@ internal open class TimingDistributionsStorageEngineImplementation(
) : GenericStorageEngine<FunctionalHistogram>() {

    companion object {
        // The base of the logarithm used to determine bucketing
        internal const val LOG_BASE = 2.0

        // The buckets per each order of magnitude of the logarithm.
        internal const val BUCKETS_PER_MAGNITUDE = 8.0

        // Maximum time of 10 minutes in nanoseconds. This maximum means we
        // retain a maximum of 313 buckets.
        internal const val MAX_SAMPLE_TIME: Long = 1000L * 1000L * 1000L * 60L * 10L
@@ -124,14 +130,14 @@ internal open class TimingDistributionsStorageEngineImplementation(
            // Too long samples should just be truncated, but otherwise we record and handle them
        }

        val dummy = FunctionalHistogram()
        val dummy = FunctionalHistogram(LOG_BASE, BUCKETS_PER_MAGNITUDE)
        validSamples.forEach { sample ->
            super.recordMetric(metricData, dummy, null) { currentValue, _ ->
                currentValue?.let {
                    it.accumulate(sample)
                    it
                } ?: let {
                    val newTD = FunctionalHistogram()
                    val newTD = FunctionalHistogram(LOG_BASE, BUCKETS_PER_MAGNITUDE)
                    newTD.accumulate(sample)
                    return@let newTD
                }
+7 −5
Original line number Diff line number Diff line
@@ -21,30 +21,32 @@ class FunctionalHistogramTest {

    @Test
    fun `sampleToBucketMinimum correctly rounds down`() {
        val hist = FunctionalHistogram(2.0, 8.0)

        // Check each of the first 100 integers, where numerical accuracy of the round-tripping
        // is most potentially problematic
        for (i in (0..100)) {
            val value = i.toLong()
            val bucketMinimum = FunctionalHistogram.sampleToBucketMinimum(value)
            val bucketMinimum = hist.sampleToBucketMinimum(value)
            assert(bucketMinimum <= value)

            assertEquals(bucketMinimum, FunctionalHistogram.sampleToBucketMinimum(bucketMinimum))
            assertEquals(bucketMinimum, hist.sampleToBucketMinimum(bucketMinimum))
        }

        // Do an exponential sampling of higher numbers
        for (i in (11..500)) {
            val value = pow(1.5, i.toDouble()).toLong()
            val bucketMinimum = FunctionalHistogram.sampleToBucketMinimum(value)
            val bucketMinimum = hist.sampleToBucketMinimum(value)
            assert(bucketMinimum <= value)

            assertEquals(bucketMinimum, FunctionalHistogram.sampleToBucketMinimum(bucketMinimum))
            assertEquals(bucketMinimum, hist.sampleToBucketMinimum(bucketMinimum))
        }
    }

    @Test
    fun `toJsonObject correctly converts a FunctionalHistogram object`() {
        // Define a FunctionalHistogram object
        val tdd = FunctionalHistogram()
        val tdd = FunctionalHistogram(2.0, 8.0)

        // Accumulate some samples to populate sum and values properties
        tdd.accumulate(1L)
Loading