Files
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

219 lines
8.2 KiB
Python

import collections
import logging
logger = logging.getLogger(__name__)
class Delegate:
def __init__(self, name, delegated):
self.name = name
self.delegated = delegated
def __get__(self, instance, owner):
deque = getattr(instance, self.delegated)
return getattr(deque, self.name)
class BaseCache(list):
# implicitly called magic methods don't invoke __getattribute__
# https://docs.python.org/3/reference/datamodel.html#special-method-lookup
# all method lookups obey the descriptor protocol
# this is how the implicit api is defined in ccxt
__iter__ = Delegate('__iter__', '_deque')
__setitem__ = Delegate('__setitem__', '_deque')
__delitem__ = Delegate('__delitem__', '_deque')
__len__ = Delegate('__len__', '_deque')
__contains__ = Delegate('__contains__', '_deque')
__reversed__ = Delegate('__reversed__', '_deque')
clear = Delegate('clear', '_deque')
pop = Delegate('pop', '_deque')
def __init__(self, max_size=None):
super(BaseCache, self).__init__()
self.max_size = max_size
self._deque = collections.deque([], max_size)
def __eq__(self, other):
return list(self) == other
def __repr__(self):
return str(list(self))
def __add__(self, other):
return list(self) + other
def __getitem__(self, item):
# deque doesn't support slicing
deque = super(list, self).__getattribute__('_deque')
if isinstance(item, slice):
start, stop, step = item.indices(len(deque))
return [deque[i] for i in range(start, stop, step)]
else:
return deque[item]
# to be overriden
def getLimit(self, symbol, limit):
pass
# support transpiled snake_case calls
def get_limit(self, symbol, limit):
return self.getLimit(symbol, limit)
class ArrayCache(BaseCache):
def __init__(self, max_size=None):
super(ArrayCache, self).__init__(max_size)
self._nested_new_updates_by_symbol = False
self._new_updates_by_symbol = {}
self._clear_updates_by_symbol = {}
self._all_new_updates = 0
self._clear_all_updates = False
def getLimit(self, symbol, limit):
if symbol is None:
new_updates_value = self._all_new_updates
self._clear_all_updates = True
else:
new_updates_value = self._new_updates_by_symbol.get(symbol)
if new_updates_value is not None and self._nested_new_updates_by_symbol:
new_updates_value = len(new_updates_value)
self._clear_updates_by_symbol[symbol] = True
if new_updates_value is None:
return limit
elif limit is not None:
return min(new_updates_value, limit)
else:
return new_updates_value
def append(self, item):
self._deque.append(item)
if self._clear_all_updates:
self._clear_all_updates = False
self._clear_updates_by_symbol.clear()
self._all_new_updates = 0
self._new_updates_by_symbol.clear()
if self._clear_updates_by_symbol.get(item['symbol']):
self._clear_updates_by_symbol[item['symbol']] = False
self._new_updates_by_symbol[item['symbol']] = 0
self._new_updates_by_symbol[item['symbol']] = self._new_updates_by_symbol.get(item['symbol'], 0) + 1
self._all_new_updates = (self._all_new_updates or 0) + 1
class ArrayCacheByTimestamp(BaseCache):
def __init__(self, max_size=None):
super(ArrayCacheByTimestamp, self).__init__(max_size)
self.hashmap = {}
self._size_tracker = set()
self._new_updates = 0
self._clear_updates = False
def getLimit(self, symbol, limit):
self._clear_updates = True
if limit is None:
return self._new_updates
return min(self._new_updates, limit)
def append(self, item):
if item[0] in self.hashmap:
reference = self.hashmap[item[0]]
if reference != item:
reference[0:len(item)] = item
else:
self.hashmap[item[0]] = item
if len(self._deque) == self._deque.maxlen:
delete_reference = self._deque.popleft()
del self.hashmap[delete_reference[0]]
self._deque.append(item)
if self._clear_updates:
self._clear_updates = False
self._size_tracker.clear()
self._size_tracker.add(item[0])
self._new_updates = len(self._size_tracker)
class ArrayCacheBySymbolById(ArrayCache):
def __init__(self, max_size=None):
super(ArrayCacheBySymbolById, self).__init__(max_size)
self._nested_new_updates_by_symbol = True
self.hashmap = {}
self._index = collections.deque([], max_size)
def append(self, item):
by_id = self.hashmap.setdefault(item['symbol'], {})
if item['id'] in by_id:
reference = by_id[item['id']]
if reference != item:
reference.update(item)
item = reference
index = self._index.index(item['id'])
del self._deque[index]
del self._index[index]
else:
by_id[item['id']] = item
if len(self._deque) == self._deque.maxlen:
delete_item = self._deque.popleft()
self._index.popleft()
try:
del self.hashmap[delete_item['symbol']][delete_item['id']]
except Exception as e:
logger.error(f"Error deleting item from hashmap: {delete_item}. Error:{e}")
self._deque.append(item)
self._index.append(item['id'])
if self._clear_all_updates:
self._clear_all_updates = False
self._clear_updates_by_symbol.clear()
self._all_new_updates = 0
self._new_updates_by_symbol.clear()
if item['symbol'] not in self._new_updates_by_symbol:
self._new_updates_by_symbol[item['symbol']] = set()
if self._clear_updates_by_symbol.get(item['symbol']):
self._clear_updates_by_symbol[item['symbol']] = False
self._new_updates_by_symbol[item['symbol']].clear()
id_set = self._new_updates_by_symbol[item['symbol']]
before_length = len(id_set)
id_set.add(item['id'])
after_length = len(id_set)
self._all_new_updates = (self._all_new_updates or 0) + (after_length - before_length)
class ArrayCacheBySymbolBySide(ArrayCache):
def __init__(self, max_size=None):
super(ArrayCacheBySymbolBySide, self).__init__(max_size)
self._nested_new_updates_by_symbol = True
self.hashmap = {}
self._index = collections.deque([], max_size)
def append(self, item):
by_side = self.hashmap.setdefault(item['symbol'], {})
if item['side'] in by_side:
reference = by_side[item['side']]
if reference != item:
reference.update(item)
item = reference
index = self._index.index(item['symbol'] + item['side'])
del self._deque[index]
del self._index[index]
else:
by_side[item['side']] = item
if len(self._deque) == self._deque.maxlen:
delete_item = self._deque.popleft()
self._index.popleft()
del self.hashmap[delete_item['symbol']][delete_item['side']]
self._deque.append(item)
self._index.append(item['symbol'] + item['side'])
if self._clear_all_updates:
self._clear_all_updates = False
self._clear_updates_by_symbol.clear()
self._all_new_updates = 0
self._new_updates_by_symbol.clear()
if item['symbol'] not in self._new_updates_by_symbol:
self._new_updates_by_symbol[item['symbol']] = set()
if self._clear_updates_by_symbol.get(item['symbol']):
self._clear_updates_by_symbol[item['symbol']] = False
self._new_updates_by_symbol[item['symbol']].clear()
side_set = self._new_updates_by_symbol[item['symbol']]
before_length = len(side_set)
side_set.add(item['side'])
after_length = len(side_set)
self._all_new_updates = (self._all_new_updates or 0) + (after_length - before_length)