Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove share body logic #11987

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions ydb/core/base/appdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/tablet_flat/shared_cache_pages.h>
#include <ydb/core/protos/auth.pb.h>
#include <ydb/core/protos/bootstrap.pb.h>
#include <ydb/core/protos/blobstorage.pb.h>
Expand Down Expand Up @@ -93,6 +94,7 @@ TAppData::TAppData(
, Mon(nullptr)
, Icb(new TControlBoard())
, InFlightLimiterRegistry(new NGRpcService::TInFlightLimiterRegistry(Icb))
, SharedCachePages(new NSharedCache::TSharedCachePages())
, StreamingConfig(Impl->StreamingConfig)
, PQConfig(Impl->PQConfig)
, PQClusterDiscoveryConfig(Impl->PQClusterDiscoveryConfig)
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace NKikimr {
namespace NGRpcService {
class TInFlightLimiterRegistry;
}
namespace NSharedCache {
class TSharedCachePages;
}
}

namespace NKikimrCms {
Expand Down Expand Up @@ -189,6 +192,7 @@ struct TAppData {
::NMonitoring::TDynamicCounterPtr Counters;
TIntrusivePtr<NKikimr::TControlBoard> Icb;
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> InFlightLimiterRegistry;
TIntrusivePtr<NSharedCache::TSharedCachePages> SharedCachePages;

TIntrusivePtr<NInterconnect::TPollerThreads> PollerThreads;

Expand Down
17 changes: 7 additions & 10 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
for (const auto &it : Database->GetScheme().Tables) {
auto subset = Database->Subset(it.first, NTable::TEpoch::Max(), { }, { });

for (auto &partView : subset->Flatten) AddCachesOfBundle(partView);
for (auto &partView : subset->Flatten) {
AddCachesOfBundle(partView);
}
}

if (TransactionWaitPads) {
Expand Down Expand Up @@ -2360,7 +2362,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
auto *bySwitchAux = aux.AddBySwitchAux();

TPageCollectionProtoHelper::Snap(snap, loaned->PartComponents, partSwitch.TableId, CompactionLogic->BorrowedPartLevel());
TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), loaned->PartComponents);
TPageCollectionProtoHelper(true).Do(bySwitchAux->AddHotBundles(), loaned->PartComponents);

auto body = proto.SerializeAsString();
auto glob = CommitManager->Turns.One(commit->Refs, std::move(body), true);
Expand Down Expand Up @@ -2821,9 +2823,6 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) {

for (auto &kv : msg->Actions) {
if (auto *info = PrivatePageCache->Info(kv.first)) {
for (auto &kvCorrected : kv.second.Accepted) {
PrivatePageCache->UpdateSharedBody(info, kvCorrected.first, std::move(kvCorrected.second));
}
for (ui32 pageId : kv.second.Dropped) {
PrivatePageCache->DropSharedBody(info, pageId);
}
Expand Down Expand Up @@ -3472,7 +3471,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
const auto &newPart = result.Part;

TPageCollectionProtoHelper::Snap(snap, newPart, tableId, logicResult.Changes.NewPartsLevel);
TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), newPart);
TPageCollectionProtoHelper(true).Do(bySwitchAux->AddHotBundles(), newPart);
}
}

Expand Down Expand Up @@ -3648,7 +3647,6 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_BODY].Set(stats.TotalSharedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_PINNED_BODY].Set(stats.TotalPinnedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_EXCLUSIVE].Set(stats.TotalExclusive);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_PENDING].Set(stats.TotalSharedPending);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(stats.TotalSticky);
}

