package centrifuge

import (
	"container/heap"
	"sync"
	"time"

	"github.com/centrifugal/centrifuge/internal/memstream"
	"github.com/centrifugal/centrifuge/internal/priority"
)

// MemoryEngine is builtin default Engine which allows to run Centrifuge-based
// server without any external broker or storage. All data managed inside process
// memory.
//
// With this engine you can only run single Centrifuge node. If you need to scale
// you should consider using another engine implementation instead – for example
// Redis engine.
//
// Running single node can be sufficient for many use cases especially when you
// need maximum performance and not too many online clients. Consider configuring
// your load balancer to have one backup Centrifuge node for HA in this case.
type MemoryEngine struct {
	node         *Node
	presenceHub  *presenceHub
	historyHub   *historyHub
	eventHandler BrokerEventHandler

	// pubLocks synchronize access to publishing. We have to sync publish
	// to handle publications in the order of offset to prevent InsufficientState
	// errors.
	// TODO: maybe replace with sharded pool of workers with buffered channels.
	pubLocks map[int]*sync.Mutex
}

var _ Engine = (*MemoryEngine)(nil)

// MemoryEngineConfig is a memory engine config.
type MemoryEngineConfig struct {
	// HistoryMetaTTL sets a time of inactive stream meta information expiration.
	// Must have a reasonable value for application.
	// At moment works with seconds precision.
	// TODO v1: maybe make this channel namespace option?
	// TODO v1: since we have epoch things should also properly work without meta
	// information at all (but we loose possibility of long-term recover in stream
	// without new messages).
	HistoryMetaTTL time.Duration
}

const numPubLocks = 4096

// NewMemoryEngine initializes Memory Engine.
func NewMemoryEngine(n *Node, c MemoryEngineConfig) (*MemoryEngine, error) {
	pubLocks := make(map[int]*sync.Mutex, numPubLocks)
	for i := 0; i < numPubLocks; i++ {
		pubLocks[i] = &sync.Mutex{}
	}
	e := &MemoryEngine{
		node:        n,
		presenceHub: newPresenceHub(),
		historyHub:  newHistoryHub(c.HistoryMetaTTL),
		pubLocks:    pubLocks,
	}
	return e, nil
}

// Run runs memory engine.
func (e *MemoryEngine) Run(h BrokerEventHandler) error {
	e.eventHandler = h
	e.historyHub.runCleanups()
	return nil
}

func (e *MemoryEngine) pubLock(ch string) *sync.Mutex {
	return e.pubLocks[index(ch, numPubLocks)]
}

// Publish adds message into history hub and calls node method to handle message.
// We don't have any PUB/SUB here as Memory Engine is single node only.
func (e *MemoryEngine) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error) {
	mu := e.pubLock(ch)
	mu.Lock()
	defer mu.Unlock()

	pub := &Publication{
		Data: data,
		Info: opts.ClientInfo,
	}
	if opts.HistorySize > 0 && opts.HistoryTTL > 0 {
		streamTop, err := e.historyHub.add(ch, pub, opts)
		if err != nil {
			return StreamPosition{}, err
		}
		pub.Offset = streamTop.Offset
		return streamTop, e.eventHandler.HandlePublication(ch, pub)
	}
	return StreamPosition{}, e.eventHandler.HandlePublication(ch, pub)
}

// PublishJoin - see engine interface description.
func (e *MemoryEngine) PublishJoin(ch string, info *ClientInfo) error {
	return e.eventHandler.HandleJoin(ch, info)
}

// PublishLeave - see engine interface description.
func (e *MemoryEngine) PublishLeave(ch string, info *ClientInfo) error {
	return e.eventHandler.HandleLeave(ch, info)
}

// PublishControl - see Engine interface description.
func (e *MemoryEngine) PublishControl(data []byte) error {
	return e.eventHandler.HandleControl(data)
}

// Subscribe is noop here.
func (e *MemoryEngine) Subscribe(_ string) error {
	return nil
}

// Unsubscribe node from channel.
func (e *MemoryEngine) Unsubscribe(_ string) error {
	return nil
}

// AddPresence - see engine interface description.
func (e *MemoryEngine) AddPresence(ch string, uid string, info *ClientInfo, _ time.Duration) error {
	return e.presenceHub.add(ch, uid, info)
}

// RemovePresence - see engine interface description.
func (e *MemoryEngine) RemovePresence(ch string, uid string) error {
	return e.presenceHub.remove(ch, uid)
}

