Skip to content

Commit

Permalink
extracted classes from storage.py and first minimal Redis sync for ap…
Browse files Browse the repository at this point in the history
…p.storage.general
  • Loading branch information
rodja committed Dec 8, 2024
1 parent 3cfb14a commit 7aff68d
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 65 deletions.
2 changes: 2 additions & 0 deletions nicegui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .context import context
from .element_filter import ElementFilter
from .nicegui import app
from .persistence import storage
from .tailwind import Tailwind
from .version import __version__

Expand All @@ -20,5 +21,6 @@
'elements',
'html',
'run',
'storage',
'ui',
]
4 changes: 2 additions & 2 deletions nicegui/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from ..logging import log
from ..native import NativeConfig
from ..observables import ObservableSet
from ..persistence.storage import Storage
from ..server import Server
from ..staticfiles import CacheControlledStaticFiles
from ..storage import Storage
from .app_config import AppConfig
from .range_response import get_range_response

Expand All @@ -39,7 +39,7 @@ def __init__(self, **kwargs) -> None:
self._state: State = State.STOPPED
self.config = AppConfig()

self._startup_handlers: List[Union[Callable[..., Any], Awaitable]] = []
self._startup_handlers: List[Union[Callable[..., Any], Awaitable]] = [self.storage.general.initialize,]
self._shutdown_handlers: List[Union[Callable[..., Any], Awaitable]] = []
self._connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
self._disconnect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
Expand Down
3 changes: 2 additions & 1 deletion nicegui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fastapi.templating import Jinja2Templates
from typing_extensions import Self

from . import background_tasks, binding, core, helpers, json, storage
from . import background_tasks, binding, core, helpers, json
from .awaitable_response import AwaitableResponse
from .dependencies import generate_resources
from .element import Element
Expand All @@ -22,6 +22,7 @@
from .logging import log
from .observables import ObservableDict
from .outbox import Outbox
from .persistence import storage
from .version import __version__

if TYPE_CHECKING:
Expand Down
3 changes: 2 additions & 1 deletion nicegui/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

from typing_extensions import Self

from . import core, events, helpers, json, storage
from . import core, events, helpers, json
from .awaitable_response import AwaitableResponse, NullResponse
from .classes import Classes
from .context import context
from .dependencies import Component, Library, register_library, register_resource, register_vue_component
from .elements.mixins.visibility import Visibility
from .event_listener import EventListener
from .persistence import storage
from .props import Props
from .slot import Slot
from .style import Style
Expand Down
36 changes: 36 additions & 0 deletions nicegui/persistence/persistent_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pathlib import Path
from typing import Optional

import aiofiles

from nicegui import background_tasks, core, json, observables
from nicegui.logging import log


class PersistentDict(observables.ObservableDict):

def __init__(self, filepath: Path, encoding: Optional[str] = None, *, indent: bool = False) -> None:
self.filepath = filepath
self.encoding = encoding
self.indent = indent
try:
data = json.loads(filepath.read_text(encoding)) if filepath.exists() else {}
except Exception:
log.warning(f'Could not load storage file {filepath}')
data = {}
super().__init__(data, on_change=self.backup)

def backup(self) -> None:
"""Back up the data to the given file path."""
if not self.filepath.exists():
if not self:
return
self.filepath.parent.mkdir(exist_ok=True)

async def backup() -> None:
async with aiofiles.open(self.filepath, 'w', encoding=self.encoding) as f:
await f.write(json.dumps(self, indent=self.indent))
if core.loop:
background_tasks.create_lazy(backup(), name=self.filepath.stem)
else:
core.app.on_startup(backup())
24 changes: 24 additions & 0 deletions nicegui/persistence/read_only_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from collections.abc import MutableMapping
from typing import Any, Dict, Iterator


class ReadOnlyDict(MutableMapping):

def __init__(self, data: Dict[Any, Any], write_error_message: str = 'Read-only dict') -> None:
self._data: Dict[Any, Any] = data
self._write_error_message: str = write_error_message

def __getitem__(self, item: Any) -> Any:
return self._data[item]

def __setitem__(self, key: Any, value: Any) -> None:
raise TypeError(self._write_error_message)

def __delitem__(self, key: Any) -> None:
raise TypeError(self._write_error_message)

def __iter__(self) -> Iterator:
return iter(self._data)

def __len__(self) -> int:
return len(self._data)
56 changes: 56 additions & 0 deletions nicegui/persistence/redis_persistent_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import redis.asyncio as redis

from nicegui import background_tasks, core, json, observables
from nicegui.logging import log


class RedisDict(observables.ObservableDict):

def __init__(self, redis_url: str = 'redis://localhost:6379', key_prefix: str = 'nicegui:', encoding: str = 'utf-8') -> None:
self.redis_client = redis.from_url(redis_url)
self.pubsub = self.redis_client.pubsub()
self.key_prefix = key_prefix
self.encoding = encoding

