线程和进程的通信
同一进程,线程之间怎么通信
互斥量 + 条件变量(生产者-消费者)
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <iostream> using namespace std;
mutex mtx; condition_variable cv; queue<int> q; bool done = false;
void producer() { for (int i=1;i<=5;++i){ { lock_guard<mutex> lk(mtx); q.push(i); cout<<"[prod] "<<i<<"\n"; } cv.notify_one(); this_thread::sleep_for(chrono::milliseconds(100)); } { lock_guard<mutex> lk(mtx); done = true; } cv.notify_all(); } void consumer() { while (true) { unique_lock<mutex> lk(mtx); cv.wait(lk,[]{return !q.empty()||done;}); if (!q.empty()) { int x=q.front(); q.pop(); lk.unlock(); cout<<" [cons] "<<x<<"\n"; } else if (done) break; } } int main(){ thread t1(producer),t2(consumer); t1.join(); t2.join(); }
|
使用谓词防虚假唤醒。
尽量把耗时工作放在解锁后做来保证线程锁粒度。
atomic
(无锁计数/标志位)
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| #include <atomic> #include <thread> #include <iostream> using namespace std; atomic<int> counter{0}; int main(){ thread t1([]{ for(int i=0;i<100000;i++) counter.fetch_add(1, memory_order_relaxed); }); thread t2([]{ for(int i=0;i<100000;i++) counter++; }); t1.join(); t2.join(); cout << "counter=" << counter.load() << "\n"; }
|
读写锁
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| #include <shared_mutex> #include <thread> #include <vector> #include <iostream> using namespace std; shared_mutex rw; int dataV = 0; int main(){ auto reader = []{ shared_lock<shared_mutex> lk(rw); cout<<"R:"<<dataV<<"\n"; }; auto writer = []{ unique_lock<shared_mutex> lk(rw); dataV++; }; thread w1(writer), w2(writer); vector<thread> rs; for(int i=0;i<4;i++) rs.emplace_back(reader); w1.join(); w2.join(); for(auto& t:rs) t.join(); }
|
匿名管道(进程间的通信)
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <unistd.h> #include <sys/wait.h> #include <cstring> #include <iostream>
int main() { int fd[2]; if (pipe(fd) == -1) { perror("pipe"); return 1; }
pid_t pid = fork(); if (pid < 0) { perror("fork"); return 1; }
if (pid == 0) { close(fd[1]); char buf[256]; ssize_t n = read(fd[0], buf, sizeof(buf)); if (n >= 0) std::cout << "[child] recv: " << std::string(buf, buf+n) << "\n"; else perror("read"); close(fd[0]); _exit(0); } else { close(fd[0]); const char* msg = "hello via pipe"; if (write(fd[1], msg, strlen(msg)) == -1) perror("write"); close(fd[1]); waitpid(pid, nullptr, 0); } }
|
由于 fork 其实都从父进程复制了描述符副本——如果这些子进程之间都来自同一个父进程(即父在 fork 两次),那它们间接共享了同一管道的 fd,因此它们也能通信,但前提是它们继承自同一个父进程的那组 fd。
注:fork 之后父子进程的“相同/不同”:
一样的
- 打开的文件描述符(指向同一 open file,共享文件偏移与状态标志)
- 虚拟内存内容的快照(代码/数据/堆/栈,写时复制 COW)
- 内存映射、
argv/environ、当前工作目录、umask、rlimit
- 信号处置与屏蔽集(忽略/默认/处理器设置、mask)
- 会话/进程组/控制终端、调度策略/优先级(nice)
不一样的
- PID/PPID(进程号不同,子进程父 PID=父进程 PID)
- 资源/时间统计起点(开始时间、CPU 统计)
- 挂起信号队列(pending signals 不继承)
- 线程(多线程程序中:只保留调用
fork() 的那个线程)
- 物理页:最初共享,写入时分离(COW 后各自独立)
- 定时器/闹钟等实现相关状态(不要指望保持一致)
exit和_exit的区别:
exit(0)
├─ 调用用户注册的 atexit() 回调
├─ 刷新 stdio 缓冲区
├─ 调用全局/静态对象析构
└─ 调用 _exit(0) → 系统调用 → 释放进程资源 → 通知父进程