// Presence - see engine interface description.
func (e *MemoryEngine) Presence(ch string) (map[string]*ClientInfo, error) {
	return e.presenceHub.get(ch)
}

// PresenceStats - see engine interface description.
func (e *MemoryEngine) PresenceStats(ch string) (PresenceStats, error) {
	return e.presenceHub.getStats(ch)
}

// History - see engine interface description.
func (e *MemoryEngine) History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) {
	return e.historyHub.get(ch, filter)
}

// RemoveHistory - see engine interface description.
func (e *MemoryEngine) RemoveHistory(ch string) error {
	return e.historyHub.remove(ch)
}

// Channels - see engine interface description.
func (e *MemoryEngine) Channels() ([]string, error) {
	return e.node.Hub().Channels(), nil
}

type presenceHub struct {
	sync.RWMutex
	presence map[string]map[string]*ClientInfo
}

func newPresenceHub() *presenceHub {
	return &presenceHub{
		presence: make(map[string]map[string]*ClientInfo),
	}
}

func (h *presenceHub) add(ch string, uid string, info *ClientInfo) error {
	h.Lock()
	defer h.Unlock()

	_, ok := h.presence[ch]
	if !ok {
		h.presence[ch] = make(map[string]*ClientInfo)
	}
	h.presence[ch][uid] = info
	return nil
}

func (h *presenceHub) remove(ch string, uid string) error {
	h.Lock()
	defer h.Unlock()

	if _, ok := h.presence[ch]; !ok {
		return nil
	}
	if _, ok := h.presence[ch][uid]; !ok {
		return nil
	}

	delete(h.presence[ch], uid)

	// clean up map if needed
	if len(h.presence[ch]) == 0 {
		delete(h.presence, ch)
	}

	return nil
}

func (h *presenceHub) get(ch string) (map[string]*ClientInfo, error) {
	h.RLock()
	defer h.RUnlock()

	presence, ok := h.presence[ch]
	if !ok {
		// return empty map
		return nil, nil
	}

	data := make(map[string]*ClientInfo, len(presence))
	for k, v := range presence {
		data[k] = v
	}
	return data, nil
}

func (h *presenceHub) getStats(ch string) (PresenceStats, error) {
	h.RLock()
	defer h.RUnlock()

	presence, ok := h.presence[ch]
	if !ok {
		// return empty map
		return PresenceStats{}, nil
	}

	numClients := len(presence)
	numUsers := 0
	uniqueUsers := map[string]struct{}{}

	for _, info := range presence {
		userID := info.UserID
		if _, ok := uniqueUsers[userID]; !ok {
			uniqueUsers[userID] = struct{}{}
			numUsers++
		}
	}

	return PresenceStats{
		NumClients: numClients,
		NumUsers:   numUsers,
	}, nil
}

type historyHub struct {
	sync.RWMutex
	streams         map[string]*memstream.Stream
	nextExpireCheck int64
	expireQueue     priority.Queue
	expires         map[string]int64
	historyMetaTTL  time.Duration
	nextRemoveCheck int64
	removeQueue     priority.Queue
	removes         map[string]int64
}

func newHistoryHub(historyMetaTTL time.Duration) *historyHub {
	return &historyHub{
		streams:        make(map[string]*memstream.Stream),
		expireQueue:    priority.MakeQueue(),
		expires:        make(map[string]int64),
		historyMetaTTL: historyMetaTTL,
		removeQueue:    priority.MakeQueue(),
		removes:        make(map[string]int64),
	}
}

func (h *historyHub) runCleanups() {
	go h.expireStreams()
	if h.historyMetaTTL > 0 {
		go h.removeStreams()
	}
}

func (h *historyHub) removeStreams() {
	var nextRemoveCheck int64
	for {
		time.Sleep(time.Second)
		h.Lock()
		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > time.Now().Unix() {
			h.Unlock()
			continue
		}
		nextRemoveCheck = 0
		for h.removeQueue.Len() > 0 {
			item := heap.Pop(&h.removeQueue).(*priority.Item)
			expireAt := item.Priority
			if expireAt > time.Now().Unix() {
				heap.Push(&h.removeQueue, item)
				nextRemoveCheck = expireAt
				break
			}
			ch := item.Value
			exp, ok := h.removes[ch]
			if !ok {
				continue
			}
			if exp <= expireAt {
				delete(h.removes, ch)
				delete(h.streams, ch)
			} else {
				heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: exp})
			}
		}
		h.nextRemoveCheck = nextRemoveCheck
		h.Unlock()
	}
}

