Skip to content

Commit

Permalink
Avoid duplication/triplication/n-plication of traces for the same pro…
Browse files Browse the repository at this point in the history
…cess (#371)

* Failint tests for multi-process

* Avoid multi-instrumentation of the same process
  • Loading branch information
mariomac authored Oct 24, 2023
1 parent 6b46732 commit 4b7d6c8
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 28 deletions.
47 changes: 40 additions & 7 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package discover
import (
"context"
"fmt"
"hash/fnv"
"log/slog"
"os"
"path"
Expand All @@ -21,21 +22,25 @@ import (
// for each received Instrumentable process and forwards an ebpf.ProcessTracer instance ready to run and start
// instrumenting the executable
type TraceAttacher struct {
log *slog.Logger
Cfg *pipe.Config
Ctx context.Context
DiscoveredTracers chan *ebpf.ProcessTracer
Metrics imetrics.Reporter

log *slog.Logger
// keeps a copy of all the tracers for a given executable path
existingTracers map[string]*ebpf.ProcessTracer
}

func TraceAttacherProvider(ta TraceAttacher) (node.TerminalFunc[[]Event[Instrumentable]], error) {
ta.log = slog.With("component", "discover.TraceAttacher")
ta.existingTracers = map[string]*ebpf.ProcessTracer{}

return func(in <-chan []Event[Instrumentable]) {
mainLoop:
for instrumentables := range in {
for _, instr := range instrumentables {
if pt, ok := ta.getTracer(instr.Obj); ok {
if pt, ok := ta.getTracer(&instr.Obj); ok {
// we can create multiple tracers for the same executable (ran from different processes)
// even if we just need to instrument the executable once. TODO: deduplicate
ta.DiscoveredTracers <- pt
Expand All @@ -51,8 +56,17 @@ func TraceAttacherProvider(ta TraceAttacher) (node.TerminalFunc[[]Event[Instrume
}, nil
}

func (ta *TraceAttacher) getTracer(ie Instrumentable) (*ebpf.ProcessTracer, bool) {
// gets the
func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, bool) {
if tracer, ok := ta.existingTracers[ie.FileInfo.CmdExePath]; ok {
ta.log.Info("new process for already instrumented executable",
"pid", ie.FileInfo.Pid,
"exec", ie.FileInfo.CmdExePath)
_ = tracer
return nil, false
}
ta.log.Info("instrumenting process", "cmd", ie.FileInfo.CmdExePath, "pid", ie.FileInfo.Pid)

// builds a tracer for that executable
var programs []ebpf.Tracer
switch ie.Type {
case svc.InstrumentableGolang:
Expand Down Expand Up @@ -82,14 +96,33 @@ func (ta *TraceAttacher) getTracer(ie Instrumentable) (*ebpf.ProcessTracer, bool
return nil, false
}

return &ebpf.ProcessTracer{
tracer := &ebpf.ProcessTracer{
Programs: programs,
ELFInfo: ie.FileInfo,
Goffsets: ie.Offsets,
Exe: exe,
PinPath: path.Join(ta.Cfg.EBPF.BpfBaseDir, fmt.Sprintf("%d-%d", os.Getpid(), ie.FileInfo.Pid)),
PinPath: ta.buildPinPath(ie),
SystemWide: ta.Cfg.Discovery.SystemWide,
}, true
}
ta.existingTracers[ie.FileInfo.CmdExePath] = tracer
return tracer, true
}

// pinpath must be unique for a given executable group
// it will be:
// - current beyla PID
// - PID of the first process that matched that executable
// (don't mind if that process stops and other processes of the same executable keep using this pinPath)
// - Hash of the executable path
//
// This way we prevent improbable (almost impossible) collisions of the exec hash
// or that the first process stopped and a different process with the same PID
// started, with a different executable
func (ta *TraceAttacher) buildPinPath(ie *Instrumentable) string {
execHash := fnv.New32()
_, _ = execHash.Write([]byte(ie.FileInfo.CmdExePath))
return path.Join(ta.Cfg.EBPF.BpfBaseDir,
fmt.Sprintf("%d-%d-%x", os.Getpid(), ie.FileInfo.Pid, execHash.Sum32()))
}

// filterNotFoundPrograms will filter these programs whose required functions (as
Expand Down
5 changes: 4 additions & 1 deletion pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[Instrumentable
// if we found a process and returned its parent, it might be already
// instrumented. We skip it in that case
if _, ok := t.instrumentedPids[inst.FileInfo.Pid]; !ok {
t.log.Info("instrumenting process", "cmd", inst.FileInfo.CmdExePath, "pid", inst.FileInfo.Pid)
t.log.Debug(
"found an instrumentable process",
"type", inst.Type.String(),
"exec", inst.FileInfo.CmdExePath, "pid", inst.FileInfo.Pid)
out = append(out, Event[Instrumentable]{Type: EventCreated, Obj: inst})
t.instrumentedPids[inst.FileInfo.Pid] = struct{}{}
}
Expand Down
28 changes: 28 additions & 0 deletions test/integration/components/testserver/Dockerfile_duplicate
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Build the testserver binary
# Docker command must be invoked from the projec root directory
FROM golang:1.21 as builder

ARG TARGETARCH

ENV GOARCH=$TARGETARCH

WORKDIR /src

# Copy the go manifests and source
COPY vendor/ vendor/
COPY test/ test/
COPY go.mod go.mod
COPY go.sum go.sum

# Build
RUN go build -o testserver ./test/integration/components/testserver/testserver.go

# Create final image from minimal + built binary
FROM debian:bookworm-slim

WORKDIR /
COPY --from=builder /src/test/integration/components/testserver/duplicate_testserver.sh .
COPY --from=builder /src/testserver dupe_testserver
USER 0:0

CMD [ "sh", "/duplicate_testserver.sh" ]
12 changes: 12 additions & 0 deletions test/integration/components/testserver/duplicate_testserver.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env sh

run_testserver()
{
# prefix for start ports. E.g. 808
sp=$1
STD_PORT=${1}0 GIN_PORT=${1}1 GORILLA_PORT=${1}2 GORILLA_MID_PORT=${1}3 ./dupe_testserver -port ${1}4
}

# runs testserver twice
run_testserver 1808 &
run_testserver 1809
22 changes: 22 additions & 0 deletions test/integration/docker-compose-multiexec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ services:
- "8080:8080"
environment:
LOG_LEVEL: DEBUG
# another instance of the above image. Used to test the deduplication
# of metrics when they come from the same executable file
testserver-unused:
image: hatest-testserver
ports:
- "38080:8080"
environment:
LOG_LEVEL: DEBUG

testserver1:
build:
Expand All @@ -20,6 +28,20 @@ services:
- "8900:8900"
environment:
LOG_LEVEL: DEBUG

# image that runs two instances of the 'testserver' executable
# Used to test the deduplication
# of metrics when they come from the same executable file
testserver-duplicate:
build:
context: ../..
dockerfile: test/integration/components/testserver/Dockerfile_duplicate
image: hatest-testserver-duplicate
ports:
- "18080:18080"
- "18090:18090"
environment:
LOG_LEVEL: DEBUG

autoinstrumenter:
build:
Expand Down
79 changes: 79 additions & 0 deletions test/integration/multiprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//go:build integration

package integration

import (
"net/http"
"path"
"testing"
"time"

"github.com/mariomac/guara/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/prom"
)

func TestMultiProcess(t *testing.T) {
compose, err := docker.ComposeSuite("docker-compose-multiexec.yml", path.Join(pathOutput, "test-suite-multiexec.log"))
// we are going to setup discovery directly in the configuration file
compose.Env = append(compose.Env, `EXECUTABLE_NAME=`, `OPEN_PORT=`)
require.NoError(t, err)
require.NoError(t, compose.Up())
t.Run("Go RED metrics: usual service", func(t *testing.T) {
waitForTestComponents(t, instrumentedServiceStdURL)
testREDMetricsForHTTPLibrary(t, instrumentedServiceStdURL, "testserver", "initial-set")
// checks that, instrumenting the process from this container,
// it doesn't instrument too the process from the other container
checkReportedOnlyOnce(t, instrumentedServiceStdURL, "testserver")
})
t.Run("Go RED metrics: service 1", func(t *testing.T) {
waitForTestComponents(t, "http://localhost:8900")
testREDMetricsForHTTPLibrary(t, "http://localhost:8900", "rename1", "initial-set")
// checks that, instrumenting the process from this container,
// it doesn't instrument too the process from the other container
checkReportedOnlyOnce(t, "http://localhost:8900", "rename1")
})
t.Run("Processes in the same host are instrumented once and only once", func(t *testing.T) {
waitForTestComponents(t, "http://localhost:18080")
checkReportedOnlyOnce(t, "http://localhost:18080", "dupe_testserver")
})

t.Run("BPF pinning folders mounted", func(t *testing.T) {
// 1 pinned map for testserver and testserver-unused containers
// 1 pinned map for testserver1 container
// 1 pinned map for all the processes in testserver-duplicate container
testBPFPinningMountedWithCount(t, 3)
})

require.NoError(t, compose.Close())
t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted)
}

// Addresses bug https://github.com/grafana/beyla/issues/370 for Go executables
// Prevents that two instances of the same process report traces or metrics by duplicate
func checkReportedOnlyOnce(t *testing.T, baseURL, serviceName string) {
const path = "/check-only-once"
for i := 0; i < 3; i++ {
resp, err := http.Get(baseURL + path)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}
pq := prom.Client{HostPort: prometheusHostPort}
var results []prom.Result
test.Eventually(t, testTimeout, func(t require.TestingT) {
var err error
results, err = pq.Query(`http_server_duration_seconds_count{` +
`http_method="GET",` +
`http_status_code="200",` +
`service_name="` + serviceName + `",` +
`http_target="` + path + `"}`)
require.NoError(t, err)
// check duration_count has 3 calls and all the arguments
require.Len(t, results, 1)
assert.Equal(t, 3, totalPromCount(t, results))
}, test.Interval(1000*time.Millisecond))

}
20 changes: 0 additions & 20 deletions test/integration/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,23 +377,3 @@ func TestSuiteNoRoutes(t *testing.T) {
require.NoError(t, compose.Close())
t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted)
}

func TestSuite_MultiExec(t *testing.T) {
compose, err := docker.ComposeSuite("docker-compose-multiexec.yml", path.Join(pathOutput, "test-suite-multiexec.log"))
// we are going to setup discovery directly in the configuration file
compose.Env = append(compose.Env, `EXECUTABLE_NAME=`, `OPEN_PORT=`)
require.NoError(t, err)
require.NoError(t, compose.Up())
t.Run("Go RED metrics: usual service", func(t *testing.T) {
waitForTestComponents(t, instrumentedServiceStdURL)
testREDMetricsForHTTPLibrary(t, instrumentedServiceStdURL, "testserver", "initial-set")
})
t.Run("Go RED metrics: service 1", func(t *testing.T) {
waitForTestComponents(t, "http://localhost:8900")
testREDMetricsForHTTPLibrary(t, "http://localhost:8900", "rename1", "initial-set")
})
t.Run("BPF pinning folders mounted", func(t *testing.T) { testBPFPinningMountedWithCount(t, 2) })

require.NoError(t, compose.Close())
t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted)
}

0 comments on commit 4b7d6c8

Please sign in to comment.