# Initialize with empty data first
super().__init__({}, on_change=self.backup)

async def initialize(self) -> None:
"""Load initial data from Redis and start listening for changes."""
try:
data = await self._load_data()
self.update(data)
except Exception:
log.warning(f'Could not load data from Redis with prefix {self.key_prefix}')

await self._listen_for_changes()

async def _load_data(self) -> dict:
data = await self.redis_client.get(self.key_prefix + 'data')
return json.loads(data) if data else {}

async def _listen_for_changes(self) -> None:
await self.pubsub.subscribe(self.key_prefix + 'changes')
async for message in self.pubsub.listen():
if message['type'] == 'message':
new_data = json.loads(message['data'])
if new_data != self:
self.update(new_data)

def backup(self) -> None:
"""Back up the data to Redis and notify other instances."""
async def backup() -> None:
pipeline = self.redis_client.pipeline()
pipeline.set(self.key_prefix + 'data', json.dumps(self))
pipeline.publish(self.key_prefix + 'changes', json.dumps(self))
await pipeline.execute()
if core.loop:
background_tasks.create_lazy(backup(), name=f'redis-{self.key_prefix}')
else:
core.app.on_startup(backup())

async def close(self) -> None:
"""Close Redis connection and subscription."""
await self.pubsub.unsubscribe()
await self.pubsub.close()
await self.redis_client.close()
67 changes: 8 additions & 59 deletions nicegui/storage.py → nicegui/persistence/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,79 +3,28 @@
import os
import time
import uuid
from collections.abc import MutableMapping
from datetime import timedelta
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, Union
from typing import Dict, Optional, Union

import aiofiles
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
from starlette.responses import Response

from . import background_tasks, core, json, observables
from .context import context
from .logging import log
from .observables import ObservableDict
from .. import core, observables
from ..context import context
from ..observables import ObservableDict
from .persistent_dict import PersistentDict
from .read_only_dict import ReadOnlyDict
from .redis_persistent_dict import RedisDict

request_contextvar: contextvars.ContextVar[Optional[Request]] = contextvars.ContextVar('request_var', default=None)

PURGE_INTERVAL = timedelta(minutes=5).total_seconds()


class ReadOnlyDict(MutableMapping):

def __init__(self, data: Dict[Any, Any], write_error_message: str = 'Read-only dict') -> None:
self._data: Dict[Any, Any] = data
self._write_error_message: str = write_error_message

def __getitem__(self, item: Any) -> Any:
return self._data[item]

def __setitem__(self, key: Any, value: Any) -> None:
raise TypeError(self._write_error_message)

def __delitem__(self, key: Any) -> None:
raise TypeError(self._write_error_message)

def __iter__(self) -> Iterator:
return iter(self._data)

def __len__(self) -> int:
return len(self._data)


class PersistentDict(observables.ObservableDict):

def __init__(self, filepath: Path, encoding: Optional[str] = None, *, indent: bool = False) -> None:
self.filepath = filepath
self.encoding = encoding
self.indent = indent
try:
data = json.loads(filepath.read_text(encoding)) if filepath.exists() else {}
except Exception:
log.warning(f'Could not load storage file {filepath}')
data = {}
super().__init__(data, on_change=self.backup)

def backup(self) -> None:
"""Back up the data to the given file path."""
if not self.filepath.exists():
if not self:
return
self.filepath.parent.mkdir(exist_ok=True)

async def backup() -> None:
async with aiofiles.open(self.filepath, 'w', encoding=self.encoding) as f:
await f.write(json.dumps(self, indent=self.indent))
if core.loop:
background_tasks.create_lazy(backup(), name=self.filepath.stem)
else:
core.app.on_startup(backup())


class RequestTrackingMiddleware(BaseHTTPMiddleware):

async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
Expand Down Expand Up @@ -105,7 +54,7 @@ class Storage:
def __init__(self) -> None:
self.path = Path(os.environ.get('NICEGUI_STORAGE_PATH', '.nicegui')).resolve()
self.max_tab_storage_age = timedelta(days=30).total_seconds()
self._general = PersistentDict(self.path / 'storage-general.json', encoding='utf-8')
self._general = RedisDict() # PersistentDict(self.path / 'storage-general.json', encoding='utf-8')
self._users: Dict[str, PersistentDict] = {}
self._tabs: Dict[str, observables.ObservableDict] = {}

Expand Down
3 changes: 2 additions & 1 deletion nicegui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import uvicorn

from . import core, storage
from . import core
from .native import native
from .persistence import storage


class CustomServerConfig(uvicorn.Config):
Expand Down
3 changes: 2 additions & 1 deletion nicegui/ui_run_with.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

from fastapi import FastAPI

from . import core, storage
from . import core
from .air import Air
from .language import Language
from .nicegui import _shutdown, _startup
from .persistence import storage


def run_with(
Expand Down

0 comments on commit 7aff68d

Please sign in to comment.