package threading;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
public class SynchronizedSignalOrdererTest {
private CountDownLatch readyLatch;
private CountDownLatch startLatch;
private CountDownLatch doneLatch;
class SignalProducer extends Thread {
private final long start;
private final long end;
private final SignalReciever reciever;
public SignalProducer(long start, long end, SignalReciever reciever) {
this.start = start;
this.end = end;
this.reciever = reciever;
}
@Override
public void run() {
readyLatch.countDown();
try {
startLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
for (long i=start;i<=end;i++) {
reciever.accept(new Signal<String>(i, null));
}
doneLatch.countDown();
}
}
public static class SignalRecorder implements SignalReciever {
private List<Signal<?>> signals = new ArrayList<Signal<?>>();
@Override
public synchronized void accept(Signal<?> signal) {
signals.add(signal);
}
public List<Signal<?>> getSignals() {
return signals;
}
}
@Test
public void testOrdering() {
SignalRecorder recorder = new SignalRecorder();
ConcurrentSignalOrderer orderer = new ConcurrentSignalOrderer(recorder,0);
long[] order = new long[]{0,5,2,1,3,7,4,6,9,8};
for (long next : order) {
orderer.accept(new Signal<Object>(next, null));
}
Long[] recordedOrder = new Long[recorder.getSignals().size()];
int i = 0;
for (Signal<?> signal : recorder.getSignals()) {
recordedOrder[i++] = signal.sequenceNumber();
}
assertArrayEquals(new Long[]{0L,1L,2L,3L,4L,5L,6L,7L,8L,9L},recordedOrder);
}
@Test
public void testSynchronizationTestCase() throws InterruptedException {
SignalRecorder recorder = new SignalRecorder();
int producerCount = 20;
int dataCount = 1000;
runProducers(recorder, producerCount, dataCount);
List<Signal<?>> recorded = recorder.getSignals();
assertEquals(producerCount*dataCount,recorded.size());
boolean failed = false;
for (int i=1;i<recorded.size();i++) {
if ((recorded.get(i-1).sequenceNumber()+1 != recorded.get(i).sequenceNumber())) {
failed = true;
}
}
assertTrue(failed);
}
@Test
public void testConcurrentOrdered() throws InterruptedException {
SignalRecorder recorder = new SignalRecorder();
int producerCount = 50;
int dataCount = 2;
runProducers(new ConcurrentSignalOrderer(recorder,0), producerCount, dataCount);
List<Signal<?>> recorded = recorder.getSignals();
assertEquals(producerCount*dataCount,recorded.size());
for (int i=1;i<recorded.size();i++) {
assertEquals(recorded.get(i-1).sequenceNumber()+1,recorded.get(i).sequenceNumber());
}
}
@Test
public void testSynchronizedOrdered() throws InterruptedException {
SignalRecorder recorder = new SignalRecorder();
int producerCount = 50;
int dataCount = 2;
runProducers(new SynchronizedSignalOrderer(recorder,0), producerCount, dataCount);
List<Signal<?>> recorded = recorder.getSignals();
assertEquals(producerCount*dataCount,recorded.size());
for (int i=1;i<recorded.size();i++) {
assertEquals(recorded.get(i-1).sequenceNumber()+1,recorded.get(i).sequenceNumber());
}
}
private void runProducers(SignalReciever reciever, int producerCount, int dataCount) throws InterruptedException {
readyLatch = new CountDownLatch(producerCount);
startLatch = new CountDownLatch(1);
doneLatch = new CountDownLatch(producerCount);
for(int i=0;i<producerCount;i++) {
SignalProducer producer = new SignalProducer(i*dataCount,i*dataCount+dataCount-1,reciever);
producer.start();
}
readyLatch.await();
startLatch.countDown();
doneLatch.await();
}
}