
This picks up a bug fix that will be helpful when delve starts to test telemetry counters. Related: https://go-review.googlesource.com/c/telemetry/+/625496
402 lines
11 KiB
Go
402 lines
11 KiB
Go
// Copyright 2023 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package internal/counter implements the internals of the public counter package.
|
|
// In addition to the public API, this package also includes APIs to parse and
|
|
// manage the counter files, needed by the upload package.
|
|
package counter
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync/atomic"
|
|
)
|
|
|
|
var (
|
|
// Note: not using internal/godebug, so that internal/godebug can use
|
|
// internal/counter.
|
|
debugCounter = strings.Contains(os.Getenv("GODEBUG"), "countertrace=1")
|
|
CrashOnBugs = false // for testing; if set, exit on fatal log messages
|
|
)
|
|
|
|
// debugPrintf formats a debug message if GODEBUG=countertrace=1.
|
|
func debugPrintf(format string, args ...any) {
|
|
if debugCounter {
|
|
if len(format) == 0 || format[len(format)-1] != '\n' {
|
|
format += "\n"
|
|
}
|
|
fmt.Fprintf(os.Stderr, "counter: "+format, args...)
|
|
}
|
|
}
|
|
|
|
// debugFatalf logs a fatal error if GODEBUG=countertrace=1.
|
|
func debugFatalf(format string, args ...any) {
|
|
if debugCounter || CrashOnBugs {
|
|
if len(format) == 0 || format[len(format)-1] != '\n' {
|
|
format += "\n"
|
|
}
|
|
fmt.Fprintf(os.Stderr, "counter bug: "+format, args...)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// A Counter is a single named event counter.
|
|
// A Counter is safe for use by multiple goroutines simultaneously.
|
|
//
|
|
// Counters should typically be created using New
|
|
// and stored as global variables, like:
|
|
//
|
|
// package mypackage
|
|
// var errorCount = counter.New("mypackage/errors")
|
|
//
|
|
// (The initialization of errorCount in this example is handled
|
|
// entirely by the compiler and linker; this line executes no code
|
|
// at program startup.)
|
|
//
|
|
// Then code can call Add to increment the counter
|
|
// each time the corresponding event is observed.
|
|
//
|
|
// Although it is possible to use New to create
|
|
// a Counter each time a particular event needs to be recorded,
|
|
// that usage fails to amortize the construction cost over
|
|
// multiple calls to Add, so it is more expensive and not recommended.
|
|
type Counter struct {
|
|
name string
|
|
file *file
|
|
|
|
next atomic.Pointer[Counter]
|
|
state counterState
|
|
ptr counterPtr
|
|
}
|
|
|
|
func (c *Counter) Name() string {
|
|
return c.name
|
|
}
|
|
|
|
type counterPtr struct {
|
|
m *mappedFile
|
|
count *atomic.Uint64
|
|
}
|
|
|
|
type counterState struct {
|
|
bits atomic.Uint64
|
|
}
|
|
|
|
func (s *counterState) load() counterStateBits {
|
|
return counterStateBits(s.bits.Load())
|
|
}
|
|
|
|
func (s *counterState) update(old *counterStateBits, new counterStateBits) bool {
|
|
if s.bits.CompareAndSwap(uint64(*old), uint64(new)) {
|
|
*old = new
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type counterStateBits uint64
|
|
|
|
const (
|
|
stateReaders counterStateBits = 1<<30 - 1
|
|
stateLocked counterStateBits = stateReaders
|
|
stateHavePtr counterStateBits = 1 << 30
|
|
stateExtraShift = 31
|
|
stateExtra counterStateBits = 1<<64 - 1<<stateExtraShift
|
|
)
|
|
|
|
func (b counterStateBits) readers() int { return int(b & stateReaders) }
|
|
func (b counterStateBits) locked() bool { return b&stateReaders == stateLocked }
|
|
func (b counterStateBits) havePtr() bool { return b&stateHavePtr != 0 }
|
|
func (b counterStateBits) extra() uint64 { return uint64(b&stateExtra) >> stateExtraShift }
|
|
|
|
func (b counterStateBits) incReader() counterStateBits { return b + 1 }
|
|
func (b counterStateBits) decReader() counterStateBits { return b - 1 }
|
|
func (b counterStateBits) setLocked() counterStateBits { return b | stateLocked }
|
|
func (b counterStateBits) clearLocked() counterStateBits { return b &^ stateLocked }
|
|
func (b counterStateBits) setHavePtr() counterStateBits { return b | stateHavePtr }
|
|
func (b counterStateBits) clearHavePtr() counterStateBits { return b &^ stateHavePtr }
|
|
func (b counterStateBits) clearExtra() counterStateBits { return b &^ stateExtra }
|
|
func (b counterStateBits) addExtra(n uint64) counterStateBits {
|
|
const maxExtra = uint64(stateExtra) >> stateExtraShift // 0x1ffffffff
|
|
x := b.extra()
|
|
if x+n < x || x+n > maxExtra {
|
|
x = maxExtra
|
|
} else {
|
|
x += n
|
|
}
|
|
return b.clearExtra() | counterStateBits(x)<<stateExtraShift
|
|
}
|
|
|
|
// New returns a counter with the given name.
|
|
// New can be called in global initializers and will be compiled down to
|
|
// linker-initialized data. That is, calling New to initialize a global
|
|
// has no cost at program startup.
|
|
func New(name string) *Counter {
|
|
// Note: not calling defaultFile.New in order to keep this
|
|
// function something the compiler can inline and convert
|
|
// into static data initializations, with no init-time footprint.
|
|
return &Counter{name: name, file: &defaultFile}
|
|
}
|
|
|
|
// Inc adds 1 to the counter.
|
|
func (c *Counter) Inc() {
|
|
c.Add(1)
|
|
}
|
|
|
|
// Add adds n to the counter. n cannot be negative, as counts cannot decrease.
|
|
func (c *Counter) Add(n int64) {
|
|
debugPrintf("Add %q += %d", c.name, n)
|
|
|
|
if n < 0 {
|
|
panic("Counter.Add negative")
|
|
}
|
|
if n == 0 {
|
|
return
|
|
}
|
|
c.file.register(c)
|
|
|
|
state := c.state.load()
|
|
for ; ; state = c.state.load() {
|
|
switch {
|
|
case !state.locked() && state.havePtr():
|
|
if !c.state.update(&state, state.incReader()) {
|
|
continue
|
|
}
|
|
// Counter unlocked or counter shared; has an initialized count pointer; acquired shared lock.
|
|
if c.ptr.count == nil {
|
|
for !c.state.update(&state, state.addExtra(uint64(n))) {
|
|
// keep trying - we already took the reader lock
|
|
state = c.state.load()
|
|
}
|
|
debugPrintf("Add %q += %d: nil extra=%d\n", c.name, n, state.extra())
|
|
} else {
|
|
sum := c.add(uint64(n))
|
|
debugPrintf("Add %q += %d: count=%d\n", c.name, n, sum)
|
|
}
|
|
c.releaseReader(state)
|
|
return
|
|
|
|
case state.locked():
|
|
if !c.state.update(&state, state.addExtra(uint64(n))) {
|
|
continue
|
|
}
|
|
debugPrintf("Add %q += %d: locked extra=%d\n", c.name, n, state.extra())
|
|
return
|
|
|
|
case !state.havePtr():
|
|
if !c.state.update(&state, state.addExtra(uint64(n)).setLocked()) {
|
|
continue
|
|
}
|
|
debugPrintf("Add %q += %d: noptr extra=%d\n", c.name, n, state.extra())
|
|
c.releaseLock(state)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Counter) releaseReader(state counterStateBits) {
|
|
for ; ; state = c.state.load() {
|
|
// If we are the last reader and havePtr was cleared
|
|
// while this batch of readers was using c.ptr,
|
|
// it's our job to update c.ptr by upgrading to a full lock
|
|
// and letting releaseLock do the work.
|
|
// Note: no new reader will attempt to add itself now that havePtr is clear,
|
|
// so we are only racing against possible additions to extra.
|
|
if state.readers() == 1 && !state.havePtr() {
|
|
if !c.state.update(&state, state.setLocked()) {
|
|
continue
|
|
}
|
|
debugPrintf("releaseReader %s: last reader, need ptr\n", c.name)
|
|
c.releaseLock(state)
|
|
return
|
|
}
|
|
|
|
// Release reader.
|
|
if !c.state.update(&state, state.decReader()) {
|
|
continue
|
|
}
|
|
debugPrintf("releaseReader %s: released (%d readers now)\n", c.name, state.readers())
|
|
return
|
|
}
|
|
}
|
|
|
|
func (c *Counter) releaseLock(state counterStateBits) {
|
|
for ; ; state = c.state.load() {
|
|
if !state.havePtr() {
|
|
// Set havePtr before updating ptr,
|
|
// to avoid race with the next clear of havePtr.
|
|
if !c.state.update(&state, state.setHavePtr()) {
|
|
continue
|
|
}
|
|
debugPrintf("releaseLock %s: reset havePtr (extra=%d)\n", c.name, state.extra())
|
|
|
|
// Optimization: only bother loading a new pointer
|
|
// if we have a value to add to it.
|
|
c.ptr = counterPtr{nil, nil}
|
|
if state.extra() != 0 {
|
|
c.ptr = c.file.lookup(c.name)
|
|
debugPrintf("releaseLock %s: ptr=%v\n", c.name, c.ptr)
|
|
}
|
|
}
|
|
|
|
if extra := state.extra(); extra != 0 && c.ptr.count != nil {
|
|
if !c.state.update(&state, state.clearExtra()) {
|
|
continue
|
|
}
|
|
sum := c.add(extra)
|
|
debugPrintf("releaseLock %s: flush extra=%d -> count=%d\n", c.name, extra, sum)
|
|
}
|
|
|
|
// Took care of refreshing ptr and flushing extra.
|
|
// Now we can release the lock, unless of course
|
|
// another goroutine cleared havePtr or added to extra,
|
|
// in which case we go around again.
|
|
if !c.state.update(&state, state.clearLocked()) {
|
|
continue
|
|
}
|
|
debugPrintf("releaseLock %s: unlocked\n", c.name)
|
|
return
|
|
}
|
|
}
|
|
|
|
// add wraps the atomic.Uint64.Add operation to handle integer overflow.
|
|
func (c *Counter) add(n uint64) uint64 {
|
|
count := c.ptr.count
|
|
for {
|
|
old := count.Load()
|
|
sum := old + n
|
|
if sum < old {
|
|
sum = ^uint64(0)
|
|
}
|
|
if count.CompareAndSwap(old, sum) {
|
|
runtime.KeepAlive(c.ptr.m)
|
|
return sum
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Counter) invalidate() {
|
|
for {
|
|
state := c.state.load()
|
|
if !state.havePtr() {
|
|
debugPrintf("invalidate %s: no ptr\n", c.name)
|
|
return
|
|
}
|
|
if c.state.update(&state, state.clearHavePtr()) {
|
|
debugPrintf("invalidate %s: cleared havePtr\n", c.name)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Counter) refresh() {
|
|
for {
|
|
state := c.state.load()
|
|
if state.havePtr() || state.readers() > 0 || state.extra() == 0 {
|
|
debugPrintf("refresh %s: havePtr=%v readers=%d extra=%d\n", c.name, state.havePtr(), state.readers(), state.extra())
|
|
return
|
|
}
|
|
if c.state.update(&state, state.setLocked()) {
|
|
debugPrintf("refresh %s: locked havePtr=%v readers=%d extra=%d\n", c.name, state.havePtr(), state.readers(), state.extra())
|
|
c.releaseLock(state)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read reads the given counter.
|
|
// This is the implementation of x/telemetry/counter/countertest.ReadCounter.
|
|
func Read(c *Counter) (uint64, error) {
|
|
if c.file.current.Load() == nil {
|
|
return c.state.load().extra(), nil
|
|
}
|
|
pf, err := readFile(c.file)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
v, ok := pf.Count[DecodeStack(c.Name())]
|
|
if !ok {
|
|
return v, fmt.Errorf("not found:%q", DecodeStack(c.Name()))
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
func readFile(f *file) (*File, error) {
|
|
if f == nil {
|
|
debugPrintf("No file")
|
|
return nil, fmt.Errorf("counter is not initialized - was Open called?")
|
|
}
|
|
|
|
// Note: don't call f.rotate here as this will enqueue a follow-up rotation.
|
|
f.rotate1()
|
|
|
|
if f.err != nil {
|
|
return nil, fmt.Errorf("failed to rotate mapped file - %v", f.err)
|
|
}
|
|
current := f.current.Load()
|
|
if current == nil {
|
|
return nil, fmt.Errorf("counter has no mapped file")
|
|
}
|
|
name := current.f.Name()
|
|
data, err := ReadMapped(name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read from file: %v", err)
|
|
}
|
|
pf, err := Parse(name, data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse: %v", err)
|
|
}
|
|
return pf, nil
|
|
}
|
|
|
|
// ReadFile reads the counters and stack counters from the given file.
|
|
// This is the implementation of x/telemetry/counter/countertest.ReadFile.
|
|
func ReadFile(name string) (counters, stackCounters map[string]uint64, _ error) {
|
|
// TODO: Document the format of the stackCounters names.
|
|
|
|
data, err := ReadMapped(name)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to read from file: %v", err)
|
|
}
|
|
pf, err := Parse(name, data)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to parse: %v", err)
|
|
}
|
|
counters = make(map[string]uint64)
|
|
stackCounters = make(map[string]uint64)
|
|
for k, v := range pf.Count {
|
|
if IsStackCounter(k) {
|
|
stackCounters[DecodeStack(k)] = v
|
|
} else {
|
|
counters[k] = v
|
|
}
|
|
}
|
|
return counters, stackCounters, nil
|
|
}
|
|
|
|
// ReadMapped reads the contents of the given file by memory mapping.
|
|
//
|
|
// This avoids file synchronization issues.
|
|
func ReadMapped(name string) ([]byte, error) {
|
|
f, err := os.OpenFile(name, os.O_RDWR, 0666)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mapping, err := memmap(f)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data := make([]byte, fi.Size())
|
|
copy(data, mapping.Data)
|
|
munmap(mapping)
|
|
return data, nil
|
|
}
|