Pipeline.cpp
#include "Pipeline.h"
#include "Executor.h"
#include "Task.h"
using namespace std;
Pipeline::Pipeline() :
empty( true ) {
}
void Pipeline::write( double value ) {
unique_lock<mutex> lock(mtx);
while (!empty) {
cv.wait(lock);
}
empty = false;
this->value = value;
cv.notify_all();
}
double Pipeline::read() {
unique_lock<mutex> lock(mtx);
while (empty) {
cv.wait(lock);
}
empty = true;
cv.notify_all();
return value;
}
class WriteTask : public Task {
public:
Pipeline& pipeline;
void execute() {
for (int i=0; i<100; i++) {
pipeline.write(i);
}
}
WriteTask( Pipeline& pipeline ) :
pipeline( pipeline ) {
}
};
class ReadTask : public Task {
public:
Pipeline& pipeline;
double total;
void execute() {
for (int i=0; i<100; i++) {
total+=pipeline.read();
}
}
ReadTask( Pipeline& pipeline ) :
pipeline( pipeline ),
total(0.0 ) {
}
};
static void testTwoThreads() {
Pipeline pipeline;
auto w = make_shared<WriteTask>( pipeline );
auto r = make_shared<ReadTask>( pipeline );
SPExecutor executor = Executor::newInstance(2);
executor->addTask( r );
executor->addTask( w );
executor->join();
ASSERT_APPROX_EQUAL( r->total, 99.0*50.0, 0.1);
}
void testPipeline() {
TEST( testTwoThreads );
}