Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bootstrap optimisation lld flow #2943

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.ambry.clustermap;

public class FileStoreException extends RuntimeException{
public enum FileStoreErrorCode{
FileStore,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface ReplicaSyncUpManager {
* @throws InterruptedException
*/
void waitBootstrapCompleted(String partitionName) throws InterruptedException;
void initiateFileCopy(ReplicaId replicaId);
void waitForFileCopyCompleted(String partitionName) throws InterruptedException;

/**
* Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status.
Expand Down Expand Up @@ -64,6 +66,8 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee
*/
void onBootstrapComplete(ReplicaId replicaId);

void onFileCopyComplete(ReplicaId replicaId);

/**
* Deactivation on given replica is complete.
* @param replicaId the replica which completes deactivation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public enum StateModelListenerType {
* leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate
* data from VCR nodes. This is part of two-way replication between Ambry and cloud.
*/
CloudToStoreReplicationManagerListener
CloudToStoreReplicationManagerListener,

/**
* The partition state change listener owned by Helix participant. It takes actions when partition state transition
* occurs.
*/
FileCopyManagerListener


}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public enum TransitionErrorCode {
/**
* If the resource name is not a numeric number.
*/
InvalidResourceName
InvalidResourceName,

FileCopyFailure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.github.ambry.config;

public class FileCopyConfig {

public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk";
@Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int parallelPartitionHydrationCountPerDisk;

public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads";
@Config(NUMBER_OF_FILE_COPY_THREADS)
public final int numberOfFileCopyThreads;

public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes";
@Config(FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.size.In";
@Config("store.data.flush.size.In")
@Default("60")
public final long storeDataFlushIntervalSeconds;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to MBs


public FileCopyConfig(VerifiableProperties verifiableProperties) {
parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4);
fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.size", 60);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.ambry.config;

public enum ServerReplicationMode {
BLOB_BASED,
FILE_BASED;
}
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,14 @@ public class StoreConfig {
public final boolean storeBlockStaleBlobStoreToStart;
public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start";

/**
* Config to Decide Replication Protocol For Hydration Of Newly Added Replicas
*/
public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration";
@Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION)
public final ServerReplicationMode serverReplicationProtocolForHydration;


/**
* Whether to attempt reshuffling of reordered disks and subsequent process termination.
*/
Expand All @@ -683,6 +691,8 @@ public class StoreConfig {
public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder";

public StoreConfig(VerifiableProperties verifiableProperties) {
serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION,
ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED);
storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory");
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60);
storeIndexMaxMemorySizeBytes = verifiableProperties.getInt("store.index.max.memory.size.bytes", 20 * 1024 * 1024);
Expand Down
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public interface RequestAPI {
*/
void handleReplicaMetadataRequest(NetworkRequest request) throws IOException, InterruptedException;

/**
*
* @param request
* @throws IOException
* @throws InterruptedException
*/

/**
* Replicate one specific Blob from a remote host to the local store.
* @param request The request that contains the remote host information and the blob id to be replicated.
Expand Down Expand Up @@ -116,4 +123,12 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept
default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException {
throw new UnsupportedOperationException("Undelete request not supported on this node");
}

default void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Meta Data request not supported on this node");
}

default void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Chunk request not supported on this node");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager {
private static final Logger logger = LoggerFactory.getLogger(AmbryReplicaSyncUpManager.class);
private final ConcurrentHashMap<String, CountDownLatch> partitionToBootstrapLatch = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, CountDownLatch> partitionToFileCopyLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> partitionToFileCopySuccessLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CountDownLatch> partitionToDeactivationLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CountDownLatch> partitionToDisconnectionLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> partitionToBootstrapSuccess = new ConcurrentHashMap<>();
Expand All @@ -63,6 +66,12 @@ public void initiateBootstrap(ReplicaId replicaId) {
ReplicaState.BOOTSTRAP));
}

@Override
public void initiateFileCopy(ReplicaId replicaId){
partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1));
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false);
}

@Override
public void initiateDeactivation(ReplicaId replicaId) {
partitionToDeactivationLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1));
Expand Down Expand Up @@ -101,6 +110,22 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep
}
}

@Override
public void waitForFileCopyCompleted(String partitionName) throws InterruptedException {
CountDownLatch latch = partitionToFileCopyLatch.get(partitionName);
if(latch == null) {
logger.info("Skipping file copy for existing partition {}", partitionName);
} else{
logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName);
latch.await();
partitionToFileCopyLatch.remove(partitionName);
if(!partitionToFileCopySuccessLatch.remove(partitionName)){
throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyFailure);
}
logger.info("File Copy is complete on partition {}", partitionName);
}
}

@Override
public void waitDeactivationCompleted(String partitionName) throws InterruptedException {
CountDownLatch latch = partitionToDeactivationLatch.get(partitionName);
Expand Down Expand Up @@ -192,6 +217,12 @@ public void onBootstrapComplete(ReplicaId replicaId) {
countDownLatch(partitionToBootstrapLatch, replicaId.getPartitionId().toPathString());
}

@Override
public void onFileCopyComplete(ReplicaId replicaId){
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true);
countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString());
}

@Override
public void onDeactivationComplete(ReplicaId replicaId) {
partitionToDeactivationSuccess.put(replicaId.getPartitionId().toPathString(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}

// 2. take actions in replication manager (add new replica if necessary)
PartitionStateChangeListener replicationManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener);
Expand Down
19 changes: 19 additions & 0 deletions ambry-file-transfer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java'
}

group = 'com.github.ambry'
version = '0.4.512'

repositories {
mavenCentral()
}

dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.github.ambry;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.PartitionStateChangeListener;
import com.github.ambry.clustermap.StateModelListenerType;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.FileCopyConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.network.NetworkClientFactory;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.StoreKeyFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileCopyManager {

protected final Logger logger = LoggerFactory.getLogger(getClass());

public FileCopyManager(PrioritisationManager prioritisationManager, FileCopyConfig fileCopyConfig, ClusterMapConfig clusterMapConfig,
StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap,
ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory,
MetricRegistry metricRegistry, ClusterParticipant clusterParticipant) {
if (clusterParticipant != null) {
clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener,
new PartitionStateChangeListenerImpl());
logger.info("File Copy Manager's state change listener registered!");
}
if(!prioritisationManager.isRunning()) {
prioritisationManager.start();
}
}
public void start() throws InterruptedException, IOException {

}
class PartitionStateChangeListenerImpl implements PartitionStateChangeListener {

@Override
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {

}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {

}

@Override
public void onPartitionBecomeLeaderFromStandby(String partitionName) {

}

@Override
public void onPartitionBecomeStandbyFromLeader(String partitionName) {

}

@Override
public void onPartitionBecomeInactiveFromStandby(String partitionName) {

}

@Override
public void onPartitionBecomeOfflineFromInactive(String partitionName) {

}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {

}
}
}

}
19 changes: 19 additions & 0 deletions ambry-prioritisation/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java'
}

group = 'com.github.ambry'
version = '0.4.514'

repositories {
mavenCentral()
}

dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.ambry;

import com.github.ambry.clustermap.AmbryPartition;
import com.github.ambry.clustermap.DiskId;
import com.github.ambry.clustermap.ReplicaId;
import java.util.HashMap;
import java.util.Map;


public class PrioritisationManager {
private Map<String, String> diskToReplicaQueue;

private boolean running;
public PrioritisationManager() {
diskToReplicaQueue = new HashMap<>();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename queue to map.

running = false;
// Constructor
}

public void start() {
running = true;
}

public boolean isRunning(){
return running;
// Start the PrioritisationManager
}

public void shutdown() {
// Shutdown the PrioritisationManager
}

public void addReplica(String partitionName) {
// Add a replica to the PrioritisationManager
}

public void removeReplica(String partitionName) {
// Remove a task from the PrioritisationManager
}

public void updatePartitionState(String partitionName) {
// Update the state of a task in the PrioritisationManager
}

public void updatePartitionProgress(String partitionName) {
// Update the progress of a task in the PrioritisationManager
}

public void updatePartitionResult() {
// Update the result of a task in the PrioritisationManager
}

public String getPartitionForDisk(DiskId diskId){
// Get a partition from the PrioritisationManager
return null;
}

public String getReplica(String partitionName) {
// Get a replica from the PrioritisationManager
return null;
}
}
Loading
Loading