Expand Down Expand Up @@ -3770,14 +3768,14 @@ TString TExecutor::BorrowSnapshot(ui32 table, const TTableSnapshotContext &snap,
for (const auto &partView : subset->Flatten) {
auto *x = proto.AddParts();

TPageCollectionProtoHelper(false, false).Do(x->MutableBundle(), partView);
TPageCollectionProtoHelper(false).Do(x->MutableBundle(), partView);
snap.Impl->Borrowed(Step(), table, partView->Label, loaner);
}

for (const auto &part : subset->ColdParts) {
auto *x = proto.AddParts();

TPageCollectionProtoHelper(false, false).Do(x->MutableBundle(), part);
TPageCollectionProtoHelper(false).Do(x->MutableBundle(), part);
snap.Impl->Borrowed(Step(), table, part->Label, loaner);
}

Expand Down Expand Up @@ -4170,7 +4168,6 @@ void TExecutor::RenderHtmlPage(NMon::TEvRemoteHttpInfo::TPtr &ev) const {
DIV_CLASS("row") {str << "Total bytes in shared cache: " << PrivatePageCache->GetStats().TotalSharedBody; }
DIV_CLASS("row") {str << "Total bytes in local cache: " << PrivatePageCache->GetStats().TotalPinnedBody; }
DIV_CLASS("row") {str << "Total bytes exclusive to local cache: " << PrivatePageCache->GetStats().TotalExclusive; }
DIV_CLASS("row") {str << "Total bytes in transit to shared cache: " << PrivatePageCache->GetStats().TotalSharedPending; }
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << PrivatePageCache->GetStats().TotalSticky; }

if (GcLogic) {
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tablet_flat/flat_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ message TLargeGlobId {
}

message TPageCollection {
message TPage {
required uint32 Id = 1;
required bytes Body = 2;
}

repeated NKikimrProto.TLogoBlobID MetaId = 1; // Replaced by TLargeGlobId

optional TLargeGlobId LargeGlobId = 6;
Expand All @@ -30,7 +25,7 @@ message TPageCollection {
// packed page collection is called HotBundle.

optional bytes Meta = 2;
repeated TPage Pages = 5;
reserved 5; // Pages
}

message TBundle {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tablet_flat/flat_executor_tx_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ namespace NTabletFlatExecutor {
return { !ReadMissingReferences, page };
}

const TSharedData* TryGetPage(const TPart* part, TPageId page, TGroupId groupId) override
const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override
{
auto *partStore = CheckedCast<const NTable::TPartStore*>(part);

return Lookup(partStore->PageCollections.at(groupId.Index).Get(), page);
return Lookup(partStore->PageCollections.at(groupId.Index).Get(), pageId);
}

void EnableReadMissingReferences() noexcept {
Expand Down
36 changes: 34 additions & 2 deletions ydb/core/tablet_flat/flat_ops_compact.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "flat_comp.h"
#include "flat_executor_misc.h"
#include "flat_bio_stats.h"
#include "shared_cache_pages.h"
#include "shared_sausagecache.h"
#include "util_channel.h"

#include <ydb/core/base/blobstorage.h>
Expand Down Expand Up @@ -102,6 +104,7 @@ namespace NTabletFlatExecutor {

Spent = new TSpent(TAppData::TimeProvider.Get());
Registry = AppData()->TypeRegistry;
SharedCachePages = AppData()->SharedCachePages;
Scheme = std::move(scheme);
Driver = driver;

Expand Down Expand Up @@ -308,11 +311,39 @@ namespace NTabletFlatExecutor {
auto *prod = new TProdCompact(!fail, Mask.Step(), std::move(Conf->Params),
std::move(YellowMoveChannels), std::move(YellowStopChannels));

if (fail) {
Results.clear(); /* shouldn't sent w/o fixation in bs */
}

for (auto &result : Results) {
Y_ABORT_UNLESS(result.PageCollections, "Compaction produced a part without page collections");
TVector<TIntrusivePtr<NTable::TLoader::TCache>> pageCollections;
for (auto& pageCollection : result.PageCollections) {
auto cache = MakeIntrusive<NTable::TLoader::TCache>(pageCollection.PageCollection);
auto saveCompactedPages = MakeHolder<NSharedCache::TEvSaveCompactedPages>(pageCollection.PageCollection);
auto gcList = SharedCachePages->GCList;
auto addPage = [&saveCompactedPages, &pageCollection, &cache, &gcList](NPageCollection::TLoadedPage& loadedPage, bool sticky) {
auto pageId = loadedPage.PageId;
auto pageSize = pageCollection.PageCollection->Page(pageId).Size;
auto sharedPage = MakeIntrusive<TPage>(pageId, pageSize, nullptr);
sharedPage->Initialize(std::move(loadedPage.Data));
saveCompactedPages->Pages.push_back(sharedPage);
cache->Fill(pageId, TSharedPageRef::MakeUsed(std::move(sharedPage), gcList), sticky);
};
for (auto &page : pageCollection.StickyPages) {
addPage(page, true);
}
for (auto &page : pageCollection.RegularPages) {
addPage(page, false);
}

Send(MakeSharedPageCacheId(), saveCompactedPages.Release());

pageCollections.push_back(std::move(cache));
}

NTable::TLoader loader(
std::move(result.PageCollections),
std::move(pageCollections),
{ },
std::move(result.Overlay));

Expand Down Expand Up @@ -363,7 +394,7 @@ namespace NTabletFlatExecutor {
}

if (fail) {
prod->Results.clear(); /* shouldn't sent w/o fixation in bs */
Y_ABORT_IF(prod->Results); /* shouldn't sent w/o fixation in bs */
} else if (bool(prod->Results) != bool(WriteStats.Rows > 0)) {
Y_ABORT("Unexpected rows production result after compaction");
} else if ((bool(prod->Results) || bool(prod->TxStatus)) != bool(Blobs > 0)) {
Expand Down Expand Up @@ -525,6 +556,7 @@ namespace NTabletFlatExecutor {
TVector<TBundle::TResult> Results;
TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus;
const NScheme::TTypeRegistry * Registry = nullptr;
TIntrusivePtr<NSharedCache::TSharedCachePages> SharedCachePages;

bool Finished = false;
bool Failed = false;/* Failed to write blobs */
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_part_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ namespace NTable {
if (type != EPage::FlatIndex) {
// hack: saving flat index to private cache will break sticky logic
// keep it in shared cache only for now
Cache->Fill(std::move(loaded), NeedIn(type));
Cache->Fill(loaded.PageId, std::move(loaded.Page), NeedIn(type));
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tablet_flat/flat_part_outset.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "defs.h"
#include "flat_sausage_packet.h"
#include "flat_sausage_fetch.h"

namespace NKikimr {
namespace NTable {
Expand All @@ -12,7 +11,6 @@ namespace NTable {
NPageCollection::TLargeGlobId LargeGlobId;
// loaded meta page
TIntrusiveConstPtr<NPageCollection::TPageCollection> Packet;
TVector<NPageCollection::TLoadedPage> Sticky;

void ParsePacket(TSharedData meta);
};
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tablet_flat/flat_part_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ class TPartStore : public TPart, public IBundle {

for (auto &one: components) {
caches.emplace_back(new TCache(std::move(one.Packet)));

for (auto &page: one.Sticky)
caches.back()->Fill(page, true);
}

return caches;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tablet_flat/flat_sausage_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ namespace NPageCollection {
Buffer.append(chunk.data(), chunk.size());
offset += piece;

if (Buffer.size() >= MaxBlobSize) Flush();
if (Buffer.size() >= MaxBlobSize) {
Flush();
}
}

return Record.Push(type, body);
Expand Down
Loading
Loading