1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! An aggressively simple wrapper for `ws`.

#![allow(deprecated)]

use failure::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::{
    Arc,
    Mutex,
};
use ws;
use ws::util::{
    Timeout,
    Token,
};
use ws::{
    CloseCode,
    Frame,
};

const PING_INTERVAL: u64 = 5_000;
const TIMEOUT_INTERVAL: u64 = 30_000;

static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);

pub type Sender = Arc<Mutex<ws::Sender>>;

pub struct SocketHandler<S: SimpleSocket> {
    args: Option<S::Args>,

    out: Arc<Mutex<ws::Sender>>,
    obj: Option<S>,

    timeout: Option<Timeout>,
    ping_event: Token,
    expire_event: Token,
}

impl<S: SimpleSocket> SocketHandler<S> {
    pub fn new(args: S::Args, out: ws::Sender) -> SocketHandler<S> {
        SocketHandler {
            args: Some(args),

            out: Arc::new(Mutex::new(out)),
            obj: None,

            timeout: None,
            ping_event: Token(TOKEN_COUNTER.fetch_add(1, Ordering::SeqCst)),
            expire_event: Token(TOKEN_COUNTER.fetch_add(1, Ordering::SeqCst)),
        }
    }
}

pub trait SimpleSocket: Sized {
    type Args;
    fn initialize(args: Self::Args, url: &str, out: Arc<Mutex<ws::Sender>>) -> Result<Self, Error>;
    fn handle_message(&mut self, data: &[u8]) -> Result<(), Error>;
    fn cleanup(&mut self) -> Result<(), Error>;
}

impl<S: SimpleSocket> ws::Handler for SocketHandler<S> {
    fn on_open(&mut self, shake: ws::Handshake) -> Result<(), ws::Error> {
        self.obj = Some(
            S::initialize(
                self.args.take().unwrap(),
                shake.request.resource(),
                self.out.clone(),
            )
            .expect("Failed to start socket handler due to error"),
        );

        {
            let out = self.out.lock().unwrap();
            // schedule a timeout to send a ping every 5 seconds
            out.timeout(PING_INTERVAL, self.ping_event)?;
            // schedule a timeout to close the connection if there is no activity for 30 seconds
            out.timeout(TIMEOUT_INTERVAL, self.expire_event)?;
        }

        Ok(())
    }

    fn on_message(&mut self, msg: ws::Message) -> Result<(), ws::Error> {
        self.obj.as_mut().map(|obj| {
            obj.handle_message(&msg.into_data())
                .expect("Could not handle native command.");
        });

        Ok(())
    }

    fn on_error(&mut self, _err: ws::Error) {
        eprintln!("[ws] killing after error");
        self.obj
            .take()
            .map(|mut x| x.cleanup().expect("Failed to clean up socket"));
    }

    fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
        println!("[ws] killing after close");
        self.obj
            .take()
            .map(|mut x| x.cleanup().expect("Failed to clean up socket"));
    }

    fn on_shutdown(&mut self) {
        eprintln!("[ws] killing after shutdown");
        self.obj
            .take()
            .map(|mut x| x.cleanup().expect("Failed to clean up socket"));
    }

    fn on_timeout(&mut self, event: Token) -> ws::Result<()> {
        if event == self.ping_event {
            let out = self.out.lock().unwrap();
            out.ping(vec![])?;
            out.timeout(PING_INTERVAL, self.ping_event)
        } else if event == self.expire_event {
            eprintln!("[ws] socket Expired {:?}", event);
            self.out.lock().unwrap().close(CloseCode::Away)
        } else {
            Ok(())
        }
    }

    fn on_new_timeout(&mut self, event: Token, timeout: Timeout) -> ws::Result<()> {
        if event == self.expire_event {
            if let Some(t) = self.timeout.take() {
                self.out.lock().unwrap().cancel(t)?;
            }
            self.timeout = Some(timeout)
        }
        Ok(())
    }

    fn on_frame(&mut self, frame: Frame) -> ws::Result<Option<Frame>> {
        // some activity has occurred, let's reset the expiration
        self.out
            .lock()
            .unwrap()
            .timeout(TIMEOUT_INTERVAL, self.expire_event)?;
        Ok(Some(frame))
    }
}