Post

🧀 对std::sync::mpsc::Receiver使用for循环

这是一段 从多个producer 发送多个消息的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use std::sync::mpsc;
use std::thread;
use std::time::Duration;


pub fn test_thread(){
    let (tx,rx) =  mpsc::channel();
    let tx1 = tx.clone();

    thread::spawn(move ||{
        let val = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:spawn1"),
        ];
        for v in val{
            tx1.send(v).unwrap();
        }
    });
    thread::spawn(move ||{
        let val = vec![
            String::from("2:hello"),
            String::from("2:from"),
            String::from("2:spawn2"),
        ];
        for v in val{
            tx.send(v).unwrap();
        }

    });

    for receiver in rx{
        println!("get {}",receiver);
    }
}

为啥可以 for receiver in rx

在这段代码里面,为啥可以

1
2
for receiver in rx{
}

这样去做

因为一般不都是 let receiver = rx.recv().unwrap();

因为 for in 其实背后的原理是迭代器

The for in construct is able to interact with an Iterator in several ways. As discussed in the section on the Iterator trait, by default the for loop will apply the into_iter function to the collection.

所以只要看一个结构体有没有into_iter 函数就可以了

而这个into_iter 是 定义在 std::iter::IntoIterator 特性中方法

1
2
3
4
5
6
7
pub trait IntoIterator {
    type Item;
    type IntoIter: Iterator<Item = Self::Item>;

    // Required method
    fn into_iter(self) -> Self::IntoIter;
}

作用是将自身转化成一个 迭代器,这个是可以自定义的行为,需要返回的类型是 Iterator<T>

而一般的做法可以是

1
2
3
4
5
6
7
8
9
impl<I: Iterator> IntoIterator for I {
    type Item = I::Item;
    type IntoIter = I;

    #[inline]
    fn into_iter(self) -> I {
        self
    }
}

然后这时候需要自身实现 std::iter::Iterator 特性

主要是实现 fn next(&mut self) -> Option<Self::Item>; 这个方法

这样就知道迭代的下一个元素是什么了

那其实 上面的 rx 是一个 std::sync::mpsc::Receiver 结构体类型

而它实现了 IntoIterator 特性

2024-05-02-11-39-02-screenshoot.png

1
2
3
4
5
6
7
8
9
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
impl<T> IntoIterator for Receiver<T> {
    type Item = T;
    type IntoIter = IntoIter<T>;

    fn into_iter(self) -> IntoIter<T> {
        IntoIter { rx: self }
    }
}

只不过这里它内部自定义了了一个IntoIter 结构体,

1
2
3
4
5
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
#[derive(Debug)]
pub struct IntoIter<T> {
    rx: Receiver<T>,
}

不应该是迭代器吗? type IntoIter: Iterator<Item = Self::Item>; 因为std::iter::IntoIterator这里有一个类型约束

所以他这里自定义的 IntoIter 实现了 Iterator 特性

1
2
3
4
5
6
7
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
impl<T> Iterator for IntoIter<T> {
    type Item = T;
    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}

自此实现了闭环

可以看到在这个地方调用了 self.rx.recv().ok()

那这个for循环什么时候结束??

这个问题我问了一下chatgpt

循环在何时结束取决于发送者和接收者的行为:

当发送者发送完所有消息后,并关闭通道 (tx 在循环结束后被丢弃),接收者 (rx) 将会接收到所有消息,循环将随之结束。 如果发送者提前关闭了通道,或者发送者 panic,接收者将收到一个特殊的信号,表明通道已经关闭,循环也会随之结束。

大概的意思就是发送者 tx tx1啥时候scope结束后,那么此时接受者就会知道

因为本来 tx,rx 合起来才是一个整体

A channel has two halves: a transmitter and a receiver.

体会一下这里halves的意思

测试一下

tx1发送间隔久一点,tx完成早一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use std::sync::mpsc;
use std::thread;
use std::time::Duration;


pub fn test_thread(){
    let (tx,rx) =  mpsc::channel();
    let tx1 = tx.clone();

    thread::spawn(move ||{
        let val = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:spawn1"),
        ];
        for v in val{
            tx1.send(v).unwrap();
            thread::sleep(Duration::from_secs(2));//加了这句
        }
    });
    thread::spawn(move ||{
        let val = vec![
            String::from("2:hello"),
            String::from("2:from"),
            String::from("2:spawn2"),
        ];
        for v in val{
            tx.send(v).unwrap();
        }

    });

    for receiver in rx{
        println!("get {}",receiver);
    }
}

在第一个spawn thread中的for循环发送中增加了 thread::sleep(Duration::from_secs(2))

为的是能让第二个 spawn thread早点结束,那这时候看rx会不会提早退出

2024-05-02-12-15-10-screenshoot.png

不会,后面两个各等了2秒打印出来

让其中一个先发送完,另一个再开始发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use std::sync::mpsc;
use std::thread;
use std::time::Duration;


pub fn test_thread(){
    let (tx,rx) =  mpsc::channel();
    let tx1 = tx.clone();

    thread::spawn(move ||{
        let val = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:spawn1"),
        ];
        thread::sleep(Duration::from_secs(5));//移到这里
        for v in val{
            tx1.send(v).unwrap();
            //thread::sleep(Duration::from_secs(2));//注释掉
        }
    });
    thread::spawn(move ||{
        let val = vec![
            String::from("2:hello"),
            String::from("2:from"),
            String::from("2:spawn2"),
        ];
        for v in val{
            tx.send(v).unwrap();
        }

    });

    for receiver in rx{
        println!("get {}",receiver);
    }
}

在第一个 spawn thread 开始for循环发送前,进行休眠,感觉5秒足够了

为的就是让第二个 spawn thread 早点完成,看看rx会不会结束

2024-05-02-12-18-10-screenshoot.png

也不会

我猜测这里可以是 tx.clone() 这里起作用,rx从一开始就知道有两个发送者,只有所有的发送者都 嗝屁了,它才安心的走

This post is licensed under CC BY 4.0 by the author.