func (h *historyHub) expireStreams() {
	var nextExpireCheck int64
	for {
		time.Sleep(time.Second)
		h.Lock()
		if h.nextExpireCheck == 0 || h.nextExpireCheck > time.Now().Unix() {
			h.Unlock()
			continue
		}
		nextExpireCheck = 0
		for h.expireQueue.Len() > 0 {
			item := heap.Pop(&h.expireQueue).(*priority.Item)
			expireAt := item.Priority
			if expireAt > time.Now().Unix() {
				heap.Push(&h.expireQueue, item)
				nextExpireCheck = expireAt
				break
			}
			ch := item.Value
			exp, ok := h.expires[ch]
			if !ok {
				continue
			}
			if exp <= expireAt {
				delete(h.expires, ch)
				if stream, ok := h.streams[ch]; ok {
					stream.Clear()
				}
			} else {
				heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: exp})
			}
		}
		h.nextExpireCheck = nextExpireCheck
		h.Unlock()
	}
}

func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (StreamPosition, error) {
	h.Lock()
	defer h.Unlock()

	var index uint64
	var epoch string

	expireAt := time.Now().Unix() + int64(opts.HistoryTTL.Seconds())
	if _, ok := h.expires[ch]; !ok {
		heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: expireAt})
	}
	h.expires[ch] = expireAt
	if h.nextExpireCheck == 0 || h.nextExpireCheck > expireAt {
		h.nextExpireCheck = expireAt
	}

	if h.historyMetaTTL > 0 {
		removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
		if _, ok := h.removes[ch]; !ok {
			heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
		}
		h.removes[ch] = removeAt
		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt {
			h.nextRemoveCheck = removeAt
		}
	}

	if stream, ok := h.streams[ch]; ok {
		index, _ = stream.Add(pub, opts.HistorySize)
		epoch = stream.Epoch()
	} else {
		stream := memstream.New()
		index, _ = stream.Add(pub, opts.HistorySize)
		epoch = stream.Epoch()
		h.streams[ch] = stream
	}
	pub.Offset = index

	return StreamPosition{Offset: index, Epoch: epoch}, nil
}

// Lock must be held outside.
func (h *historyHub) createStream(ch string) StreamPosition {
	stream := memstream.New()
	h.streams[ch] = stream
	streamPosition := StreamPosition{}
	streamPosition.Offset = 0
	streamPosition.Epoch = stream.Epoch()
	return streamPosition
}

func getPosition(stream *memstream.Stream) StreamPosition {
	streamPosition := StreamPosition{}
	streamPosition.Offset = stream.Top()
	streamPosition.Epoch = stream.Epoch()
	return streamPosition
}

func (h *historyHub) get(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) {
	h.Lock()
	defer h.Unlock()

	if h.historyMetaTTL > 0 {
		removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
		if _, ok := h.removes[ch]; !ok {
			heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
		}
		h.removes[ch] = removeAt
		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt {
			h.nextRemoveCheck = removeAt
		}
	}

	stream, ok := h.streams[ch]
	if !ok {
		return nil, h.createStream(ch), nil
	}

	if filter.Since == nil {
		if filter.Limit == 0 {
			return nil, getPosition(stream), nil
		}
		items, _, err := stream.Get(0, filter.Limit)
		if err != nil {
			return nil, StreamPosition{}, err
		}
		pubs := make([]*Publication, 0, len(items))
		for _, item := range items {
			pub := item.Value.(*Publication)
			pubs = append(pubs, pub)
		}
		return pubs, getPosition(stream), nil
	}

	since := filter.Since

	streamPosition := getPosition(stream)
	if streamPosition.Offset == since.Offset && since.Epoch == stream.Epoch() {
		return nil, streamPosition, nil
	}

	streamOffset := since.Offset + 1

	items, _, err := stream.Get(streamOffset, filter.Limit)
	if err != nil {
		return nil, StreamPosition{}, err
	}

	pubs := make([]*Publication, 0, len(items))
	for _, item := range items {
		pub := item.Value.(*Publication)
		pubs = append(pubs, pub)
	}
	return pubs, streamPosition, nil
}

func (h *historyHub) remove(ch string) error {
	h.Lock()
	defer h.Unlock()
	if stream, ok := h.streams[ch]; ok {
		stream.Clear()
	}
	return nil
}