在写代码的时候碰见了一个问题。 首先, 我需要维护一个队列, 有多个方法可以在队列里面插入对象。比如:
let fifo = [];
function productor1(){
fifo.push[obj1]
}
function productor2(){
fifo.push[obj2]
}
然后, 有一个方法来消耗这个数组, 这个是同步的, 第一个数组元素消耗完才能消耗第二个数组元素。
function consumer(){
let obj = fifo.shift()
//处理
}
我应该怎样做到生产者跟消费者同时进行, 并且当数组为空的时候消费者休息, 当生产者往里面加入数据的时候, 消费者又马上开始消费。 尽量用emit来实现, 不要用定时器来判断, 感觉太占资源了
@zy445566 这个数组里面的东西是同步执行的, 就是数组第一个索引位置执行完才能执行第二个, 你上面这么应该会出现两个同时执行吧?
刚刚点错了,还没写完,楼主真是秒回啊😓。加个是否在运行的判断就好了 消费者再调一下消费者,这个刚刚加的,这样消费者就不断运行了,队列空了就结束消费就好了 还有自己调自己要取消isRun判断
const Events= require('events');
const emitter = new Events();
let fifo = [];
function productor1(){
fifo.push(1)
emitMsg()
}
function productor2(){
fifo.push(2)
emitMsg()
}
function emitMsg() {
if (fifo.length==1) {
emitter.emit('msg');
}
}
let isRun = false;
function consumer(self=false) {
if (isRun && !self){return;}
isRun=true;
// 模拟处理耗时
setTimeout(()=>{
let obj = fifo.shift();
console.log(obj)
if (fifo.length==0) {isRun=false;return;}
consumer(true);
},1000)
}
emitter.on('msg', () => {
consumer()
});
(()=>{
productor1();
productor2();
})();
@zy445566 好的 谢谢。 理解意思
@atian25 自己生写算了
@zy445566 把isRun=false;写在上面是不是就不需要那个self了呀
@heguangda 写在上面只是一个默认值
试试 rxjs6。以下为 typescript 代码,可抹掉类型相关代码换成 js
import { Subject } from 'rxjs';
// 定义类型
export interface FifoItemType {
ip: string
}
// 生产者流。 FifoItemType 为入口参数类型,省事可用 any 类型
const fifo$ = new Subject<FifoItemType>();
// 消费者订阅
const consumerSub = fifo$.subscribe(consumer)
// 生产数据
fifo$.next({ ip: '192.168.1.1' })
fifo$.next({ ip: '192.168.1.2' })
// 具体消费实现
function consumer(data: FifoItemType){
//处理
console.log(data.ip)
}
可以 export fifo$
这样,其他地方都可以调用 .next() 方法生产数据.
上面的 consumerSub
表示 消费者订阅(consumer subscription),可执行 consumerSub.unsubscribe()
来取消订阅。
ps: rxjs 是(异步/同步)流控制的利器。用熟了对于开发效率提升很多(当然调试堆栈不那么友好……)
混合同步及异步消费且耦合的升级版本(需考虑生产速度高于消费速度的背压情况):
import { defer, of, Observable, Subject } from 'rxjs'
import { concatMap, map, tap } from 'rxjs/operators'
// 定义类型
export interface FifoItemType {
ip: string
}
// 生产者流。 FifoItemType 为入口参数类型,省事可用 any 类型
const fifo$ = new Subject<FifoItemType>().pipe(
concatMap(consumerAsync),
concatMap(consumerSync),
tap(consumer),
tap(data => consumer(data)),
)
// 消费者订阅
const consumerSub = fifo$.subscribe()
// 生产数据
fifo$.next({ ip: '192.168.1.1' })
fifo$.next({ ip: '192.168.1.2' })
// 异步消费实现
function consumerAsync(data: FifoItemType): Observable<FifoItemType> {
const ret$ = defer(() => fetch('saveip/' + data.ip)).pipe(
map(() => data),
)
return ret$
}
// 同步消费实现
function consumerSync(data: FifoItemType): Observable<FifoItemType> {
//处理
console.log(data.ip)
return of(data)
}
// 有副作用消费实现
let outerIp = ''
function consumer(data: FifoItemType): void {
outerIp = data.ip
}
用js的协程, yield切换,next恢复
来自酷炫的 CNodeMD