From f857e58659cc37d7a9f55fe4c9dd1afa96a0cb51 Mon Sep 17 00:00:00 2001 From: Philip Laine Date: Sun, 19 May 2024 11:03:38 +0200 Subject: [PATCH] Add visualization tool to debug image sources --- main.go | 50 +++++++ pkg/registry/registry.go | 39 ++++++ pkg/visualize/store.go | 138 +++++++++++++++++++ pkg/visualize/store_test.go | 30 +++++ pkg/visualize/testdata/all.dot | 13 ++ pkg/visualize/testdata/filter.dot | 12 ++ pkg/visualize/visualize.go | 214 ++++++++++++++++++++++++++++++ 7 files changed, 496 insertions(+) create mode 100644 pkg/visualize/store.go create mode 100644 pkg/visualize/store_test.go create mode 100644 pkg/visualize/testdata/all.dot create mode 100644 pkg/visualize/testdata/filter.dot create mode 100644 pkg/visualize/visualize.go diff --git a/main.go b/main.go index 907e844d..4133a10c 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "net/http/pprof" + "net/netip" "net/url" "os" "os/signal" @@ -29,6 +30,7 @@ import ( "github.com/spegel-org/spegel/pkg/routing" "github.com/spegel-org/spegel/pkg/state" "github.com/spegel-org/spegel/pkg/throttle" + "github.com/spegel-org/spegel/pkg/visualize" ) type ConfigurationCmd struct { @@ -65,7 +67,16 @@ type RegistryCmd struct { ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."` } +type VisualizationCmd struct { + ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path,env:CONTAINERD_REGISTRY_CONFIG_PATH" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."` + ContainerdSock string `arg:"--containerd-sock,env:CONTAINERD_SOCK" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."` + ContainerdNamespace string `arg:"--containerd-namespace,env:CONTAINERD_NAMESPACE" default:"k8s.io" help:"Containerd namespace to fetch images from."` + ContainerdContentPath string `arg:"--containerd-content-path,env:CONTAINERD_CONTENT_PATH" default:"/var/lib/containerd/io.containerd.content.v1.content" help:"Path to Containerd content store"` + Registries []url.URL `arg:"--registries,env:REGISTRIES,required" help:"registries that are configured to be mirrored."` +} + type Arguments struct { + Visualization *VisualizationCmd `arg:"subcommand:visualization"` Configuration *ConfigurationCmd `arg:"subcommand:configuration"` Registry *RegistryCmd `arg:"subcommand:registry"` LogLevel slog.Level `arg:"--log-level,env:LOG_LEVEL" default:"INFO" help:"Minimum log level to output. Value should be DEBUG, INFO, WARN, or ERROR."` @@ -96,6 +107,8 @@ func run(ctx context.Context, args *Arguments) error { ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM) defer cancel() switch { + case args.Visualization != nil: + return visualizeCommand(ctx, args.Visualization) case args.Configuration != nil: return configurationCommand(ctx, args.Configuration) case args.Registry != nil: @@ -105,6 +118,25 @@ func run(ctx context.Context, args *Arguments) error { } } +func visualizeCommand(_ context.Context, args *VisualizationCmd) error { + ociClient, err := oci.NewContainerd(args.ContainerdSock, args.ContainerdNamespace, args.ContainerdRegistryConfigPath, args.Registries, oci.WithContentPath(args.ContainerdContentPath)) + if err != nil { + return err + } + eventStore := visualize.NewMemoryStore() + eventStore.Record("foobar", netip.MustParseAddr("10.0.0.0"), 200, true) + eventStore.Record("sha256:1f1a2d56de1d604801a9671f301190704c25d604a416f59e03c04f5c6ffee0d6", netip.MustParseAddr("10.0.0.0"), 404, true) + eventStore.Record("serve", netip.MustParseAddr("10.0.0.0"), 404, false) + eventStore.Record("serve", netip.MustParseAddr("10.0.0.1"), 200, false) + vSvr := visualize.NewServer(eventStore, ociClient) + svr := vSvr.Server(":9091") + err = svr.ListenAndServe() + if err != nil { + return err + } + return nil +} + func configurationCommand(ctx context.Context, args *ConfigurationCmd) error { fs := afero.NewOsFs() err := oci.AddMirrorConfiguration(ctx, fs, args.ContainerdRegistryConfigPath, args.Registries, args.MirrorRegistries, args.ResolveTags, args.AppendMirrors) @@ -188,6 +220,23 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { return nil }) + // Visualizer + eventStore := visualize.NewMemoryStore() + vis := visualize.NewServer(eventStore, ociClient) + visSrv := vis.Server(":9091") + g.Go(func() error { + if err := visSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }) + g.Go(func() error { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return visSrv.Shutdown(shutdownCtx) + }) + // Registry registryOpts := []registry.Option{ registry.WithResolveLatestTag(args.ResolveLatestTag), @@ -195,6 +244,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { registry.WithResolveTimeout(args.MirrorResolveTimeout), registry.WithLocalAddress(args.LocalAddr), registry.WithLogger(log), + registry.WithEventStore(eventStore), } if args.BlobSpeed != nil { registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed)) diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index d7c6e7f8..5489f83c 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/http/httputil" + "net/netip" "net/url" "path" "strconv" @@ -21,6 +22,7 @@ import ( "github.com/spegel-org/spegel/pkg/oci" "github.com/spegel-org/spegel/pkg/routing" "github.com/spegel-org/spegel/pkg/throttle" + "github.com/spegel-org/spegel/pkg/visualize" ) const ( @@ -37,6 +39,7 @@ type Registry struct { resolveRetries int resolveTimeout time.Duration resolveLatestTag bool + eventStore visualize.EventStore } type Option func(*Registry) @@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option { } } +func WithEventStore(eventStore visualize.EventStore) Option { + return func(r *Registry) { + r.eventStore = eventStore + } +} + func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry { r := &Registry{ ociClient: ociClient, @@ -188,6 +197,23 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str return "mirror" } + defer func() { + if req.Method != http.MethodGet { + return + } + id := ref.name + if id == "" { + // First 7 character plus sha256: prefix + id = ref.dgst.String()[:14] + } + ip := getClientIP(req) + addr, err := netip.ParseAddr(ip) + if err != nil { + return + } + r.eventStore.Record(id, addr, rw.Status(), false) + }() + // Serve registry endpoints. switch ref.kind { case referenceKindManifest: @@ -288,6 +314,19 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re return nil } proxy.ServeHTTP(rw, req) + + // Track image events if enabled + if r.eventStore != nil { + if req.Method != http.MethodGet { + return + } + id := ref.name + if id == "" { + id = ref.dgst.String() + } + r.eventStore.Record(id, ipAddr.Addr(), rw.Status(), true) + } + if !succeeded { break } diff --git a/pkg/visualize/store.go b/pkg/visualize/store.go new file mode 100644 index 00000000..1757c3d5 --- /dev/null +++ b/pkg/visualize/store.go @@ -0,0 +1,138 @@ +package visualize + +import ( + "fmt" + "net/http" + "net/netip" + "strings" + "sync" +) + +type EventStore interface { + Record(id string, peer netip.Addr, status int, mirror bool) + FilterById(include []string) EventStore + FilterByDirection(rootIsSource bool) EventStore + Dot() string +} + +// TODO: Include blob or manifest +type edge struct { + node string + id string + status int + rootIsSource bool +} + +var _ EventStore = &MemoryStore{} + +type MemoryStore struct { + mx sync.RWMutex + edges []edge + edgeIndex map[string]int +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ + edges: []edge{}, + edgeIndex: map[string]int{}, + } +} + +func (m *MemoryStore) Record(id string, peer netip.Addr, status int, mirror bool) { + m.mx.Lock() + defer m.mx.Unlock() + + e := edge{ + node: peer.String(), + id: id, + status: status, + rootIsSource: mirror, + } + m.edges = append(m.edges, e) + m.edgeIndex[id] = len(m.edges) - 1 +} + +func (m *MemoryStore) FilterById(include []string) EventStore { + m.mx.RLock() + defer m.mx.RUnlock() + + f := NewMemoryStore() + for _, v := range include { + idx, ok := m.edgeIndex[v] + if !ok { + continue + } + edge := m.edges[idx] + f.edges = append(f.edges, edge) + f.edgeIndex[v] = len(f.edges) - 1 + } + return f +} + +func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore { + m.mx.RLock() + defer m.mx.RUnlock() + + f := NewMemoryStore() + for _, edge := range m.edges { + if edge.rootIsSource != rootIsSource { + continue + } + f.edges = append(f.edges, edge) + f.edgeIndex[edge.id] = len(f.edges) - 1 + } + return f +} + +func (m *MemoryStore) Dot() string { + m.mx.RLock() + defer m.mx.RUnlock() + + nodeIndex := map[string]interface{}{} + dotNodes := []string{} + dotEdges := []string{} + for _, edge := range m.edges { + color := "" + switch edge.status { + case 0: + color = "yellow" + case http.StatusOK: + color = "green" + default: + color = "red" + } + + src := "n0" + dest := edge.node + if !edge.rootIsSource { + src = edge.node + dest = "n0" + } + + id := edge.id + if strings.HasPrefix(id, "sha256:") { + id = id[:14] + } + dotEdge := fmt.Sprintf(`%q -> %q[color=%s tooltip=%q]`, src, dest, color, id) + dotEdges = append(dotEdges, dotEdge) + + if _, ok := nodeIndex[edge.node]; ok { + continue + } + nodeIndex[edge.node] = nil + dotNode := fmt.Sprintf(`%q[label="%s"];`, edge.node, edge.node) + dotNodes = append(dotNodes, dotNode) + } + + dot := fmt.Sprintf(`digraph { + layout="circo"; + root="n0"; + + n0[label="host"] + + %s + + %s +}`, strings.Join(dotNodes, "\n\t"), strings.Join(dotEdges, "\n\t")) + return strings.TrimSpace(dot) +} diff --git a/pkg/visualize/store_test.go b/pkg/visualize/store_test.go new file mode 100644 index 00000000..f7f675c5 --- /dev/null +++ b/pkg/visualize/store_test.go @@ -0,0 +1,30 @@ +package visualize + +import ( + "net/http" + "net/netip" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMemoryStore(t *testing.T) { + store := NewMemoryStore() + store.Record("one", netip.MustParseAddr("127.0.0.1"), http.StatusOK, true) + store.Record("two", netip.MustParseAddr("127.0.0.1"), http.StatusOK, true) + store.Record("three", netip.MustParseAddr("0.0.0.0"), http.StatusOK, true) + + dot := store.Dot() + b, err := os.ReadFile("./testdata/all.dot") + require.NoError(t, err) + expected := strings.TrimSpace(string(b)) + require.Equal(t, expected, dot) + + dot = store.Filter([]string{"two", "three"}).Dot() + b, err = os.ReadFile("./testdata/filter.dot") + require.NoError(t, err) + expected = strings.TrimSpace(string(b)) + require.Equal(t, expected, dot) +} diff --git a/pkg/visualize/testdata/all.dot b/pkg/visualize/testdata/all.dot new file mode 100644 index 00000000..6306ff03 --- /dev/null +++ b/pkg/visualize/testdata/all.dot @@ -0,0 +1,13 @@ +digraph { + layout="circo"; + root="n0"; + + n0[label="host"] + + "127.0.0.1"[label="127.0.0.1"]; + "0.0.0.0"[label="0.0.0.0"]; + + "n0" -> "127.0.0.1"[color=green tooltip="one"] + "n0" -> "127.0.0.1"[color=green tooltip="two"] + "n0" -> "0.0.0.0"[color=green tooltip="three"] +} diff --git a/pkg/visualize/testdata/filter.dot b/pkg/visualize/testdata/filter.dot new file mode 100644 index 00000000..f141d265 --- /dev/null +++ b/pkg/visualize/testdata/filter.dot @@ -0,0 +1,12 @@ +digraph { + layout="circo"; + root="n0"; + + n0[label="host"] + + "127.0.0.1"[label="127.0.0.1"]; + "0.0.0.0"[label="0.0.0.0"]; + + "n0" -> "127.0.0.1"[color=green tooltip="two"] + "n0" -> "0.0.0.0"[color=green tooltip="three"] +} diff --git a/pkg/visualize/visualize.go b/pkg/visualize/visualize.go new file mode 100644 index 00000000..7b2441d3 --- /dev/null +++ b/pkg/visualize/visualize.go @@ -0,0 +1,214 @@ +package visualize + +import ( + "html/template" + "net/http" + "strconv" + + "github.com/spegel-org/spegel/internal/mux" + "github.com/spegel-org/spegel/pkg/oci" +) + +// NOTE: Use metrics port and allow dynamic enabling through use of post request +// NOTE: image could be discoverd by peeking at the manifest content? + +type Server struct { + eventStore EventStore + ociClient oci.Client +} + +func NewServer(eventStore EventStore, ociClient oci.Client) *Server { + return &Server{ + eventStore: eventStore, + ociClient: ociClient, + } +} + +func (s *Server) Server(addr string) *http.Server { + srv := &http.Server{ + Addr: addr, + Handler: mux.NewServeMux(s.handle), + } + return srv +} + +func (s *Server) handle(rw mux.ResponseWriter, req *http.Request) { + switch req.URL.Path { + case "/": + s.indexHandler(rw, req) + case "/images": + s.imagesHandler(rw, req) + default: + rw.WriteHeader(http.StatusNotFound) + } +} + +func (s *Server) indexHandler(rw mux.ResponseWriter, _ *http.Request) { + index := ` + + + + Spegel + + + + + + + + +

Spegel

+
+
+ + + +` + rw.Write([]byte(index)) +} + +// TODO: Hash result to avoid generating graph over again +func (s *Server) imagesHandler(rw mux.ResponseWriter, req *http.Request) { + imgs, err := s.ociClient.ListImages(req.Context()) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } + include := []string{} + query := req.URL.Query() + if len(query) > 0 { + for _, img := range imgs { + if !query.Has(img.String()) { + continue + } + ids, err := s.ociClient.AllIdentifiers(req.Context(), img) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } + include = append(include, ids...) + } + } + store := s.eventStore + if len(include) > 0 { + store = store.FilterById(include) + } + direction := query.Get("direction") + if direction != "" { + isRootSource, err := strconv.ParseBool(direction) + if err != nil { + rw.WriteError(http.StatusBadRequest, err) + return + } + store = store.FilterByDirection(isRootSource) + } + + data := struct { + Images []oci.Image + Dot string + }{ + Images: imgs, + Dot: store.Dot(), + } + // TODO: When layer is not found it should default to subgraph for original registry + // TODO: Add a follow live toggle that changes selected image to the latest + tmpl, err := template.New("images").Parse(` + + +
+ +
+
+ + +
+ +
+ Request Direction + + + + + + + + + +
+ + +

Images

+ + + + + + + + + + {{ range $i, $element := .Images }} + + + + + + {{ end }} + +
NameCreated
{{ $element }}Today
+
+ +
+ `) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } + err = tmpl.Execute(rw, data) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } +}