'use strict';
var assert = require('assert')
var from = require('from2')
var merge = require('./')
function test (name, fn, c) {
var combined = c || merge();
var to = after(1000, process.emit.bind(process), 'error', new Error('Timed out: ' + name))
combined.on('end', function () {
clearTimeout(to)
});
fn(combined);
}
test('smoke', function (combined) {
function addSource (i) {
if (i === 38) return;
combined.add(range(i))
after(6, addSource, i + 1)
}
var total = 0
combined.on('data', function (i) { total += i })
combined.on('end', function () { assert.equal(666, total) })
addSource(-36)
})
test('pause/resume', function (combined) {
combined.add(range(100))
combined.add(range(-100))
var counter = 0;
combined.on('data', function () { counter++ })
after(20, function () {
combined.pause()
assert(counter < 200)
var pauseCount = counter;
after(50, function () {
assert.equal(pauseCount, counter);
combined.resume();
});
});
combined.on('end', function () { assert.equal(counter, 200) })
})
test('array', function (combined) {
var counter = 0;
combined.on('data', function () { counter++ })
combined.on('end', function () { assert.equal(counter, 200) })
}, merge([
range(100),
range(-100)
]))
test('isEmpty', function (combined) {
assert(combined.isEmpty());
combined.on('data', function (n) { assert.equal(0, n) });
combined.add(range(1));
assert(!combined.isEmpty());
})
function range (n) {
var k = n > 0 ? -1 : 1
return from.obj(function (_, next) {
setTimeout(function () {
next(null, n === 0 ? null : n += k)
}, Math.round(6 + Math.round(Math.random() * 6)));
})
}
function after (ms, fn, a, b, c) { return setTimeout(fn, ms, a, b, c) }