package dissolve

import (
	"errors"
	"runtime"
)

// Dissolver allows to put function to in-memory queue and process
// it with workers until success. The order of execution is not maintained.
// Jobs will be lost after closing. Jobs not saved to persistent store so
// do not survive process restart.
// Centrifuge uses this for asynchronously unsubscribing node from channels
// in broker. As soon as process restarts all connections to broker get
// closed automatically so it's ok to lose jobs inside Dissolver queue.
type Dissolver struct {
	queue      queue
	numWorkers int
}

// New creates new Dissolver.
func New(numWorkers int) *Dissolver {
	return &Dissolver{
		queue:      newQueue(),
		numWorkers: numWorkers,
	}
}

// Run launches workers to process Jobs from queue concurrently.
func (d *Dissolver) Run() error {
	for i := 0; i < d.numWorkers; i++ {
		go d.runWorker()
	}
	return nil
}

// Close stops processing Jobs, no more Jobs can be submitted after closing.
func (d *Dissolver) Close() error {
	d.queue.Close()
	return nil
}

// Submit Job to be reliably processed.
func (d *Dissolver) Submit(job Job) error {
	if !d.queue.Add(job) {
		return errors.New("can not submit job to closed dissolver")
	}
	return nil
}

func (d *Dissolver) runWorker() {
	for {
		job, ok := d.queue.Wait()
		if !ok {
			if d.queue.Closed() {
				break
			}
			continue
		}
		err := job()
		if err != nil {
			// Put to the end of queue.
			runtime.Gosched()
			d.queue.Add(job)
		}
	}
}