1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//! Sync state. This is a candidate file to be moved into Oatie.

use failure::Error;
use oatie::{
    doc::*,
    rtf::*,
    validate::validate_doc,
};
use std::collections::HashMap;

pub struct SyncState {
    pub version: usize,
    pub clients: HashMap<String, usize>, // client_id -> client_version
    pub history: HashMap<usize, Op<RtfSchema>>, // version -> op
    pub doc: Doc<RtfSchema>,
}

impl SyncState {
    fn prune_history(&mut self) {
        if let Some(min_version) = self.clients.iter().map(|(_, &v)| v).min() {
            for k in self.history.keys().cloned().collect::<Vec<usize>>() {
                if k < min_version {
                    // eprintln!("(^) evicted document version {}", k);
                    self.history.remove(&k);
                }
            }
        }
    }

    /// Transform an operation incrementally against each interim document operation.
    pub fn update_operation_to_current(
        &self,
        mut op: Op<RtfSchema>,
        mut input_version: usize,
        target_version: usize,
    ) -> Result<Op<RtfSchema>, Error> {
        // Transform against all more recent operations.
        while input_version < target_version {
            // If the version exists (it should) transform against it.
            let version_op = self
                .history
                .get(&input_version)
                .ok_or(format_err!("Version missing from history"))?;
            let (updated_op, _) = Op::transform(version_op, &op);
            op = updated_op;

            input_version += 1;
        }
        Ok(op)
    }

    pub fn commit(
        &mut self,
        client_id: &str,
        op: Op<RtfSchema>,
        input_version: usize,
    ) -> Result<Op<RtfSchema>, Error> {
        let target_version = self.version;

        // Update the operation so we can apply it to the document.
        let op = self.update_operation_to_current(op, input_version, target_version)?;

        if let Some(version) = self.clients.get_mut(client_id) {
            *version = target_version;
        } else {
            // TODO what circumstances would it be missing? Client closed
            // and removed itself from list but operation used later?
        }

        // Prune history entries.
        self.prune_history();
        self.history.insert(target_version, op.clone());

        // Update the document with this operation.
        let new_doc = Op::apply(&self.doc, &op);

        // Gut check.
        validate_doc(&self.doc).map_err(|_| format_err!("Validation error"))?;

        // Commit chhanges.
        self.doc = new_doc;
        self.version = target_version + 1;

        Ok(op)
    }

    pub fn new(doc: Doc<RtfSchema>, version: usize) -> SyncState {
        SyncState {
            doc,
            version,
            clients: hashmap![],
            history: hashmap![],
        }
    }
}