package notifications

import (
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/Sirupsen/logrus"

	"testing"
)

func TestBroadcaster(t *testing.T) {
	const nEvents = 1000
	var sinks []Sink

	for i := 0; i < 10; i++ {
		sinks = append(sinks, &testSink{})
	}

	b := NewBroadcaster(sinks...)

	var block []Event
	var wg sync.WaitGroup
	for i := 1; i <= nEvents; i++ {
		block = append(block, createTestEvent("push", "library/test", "blob"))

		if i%10 == 0 && i > 0 {
			wg.Add(1)
			go func(block ...Event) {
				if err := b.Write(block...); err != nil {
					t.Fatalf("error writing block of length %d: %v", len(block), err)
				}
				wg.Done()
			}(block...)

			block = nil
		}
	}

	wg.Wait() // Wait until writes complete
	checkClose(t, b)

	// Iterate through the sinks and check that they all have the expected length.
	for _, sink := range sinks {
		ts := sink.(*testSink)
		ts.mu.Lock()
		defer ts.mu.Unlock()

		if len(ts.events) != nEvents {
			t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
		}

		if !ts.closed {
			t.Fatalf("sink should have been closed")
		}
	}

}

func TestEventQueue(t *testing.T) {
	const nevents = 1000
	var ts testSink
	metrics := newSafeMetrics()
	eq := newEventQueue(
		// delayed sync simulates destination slower than channel comms
		&delayedSink{
			Sink:  &ts,
			delay: time.Millisecond * 1,
		}, metrics.eventQueueListener())

	var wg sync.WaitGroup
	var block []Event
	for i := 1; i <= nevents; i++ {
		block = append(block, createTestEvent("push", "library/test", "blob"))
		if i%10 == 0 && i > 0 {
			wg.Add(1)
			go func(block ...Event) {
				if err := eq.Write(block...); err != nil {
					t.Fatalf("error writing event block: %v", err)
				}
				wg.Done()
			}(block...)

			block = nil
		}
	}

	wg.Wait()
	checkClose(t, eq)

	ts.mu.Lock()
	defer ts.mu.Unlock()
	metrics.Lock()
	defer metrics.Unlock()

	if len(ts.events) != nevents {
		t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
	}

	if !ts.closed {
		t.Fatalf("sink should have been closed")
	}

	if metrics.Events != nevents {
		t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
	}

	if metrics.Pending != 0 {
		t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
	}
}

func TestRetryingSink(t *testing.T) {

	// Make a sync that fails most of the time, ensuring that all the events
	// make it through.
	var ts testSink
	flaky := &flakySink{
		rate: 1.0, // start out always failing.
		Sink: &ts,
	}
	s := newRetryingSink(flaky, 3, 10*time.Millisecond)

	var wg sync.WaitGroup
	var block []Event
	for i := 1; i <= 100; i++ {
		block = append(block, createTestEvent("push", "library/test", "blob"))

		// Above 50, set the failure rate lower
		if i > 50 {
			s.mu.Lock()
			flaky.rate = 0.90
			s.mu.Unlock()
		}

		if i%10 == 0 && i > 0 {
			wg.Add(1)
			go func(block ...Event) {
				defer wg.Done()
				if err := s.Write(block...); err != nil {
					t.Fatalf("error writing event block: %v", err)
				}
			}(block...)

			block = nil
		}
	}

	wg.Wait()
	checkClose(t, s)

	ts.mu.Lock()
	defer ts.mu.Unlock()

	if len(ts.events) != 100 {
		t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
	}
}

type testSink struct {
	events []Event
	mu     sync.Mutex
	closed bool
}

func (ts *testSink) Write(events ...Event) error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.events = append(ts.events, events...)
	return nil
}

func (ts *testSink) Close() error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.closed = true

	logrus.Infof("closing testSink")
	return nil
}

type delayedSink struct {
	Sink
	delay time.Duration
}

func (ds *delayedSink) Write(events ...Event) error {
	time.Sleep(ds.delay)
	return ds.Sink.Write(events...)
}

type flakySink struct {
	Sink
	rate float64
}

func (fs *flakySink) Write(events ...Event) error {
	if rand.Float64() < fs.rate {
		return fmt.Errorf("error writing %d events", len(events))
	}

	return fs.Sink.Write(events...)
}

func checkClose(t *testing.T, sink Sink) {
	if err := sink.Close(); err != nil {
		t.Fatalf("unexpected error closing: %v", err)
	}

	// second close should not crash but should return an error.
	if err := sink.Close(); err == nil {
		t.Fatalf("no error on double close")
	}

	// Write after closed should be an error
	if err := sink.Write([]Event{}...); err == nil {
		t.Fatalf("write after closed did not have an error")
	} else if err != ErrSinkClosed {
		t.Fatalf("error should be ErrSinkClosed")
	}
}