1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/*
 * SPDX-License-Identifier: MIT
 */
use std::fmt::Debug;
use std::ops::DerefMut;
use std::rc::Rc;
use std::sync::{mpsc, Mutex};

use crate::core_api::loop_::LoopRef;
use crate::spa::loop_::EventSource;

#[derive(Debug)]
pub enum SendError<T> {
    SendError(mpsc::SendError<T>),
    CannotSignalEvent(crate::Error),
}

impl<T> From<SendError<T>> for crate::Error {
    fn from(value: SendError<T>) -> Self {
        match value {
            SendError::SendError(e) => {
                crate::Error::ErrorMessage("Receiver is disconnected, unable to send message")
            }
            SendError::CannotSignalEvent(e) => e,
        }
    }
}

pub struct LoopChannel<'a> {
    loop_: Option<&'a LoopRef>,
    event: Option<EventSource<'a>>,
}

impl<'a> LoopChannel<'a> {
    pub fn channel<T>() -> (Sender<'a, T>, Receiver<'a, T>) {
        Self::from_channel(mpsc::channel())
    }

    pub fn from_channel<T>(
        (sender, receiver): (mpsc::Sender<T>, mpsc::Receiver<T>),
    ) -> (Sender<'a, T>, Receiver<'a, T>) {
        let channel = Rc::new(Mutex::new(LoopChannel {
            loop_: None,
            event: None,
        }));
        (
            Sender {
                channel: channel.clone(),
                sender,
            },
            Receiver { channel, receiver },
        )
    }
}

#[derive(Clone)]
pub struct Sender<'a, T> {
    sender: mpsc::Sender<T>,
    channel: Rc<Mutex<LoopChannel<'a>>>,
}

impl<'a, T> Sender<'a, T> {
    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
        let mut channel = self.channel.lock().unwrap();
        self.sender
            .send(value)
            .map_err(|e| SendError::SendError(e))?;
        if let LoopChannel {
            loop_: Some(loop_),
            event: Some(event),
        } = channel.deref_mut()
        {
            loop_
                .utils()
                .signal_event(event)
                .map_err(|e| SendError::CannotSignalEvent(e))?;
        }
        Ok(())
    }

    pub fn into_sender(self) -> mpsc::Sender<T> {
        self.sender
    }

    pub fn detach(&self) {
        let mut channel = self.channel.lock().unwrap();
        channel.event = None;
        channel.loop_ = None;
    }
}

pub struct Receiver<'a, T: 'a> {
    receiver: mpsc::Receiver<T>,
    channel: Rc<Mutex<LoopChannel<'a>>>,
}

pub type ReceiverCallback<'a, T> = Box<dyn FnMut(&mpsc::Receiver<T>) + 'a>;

impl<'a, T: 'a> Receiver<'a, T> {
    pub fn attach(
        self,
        loop_: &'a LoopRef,
        mut callback: ReceiverCallback<'a, T>,
    ) -> crate::Result<()> {
        let channel = self.channel.clone();
        let event = loop_.utils().add_event(
            loop_,
            Box::new({
                move |_count| {
                    callback(&self.receiver);
                }
            }),
        )?;
        let mut channel = channel.lock().unwrap();
        channel.event = Some(event);
        channel.loop_ = Some(loop_);
        Ok(())
    }

    pub fn into_receiver(self) -> mpsc::Receiver<T> {
        self.receiver
    }
}