diff --git a/pkg/kv/option.go b/pkg/kv/option.go index 75c89f77acf89..0312a11d708b3 100644 --- a/pkg/kv/option.go +++ b/pkg/kv/option.go @@ -240,6 +240,8 @@ const ( LossyDDLColumnReorgSource = 1 lossyDDLReorgSourceMax = (1 << lossyDDLReorgSourceBits) - 1 lossyDDLReorgSourceShift = cdcWriteSourceBits + // LightningPhysicalImportTxnSource the 17th bit is set as the txn source for Lightning physical import. + LightningPhysicalImportTxnSource = 1 << 16 ) // SetCDCWriteSource sets the TiCDC write source in the txnSource. diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 891a8cdffb261..1e538c061af81 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -314,6 +314,21 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { return err } +func newWriteRequest(meta *sst.SSTMeta, resourceGroupName, taskType string) *sst.WriteRequest { + return &sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: meta, + }, + Context: &kvrpcpb.Context{ + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: resourceGroupName, + }, + RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, taskType), + TxnSource: kv.LightningPhysicalImportTxnSource, + }, + } +} + func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { if j.stage != regionScanned { return nil @@ -396,17 +411,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { leaderID := j.region.Leader.GetId() clients := make([]sst.ImportSST_WriteClient, 0, len(region.GetPeers())) allPeers := make([]*metapb.Peer, 0, len(region.GetPeers())) - req := &sst.WriteRequest{ - Chunk: &sst.WriteRequest_Meta{ - Meta: meta, - }, - Context: &kvrpcpb.Context{ - ResourceControlContext: &kvrpcpb.ResourceControlContext{ - ResourceGroupName: local.ResourceGroupName, - }, - RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType), - }, - } + req := newWriteRequest(meta, local.ResourceGroupName, local.TaskType) for _, peer := range region.GetPeers() { cli, err := clientFactory.create(ctx, peer.StoreId) if err != nil { diff --git a/pkg/lightning/backend/local/region_job_test.go b/pkg/lightning/backend/local/region_job_test.go index c22e223554630..e0989384de8a2 100644 --- a/pkg/lightning/backend/local/region_job_test.go +++ b/pkg/lightning/backend/local/region_job_test.go @@ -16,6 +16,7 @@ package local import ( "context" + "github.com/pingcap/tidb/pkg/kv" "math/rand" "sync" "testing" @@ -536,6 +537,11 @@ func TestCancelBalancer(t *testing.T) { jobWg.Wait() } +func TestNewWriteRequest(T *testing.T) { + req := newWriteRequest(&sst.SSTMeta{}, "", "") + require.Equal(T, req.Context.TxnSource, uint64(kv.LightningPhysicalImportTxnSource)) +} + func TestStoreBalancerNoRace(t *testing.T) { jobToWorkerCh := make(chan *regionJob) jobFromWorkerCh := make(chan *regionJob)