delve/vendor/github.com/cilium/ebpf/internal/epoll/poller.go
2023-08-30 10:22:08 -07:00

226 lines
5.0 KiB
Go

package epoll
import (
"fmt"
"math"
"os"
"runtime"
"sync"
"time"
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/unix"
)
// Poller waits for readiness notifications from multiple file descriptors.
//
// The wait can be interrupted by calling Close.
type Poller struct {
// mutexes protect the fields declared below them. If you need to
// acquire both at once you must lock epollMu before eventMu.
epollMu sync.Mutex
epollFd int
eventMu sync.Mutex
event *eventFd
}
func New() (*Poller, error) {
epollFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, fmt.Errorf("create epoll fd: %v", err)
}
p := &Poller{epollFd: epollFd}
p.event, err = newEventFd()
if err != nil {
unix.Close(epollFd)
return nil, err
}
if err := p.Add(p.event.raw, 0); err != nil {
unix.Close(epollFd)
p.event.close()
return nil, fmt.Errorf("add eventfd: %w", err)
}
runtime.SetFinalizer(p, (*Poller).Close)
return p, nil
}
// Close the poller.
//
// Interrupts any calls to Wait. Multiple calls to Close are valid, but subsequent
// calls will return os.ErrClosed.
func (p *Poller) Close() error {
runtime.SetFinalizer(p, nil)
// Interrupt Wait() via the event fd if it's currently blocked.
if err := p.wakeWait(); err != nil {
return err
}
// Acquire the lock. This ensures that Wait isn't running.
p.epollMu.Lock()
defer p.epollMu.Unlock()
// Prevent other calls to Close().
p.eventMu.Lock()
defer p.eventMu.Unlock()
if p.epollFd != -1 {
unix.Close(p.epollFd)
p.epollFd = -1
}
if p.event != nil {
p.event.close()
p.event = nil
}
return nil
}
// Add an fd to the poller.
//
// id is returned by Wait in the unix.EpollEvent.Pad field any may be zero. It
// must not exceed math.MaxInt32.
//
// Add is blocked by Wait.
func (p *Poller) Add(fd int, id int) error {
if int64(id) > math.MaxInt32 {
return fmt.Errorf("unsupported id: %d", id)
}
p.epollMu.Lock()
defer p.epollMu.Unlock()
if p.epollFd == -1 {
return fmt.Errorf("epoll add: %w", os.ErrClosed)
}
// The representation of EpollEvent isn't entirely accurate.
// Pad is fully useable, not just padding. Hence we stuff the
// id in there, which allows us to identify the event later (e.g.,
// in case of perf events, which CPU sent it).
event := unix.EpollEvent{
Events: unix.EPOLLIN,
Fd: int32(fd),
Pad: int32(id),
}
if err := unix.EpollCtl(p.epollFd, unix.EPOLL_CTL_ADD, fd, &event); err != nil {
return fmt.Errorf("add fd to epoll: %v", err)
}
return nil
}
// Wait for events.
//
// Returns the number of pending events or an error wrapping os.ErrClosed if
// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout.
func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) {
p.epollMu.Lock()
defer p.epollMu.Unlock()
if p.epollFd == -1 {
return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed)
}
for {
timeout := int(-1)
if !deadline.IsZero() {
msec := time.Until(deadline).Milliseconds()
if msec < 0 {
// Deadline is in the past.
msec = 0
} else if msec > math.MaxInt {
// Deadline is too far in the future.
msec = math.MaxInt
}
timeout = int(msec)
}
n, err := unix.EpollWait(p.epollFd, events, timeout)
if temp, ok := err.(temporaryError); ok && temp.Temporary() {
// Retry the syscall if we were interrupted, see https://github.com/golang/go/issues/20400
continue
}
if err != nil {
return 0, err
}
if n == 0 {
return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded)
}
for _, event := range events[:n] {
if int(event.Fd) == p.event.raw {
// Since we don't read p.event the event is never cleared and
// we'll keep getting this wakeup until Close() acquires the
// lock and sets p.epollFd = -1.
return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed)
}
}
return n, nil
}
}
type temporaryError interface {
Temporary() bool
}
// wakeWait unblocks Wait if it's epoll_wait.
func (p *Poller) wakeWait() error {
p.eventMu.Lock()
defer p.eventMu.Unlock()
if p.event == nil {
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
}
return p.event.add(1)
}
// eventFd wraps a Linux eventfd.
//
// An eventfd acts like a counter: writes add to the counter, reads retrieve
// the counter and reset it to zero. Reads also block if the counter is zero.
//
// See man 2 eventfd.
type eventFd struct {
file *os.File
// prefer raw over file.Fd(), since the latter puts the file into blocking
// mode.
raw int
}
func newEventFd() (*eventFd, error) {
fd, err := unix.Eventfd(0, unix.O_CLOEXEC|unix.O_NONBLOCK)
if err != nil {
return nil, err
}
file := os.NewFile(uintptr(fd), "event")
return &eventFd{file, fd}, nil
}
func (efd *eventFd) close() error {
return efd.file.Close()
}
func (efd *eventFd) add(n uint64) error {
var buf [8]byte
internal.NativeEndian.PutUint64(buf[:], 1)
_, err := efd.file.Write(buf[:])
return err
}
func (efd *eventFd) read() (uint64, error) {
var buf [8]byte
_, err := efd.file.Read(buf[:])
return internal.NativeEndian.Uint64(buf[:]), err
}