Skip to content

Commit

Permalink
wip uplog encoding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
josephg committed Oct 29, 2024
1 parent 5a9cc34 commit 75b852f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) enum ChunkType {
CausalGraph = 21,
Operations = 20,
// OpTypeAndPosition = 22,

CGClientIDs = 22,
CGEntries = 23,

Expand Down
69 changes: 65 additions & 4 deletions src/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,43 @@ impl OpLog {
//
// Better to write the chunk header first, and use the "raw output" as the ids, and so on.
// - which would all avoid some allocations & copies.
let mut cg_changes = Vec::new();
push_chunk(&mut cg_changes, ChunkType::CGClientIDs, &ids).unwrap();
push_chunk(&mut cg_changes, ChunkType::CGEntries, &cg_entries).unwrap();




// Serialize map operations
let mut map_ops = Vec::new();

let mut last_crdt = LVKey::MAX;
let mut last_key: Option<&str> = None;

for (crdt, key) in map_crdts_to_send {
// We need to write 4 fields:
// - The ID of the map CRDT being edited
// - The CRDT's key
// - The version of the edit
// - The new value

let write_crdt = crdt != last_crdt;
last_crdt = crdt;

let write_key = last_key != Some(&key);
last_key = Some(key.as_str());

// There are 3 cases for the CRDT:
// 1. We're editing the same CRDT as the previous operation. This is the most common.
// 2. We're editing the root CRDT. Write mapped agent 0.
// 3. We're editing some other CRDT. Write mapped agent a + 1.
if write_crdt {
if crdt == ROOT_CRDT_ID {

} else {
let av = self.cg.agent_assignment.local_to_agent_version(crdt);
let mapped_agent = write_map.map_and_store(av.0, &self.cg.agent_assignment.client_data, &mut ids);

}
}


let crdt_name = self.crdt_name_to_remote(crdt);
let entry = self.map_keys.get(&(crdt, key.clone()))
.unwrap();
Expand All @@ -553,6 +582,33 @@ impl OpLog {
}
}
}





// // Serialize map operations
// let mut map_ops = Vec::new();
// for (crdt, key) in map_crdts_to_send {
// let crdt_name = self.crdt_name_to_remote(crdt);
// let entry = self.map_keys.get(&(crdt, key.clone()))
// .unwrap();
// for r in diff_rev.iter().rev() {
// // Find all the unknown ops.
// // TODO: Add a flag to trim this to only the most recent ops.
// let start_idx = entry.ops
// .binary_search_by_key(&r.start, |e| e.0)
// .unwrap_or_else(|idx| idx);
//
// for pair in &entry.ops[start_idx..] {
// if pair.0 >= r.end { break; }
//
// // dbg!(pair);
// let rv = self.cg.agent_assignment.local_to_remote_version(pair.0);
// map_ops.push((crdt_name, rv, key.as_str(), pair.1.clone()));
// }
// }
// }

// Serialize text operations
let mut text_context = ListOperationCtx::new();
Expand All @@ -578,6 +634,11 @@ impl OpLog {
}
}

let mut cg_changes = Vec::new();
push_chunk(&mut cg_changes, ChunkType::CGClientIDs, &ids).unwrap();
push_chunk(&mut cg_changes, ChunkType::CGEntries, &cg_entries).unwrap();


SerializedOps {
cg_changes,
map_ops,
Expand Down

0 comments on commit 75b852f

Please sign in to comment.