diff --git a/monitoring/monitoring.go b/monitoring/monitoring.go index 4e7deb5a..d9770348 100644 --- a/monitoring/monitoring.go +++ b/monitoring/monitoring.go @@ -14,21 +14,7 @@ package monitoring -import ( - "context" - "fmt" - "math" - "sync" - - "go.opencensus.io/metric" - "go.opencensus.io/metric/metricdata" - "go.opencensus.io/metric/metricproducer" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - - "istio.io/pkg/log" -) +// OpenCensus independent metrics type ( // A Metric collects numerical observations. @@ -43,25 +29,31 @@ type ( // this is equivalent to subtracting -1 to the current value. For Gauges, // this is equivalent to setting the value to -1. For Distributions, // this is equivalent to making an observation of value -1. - Decrement() + // + // Not used in istio, removed pending new interface. + //Decrement() // Name returns the name value of a Metric. + // TODO: internal use only, make private Name() string // Record makes an observation of the provided value for the given measure. + // Majority of Istio is setting this to an int Record(value float64) // RecordInt makes an observation of the provided value for the measure. - RecordInt(value int64) + // Not actually used in Istio. + //RecordInt(value int64) // With creates a new Metric, with the LabelValues provided. This allows creating // a set of pre-dimensioned data for recording purposes. This is primarily used // for documentation and convenience. Metrics created with this method do not need // to be registered (they share the registration of their parent Metric). - With(labelValues ...LabelValue) Metric + With(labelValues ...Attr) Metric // Register configures the Metric for export. It MUST be called before collection // of values for the Metric. An error will be returned if registration fails. + // TODO: internal use only Register() error } @@ -82,7 +74,7 @@ type ( // ValueFrom may be called without any labelValues. Otherwise, the labelValues // supplied MUST match the label keys supplied at creation time both in number // and in order. - ValueFrom(valueFn func() float64, labelValues ...string) + ValueFrom(valueFn func() float64, labelValues ...Attr) } disabledMetric struct { @@ -95,80 +87,39 @@ type ( // DerivedOptions encode changes to the options passed to a DerivedMetric at creation time. DerivedOptions func(*derivedOptions) - // A Label provides a named dimension for a Metric. - Label tag.Key - - // A LabelValue represents a Label with a specific value. It is used to record - // values for a Metric. - LabelValue tag.Mutator - options struct { - unit Unit - labels []Label - useInt64 bool + unit Unit + labels []Label // Label } derivedOptions struct { labelKeys []string - valueFn func() float64 - } - - // RecordHook has a callback function which a measure is recorded. - RecordHook interface { - OnRecordFloat64Measure(f *stats.Float64Measure, tags []tag.Mutator, value float64) - OnRecordInt64Measure(i *stats.Int64Measure, tags []tag.Mutator, value int64) } ) -// Decrement implements Metric -func (dm *disabledMetric) Decrement() {} +func (dm *disabledMetric) ValueFrom(func() float64, ...Attr) { +} -// Increment implements Metric func (dm *disabledMetric) Increment() {} -// Name implements Metric func (dm *disabledMetric) Name() string { return dm.name } -// Record implements Metric func (dm *disabledMetric) Record(value float64) {} -// RecordInt implements Metric func (dm *disabledMetric) RecordInt(value int64) {} -// Register implements Metric func (dm *disabledMetric) Register() error { return nil } -// With implements Metric -func (dm *disabledMetric) With(labelValues ...LabelValue) Metric { +func (dm *disabledMetric) With(labelValues ...Attr) Metric { return dm } var _ Metric = &disabledMetric{} -var ( - recordHooks map[string]RecordHook - recordHookMutex sync.RWMutex - - derivedRegistry = metric.NewRegistry() -) - -func init() { - recordHooks = make(map[string]RecordHook) - // ensures exporters can see any derived metrics - metricproducer.GlobalManager().AddProducer(derivedRegistry) -} - -// RegisterRecordHook adds a RecordHook for a given measure. -func RegisterRecordHook(name string, h RecordHook) { - recordHookMutex.Lock() - defer recordHookMutex.Unlock() - recordHooks[name] = h -} - // WithLabels provides configuration options for a new Metric, providing the expected // dimensions for data collection for that Metric. func WithLabels(labels ...Label) Options { @@ -179,21 +130,14 @@ func WithLabels(labels ...Label) Options { // WithUnit provides configuration options for a new Metric, providing unit of measure // information for a new Metric. +// Used only 2x - once the type is part of the name ( as recommended), once is not. +// TODO: get rid of it or use consistently in ALL metrics. func WithUnit(unit Unit) Options { return func(opts *options) { opts.unit = unit } } -// WithInt64Values provides configuration options for a new Metric, indicating that -// recorded values will be saved as int64 values. Any float64 values recorded will -// converted to int64s via math.Floor-based conversion. -func WithInt64Values() Options { - return func(opts *options) { - opts.useInt64 = true - } -} - // WithLabelKeys is used to configure the label keys used by a DerivedMetric. This // option is mutually exclusive with the derived option `WithValueFrom` and will be ignored // if that option is provided. @@ -203,28 +147,26 @@ func WithLabelKeys(keys ...string) DerivedOptions { } } -// WithValueFrom is used to configure the derivation of a DerivedMetric. This option -// is mutually exclusive with the derived option `WithLabelKeys`. It acts as syntactic sugar -// that elides the need to create a DerivedMetric (with no labels) and then call `ValueFrom`. -func WithValueFrom(valueFn func() float64) DerivedOptions { - return func(opts *derivedOptions) { - opts.valueFn = valueFn - } -} +// Label is used to ease migration from opencensus. Will be eventually replaced +// with the otel attributes, but in a second stage. +type Label string -// Value creates a new LabelValue for the Label. -func (l Label) Value(value string) LabelValue { - return tag.Upsert(tag.Key(l), value) +// Attr is used to ease migration and minimize changes. Will be replaced by otel attributes. +type Attr struct { + Key string + Value string } -// MustCreateLabel will attempt to create a new Label. If -// creation fails, then this method will panic. +// MustCreateLabel is a temporary method to ease migration. func MustCreateLabel(key string) Label { - k, err := tag.NewKey(key) - if err != nil { - panic(fmt.Errorf("could not create label %q: %v", key, err)) + return Label(key) +} + +func (l Label) Value(v string) Attr { + return Attr{ + Key: string(l), + Value: v, } - return Label(k) } // MustRegister is a helper function that will ensure that the provided Metrics are @@ -254,95 +196,65 @@ func RegisterIf(metric Metric, enabled func() bool) Metric { // NewSum creates a new Metric with an aggregation type of Sum (the values will be cumulative). // That means that data collected by the new Metric will be summed before export. +// Per prom conventions, must have a name ending in _total. +// +// Istio doesn't do this for: +// +// num_outgoing_retries +// pilot_total_rejected_configs +// provider_lookup_cluster_failures +// xds_cache_reads +// xds_cache_evictions +// pilot_k8s_cfg_events +// pilot_k8s_reg_events +// pilot_k8s_endpoints_with_no_pods +// pilot_total_xds_rejects +// pilot_xds_expired_nonce +// pilot_xds_write_timeout +// pilot_xds_pushes +// pilot_push_triggers +// pilot_xds_push_context_errors +// pilot_total_xds_internal_errors +// pilot_inbound_updates +// wasm_cache_lookup_count +// wasm_remote_fetch_count +// wasm_config_conversion_count +// citadel_server_csr_count +// citadel_server_authentication_failure_count +// citadel_server_csr_parsing_err_count +// citadel_server_id_extraction_err_count +// citadel_server_csr_sign_err_count +// citadel_server_success_cert_issuance_count func NewSum(name, description string, opts ...Options) Metric { - return newMetric(name, description, view.Sum(), opts...) + return newSum(name, description, opts...) } // NewGauge creates a new Metric with an aggregation type of LastValue. That means that data collected // by the new Metric will export only the last recorded value. func NewGauge(name, description string, opts ...Options) Metric { - return newMetric(name, description, view.LastValue(), opts...) + return newGauge(name, description, opts...) } // NewDerivedGauge creates a new Metric with an aggregation type of LastValue that generates the value // dynamically according to the provided function. This can be used for values based on querying some // state within a system (when event-driven recording is not appropriate). +// +// Only 2 usages (uptime and cache expiry in node agent) in istio func NewDerivedGauge(name, description string, opts ...DerivedOptions) DerivedMetric { - options := createDerivedOptions(opts...) - m, err := derivedRegistry.AddFloat64DerivedGauge(name, - metric.WithDescription(description), - metric.WithLabelKeys(options.labelKeys...), - metric.WithUnit(metricdata.UnitDimensionless)) // TODO: allow unit in options - if err != nil { - log.Warnf("failed to add metric %q: %v", name, err) - } - derived := &derivedFloat64Metric{ - base: m, - name: name, - } - if options.valueFn != nil { - derived.ValueFrom(options.valueFn) - } - return derived + return newDerivedGauge(name, description, opts...) } // NewDistribution creates a new Metric with an aggregation type of Distribution. This means that the // data collected by the Metric will be collected and exported as a histogram, with the specified bounds. func NewDistribution(name, description string, bounds []float64, opts ...Options) Metric { - return newMetric(name, description, view.Distribution(bounds...), opts...) -} - -func newMetric(name, description string, aggregation *view.Aggregation, opts ...Options) Metric { - o := createOptions(opts...) - if o.useInt64 { - return newInt64Metric(name, description, aggregation, o) - } - return newFloat64Metric(name, description, aggregation, o) -} - -type derivedFloat64Metric struct { - base *metric.Float64DerivedGauge - - name string -} - -func (d *derivedFloat64Metric) Name() string { - return d.name -} - -// no-op -func (d *derivedFloat64Metric) Register() error { - return nil + return newDistribution(name, description, bounds, opts...) } -func (d *derivedFloat64Metric) ValueFrom(valueFn func() float64, labelValues ...string) { - if len(labelValues) == 0 { - if err := d.base.UpsertEntry(valueFn); err != nil { - log.Errorf("failed to add value for derived metric %q: %v", d.name, err) - } - return - } - lv := make([]metricdata.LabelValue, 0, len(labelValues)) - for _, l := range labelValues { - lv = append(lv, metricdata.NewLabelValue(l)) - } - if err := d.base.UpsertEntry(valueFn, lv...); err != nil { - log.Errorf("failed to add value for derived metric %q: %v", d.name, err) - } -} - -type float64Metric struct { - *stats.Float64Measure - - // tags stores all tags for the metrics - tags []tag.Mutator - // ctx is a precomputed context holding tags, as an optimization - ctx context.Context - view *view.View - - incrementMeasure []stats.Measurement - decrementMeasure []stats.Measurement -} +// Internal methods used to hook one of the conditionally compiled implementations. +var newSum func(name, description string, opts ...Options) Metric +var newGauge func(name, description string, opts ...Options) Metric +var newDistribution func(name, description string, bounds []float64, opts ...Options) Metric +var newDerivedGauge func(name, description string, opts ...DerivedOptions) DerivedMetric func createOptions(opts ...Options) *options { o := &options{unit: None, labels: make([]Label, 0)} @@ -357,174 +269,5 @@ func createDerivedOptions(opts ...DerivedOptions) *derivedOptions { for _, opt := range opts { opt(o) } - // if a valueFn is supplied, then no label values can be supplied. - // to prevent issues, drop the label keys - if o.valueFn != nil { - o.labelKeys = []string{} - } return o } - -func newFloat64Metric(name, description string, aggregation *view.Aggregation, opts *options) *float64Metric { - measure := stats.Float64(name, description, string(opts.unit)) - tagKeys := make([]tag.Key, 0, len(opts.labels)) - for _, l := range opts.labels { - tagKeys = append(tagKeys, tag.Key(l)) - } - ctx, _ := tag.New(context.Background()) //nolint:errcheck - return &float64Metric{ - Float64Measure: measure, - tags: make([]tag.Mutator, 0), - ctx: ctx, - view: &view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation}, - incrementMeasure: []stats.Measurement{measure.M(1)}, - decrementMeasure: []stats.Measurement{measure.M(-1)}, - } -} - -func (f *float64Metric) Increment() { - f.recordMeasurements(f.incrementMeasure) -} - -func (f *float64Metric) Decrement() { - f.recordMeasurements(f.decrementMeasure) -} - -func (f *float64Metric) Name() string { - return f.Float64Measure.Name() -} - -func (f *float64Metric) Record(value float64) { - recordHookMutex.RLock() - if rh, ok := recordHooks[f.Name()]; ok { - rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, value) - } - recordHookMutex.RUnlock() - m := f.M(value) - stats.Record(f.ctx, m) //nolint:errcheck -} - -func (f *float64Metric) recordMeasurements(m []stats.Measurement) { - recordHookMutex.RLock() - if rh, ok := recordHooks[f.Name()]; ok { - for _, mv := range m { - rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, mv.Value()) - } - } - recordHookMutex.RUnlock() - stats.Record(f.ctx, m...) -} - -func (f *float64Metric) RecordInt(value int64) { - f.Record(float64(value)) -} - -func (f *float64Metric) With(labelValues ...LabelValue) Metric { - t := make([]tag.Mutator, len(f.tags), len(f.tags)+len(labelValues)) - copy(t, f.tags) - for _, tagValue := range labelValues { - t = append(t, tag.Mutator(tagValue)) - } - ctx, _ := tag.New(context.Background(), t...) //nolint:errcheck - return &float64Metric{ - Float64Measure: f.Float64Measure, - tags: t, - ctx: ctx, - view: f.view, - incrementMeasure: f.incrementMeasure, - decrementMeasure: f.decrementMeasure, - } -} - -func (f *float64Metric) Register() error { - return view.Register(f.view) -} - -type int64Metric struct { - *stats.Int64Measure - - // tags stores all tags for the metrics - tags []tag.Mutator - // ctx is a precomputed context holding tags, as an optimization - ctx context.Context - view *view.View - - // incrementMeasure is a precomputed +1 measurement to avoid extra allocations in Increment() - incrementMeasure []stats.Measurement - // decrementMeasure is a precomputed -1 measurement to avoid extra allocations in Decrement() - decrementMeasure []stats.Measurement -} - -func newInt64Metric(name, description string, aggregation *view.Aggregation, opts *options) *int64Metric { - measure := stats.Int64(name, description, string(opts.unit)) - tagKeys := make([]tag.Key, 0, len(opts.labels)) - for _, l := range opts.labels { - tagKeys = append(tagKeys, tag.Key(l)) - } - ctx, _ := tag.New(context.Background()) //nolint:errcheck - return &int64Metric{ - Int64Measure: measure, - tags: make([]tag.Mutator, 0), - ctx: ctx, - view: &view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation}, - incrementMeasure: []stats.Measurement{measure.M(1)}, - decrementMeasure: []stats.Measurement{measure.M(-1)}, - } -} - -func (i *int64Metric) Increment() { - i.recordMeasurements(i.incrementMeasure) -} - -func (i *int64Metric) Decrement() { - i.recordMeasurements(i.decrementMeasure) -} - -func (i *int64Metric) Name() string { - return i.Int64Measure.Name() -} - -func (i *int64Metric) Record(value float64) { - i.RecordInt(int64(math.Floor(value))) -} - -func (i *int64Metric) recordMeasurements(m []stats.Measurement) { - recordHookMutex.RLock() - if rh, ok := recordHooks[i.Name()]; ok { - for _, mv := range m { - rh.OnRecordInt64Measure(i.Int64Measure, i.tags, int64(math.Floor(mv.Value()))) - } - } - recordHookMutex.RUnlock() - stats.Record(i.ctx, m...) //nolint:errcheck -} - -func (i *int64Metric) RecordInt(value int64) { - recordHookMutex.RLock() - if rh, ok := recordHooks[i.Name()]; ok { - rh.OnRecordInt64Measure(i.Int64Measure, i.tags, value) - } - recordHookMutex.RUnlock() - stats.Record(i.ctx, i.M(value)) //nolint:errcheck -} - -func (i *int64Metric) With(labelValues ...LabelValue) Metric { - t := make([]tag.Mutator, len(i.tags), len(i.tags)+len(labelValues)) - copy(t, i.tags) - for _, tagValue := range labelValues { - t = append(t, tag.Mutator(tagValue)) - } - ctx, _ := tag.New(context.Background(), t...) //nolint:errcheck - return &int64Metric{ - Int64Measure: i.Int64Measure, - tags: t, - ctx: ctx, - view: i.view, - incrementMeasure: i.incrementMeasure, - decrementMeasure: i.decrementMeasure, - } -} - -func (i *int64Metric) Register() error { - return view.Register(i.view) -} diff --git a/monitoring/monitoring_opencensus.go b/monitoring/monitoring_opencensus.go new file mode 100644 index 00000000..4e2e0701 --- /dev/null +++ b/monitoring/monitoring_opencensus.go @@ -0,0 +1,231 @@ +//go:build !skip_opencensus + +// Copyright 2019 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring + +import ( + "context" + "sync" + + "go.opencensus.io/metric" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "istio.io/pkg/log" +) + +var ( + recordHookMutex sync.RWMutex + recordHooks map[string]RecordHook + derivedRegistry = metric.NewRegistry() +) + +// RecordHook has a callback function which a measure is recorded. +type RecordHook interface { + OnRecordFloat64Measure(f *stats.Float64Measure, tags []tag.Mutator, value float64) + OnRecordInt64Measure(i *stats.Int64Measure, tags []tag.Mutator, value int64) +} + +func init() { + recordHooks = make(map[string]RecordHook) + // ensures exporters can see any derived metrics + metricproducer.GlobalManager().AddProducer(derivedRegistry) + newSum = newSumOC + newGauge = newGaugeOC + newDistribution = newDistributionOC + newDerivedGauge = newDerivedGaugeOpenCensus +} + +// RegisterRecordHook adds a RecordHook for a given measure. +func RegisterRecordHook(name string, h RecordHook) { + recordHookMutex.Lock() + defer recordHookMutex.Unlock() + recordHooks[name] = h +} + +// NewDistribution creates a new Metric with an aggregation type of Distribution. This means that the +// data collected by the Metric will be collected and exported as a histogram, with the specified bounds. +func newDistributionOC(name, description string, bounds []float64, opts ...Options) Metric { + return newMetricOC(name, description, view.Distribution(bounds...), opts...) +} + +func newSumOC(name, description string, opts ...Options) Metric { + return newMetricOC(name, description, view.Sum(), opts...) +} + +// NewGauge creates a new Metric with an aggregation type of LastValue. That means that data collected +// by the new Metric will export only the last recorded value. +func newGaugeOC(name, description string, opts ...Options) Metric { + return newMetricOC(name, description, view.LastValue(), opts...) +} + +func newMetricOC(name, description string, aggregation *view.Aggregation, opts ...Options) Metric { + o := createOptions(opts...) + return newFloat64Metric(name, description, aggregation, o) +} + +func newDerivedGaugeOpenCensus(name, description string, opts ...DerivedOptions) DerivedMetric { + options := createDerivedOptions(opts...) + m, err := derivedRegistry.AddFloat64DerivedGauge(name, + metric.WithDescription(description), + metric.WithLabelKeys(options.labelKeys...), + metric.WithUnit(metricdata.UnitDimensionless)) // TODO: allow unit in options + if err != nil { + log.Warnf("failed to add metric %q: %v", name, err) + } + derived := &derivedFloat64Metric{ + base: m, + name: name, + } + //if options.valueFn != nil { + // derived.ValueFrom(options.valueFn) + //} + return derived +} + +type derivedFloat64Metric struct { + base *metric.Float64DerivedGauge + + name string +} + +func (d *derivedFloat64Metric) Name() string { + return d.name +} + +// no-op +func (d *derivedFloat64Metric) Register() error { + return nil +} + +func (d *derivedFloat64Metric) ValueFrom(valueFn func() float64, labelValues ...Attr) { + if len(labelValues) == 0 { + if err := d.base.UpsertEntry(valueFn); err != nil { + log.Errorf("failed to add value for derived metric %q: %v", d.name, err) + } + return + } + lv := make([]metricdata.LabelValue, 0, len(labelValues)) + for _, l := range labelValues { + lv = append(lv, metricdata.NewLabelValue(l.Value)) + } + if err := d.base.UpsertEntry(valueFn, lv...); err != nil { + log.Errorf("failed to add value for derived metric %q: %v", d.name, err) + } +} + +type float64Metric struct { + *stats.Float64Measure + + // tags stores all tags for the metrics + tags []tag.Mutator + // ctx is a precomputed context holding tags, as an optimization + ctx context.Context + view *view.View + + incrementMeasure []stats.Measurement + decrementMeasure []stats.Measurement +} + +func newFloat64Metric(name, description string, aggregation *view.Aggregation, opts *options) *float64Metric { + measure := stats.Float64(name, description, string(opts.unit)) + tagKeys := make([]tag.Key, 0, len(opts.labels)) + for _, l := range opts.labels { + tagKeys = append(tagKeys, tag.MustNewKey(string(l))) + } + ctx, _ := tag.New(context.Background()) //nolint:errcheck + return &float64Metric{ + Float64Measure: measure, + tags: make([]tag.Mutator, 0), + ctx: ctx, + view: &view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation}, + incrementMeasure: []stats.Measurement{measure.M(1)}, + decrementMeasure: []stats.Measurement{measure.M(-1)}, + } +} + +func (f *float64Metric) Increment() { + f.recordMeasurements(f.incrementMeasure) +} + +func (f *float64Metric) Decrement() { + f.recordMeasurements(f.decrementMeasure) +} + +func (f *float64Metric) Name() string { + return f.Float64Measure.Name() +} + +func (f *float64Metric) Record(value float64) { + recordHookMutex.RLock() + if rh, ok := recordHooks[f.Name()]; ok { + rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, value) + } + recordHookMutex.RUnlock() + m := f.M(value) + stats.Record(f.ctx, m) //nolint:errcheck +} + +func (f *float64Metric) recordMeasurements(m []stats.Measurement) { + recordHookMutex.RLock() + if rh, ok := recordHooks[f.Name()]; ok { + for _, mv := range m { + rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, mv.Value()) + } + } + recordHookMutex.RUnlock() + stats.Record(f.ctx, m...) +} + +func (f *float64Metric) RecordInt(value int64) { + f.Record(float64(value)) +} + +// A LabelValue represents a Label with a specific value. It is used to record +// values for a Metric. +type LabelValue tag.Mutator + +func toLabelValues(args ...Attr) []tag.Mutator { + t := make([]tag.Mutator, len(args)) + for _, a := range args { + t = append(t, tag.Insert(tag.MustNewKey(a.Key), a.Value)) + } + return nil +} + +func (f *float64Metric) With(labelValues ...Attr) Metric { + t := make([]tag.Mutator, len(f.tags), len(f.tags)+len(labelValues)) + copy(t, f.tags) + lv := toLabelValues(labelValues...) + for _, tagValue := range lv { + t = append(t, tagValue) + } + ctx, _ := tag.New(context.Background(), t...) //nolint:errcheck + return &float64Metric{ + Float64Measure: f.Float64Measure, + tags: t, + ctx: ctx, + view: f.view, + incrementMeasure: f.incrementMeasure, + decrementMeasure: f.decrementMeasure, + } +} + +func (f *float64Metric) Register() error { + return view.Register(f.view) +}