Skip to content

Commit

Permalink
Add visualization tool to debug image sources
Browse files Browse the repository at this point in the history
  • Loading branch information
phillebaba committed May 21, 2024
1 parent 025ae72 commit f857e58
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 0 deletions.
50 changes: 50 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"net/http"
"net/http/pprof"
"net/netip"
"net/url"
"os"
"os/signal"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/state"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)

type ConfigurationCmd struct {
Expand Down Expand Up @@ -65,7 +67,16 @@ type RegistryCmd struct {
ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."`
}

type VisualizationCmd struct {
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
ContainerdSock string `arg:"--containerd-sock,env:CONTAINERD_SOCK" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace,env:CONTAINERD_NAMESPACE" default:"k8s.io" help:"Containerd namespace to fetch images from."`
ContainerdContentPath string `arg:"--containerd-content-path,env:CONTAINERD_CONTENT_PATH" default:"/var/lib/containerd/io.containerd.content.v1.content" help:"Path to Containerd content store"`
Registries []url.URL `arg:"--registries,env:REGISTRIES,required" help:"registries that are configured to be mirrored."`
}

type Arguments struct {
Visualization *VisualizationCmd `arg:"subcommand:visualization"`
Configuration *ConfigurationCmd `arg:"subcommand:configuration"`
Registry *RegistryCmd `arg:"subcommand:registry"`
LogLevel slog.Level `arg:"--log-level,env:LOG_LEVEL" default:"INFO" help:"Minimum log level to output. Value should be DEBUG, INFO, WARN, or ERROR."`
Expand Down Expand Up @@ -96,6 +107,8 @@ func run(ctx context.Context, args *Arguments) error {
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()
switch {
case args.Visualization != nil:
return visualizeCommand(ctx, args.Visualization)
case args.Configuration != nil:
return configurationCommand(ctx, args.Configuration)
case args.Registry != nil:
Expand All @@ -105,6 +118,25 @@ func run(ctx context.Context, args *Arguments) error {
}
}

func visualizeCommand(_ context.Context, args *VisualizationCmd) error {
ociClient, err := oci.NewContainerd(args.ContainerdSock, args.ContainerdNamespace, args.ContainerdRegistryConfigPath, args.Registries, oci.WithContentPath(args.ContainerdContentPath))
if err != nil {
return err
}
eventStore := visualize.NewMemoryStore()
eventStore.Record("foobar", netip.MustParseAddr("10.0.0.0"), 200, true)
eventStore.Record("sha256:1f1a2d56de1d604801a9671f301190704c25d604a416f59e03c04f5c6ffee0d6", netip.MustParseAddr("10.0.0.0"), 404, true)
eventStore.Record("serve", netip.MustParseAddr("10.0.0.0"), 404, false)
eventStore.Record("serve", netip.MustParseAddr("10.0.0.1"), 200, false)
vSvr := visualize.NewServer(eventStore, ociClient)
svr := vSvr.Server(":9091")
err = svr.ListenAndServe()
if err != nil {
return err
}
return nil
}

func configurationCommand(ctx context.Context, args *ConfigurationCmd) error {
fs := afero.NewOsFs()
err := oci.AddMirrorConfiguration(ctx, fs, args.ContainerdRegistryConfigPath, args.Registries, args.MirrorRegistries, args.ResolveTags, args.AppendMirrors)
Expand Down Expand Up @@ -188,13 +220,31 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
return nil
})

// Visualizer
eventStore := visualize.NewMemoryStore()
vis := visualize.NewServer(eventStore, ociClient)
visSrv := vis.Server(":9091")
g.Go(func() error {
if err := visSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
g.Go(func() error {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return visSrv.Shutdown(shutdownCtx)
})

// Registry
registryOpts := []registry.Option{
registry.WithResolveLatestTag(args.ResolveLatestTag),
registry.WithResolveRetries(args.MirrorResolveRetries),
registry.WithResolveTimeout(args.MirrorResolveTimeout),
registry.WithLocalAddress(args.LocalAddr),
registry.WithLogger(log),
registry.WithEventStore(eventStore),
}
if args.BlobSpeed != nil {
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))
Expand Down
39 changes: 39 additions & 0 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/netip"
"net/url"
"path"
"strconv"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)

const (
Expand All @@ -37,6 +39,7 @@ type Registry struct {
resolveRetries int
resolveTimeout time.Duration
resolveLatestTag bool
eventStore visualize.EventStore
}

type Option func(*Registry)
Expand Down Expand Up @@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option {
}
}

func WithEventStore(eventStore visualize.EventStore) Option {
return func(r *Registry) {
r.eventStore = eventStore
}
}

func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry {
r := &Registry{
ociClient: ociClient,
Expand Down Expand Up @@ -188,6 +197,23 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
return "mirror"
}

defer func() {
if req.Method != http.MethodGet {
return
}
id := ref.name
if id == "" {
// First 7 character plus sha256: prefix
id = ref.dgst.String()[:14]
}
ip := getClientIP(req)
addr, err := netip.ParseAddr(ip)
if err != nil {
return
}
r.eventStore.Record(id, addr, rw.Status(), false)
}()

// Serve registry endpoints.
switch ref.kind {
case referenceKindManifest:
Expand Down Expand Up @@ -288,6 +314,19 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
return nil
}
proxy.ServeHTTP(rw, req)

// Track image events if enabled
if r.eventStore != nil {
if req.Method != http.MethodGet {
return
}
id := ref.name
if id == "" {
id = ref.dgst.String()
}
r.eventStore.Record(id, ipAddr.Addr(), rw.Status(), true)
}

if !succeeded {
break
}
Expand Down
138 changes: 138 additions & 0 deletions pkg/visualize/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package visualize

import (
"fmt"
"net/http"
"net/netip"
"strings"
"sync"
)

type EventStore interface {
Record(id string, peer netip.Addr, status int, mirror bool)
FilterById(include []string) EventStore
FilterByDirection(rootIsSource bool) EventStore
Dot() string
}

// TODO: Include blob or manifest
type edge struct {
node string
id string
status int
rootIsSource bool
}

var _ EventStore = &MemoryStore{}

type MemoryStore struct {
mx sync.RWMutex
edges []edge
edgeIndex map[string]int
}

func NewMemoryStore() *MemoryStore {
return &MemoryStore{
edges: []edge{},
edgeIndex: map[string]int{},
}
}

func (m *MemoryStore) Record(id string, peer netip.Addr, status int, mirror bool) {
m.mx.Lock()
defer m.mx.Unlock()

e := edge{
node: peer.String(),
id: id,
status: status,
rootIsSource: mirror,
}
m.edges = append(m.edges, e)
m.edgeIndex[id] = len(m.edges) - 1
}

func (m *MemoryStore) FilterById(include []string) EventStore {
m.mx.RLock()
defer m.mx.RUnlock()

f := NewMemoryStore()
for _, v := range include {
idx, ok := m.edgeIndex[v]
if !ok {
continue
}
edge := m.edges[idx]
f.edges = append(f.edges, edge)
f.edgeIndex[v] = len(f.edges) - 1
}
return f
}

func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore {
m.mx.RLock()
defer m.mx.RUnlock()

f := NewMemoryStore()
for _, edge := range m.edges {
if edge.rootIsSource != rootIsSource {
continue
}
f.edges = append(f.edges, edge)
f.edgeIndex[edge.id] = len(f.edges) - 1
}
return f
}

func (m *MemoryStore) Dot() string {
m.mx.RLock()
defer m.mx.RUnlock()

nodeIndex := map[string]interface{}{}
dotNodes := []string{}
dotEdges := []string{}
for _, edge := range m.edges {
color := ""
switch edge.status {
case 0:
color = "yellow"
case http.StatusOK:
color = "green"
default:
color = "red"
}

src := "n0"
dest := edge.node
if !edge.rootIsSource {
src = edge.node
dest = "n0"
}

id := edge.id
if strings.HasPrefix(id, "sha256:") {
id = id[:14]
}
dotEdge := fmt.Sprintf(`%q -> %q[color=%s tooltip=%q]`, src, dest, color, id)
dotEdges = append(dotEdges, dotEdge)

if _, ok := nodeIndex[edge.node]; ok {
continue
}
nodeIndex[edge.node] = nil
dotNode := fmt.Sprintf(`%q[label="%s"];`, edge.node, edge.node)
dotNodes = append(dotNodes, dotNode)
}

dot := fmt.Sprintf(`digraph {
layout="circo";
root="n0";
n0[label="host"]
%s
%s
}`, strings.Join(dotNodes, "\n\t"), strings.Join(dotEdges, "\n\t"))
return strings.TrimSpace(dot)
}
30 changes: 30 additions & 0 deletions pkg/visualize/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package visualize

import (
"net/http"
"net/netip"
"os"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestMemoryStore(t *testing.T) {
store := NewMemoryStore()
store.Record("one", netip.MustParseAddr("127.0.0.1"), http.StatusOK, true)
store.Record("two", netip.MustParseAddr("127.0.0.1"), http.StatusOK, true)
store.Record("three", netip.MustParseAddr("0.0.0.0"), http.StatusOK, true)

dot := store.Dot()
b, err := os.ReadFile("./testdata/all.dot")
require.NoError(t, err)
expected := strings.TrimSpace(string(b))
require.Equal(t, expected, dot)

dot = store.Filter([]string{"two", "three"}).Dot()

Check failure on line 25 in pkg/visualize/store_test.go

View workflow job for this annotation

GitHub Actions / unit

store.Filter undefined (type *MemoryStore has no field or method Filter)

Check failure on line 25 in pkg/visualize/store_test.go

View workflow job for this annotation

GitHub Actions / lint

store.Filter undefined (type *MemoryStore has no field or method Filter) (typecheck)
b, err = os.ReadFile("./testdata/filter.dot")
require.NoError(t, err)
expected = strings.TrimSpace(string(b))
require.Equal(t, expected, dot)
}
13 changes: 13 additions & 0 deletions pkg/visualize/testdata/all.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
digraph {
layout="circo";
root="n0";

n0[label="host"]

"127.0.0.1"[label="127.0.0.1"];
"0.0.0.0"[label="0.0.0.0"];

"n0" -> "127.0.0.1"[color=green tooltip="one"]
"n0" -> "127.0.0.1"[color=green tooltip="two"]
"n0" -> "0.0.0.0"[color=green tooltip="three"]
}
12 changes: 12 additions & 0 deletions pkg/visualize/testdata/filter.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
digraph {
layout="circo";
root="n0";

n0[label="host"]

"127.0.0.1"[label="127.0.0.1"];
"0.0.0.0"[label="0.0.0.0"];

"n0" -> "127.0.0.1"[color=green tooltip="two"]
"n0" -> "0.0.0.0"[color=green tooltip="three"]
}
Loading

0 comments on commit f857e58

Please sign in to comment.