Skip to content

pyWikiCMS API Documentation

clickstream

Created on 2023-06-11

@author: wf

ClickStream dataclass

Represents a clickstream with associated page hits and user agent data.

Source code in frontend/clickstream.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@dataclass
class ClickStream:
    """Represents a clickstream with associated page hits and user agent data."""

    url: str
    ip: str
    domain: str
    timeStamp: datetime
    pageHits: List[PageHit]
    userAgent: UserAgent
    userAgentHeader: Optional[str] = None
    referrer: Optional[str] = None
    acceptLanguage: Optional[str] = None

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "ClickStream":
        data["timeStamp"] = DateParse.parse_date(data["timeStamp"])
        # Ensure `pageHits` are processed into PageHit instances
        # Initialize an empty list to store PageHit instances.
        page_hits = []

        # Iterate through each item in the list obtained from the 'pageHits' key.
        # Using .get() with a default empty list to handle the absence of 'pageHits'.
        for hit in data.get("pageHits", []):
            # Check if the current hit is not None before processing.
            if hit is not None:
                # Convert the hit dictionary to a PageHit instance and add it to the list.
                page_hits.append(PageHit.from_dict(hit))

        # 'data' dictionary is updated to hold the list of PageHit instances.
        data["pageHits"] = page_hits
        # Remove any keys from `data` that are not fields of the `ClickStream` dataclass
        # data = {key: value for key, value in data.items() if key in ClickStream.__annotations__}

        # Let the `_postprocess` handle the userAgent conversion
        data = ClickStream._postprocess(data)
        return ClickStream(**data)

    @staticmethod
    def _postprocess(data: Dict[str, Any]) -> Dict[str, Any]:
        # Ensure `userAgent` is a dictionary before trying to convert
        if isinstance(data.get("userAgent"), dict):
            data["userAgent"] = UserAgent.from_dict(data["userAgent"])
        # If `pageHits` needs to be processed again (not typically necessary if handled in `from_dict`)
        if isinstance(data.get("pageHits"), list):
            data["pageHits"] = [
                PageHit.from_dict(hit) if isinstance(hit, dict) else hit
                for hit in data["pageHits"]
            ]
        return data

ClickstreamLog dataclass

single log of clickstreams

Source code in frontend/clickstream.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@dataclass
class ClickstreamLog:
    """
    single log of clickstreams
    """

    debug: bool
    MAX_CLICKSTREAMS: int
    LOGGING_TIME_PERIOD: int
    MAX_SESSION_TIME: int
    FLUSH_PERIOD: int
    startTime: datetime
    lastFlush: datetime
    lastLogRotate: datetime
    fileName: str
    clickStreams: List[ClickStream]

    @classmethod
    def from_json(cls, json_file: str):
        with open(json_file, "r", encoding="utf-8") as file:
            data = json.load(file)

        # Handle nested structures
        data = ClickstreamLog._postprocess(data)

        return ClickstreamLog(**data)

    @classmethod
    def _postprocess(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        data["startTime"] = DateParse.parse_date(data["startTime"])
        data["lastFlush"] = DateParse.parse_date(data["lastFlush"])
        data["lastLogRotate"] = DateParse.parse_date(data["lastLogRotate"])
        data["clickStreams"] = [
            ClickStream.from_dict(cs) for cs in data.get("clickStreams", [])
        ]
        return data

ClickstreamManager

Bases: object

logging of client clicks

Source code in frontend/clickstream.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
class ClickstreamManager(object):
    """
    logging of client clicks
    """

    def __init__(
        self,
        root_path: str,
        rdf_namespace: str = "http://cms.bitplan.com/clickstream#",
        show_progress: bool = True,
        verbose: bool = True,
    ):
        """
        Constructor

        Args:
            root_path (str): the root path
            rdf_namespace (str): The base namespace URI for the RDF export.
            show_progress (bool): If True, show progress.
            verbose (bool): If True, print the output message.
        """
        self.root_path = root_path
        self.rdf_namespace = rdf_namespace
        self.clickstream_logs: List[ClickstreamLog] = []
        self.show_progress = show_progress
        self.verbose = verbose

    def get_progress(self, iterable, desc="Processing"):
        """
        Wrap an iterable with a progress bar if show_progress is True
        """
        if self.show_progress:
            return tqdm(iterable, desc=desc)
        else:
            return iterable

    def load_clickstream_logs(self, limit: Optional[int] = None) -> None:
        """
        Load all clickstream logs from the directory
        """
        # Find all json files in the directory
        json_files = glob.glob(os.path.join(self.root_path, "*.json"))
        # If a limit is set, truncate the file list
        if limit is not None:
            json_files = json_files[:limit]

        # Prepare tqdm iterator if required and tqdm is available
        iterator = self.get_progress(json_files, desc="Loading Clickstream Logs")

        total_clickstreams = 0

        # Load each file

        for json_file in iterator:
            try:
                # Parse the JSON file into ClickstreamLog
                clickstream_log = ClickstreamLog.from_json(json_file)
                self.clickstream_logs.append(clickstream_log)
                total_clickstreams += len(
                    clickstream_log.clickStreams
                )  # Count the clickstreams
            except json.JSONDecodeError as jde:
                # Handle JSON-specific parsing errors
                print(f"JSON decode error in file {json_file}: {jde.msg}")
                print(f"Error at line {jde.lineno}, column {jde.colno}")
            except Exception as e:
                tb = traceback.format_exc()  # This will give you the stack trace
                print(f"Error loading {json_file}: {e}")
                print(tb)  # Print stack trace to get more details about the exception
        # After importing, show the total counts
        total_logs = len(self.clickstream_logs)
        print(
            f"Imported {total_logs} clickstream logs with a total of {total_clickstreams} clickstreams."
        )

    def serialize_batch(
        self, g: Graph, rdf_file: str, file_counter: int, rdf_format: str
    ) -> None:
        """
        Serializes a batch of RDF data to a file.

        Args:
            g (Graph): The RDF graph to serialize.
            rdf_file (str): The base name for the RDF file.
            file_counter (int): The current file count for naming.
            rdf_format (str): The format to serialize the RDF data.

        """
        batch_file = f"{rdf_file}_part{file_counter:03}.{rdf_format}"
        g.serialize(destination=batch_file, format=rdf_format)
        if self.verbose:
            print(f"Exported RDF to {batch_file}")

    def add_stream_properties_to_graph(
        self, g: Graph, CS: Namespace, stream: Any, entity_counter: int
    ) -> int:
        """
        Adds the properties of a clickstream to the RDF graph.

        Args:
            g (Graph): The graph to which the properties will be added.
            CS (Namespace): The namespace for clickstream data.
            stream (Any): The clickstream object containing the data.
            entity_counter (int): A counter for creating unique entities.

        Returns:
            int: The updated entity counter after adding the properties.
        """
        stream_uri = URIRef(f"{CS}clickstream/{entity_counter}")
        entity_counter += 1

        # Add properties to the stream URI
        g.add((stream_uri, RDF.type, CS.ClickStream))
        g.add((stream_uri, CS.url, Literal(stream.url)))
        g.add((stream_uri, CS.ip, Literal(stream.ip)))
        g.add((stream_uri, CS.domain, Literal(stream.domain)))
        g.add((stream_uri, CS.userAgentHeader, Literal(stream.userAgentHeader)))
        g.add(
            (
                stream_uri,
                CS.timeStamp,
                Literal(stream.timeStamp.isoformat(), datatype=XSD.dateTime),
            )
        )

        # Optional referrer information
        if stream.referrer:
            g.add((stream_uri, CS.referrer, Literal(stream.referrer)))

        # User Agent details
        ua_uri = URIRef(f"{CS}useragent/{entity_counter}")
        entity_counter += 1
        g.add((ua_uri, RDF.type, CS.UserAgent))
        g.add((ua_uri, CS.hasSyntaxError, Literal(stream.userAgent.hasSyntaxError)))
        g.add((ua_uri, CS.hasAmbiguity, Literal(stream.userAgent.hasAmbiguity)))
        g.add((ua_uri, CS.ambiguityCount, Literal(stream.userAgent.ambiguityCount)))
        g.add((ua_uri, CS.userAgentString, Literal(stream.userAgent.userAgentString)))
        g.add((stream_uri, CS.userAgent, ua_uri))

        # Page Hits
        for hit in stream.pageHits:
            hit_uri = URIRef(f"{CS}pagehit/{entity_counter}")
            entity_counter += 1
            g.add((hit_uri, RDF.type, CS.PageHit))
            g.add((hit_uri, CS.path, Literal(hit.path)))
            g.add(
                (
                    hit_uri,
                    CS.timeStamp,
                    Literal(hit.timeStamp.isoformat(), datatype=XSD.dateTime),
                )
            )
            g.add((stream_uri, CS.pageHits, hit_uri))

        return entity_counter

    def export_to_rdf(
        self,
        rdf_file: str,
        batch_size: int,
        rdf_format: str = "nt",
    ) -> None:
        """
        Export clickstream logs to RDF files in batches.
        :param rdf_file: The base file name to write the RDF data to.
        :param batch_size: The number of clickstream records per file.
        :param rdf_format: The RDF serialization format to use (default is "nt").
        """
        # Namespace definition
        CS = Namespace(self.rdf_namespace)

        # Initialize variables
        file_counter = 1
        entity_counter = 1
        g = Graph()
        g.bind("cs", CS)

        # Create the directory if it doesn't exist
        os.makedirs(os.path.dirname(rdf_file), exist_ok=True)
        iterator = self.get_progress(self.clickstream_logs, desc="Export Progress")

        for log in iterator:
            for stream in log.clickStreams:
                entity_counter = self.add_stream_properties_to_graph(
                    g, CS, stream, entity_counter
                )

                # If batch size is reached, serialize and save to file
                if entity_counter % batch_size == 0:
                    self.serialize_batch(g, rdf_file, file_counter, rdf_format)
                    file_counter += 1
                    g = Graph()  # Reset the graph for the next batch
                    g.bind("cs", CS)

        # Serialize and save any remaining triples that didn't fill up the last batch
        if len(g):
            self.serialize_batch(g, rdf_file, file_counter, rdf_format)

    def reload_graph(self, rdf_file_pattern: str, rdf_format: str = "nt") -> Graph:
        """
        Reloads the RDF data from a batch of files into the clickstream logs.

        Args:
            rdf_file_pattern (str): The file pattern to search for RDF files.
                                    A wildcard '*' will be appended if not present.
            rdf_format (str): The RDF serialization format of the files (default is "nt").

        Returns:
            Graph: The RDF graph populated with data from the files.
        """
        # Ensure the pattern ends with a wildcard, append if necessary
        if not rdf_file_pattern.endswith("*"):
            rdf_file_pattern += "*"

        # Find all files matching the pattern
        rdf_files = glob.glob(rdf_file_pattern)

        # Initialize a new RDF graph
        g = Graph()

        # Use a progress bar if available or simply iterate over files
        try:
            iterator = self.get_progress(rdf_files, desc="Loading graph")
        except AttributeError:
            # If get_progress is not defined, fall back to simple iteration
            iterator = rdf_files

        for rdf_file in iterator:
            # Parse each RDF file and add it to the graph
            g.parse(rdf_file, format=rdf_format)

        # After loading all files, return the populated graph
        return g

__init__(root_path, rdf_namespace='http://cms.bitplan.com/clickstream#', show_progress=True, verbose=True)

Constructor

Parameters:

Name Type Description Default
root_path str

the root path

required
rdf_namespace str

The base namespace URI for the RDF export.

'http://cms.bitplan.com/clickstream#'
show_progress bool

If True, show progress.

True
verbose bool

If True, print the output message.

True
Source code in frontend/clickstream.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __init__(
    self,
    root_path: str,
    rdf_namespace: str = "http://cms.bitplan.com/clickstream#",
    show_progress: bool = True,
    verbose: bool = True,
):
    """
    Constructor

    Args:
        root_path (str): the root path
        rdf_namespace (str): The base namespace URI for the RDF export.
        show_progress (bool): If True, show progress.
        verbose (bool): If True, print the output message.
    """
    self.root_path = root_path
    self.rdf_namespace = rdf_namespace
    self.clickstream_logs: List[ClickstreamLog] = []
    self.show_progress = show_progress
    self.verbose = verbose

add_stream_properties_to_graph(g, CS, stream, entity_counter)

Adds the properties of a clickstream to the RDF graph.

Parameters:

Name Type Description Default
g Graph

The graph to which the properties will be added.

required
CS Namespace

The namespace for clickstream data.

required
stream Any

The clickstream object containing the data.

required
entity_counter int

A counter for creating unique entities.

required

Returns:

Name Type Description
int int

The updated entity counter after adding the properties.

Source code in frontend/clickstream.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def add_stream_properties_to_graph(
    self, g: Graph, CS: Namespace, stream: Any, entity_counter: int
) -> int:
    """
    Adds the properties of a clickstream to the RDF graph.

    Args:
        g (Graph): The graph to which the properties will be added.
        CS (Namespace): The namespace for clickstream data.
        stream (Any): The clickstream object containing the data.
        entity_counter (int): A counter for creating unique entities.

    Returns:
        int: The updated entity counter after adding the properties.
    """
    stream_uri = URIRef(f"{CS}clickstream/{entity_counter}")
    entity_counter += 1

    # Add properties to the stream URI
    g.add((stream_uri, RDF.type, CS.ClickStream))
    g.add((stream_uri, CS.url, Literal(stream.url)))
    g.add((stream_uri, CS.ip, Literal(stream.ip)))
    g.add((stream_uri, CS.domain, Literal(stream.domain)))
    g.add((stream_uri, CS.userAgentHeader, Literal(stream.userAgentHeader)))
    g.add(
        (
            stream_uri,
            CS.timeStamp,
            Literal(stream.timeStamp.isoformat(), datatype=XSD.dateTime),
        )
    )

    # Optional referrer information
    if stream.referrer:
        g.add((stream_uri, CS.referrer, Literal(stream.referrer)))

    # User Agent details
    ua_uri = URIRef(f"{CS}useragent/{entity_counter}")
    entity_counter += 1
    g.add((ua_uri, RDF.type, CS.UserAgent))
    g.add((ua_uri, CS.hasSyntaxError, Literal(stream.userAgent.hasSyntaxError)))
    g.add((ua_uri, CS.hasAmbiguity, Literal(stream.userAgent.hasAmbiguity)))
    g.add((ua_uri, CS.ambiguityCount, Literal(stream.userAgent.ambiguityCount)))
    g.add((ua_uri, CS.userAgentString, Literal(stream.userAgent.userAgentString)))
    g.add((stream_uri, CS.userAgent, ua_uri))

    # Page Hits
    for hit in stream.pageHits:
        hit_uri = URIRef(f"{CS}pagehit/{entity_counter}")
        entity_counter += 1
        g.add((hit_uri, RDF.type, CS.PageHit))
        g.add((hit_uri, CS.path, Literal(hit.path)))
        g.add(
            (
                hit_uri,
                CS.timeStamp,
                Literal(hit.timeStamp.isoformat(), datatype=XSD.dateTime),
            )
        )
        g.add((stream_uri, CS.pageHits, hit_uri))

    return entity_counter

export_to_rdf(rdf_file, batch_size, rdf_format='nt')

Export clickstream logs to RDF files in batches. :param rdf_file: The base file name to write the RDF data to. :param batch_size: The number of clickstream records per file. :param rdf_format: The RDF serialization format to use (default is "nt").

Source code in frontend/clickstream.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def export_to_rdf(
    self,
    rdf_file: str,
    batch_size: int,
    rdf_format: str = "nt",
) -> None:
    """
    Export clickstream logs to RDF files in batches.
    :param rdf_file: The base file name to write the RDF data to.
    :param batch_size: The number of clickstream records per file.
    :param rdf_format: The RDF serialization format to use (default is "nt").
    """
    # Namespace definition
    CS = Namespace(self.rdf_namespace)

    # Initialize variables
    file_counter = 1
    entity_counter = 1
    g = Graph()
    g.bind("cs", CS)

    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(rdf_file), exist_ok=True)
    iterator = self.get_progress(self.clickstream_logs, desc="Export Progress")

    for log in iterator:
        for stream in log.clickStreams:
            entity_counter = self.add_stream_properties_to_graph(
                g, CS, stream, entity_counter
            )

            # If batch size is reached, serialize and save to file
            if entity_counter % batch_size == 0:
                self.serialize_batch(g, rdf_file, file_counter, rdf_format)
                file_counter += 1
                g = Graph()  # Reset the graph for the next batch
                g.bind("cs", CS)

    # Serialize and save any remaining triples that didn't fill up the last batch
    if len(g):
        self.serialize_batch(g, rdf_file, file_counter, rdf_format)

get_progress(iterable, desc='Processing')

Wrap an iterable with a progress bar if show_progress is True

Source code in frontend/clickstream.py
190
191
192
193
194
195
196
197
def get_progress(self, iterable, desc="Processing"):
    """
    Wrap an iterable with a progress bar if show_progress is True
    """
    if self.show_progress:
        return tqdm(iterable, desc=desc)
    else:
        return iterable

load_clickstream_logs(limit=None)

Load all clickstream logs from the directory

Source code in frontend/clickstream.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def load_clickstream_logs(self, limit: Optional[int] = None) -> None:
    """
    Load all clickstream logs from the directory
    """
    # Find all json files in the directory
    json_files = glob.glob(os.path.join(self.root_path, "*.json"))
    # If a limit is set, truncate the file list
    if limit is not None:
        json_files = json_files[:limit]

    # Prepare tqdm iterator if required and tqdm is available
    iterator = self.get_progress(json_files, desc="Loading Clickstream Logs")

    total_clickstreams = 0

    # Load each file

    for json_file in iterator:
        try:
            # Parse the JSON file into ClickstreamLog
            clickstream_log = ClickstreamLog.from_json(json_file)
            self.clickstream_logs.append(clickstream_log)
            total_clickstreams += len(
                clickstream_log.clickStreams
            )  # Count the clickstreams
        except json.JSONDecodeError as jde:
            # Handle JSON-specific parsing errors
            print(f"JSON decode error in file {json_file}: {jde.msg}")
            print(f"Error at line {jde.lineno}, column {jde.colno}")
        except Exception as e:
            tb = traceback.format_exc()  # This will give you the stack trace
            print(f"Error loading {json_file}: {e}")
            print(tb)  # Print stack trace to get more details about the exception
    # After importing, show the total counts
    total_logs = len(self.clickstream_logs)
    print(
        f"Imported {total_logs} clickstream logs with a total of {total_clickstreams} clickstreams."
    )

reload_graph(rdf_file_pattern, rdf_format='nt')

Reloads the RDF data from a batch of files into the clickstream logs.

Parameters:

Name Type Description Default
rdf_file_pattern str

The file pattern to search for RDF files. A wildcard '*' will be appended if not present.

required
rdf_format str

The RDF serialization format of the files (default is "nt").

'nt'

Returns:

Name Type Description
Graph Graph

The RDF graph populated with data from the files.

Source code in frontend/clickstream.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def reload_graph(self, rdf_file_pattern: str, rdf_format: str = "nt") -> Graph:
    """
    Reloads the RDF data from a batch of files into the clickstream logs.

    Args:
        rdf_file_pattern (str): The file pattern to search for RDF files.
                                A wildcard '*' will be appended if not present.
        rdf_format (str): The RDF serialization format of the files (default is "nt").

    Returns:
        Graph: The RDF graph populated with data from the files.
    """
    # Ensure the pattern ends with a wildcard, append if necessary
    if not rdf_file_pattern.endswith("*"):
        rdf_file_pattern += "*"

    # Find all files matching the pattern
    rdf_files = glob.glob(rdf_file_pattern)

    # Initialize a new RDF graph
    g = Graph()

    # Use a progress bar if available or simply iterate over files
    try:
        iterator = self.get_progress(rdf_files, desc="Loading graph")
    except AttributeError:
        # If get_progress is not defined, fall back to simple iteration
        iterator = rdf_files

    for rdf_file in iterator:
        # Parse each RDF file and add it to the graph
        g.parse(rdf_file, format=rdf_format)

    # After loading all files, return the populated graph
    return g

serialize_batch(g, rdf_file, file_counter, rdf_format)

Serializes a batch of RDF data to a file.

Parameters:

Name Type Description Default
g Graph

The RDF graph to serialize.

required
rdf_file str

The base name for the RDF file.

required
file_counter int

The current file count for naming.

required
rdf_format str

The format to serialize the RDF data.

required
Source code in frontend/clickstream.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def serialize_batch(
    self, g: Graph, rdf_file: str, file_counter: int, rdf_format: str
) -> None:
    """
    Serializes a batch of RDF data to a file.

    Args:
        g (Graph): The RDF graph to serialize.
        rdf_file (str): The base name for the RDF file.
        file_counter (int): The current file count for naming.
        rdf_format (str): The format to serialize the RDF data.

    """
    batch_file = f"{rdf_file}_part{file_counter:03}.{rdf_format}"
    g.serialize(destination=batch_file, format=rdf_format)
    if self.verbose:
        print(f"Exported RDF to {batch_file}")

DateParse

Source code in frontend/clickstream.py
20
21
22
23
24
25
26
27
28
29
30
31
class DateParse:
    @staticmethod
    def parse_date(date_str: str) -> datetime:
        """Parse a string to a datetime object.

        Args:
            date_str (str): The date string to parse.

        Returns:
            datetime: The parsed datetime object.
        """
        return datetime.strptime(date_str, "%b %d, %Y %I:%M:%S %p")

parse_date(date_str) staticmethod

Parse a string to a datetime object.

Parameters:

Name Type Description Default
date_str str

The date string to parse.

required

Returns:

Name Type Description
datetime datetime

The parsed datetime object.

Source code in frontend/clickstream.py
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
def parse_date(date_str: str) -> datetime:
    """Parse a string to a datetime object.

    Args:
        date_str (str): The date string to parse.

    Returns:
        datetime: The parsed datetime object.
    """
    return datetime.strptime(date_str, "%b %d, %Y %I:%M:%S %p")

PageHit dataclass

Represents a single page hit with path and timestamp.

Source code in frontend/clickstream.py
34
35
36
37
38
39
40
41
42
43
44
@dataclass
class PageHit:
    """Represents a single page hit with path and timestamp."""

    path: str
    timeStamp: datetime

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "PageHit":
        data["timeStamp"] = DateParse.parse_date(data["timeStamp"])
        return PageHit(**data)

UserAgent dataclass

Represents a user agent with syntax errors, ambiguity and other attributes.

Source code in frontend/clickstream.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class UserAgent:
    """Represents a user agent with syntax errors, ambiguity and other attributes."""

    hasSyntaxError: bool
    hasAmbiguity: bool
    ambiguityCount: int
    userAgentString: str
    debug: bool
    allFields: Dict[str, Dict[str, Any]]

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "UserAgent":
        allFields = data.get("allFields", {})

        # Use `.get()` with defaults to prevent KeyError
        return UserAgent(
            hasSyntaxError=data.get("hasSyntaxError", False),
            hasAmbiguity=data.get("hasAmbiguity", False),
            ambiguityCount=data.get("ambiguityCount", 0),
            userAgentString=data.get("userAgentString", ""),
            debug=data.get("debug", False),
            allFields=allFields,
        )

cmsmain

Created on 2022-11-24

@author: wf

CmsMain

Bases: WebserverCmd

ContentManagement System Main Program

Source code in frontend/cmsmain.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class CmsMain(WebserverCmd):
    """
    ContentManagement System Main Program
    """

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)
        parser.add_argument(
            "--sites", nargs="+", required=False, help="the sites to enable"
        )
        return parser

getArgParser(description, version_msg)

override the default argparser call

Source code in frontend/cmsmain.py
20
21
22
23
24
25
26
27
28
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)
    parser.add_argument(
        "--sites", nargs="+", required=False, help="the sites to enable"
    )
    return parser

main(argv=None)

main call

Source code in frontend/cmsmain.py
31
32
33
34
35
36
37
def main(argv: list = None):
    """
    main call
    """
    cmd = CmsMain(config=CmsWebServer.get_config(), webserver_cls=CmsWebServer)
    exit_code = cmd.cmd_main(argv)
    return exit_code

frame

HtmlFrame

A class to frame html content with a basic HTML document structure.

Attributes:

Name Type Description
lang str

Language of the HTML document.

title str

Title of the HTML document.

Source code in frontend/frame.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class HtmlFrame:
    """
    A class to frame html content with a basic HTML document structure.

    Attributes:
        lang (str): Language of the HTML document.
        title (str): Title of the HTML document.
    """

    def __init__(self, frontend, title: str, lang: str = "en") -> None:
        """
        Initialize HtmlFrame with a specified language and title.

        Args:
            title (str): Title for the HTML document.
            lang (str, optional): Language of the HTML document. Defaults to "en".
        """
        self.frontend = frontend
        self.lang = lang
        self.title = title

    def hamburger_menu(self) -> str:
        """
        Generate the HTML, CSS, and JavaScript for a hamburger menu.

        Returns:
            str: Hamburger menu HTML, CSS, and JavaScript.
        """
        menu_html = """
<!-- Hamburger Menu Start -->
<style>
  /* Basic styling */
  .menu { display: none; }
  .hamburger { cursor: pointer; }
  .hamburger:hover { opacity: 0.7; }

  /* Menu items layout */
  .menu ul { list-style-type: none; padding: 0; }
  .menu li { padding: 8px; background-color: #f0f0f0; margin-bottom: 5px; }

  /* Show the menu when .show class is added via JavaScript */
  .show { display: block; }
</style>

<!-- Hamburger Icon -->
<div class="hamburger" onclick="toggleMenu()">☰</div>

<!-- Menu Items -->
<div class="menu" id="mainMenu">
  <ul>
    <li><a href="#home">Home</a></li>
    <li><a href="#about">About</a></li>
    <li><a href="#services">Services</a></li>
    <li><a href="#contact">Contact</a></li>
  </ul>
</div>

<script>
  function toggleMenu() {
    var menu = document.getElementById("mainMenu");
    if (menu.classList.contains("show")) {
      menu.classList.remove("show");
    } else {
      menu.classList.add("show");
    }
  }
</script>
<!-- Hamburger Menu End -->
"""
        return menu_html

    def header(self) -> str:
        """
        Generate the header part of the HTML document.

        Returns:
            str: Header part of an HTML document as a string.
        """
        style_key = f"CMS/style"
        style_html = self.frontend.cms_pages.get(style_key, "")
        html = f"""<!doctype html>
<html lang="{self.lang}">
<head>
  <meta charset="utf-8"/>
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>{self.title}</title>
  {style_html}
</head>
<body>  
"""
        return html

    def footer(self) -> str:
        """
        Generate the footer part of the HTML document.

        Returns:
            str: Footer part of an HTML document as a string.
        """
        footer_key = f"CMS/footer/{self.lang}"
        footer_html = self.frontend.cms_pages.get(footer_key, "")
        html = f"""{footer_html}
  </body>
</html>
"""
        return html

    def frame(self, content: str) -> str:
        """
        Frame the given HTML content with the header and footer of the document.

        Args:
            content (str): HTML content to be framed within the HTML structure.

        Returns:
            str: Complete HTML document as a string with the provided content framed.
        """
        header_key = f"CMS/header/{self.lang}"
        header_html = self.frontend.cms_pages.get(header_key, "")
        html = f"""{self.header()}
{self.hamburger_menu()}  
{header_html}
      <div class="container">
{content}
      </div><!-- /.container -->
{self.footer()}"""
        return html

__init__(frontend, title, lang='en')

Initialize HtmlFrame with a specified language and title.

Parameters:

Name Type Description Default
title str

Title for the HTML document.

required
lang str

Language of the HTML document. Defaults to "en".

'en'
Source code in frontend/frame.py
10
11
12
13
14
15
16
17
18
19
20
def __init__(self, frontend, title: str, lang: str = "en") -> None:
    """
    Initialize HtmlFrame with a specified language and title.

    Args:
        title (str): Title for the HTML document.
        lang (str, optional): Language of the HTML document. Defaults to "en".
    """
    self.frontend = frontend
    self.lang = lang
    self.title = title

footer()

Generate the footer part of the HTML document.

Returns:

Name Type Description
str str

Footer part of an HTML document as a string.

Source code in frontend/frame.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
    def footer(self) -> str:
        """
        Generate the footer part of the HTML document.

        Returns:
            str: Footer part of an HTML document as a string.
        """
        footer_key = f"CMS/footer/{self.lang}"
        footer_html = self.frontend.cms_pages.get(footer_key, "")
        html = f"""{footer_html}
  </body>
</html>
"""
        return html

frame(content)

Frame the given HTML content with the header and footer of the document.

Parameters:

Name Type Description Default
content str

HTML content to be framed within the HTML structure.

required

Returns:

Name Type Description
str str

Complete HTML document as a string with the provided content framed.

Source code in frontend/frame.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    def frame(self, content: str) -> str:
        """
        Frame the given HTML content with the header and footer of the document.

        Args:
            content (str): HTML content to be framed within the HTML structure.

        Returns:
            str: Complete HTML document as a string with the provided content framed.
        """
        header_key = f"CMS/header/{self.lang}"
        header_html = self.frontend.cms_pages.get(header_key, "")
        html = f"""{self.header()}
{self.hamburger_menu()}  
{header_html}
      <div class="container">
{content}
      </div><!-- /.container -->
{self.footer()}"""
        return html

hamburger_menu()

Generate the HTML, CSS, and JavaScript for a hamburger menu.

Returns:

Name Type Description
str str

Hamburger menu HTML, CSS, and JavaScript.

Source code in frontend/frame.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    def hamburger_menu(self) -> str:
        """
        Generate the HTML, CSS, and JavaScript for a hamburger menu.

        Returns:
            str: Hamburger menu HTML, CSS, and JavaScript.
        """
        menu_html = """
<!-- Hamburger Menu Start -->
<style>
  /* Basic styling */
  .menu { display: none; }
  .hamburger { cursor: pointer; }
  .hamburger:hover { opacity: 0.7; }

  /* Menu items layout */
  .menu ul { list-style-type: none; padding: 0; }
  .menu li { padding: 8px; background-color: #f0f0f0; margin-bottom: 5px; }

  /* Show the menu when .show class is added via JavaScript */
  .show { display: block; }
</style>

<!-- Hamburger Icon -->
<div class="hamburger" onclick="toggleMenu()">☰</div>

<!-- Menu Items -->
<div class="menu" id="mainMenu">
  <ul>
    <li><a href="#home">Home</a></li>
    <li><a href="#about">About</a></li>
    <li><a href="#services">Services</a></li>
    <li><a href="#contact">Contact</a></li>
  </ul>
</div>

<script>
  function toggleMenu() {
    var menu = document.getElementById("mainMenu");
    if (menu.classList.contains("show")) {
      menu.classList.remove("show");
    } else {
      menu.classList.add("show");
    }
  }
</script>
<!-- Hamburger Menu End -->
"""
        return menu_html

header()

Generate the header part of the HTML document.

Returns:

Name Type Description
str str

Header part of an HTML document as a string.

Source code in frontend/frame.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    def header(self) -> str:
        """
        Generate the header part of the HTML document.

        Returns:
            str: Header part of an HTML document as a string.
        """
        style_key = f"CMS/style"
        style_html = self.frontend.cms_pages.get(style_key, "")
        html = f"""<!doctype html>
<html lang="{self.lang}">
<head>
  <meta charset="utf-8"/>
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>{self.title}</title>
  {style_html}
</head>
<body>  
"""
        return html

html_table

Created on 2022-10-25

@author: wf

HtmlTables

Bases: WebScrape

HtmlTables extractor

Source code in frontend/html_table.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class HtmlTables(WebScrape):
    """
    HtmlTables extractor
    """

    def __init__(self, url: str, debug=False, showHtml=False):
        """
        Constructor

        url(str): the url to read the tables from
        debug(bool): if True switch on debugging
        showHtml(bool): if True show the HTML retrieved
        """
        super().__init__(debug, showHtml)
        self.soup = super().getSoup(url, showHtml)

    def get_tables(self, header_tag: str = None) -> dict:
        """
        get all tables from my soup as a list of list of dicts

        Args:
            header_tag(str): if set search the table name from the given header tag

        Return:
            dict: the list of list of dicts for all tables

        """
        tables = {}
        for i, table in enumerate(self.soup.find_all("table")):
            fields = []
            table_data = []
            category = None
            for tr in table.find_all("tr", recursive=True):
                for th in tr.find_all("th", recursive=True):
                    if "colspan" in th.attrs:
                        category = th.text
                    else:
                        fields.append(th.text)
            for tr in table.find_all("tr", recursive=True):
                record = {}
                for i, td in enumerate(tr.find_all("td", recursive=True)):
                    record[fields[i]] = td.text
                if record:
                    if category:
                        record["category"] = category
                    table_data.append(record)
            if header_tag is not None:
                header = table.find_previous_sibling(header_tag)
                table_name = header.text
            else:
                table_name = f"table{i}"
            tables[table_name] = table_data
        return tables

__init__(url, debug=False, showHtml=False)

Constructor

url(str): the url to read the tables from debug(bool): if True switch on debugging showHtml(bool): if True show the HTML retrieved

Source code in frontend/html_table.py
15
16
17
18
19
20
21
22
23
24
def __init__(self, url: str, debug=False, showHtml=False):
    """
    Constructor

    url(str): the url to read the tables from
    debug(bool): if True switch on debugging
    showHtml(bool): if True show the HTML retrieved
    """
    super().__init__(debug, showHtml)
    self.soup = super().getSoup(url, showHtml)

get_tables(header_tag=None)

get all tables from my soup as a list of list of dicts

Parameters:

Name Type Description Default
header_tag(str)

if set search the table name from the given header tag

required
Return

dict: the list of list of dicts for all tables

Source code in frontend/html_table.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def get_tables(self, header_tag: str = None) -> dict:
    """
    get all tables from my soup as a list of list of dicts

    Args:
        header_tag(str): if set search the table name from the given header tag

    Return:
        dict: the list of list of dicts for all tables

    """
    tables = {}
    for i, table in enumerate(self.soup.find_all("table")):
        fields = []
        table_data = []
        category = None
        for tr in table.find_all("tr", recursive=True):
            for th in tr.find_all("th", recursive=True):
                if "colspan" in th.attrs:
                    category = th.text
                else:
                    fields.append(th.text)
        for tr in table.find_all("tr", recursive=True):
            record = {}
            for i, td in enumerate(tr.find_all("td", recursive=True)):
                record[fields[i]] = td.text
            if record:
                if category:
                    record["category"] = category
                table_data.append(record)
        if header_tag is not None:
            header = table.find_previous_sibling(header_tag)
            table_name = header.text
        else:
            table_name = f"table{i}"
        tables[table_name] = table_data
    return tables

mediawiki_site

Created on 09.03.2025

@author: wf

MediaWikiSite

a MediaWikiSite and it's current state

Source code in frontend/mediawiki_site.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class MediaWikiSite:
    """
    a MediaWikiSite and it's current  state
    """

    def __init__(
        self,
        wiki_user: WikiUser,
        row_index: int = 0,
        debug: bool = False,
        show_html: bool = False,
    ):
        """
        constructor
        """
        self.row_no = row_index + 1
        self.wiki_user = wiki_user
        self.wiki_url = self.wiki_user.url
        self.debug = debug
        self.show_html = show_html
        self.wiki_backup = WikiBackup(wiki_user)
        self._wiki_client = None
        self.task_runner = TaskRunner()

    @property
    def wiki_client(self) -> WikiClient:
        if not self._wiki_client:
            client = WikiClient.ofWikiUser(self.wiki_user)
            if client.needs_login:
                client.login()
            self._wiki_client = client
        return self._wiki_client

    def as_dict(self):
        url = f"{self.wiki_user.url}{self.wiki_user.scriptPath}"
        link = Link.create(url=url, text=self.wiki_user.wikiId, target="_blank")

        record = {
            "#": self.row_no,
            "wiki": link,
            "version": self.wiki_user.version,
            "pages": "",
            "backup": "✅" if self.wiki_backup.exists() else "❌",
            "git": "✅" if self.wiki_backup.hasGit() else "❌",
            "age": "",
            "login": "",
        }
        return record

    def get_software_version_map(
        self, tables: Dict[str, List[Dict[str, Any]]]
    ) -> Optional[Dict[str, Dict[str, Any]]]:
        """
        Extract software map from the Special:Version tables.

        Args:
            tables (Dict[str, List[Dict[str, Any]]]): Dictionary of tables with their headers as keys.

        Returns:
            Optional[Dict[str, Dict[str, Any]]]: A dictionary mapping software names to their details,
                                                or None if the "Installed software" table is not found.
        """
        if "Installed software" in tables:
            software = tables["Installed software"]
            software_map, _dup = LOD.getLookup(
                software, "Product", withDuplicates=False
            )
            return software_map
        return None

    def check_version(self) -> str:
        """
        Check the MediaWiki version of the site.

        Returns:
            str: The MediaWiki version string, or an error message if the check fails.
        """
        client = self.wiki_client
        site_info = client.get_site_info()
        generator = site_info.get("generator")
        version = generator.replace("MediaWiki ", "")
        return version

    def check_version_via_url(self) -> str:
        """
        Check the MediaWiki version of the site.

        Returns:
            str: The MediaWiki version string, or an error message if the check fails.
        """
        url = self.wiki_url
        if not "index.php" in self.wiki_url:
            url = f"{url}/index.php"
        version_url = f"{self.wiki_url}?title=Special:Version"
        mw_version = "?"
        try:
            html_tables = HtmlTables(
                version_url, debug=self.debug, showHtml=self.show_html
            )
            tables = html_tables.get_tables("h2")
            software_map = self.get_software_version_map(tables)
            if software_map and "MediaWiki" in software_map:
                mw_version = software_map["MediaWiki"]["Version"]
        except Exception as ex:
            mw_version = f"error: {str(ex)}"
        return mw_version

__init__(wiki_user, row_index=0, debug=False, show_html=False)

constructor

Source code in frontend/mediawiki_site.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    wiki_user: WikiUser,
    row_index: int = 0,
    debug: bool = False,
    show_html: bool = False,
):
    """
    constructor
    """
    self.row_no = row_index + 1
    self.wiki_user = wiki_user
    self.wiki_url = self.wiki_user.url
    self.debug = debug
    self.show_html = show_html
    self.wiki_backup = WikiBackup(wiki_user)
    self._wiki_client = None
    self.task_runner = TaskRunner()

check_version()

Check the MediaWiki version of the site.

Returns:

Name Type Description
str str

The MediaWiki version string, or an error message if the check fails.

Source code in frontend/mediawiki_site.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def check_version(self) -> str:
    """
    Check the MediaWiki version of the site.

    Returns:
        str: The MediaWiki version string, or an error message if the check fails.
    """
    client = self.wiki_client
    site_info = client.get_site_info()
    generator = site_info.get("generator")
    version = generator.replace("MediaWiki ", "")
    return version

check_version_via_url()

Check the MediaWiki version of the site.

Returns:

Name Type Description
str str

The MediaWiki version string, or an error message if the check fails.

Source code in frontend/mediawiki_site.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def check_version_via_url(self) -> str:
    """
    Check the MediaWiki version of the site.

    Returns:
        str: The MediaWiki version string, or an error message if the check fails.
    """
    url = self.wiki_url
    if not "index.php" in self.wiki_url:
        url = f"{url}/index.php"
    version_url = f"{self.wiki_url}?title=Special:Version"
    mw_version = "?"
    try:
        html_tables = HtmlTables(
            version_url, debug=self.debug, showHtml=self.show_html
        )
        tables = html_tables.get_tables("h2")
        software_map = self.get_software_version_map(tables)
        if software_map and "MediaWiki" in software_map:
            mw_version = software_map["MediaWiki"]["Version"]
    except Exception as ex:
        mw_version = f"error: {str(ex)}"
    return mw_version

get_software_version_map(tables)

Extract software map from the Special:Version tables.

Parameters:

Name Type Description Default
tables Dict[str, List[Dict[str, Any]]]

Dictionary of tables with their headers as keys.

required

Returns:

Type Description
Optional[Dict[str, Dict[str, Any]]]

Optional[Dict[str, Dict[str, Any]]]: A dictionary mapping software names to their details, or None if the "Installed software" table is not found.

Source code in frontend/mediawiki_site.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_software_version_map(
    self, tables: Dict[str, List[Dict[str, Any]]]
) -> Optional[Dict[str, Dict[str, Any]]]:
    """
    Extract software map from the Special:Version tables.

    Args:
        tables (Dict[str, List[Dict[str, Any]]]): Dictionary of tables with their headers as keys.

    Returns:
        Optional[Dict[str, Dict[str, Any]]]: A dictionary mapping software names to their details,
                                            or None if the "Installed software" table is not found.
    """
    if "Installed software" in tables:
        software = tables["Installed software"]
        software_map, _dup = LOD.getLookup(
            software, "Product", withDuplicates=False
        )
        return software_map
    return None

servers_view

Created on 2025-07-23

@author: wf

ServerView

Bases: NodeView

A class responsible for displaying details of a Server

Source code in frontend/servers_view.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ServerView(NodeView):
    """
    A class responsible for displaying details of a  Server
    """

    def setup_ui(self):
        """Setup UI with code display."""
        try:
            if self.node_data:
                self.server = self.node_data.get("_instance")
                html_markup = self.server.as_html()
                self.html = ui.html(html_markup)
                pass
                # Add probe button
                ui.button("Probe Remote", on_click=self.probe)
            super().setup_ui()
        except Exception as ex:
            self.solution.handle_exception(ex)

    async def probe(self):
        """Probe remote server and update display."""
        try:
            self.server.probe_remote()
            html_markup = self.server.as_html()
            self.html.content = html_markup
        except Exception as ex:
            self.solution.handle_exception(ex)

probe() async

Probe remote server and update display.

Source code in frontend/servers_view.py
62
63
64
65
66
67
68
69
async def probe(self):
    """Probe remote server and update display."""
    try:
        self.server.probe_remote()
        html_markup = self.server.as_html()
        self.html.content = html_markup
    except Exception as ex:
        self.solution.handle_exception(ex)

setup_ui()

Setup UI with code display.

Source code in frontend/servers_view.py
48
49
50
51
52
53
54
55
56
57
58
59
60
def setup_ui(self):
    """Setup UI with code display."""
    try:
        if self.node_data:
            self.server = self.node_data.get("_instance")
            html_markup = self.server.as_html()
            self.html = ui.html(html_markup)
            pass
            # Add probe button
            ui.button("Probe Remote", on_click=self.probe)
        super().setup_ui()
    except Exception as ex:
        self.solution.handle_exception(ex)

ServersView

Display servers

Source code in frontend/servers_view.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ServersView:
    """
    Display servers
    """

    def __init__(self, solution, servers: Servers):
        self.solution = solution
        self.servers = servers

    @classmethod
    def add_to_graph(cls, servers, graph: MogwaiGraph, with_progress: bool):
        """
        add my serves to the graph
        """
        items = servers.servers.items()
        iterator = (
            tqdm(items, desc="Adding servers to graph") if with_progress else items
        )
        for name, server in iterator:
            props = {
                "hostname": server.hostname,
                "platform": server.platform,
                "_instance": server,
            }
            _node = graph.add_labeled_node("Server", name=name, properties=props)
        return graph

add_to_graph(servers, graph, with_progress) classmethod

add my serves to the graph

Source code in frontend/servers_view.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@classmethod
def add_to_graph(cls, servers, graph: MogwaiGraph, with_progress: bool):
    """
    add my serves to the graph
    """
    items = servers.servers.items()
    iterator = (
        tqdm(items, desc="Adding servers to graph") if with_progress else items
    )
    for name, server in iterator:
        props = {
            "hostname": server.hostname,
            "platform": server.platform,
            "_instance": server,
        }
        _node = graph.add_labeled_node("Server", name=name, properties=props)
    return graph

version

Created on 2022-12-03

@author: wf

Version dataclass

Bases: object

Version handling for pyWikiCMS

Source code in frontend/version.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@dataclass
class Version(object):
    """
    Version handling for pyWikiCMS
    """

    name = "pyWikiCMS"
    description = "pyWikiCMS: python implementation of a Mediawiki based Content Management System"
    version = frontend.__version__
    date = "2022-11-16"
    updated = "2025-06-16"
    authors = "Wolfgang Fahl"
    doc_url = "http://wiki.bitplan.com/index.php/PyWikiCMS"
    chat_url = "https://github.com/BITPlan/pyWikiCMS/discussions"
    cm_url = "https://github.com/BITPlan/pyWikiCMS"
    license = f"""Copyright 2022-2025 contributors. All rights reserved.
  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0
  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}
  Created by {authors} on {date} last updated {updated}"""

webscrape

Created on 2020-08-20

@author: wf

WebScrape

Bases: object

WebScraper

Source code in frontend/webscrape.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class WebScrape(object):
    """
    WebScraper
    """

    def __init__(self, debug=False, showHtml=False):
        """
        Constructor
        """
        self.err = None
        self.valid = False
        self.debug = debug
        self.showHtml = showHtml

    def getSoup(self, url, showHtml: bool):
        """
        get the beautiful Soup parser

        Args:
           showHtml(bool): True if the html code should be pretty printed and shown
        """
        req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        html = urlopen(req).read()
        soup = BeautifulSoup(html, "html.parser", from_encoding="utf-8")
        if showHtml:
            self.printPrettyHtml(soup)

        return soup

    def printPrettyHtml(self, soup):
        """
        print the prettified html for the given soup

        Args:
            soup(BeuatifulSoup): the parsed html to print
        """
        prettyHtml = soup.prettify()
        print(prettyHtml)

__init__(debug=False, showHtml=False)

Constructor

Source code in frontend/webscrape.py
17
18
19
20
21
22
23
24
def __init__(self, debug=False, showHtml=False):
    """
    Constructor
    """
    self.err = None
    self.valid = False
    self.debug = debug
    self.showHtml = showHtml

getSoup(url, showHtml)

get the beautiful Soup parser

Parameters:

Name Type Description Default
showHtml(bool)

True if the html code should be pretty printed and shown

required
Source code in frontend/webscrape.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def getSoup(self, url, showHtml: bool):
    """
    get the beautiful Soup parser

    Args:
       showHtml(bool): True if the html code should be pretty printed and shown
    """
    req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
    html = urlopen(req).read()
    soup = BeautifulSoup(html, "html.parser", from_encoding="utf-8")
    if showHtml:
        self.printPrettyHtml(soup)

    return soup

printPrettyHtml(soup)

print the prettified html for the given soup

Parameters:

Name Type Description Default
soup(BeuatifulSoup)

the parsed html to print

required
Source code in frontend/webscrape.py
41
42
43
44
45
46
47
48
49
def printPrettyHtml(self, soup):
    """
    print the prettified html for the given soup

    Args:
        soup(BeuatifulSoup): the parsed html to print
    """
    prettyHtml = soup.prettify()
    print(prettyHtml)

webserver

Created on 2020-12-30

@author: wf

CmsSolution

Bases: GraphNavigatorSolution

Content management solution

Source code in frontend/webserver.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class CmsSolution(GraphNavigatorSolution):
    """
    Content management solution
    """

    def __init__(self, webserver: CmsWebServer, client: Client):
        """
        Initialize the solution

        Calls the constructor of the base solution
        Args:
            webserver (Cms    WebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)  # Call to the superclass constructor
        self.wiki_grid = WikiGrid(self)
        self.servers = webserver.servers
        self.server = webserver.server
        self.hostname = webserver.hostname
        self.servers_view = ServersView(self, self.servers)

    def configure_menu(self):
        """
        configure my menu
        """
        InputWebSolution.configure_menu(self)
        self.login = self.webserver.login
        self.sso_solution = SsoSolution(webserver=self.webserver)
        self.sso_solution.configure_menu()
        # icons from https://fonts.google.com/icons
        if self.webserver.authenticated():
            self.link_button(name="wikis", icon_name="menu_book", target="/wikis")
            self.link_button(name="servers", icon_name="cloud", target="/servers")

    async def home(self):
        """
        provide the main content page
        """

        def show():
            with self.content_div:
                ui.label(f"Welcome to {self.hostname}")
                if self.server:
                    html_markup = self.server.as_html()
                    ui.html(html_markup)
                pass

        await self.setup_content_div(show)

    async def show_wikis(self):
        def show():
            with self.content_div:
                self.wiki_grid.setup()

        await self.setup_content_div(show)

    async def show_nodes(self, node_type: str):
        """
        show nodes of the given type

        Args:
            node_type(str): the type of nodes to show
        """

        def show():
            try:
                config = NodeViewConfig(
                    solution=self,
                    graph=self.graph,
                    schema=self.schema,
                    node_type=node_type,
                )
                if not config.node_type_config:
                    ui.label(f"invalid_node_type: {node_type}")
                    return
                node_table_view = NodeTableView(config=config)
                node_table_view.setup_ui()
            except Exception as ex:
                self.handle_exception(ex)

        await self.setup_content_div(show)

    async def show_servers(self):
        await self.show_nodes("Server")

__init__(webserver, client)

Initialize the solution

Calls the constructor of the base solution Args: webserver (Cms WebServer): The webserver instance associated with this context. client (Client): The client instance this context is associated with.

Source code in frontend/webserver.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def __init__(self, webserver: CmsWebServer, client: Client):
    """
    Initialize the solution

    Calls the constructor of the base solution
    Args:
        webserver (Cms    WebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)  # Call to the superclass constructor
    self.wiki_grid = WikiGrid(self)
    self.servers = webserver.servers
    self.server = webserver.server
    self.hostname = webserver.hostname
    self.servers_view = ServersView(self, self.servers)

configure_menu()

configure my menu

Source code in frontend/webserver.py
160
161
162
163
164
165
166
167
168
169
170
171
def configure_menu(self):
    """
    configure my menu
    """
    InputWebSolution.configure_menu(self)
    self.login = self.webserver.login
    self.sso_solution = SsoSolution(webserver=self.webserver)
    self.sso_solution.configure_menu()
    # icons from https://fonts.google.com/icons
    if self.webserver.authenticated():
        self.link_button(name="wikis", icon_name="menu_book", target="/wikis")
        self.link_button(name="servers", icon_name="cloud", target="/servers")

home() async

provide the main content page

Source code in frontend/webserver.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def home(self):
    """
    provide the main content page
    """

    def show():
        with self.content_div:
            ui.label(f"Welcome to {self.hostname}")
            if self.server:
                html_markup = self.server.as_html()
                ui.html(html_markup)
            pass

    await self.setup_content_div(show)

show_nodes(node_type) async

show nodes of the given type

Parameters:

Name Type Description Default
node_type(str)

the type of nodes to show

required
Source code in frontend/webserver.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def show_nodes(self, node_type: str):
    """
    show nodes of the given type

    Args:
        node_type(str): the type of nodes to show
    """

    def show():
        try:
            config = NodeViewConfig(
                solution=self,
                graph=self.graph,
                schema=self.schema,
                node_type=node_type,
            )
            if not config.node_type_config:
                ui.label(f"invalid_node_type: {node_type}")
                return
            node_table_view = NodeTableView(config=config)
            node_table_view.setup_ui()
        except Exception as ex:
            self.handle_exception(ex)

    await self.setup_content_div(show)

CmsWebServer

Bases: GraphNavigatorWebserver

WebServer class that manages the servers

Source code in frontend/webserver.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class CmsWebServer(GraphNavigatorWebserver):
    """
    WebServer class that manages the servers

    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        copy_right = "(c)2023-2025 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right,
            version=Version(),
            default_port=8252,
            timeout=10.0,
            short_name="wikicms",
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = CmsSolution
        return server_config

    def authenticated(self) -> bool:
        """
        check authentication
        """
        allow = self.login.authenticated()
        if self.server:
            allow = allow or self.server.auto_login
        return allow

    def __init__(self):
        """
        constructor
        """
        GraphNavigatorWebserver.__init__(self, config=CmsWebServer.get_config())
        self.servers = Servers.of_config_path()
        self.wiki_frontends = WikiFrontends(self.servers)
        self.users = Sso_Users(self.config.short_name)
        self.login = Login(self, self.users)
        self.hostname = socket.gethostname()
        self.server = self.servers.servers.get(self.hostname)
        if self.server:
            self.server.probe_local()

        @ui.page("/servers")
        async def show_servers(client: Client):
            if not self.authenticated():
                return RedirectResponse("/login")
            return await self.page(client, CmsSolution.show_servers)

        @ui.page("/wikis")
        async def show_wikis(client: Client):
            if not self.authenticated():
                return RedirectResponse("/login")
            return await self.page(client, CmsSolution.show_wikis)

        @ui.page("/login")
        async def login(client: Client) -> None:
            return await self.page(client, CmsSolution.show_login)

        @app.get("/{frontend_name}/{page_path:path}")
        def render_path(frontend_name: str, page_path: str) -> HTMLResponse:
            """
            Handles a GET request to render the path of the given frontend.

            Args:
                frontend_name: The name of the frontend to be rendered.
                page_path: The specific path within the frontend to be rendered.

            Returns:
                An HTMLResponse containing the rendered page content.

            """
            return self.render_path(frontend_name, page_path)

    def render_path(self, frontend_name: str, page_path: str):
        """
        Renders the content for a specific path of the given frontend.

        Args:
            frontend_name: The name of the frontend to be rendered.
            page_path: The specific path within the frontend to be rendered.

        Returns:
            An HTMLResponse containing the rendered page content or an error page if something goes wrong.

        Raises:
            SomeException: If an error occurs during page content retrieval or rendering.

        """
        wiki_frontend = self.wiki_frontends.wiki_frontends.get(frontend_name, None)
        if wiki_frontend is None:
            raise HTTPException(
                status_code=404, detail=f"frontend {frontend_name} is not available"
            )
        response = wiki_frontend.get_path_response(f"/{page_path}")
        return response

    def configure_run(self):
        """
        configure command line specific details
        """
        super().configure_run()
        self.wiki_frontends.enableSites(self.args.sites)
        module_path = os.path.dirname(os.path.abspath(__file__))
        yaml_path = os.path.join(module_path, "resources", "schema.yaml")
        self.load_schema(yaml_path)
        ServersView.add_to_graph(self.servers, self.graph, with_progress=True)
        pass

__init__()

constructor

Source code in frontend/webserver.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(self):
    """
    constructor
    """
    GraphNavigatorWebserver.__init__(self, config=CmsWebServer.get_config())
    self.servers = Servers.of_config_path()
    self.wiki_frontends = WikiFrontends(self.servers)
    self.users = Sso_Users(self.config.short_name)
    self.login = Login(self, self.users)
    self.hostname = socket.gethostname()
    self.server = self.servers.servers.get(self.hostname)
    if self.server:
        self.server.probe_local()

    @ui.page("/servers")
    async def show_servers(client: Client):
        if not self.authenticated():
            return RedirectResponse("/login")
        return await self.page(client, CmsSolution.show_servers)

    @ui.page("/wikis")
    async def show_wikis(client: Client):
        if not self.authenticated():
            return RedirectResponse("/login")
        return await self.page(client, CmsSolution.show_wikis)

    @ui.page("/login")
    async def login(client: Client) -> None:
        return await self.page(client, CmsSolution.show_login)

    @app.get("/{frontend_name}/{page_path:path}")
    def render_path(frontend_name: str, page_path: str) -> HTMLResponse:
        """
        Handles a GET request to render the path of the given frontend.

        Args:
            frontend_name: The name of the frontend to be rendered.
            page_path: The specific path within the frontend to be rendered.

        Returns:
            An HTMLResponse containing the rendered page content.

        """
        return self.render_path(frontend_name, page_path)

authenticated()

check authentication

Source code in frontend/webserver.py
49
50
51
52
53
54
55
56
def authenticated(self) -> bool:
    """
    check authentication
    """
    allow = self.login.authenticated()
    if self.server:
        allow = allow or self.server.auto_login
    return allow

configure_run()

configure command line specific details

Source code in frontend/webserver.py
126
127
128
129
130
131
132
133
134
135
136
def configure_run(self):
    """
    configure command line specific details
    """
    super().configure_run()
    self.wiki_frontends.enableSites(self.args.sites)
    module_path = os.path.dirname(os.path.abspath(__file__))
    yaml_path = os.path.join(module_path, "resources", "schema.yaml")
    self.load_schema(yaml_path)
    ServersView.add_to_graph(self.servers, self.graph, with_progress=True)
    pass

render_path(frontend_name, page_path)

Renders the content for a specific path of the given frontend.

Parameters:

Name Type Description Default
frontend_name str

The name of the frontend to be rendered.

required
page_path str

The specific path within the frontend to be rendered.

required

Returns:

Type Description

An HTMLResponse containing the rendered page content or an error page if something goes wrong.

Raises:

Type Description
SomeException

If an error occurs during page content retrieval or rendering.

Source code in frontend/webserver.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def render_path(self, frontend_name: str, page_path: str):
    """
    Renders the content for a specific path of the given frontend.

    Args:
        frontend_name: The name of the frontend to be rendered.
        page_path: The specific path within the frontend to be rendered.

    Returns:
        An HTMLResponse containing the rendered page content or an error page if something goes wrong.

    Raises:
        SomeException: If an error occurs during page content retrieval or rendering.

    """
    wiki_frontend = self.wiki_frontends.wiki_frontends.get(frontend_name, None)
    if wiki_frontend is None:
        raise HTTPException(
            status_code=404, detail=f"frontend {frontend_name} is not available"
        )
    response = wiki_frontend.get_path_response(f"/{page_path}")
    return response

wikicms

Created on 2020-07-27

@author: wf

WikiFrontend

Bases: object

Wiki Content Management System Frontend

Source code in frontend/wikicms.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
class WikiFrontend(object):
    """
    Wiki Content Management System Frontend
    """

    with_login: bool = True

    def __init__(
        self,
        frontend: FrontendSite,
        parser: str = "lxml",
        proxy_prefixes=["/images/", "/videos"],
        debug: bool = False,
        filterKeys=None,
    ):
        """
        Constructor
        Args:
            frontend(FrontendSite): the frontend
            parser(str): the beautiful soup parser to use e.g. html.parser
            proxy_prefixes(list): the list of prefixes that need direct proxy access
            debug: (bool): True if debugging should be on
            filterKeys: (list): a list of keys for filters to be applied e.g. editsection
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.parser = parser
        self.proxy_prefixes = proxy_prefixes
        self.frontend = frontend
        self.name = self.frontend.name
        self.debug = debug
        self.wiki = None
        if filterKeys is None:
            self.filterKeys = ["editsection", "parser-output", "parser-output"]
        else:
            self.filterKeys = []

    def log(self, msg: str):
        """
        log the given message if debugging is true

        Args:
            msg (str): the message to log
        """
        if self.debug:
            print(msg, flush=True)

    @staticmethod
    def extract_site_and_path(path: str):
        """
        Splits the given path into the site component and the remaining path.

        This static method assumes that the 'site' is the first element of the
        path when split by "/", and the 'path' is the rest of the string after
        the site.

        Parameters:
        path (str): The complete path to split.

        Returns:
        tuple: A tuple where the first element is the site and the second
               element is the subsequent path.
        """
        # Check if the path is empty or does not contain a "/"
        if not path or "/" not in path:
            return "", path

        # Split the path into parts using the "/" as a separator
        parts = path.split("/")

        # The first part is the site, the rest is joined back into a path
        site = parts[0]
        remaining_path = "/" + "/".join(parts[1:])

        return site, remaining_path

    def open(self):
        """
        open the frontend

        """
        if self.wiki is None:
            self.wiki = WikiClient.ofWikiId(self.frontend.wikiId)
            if WikiFrontend.with_login:
                self.wiki.login()
            self.smwclient = SMWClient(self.wiki.getSite())
            self.cms_pages = self.get_cms_pages()
            self.frontend.enabled = True

    def get_cms_pages(self) -> dict:
        """
        get the Content Management elements for this site
        """
        cms_pages = {}
        ask_query = "[[Category:CMS]]"
        page_records = self.smwclient.query(ask_query, "cms pages")
        for page_title in list(page_records):
            page_title, html, error = self.getContent(page_title)
            if not error:
                cms_pages[page_title] = html
            else:
                self.logger.warn(error)
        return cms_pages

    def errMsg(self, ex):
        if self.debug:
            msg = "%s\n%s" % (repr(ex), traceback.format_exc())
        else:
            msg = repr(ex)
        return msg

    def wikiPage(self, pagePath: str) -> str:
        """
        Get the wiki page for the given page path.

        Args:
            pagePath (str): The path of the page.

        Returns:
            str: The title of the page.
        """
        if "/index.php/" in pagePath:
            wikipage = pagePath.replace("/index.php/", "")
        elif pagePath.startswith("/"):
            wikipage = pagePath[1:]
        else:
            wikipage = pagePath
        return wikipage

    def checkPath(self, pagePath: str) -> str:
        """
        check the given pathPath

        Args:
            pagePath (str): the page Path to check

        Returns:
            str: None or an error message with the illegal chars being used
        """
        error = None
        self.log(pagePath)
        illegalChars = ["{", "}", "<", ">", "[", "]", "|"]
        for illegalChar in illegalChars:
            if illegalChar in pagePath:
                error = "invalid char %s in given pagePath " % (illegalChar)
        return error

    def needsProxy(self, path: str) -> bool:
        """
        Args:
            path (str): the path to check

        Returns:
            bool: True if this path needs to be proxied
        """
        needs_proxy = False
        for prefix in self.proxy_prefixes:
            needs_proxy = needs_proxy or path.startswith(prefix)
        return needs_proxy

    def proxy(self, path: str) -> str:
        """
        Proxy a request.
        See https://stackoverflow.com/a/50231825/1497139

        Args:
            path (str): the path to proxy

        Returns:
            the proxied result as a string
        """
        wikiUser = self.wiki.wikiUser
        url = f"{wikiUser.url}{wikiUser.scriptPath}{path}"

        # Get the response
        response = requests.get(url)

        return response

    def filter(self, html: str) -> str:
        """
        filter the given html
        """
        return self.doFilter(html, self.filterKeys)

    def fixNode(self, node, attribute, prefix, delim=None):
        """
        fix the given node

        node (BeautifulSoup): the node
        attribute (str): the name of the attribute e.g. "href", "src"
        prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs"
        delim (str): if not None the delimiter for multiple values
        """
        siteprefix = f"/{self.frontend.name}{prefix}"
        if attribute in node.attrs:
            attrval = node.attrs[attribute]
            if delim is not None:
                vals = attrval.split(delim)
            else:
                vals = [attrval]
                delim = ""
            newvals = []
            for val in vals:
                if val.startswith(prefix):
                    newvals.append(val.replace(prefix, siteprefix, 1))
                else:
                    newvals.append(val)
            if delim is not None:
                node.attrs[attribute] = delim.join(newvals)

    def fix_images_and_videos(self, soup):
        """
        fix image and video entries in the source code
        """
        for img in soup.findAll("img"):
            self.fixNode(img, "src", "/")
            self.fixNode(img, "srcset", "/", ", ")
        for video in soup.findAll("video"):
            for source in video.findAll("source"):
                self.fixNode(source, "src", "/")

    def fixHtml(self, soup):
        """
        fix the HTML in the given soup

        Args:
            soup(BeautifulSoup): the html parser
        """
        self.fix_images_and_videos(soup)
        # fix absolute hrefs
        for a in soup.findAll("a"):
            self.fixNode(a, "href", "/")
        return soup

    def unwrap(self, soup) -> str:
        """
        unwrap the soup
        """
        html = str(soup)
        html = html.replace("<html><body>", "")
        html = html.replace("</body></html>", "")
        # Remove  empty paragraphs
        html = re.sub(r'<p class="mw-empty-elt">\s*</p>', "", html)

        # Replace multiple newline characters with a single newline character
        html = re.sub(r"\n\s*\n", "\n", html)
        return html

    def doFilter(self, html, filterKeys):
        # https://stackoverflow.com/questions/5598524/can-i-remove-script-tags-with-beautifulsoup
        soup = BeautifulSoup(html, self.parser)
        if "parser-output" in filterKeys:
            parserdiv = soup.find("div", {"class": "mw-parser-output"})
            if parserdiv:
                soup = parserdiv
                inner_html = parserdiv.decode_contents()
                # Parse the inner HTML string to create a new BeautifulSoup object
                soup = BeautifulSoup(inner_html, self.parser)
                pass
        # https://stackoverflow.com/questions/5041008/how-to-find-elements-by-class
        if "editsection" in filterKeys:
            for s in soup.select("span.mw-editsection"):
                s.extract()
        for comments in soup.findAll(text=lambda text: isinstance(text, Comment)):
            comments.extract()
        return soup

    def getContent(self, pagePath: str):
        """get the content for the given pagePath
        Args:
            pagePath(str): the pagePath
            whatToFilter(list): list of filter keys
        Returns:
            str: the HTML content for the given path
        """
        content = None
        error = None
        pageTitle = "?"
        try:
            if pagePath == "/":
                pageTitle = self.frontend.defaultPage
            else:
                error = self.checkPath(pagePath)
                pageTitle = self.wikiPage(pagePath)
            if error is None:
                if self.wiki is None:
                    raise Exception(
                        "getContent without wiki - you might want to call open first"
                    )
                content = self.wiki.getHtml(pageTitle)
                soup = self.filter(content)
                soup = self.fixHtml(soup)
                content = self.unwrap(soup)
        except Exception as e:
            error = self.errMsg(e)
        return pageTitle, content, error

    def wrapWithReveal(self, html: str):
        """
        wrap html content with reveal.js structure and dependencies
        """
        wrapped_html = f"""<!DOCTYPE html>
<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/reveal.min.css">
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/theme/white.css">
</head>
<body>
    <div class="reveal">
        <div class="slides">
{html}
        </div>
    </div>
    <script src="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/reveal.js"></script>
    <script>Reveal.initialize({{
    }});</script>
</body>
</html>"""
        return wrapped_html

    def toReveal(self, html: str):
        """
        convert the given html to reveal
        see https://revealjs.com/
        """
        soup = BeautifulSoup(html, "lxml")
        for h2 in soup.findChildren(recursive=True):
            if h2.name == "h2":
                span = h2.next_element
                if span.name == "span":
                    tagid = span.get("id")
                    if tagid.startswith("⌘⌘"):
                        section = soup.new_tag("section")
                        h2.parent.append(section)
                        section.insert(0, h2)
                        tag = h2.next_element
                        while tag is not None and tag.name != "h2":
                            if tag.parent != h2:
                                section.append(tag)
                            tag = tag.next_element
        html = self.unwrap(soup)
        return html

    def get_frame(self, page_title: str) -> str:
        """
        get the frame property for the given page_title
        """
        frame = None
        markup = self.wiki.get_wiki_markup(page_title)
        # {{#set:frame=reveal}}
        # {{UseFrame|Contact.rythm|
        patterns = [
            r"{{#set:frame=([^}]+)}}",  # {{#set:frame=reveal}}
            r"{{UseFrame\|([^.]+)",  # {{UseFrame|Contact.rythm|
        ]

        for pattern in patterns:
            match = re.search(pattern, markup)
            if match:
                frame = match.group(1)
        return frame

    def get_path_response(self, path: str) -> str:
        """
        get the repsonse for the the given path

        Args:
            path(str): the path to render the content for

        Returns:
            Response: a FastAPI response
        """
        if self.needsProxy(path):
            html_response = self.proxy(path)
            # Create a FastAPI response object
            response = Response(
                content=html_response.content,
                status_code=html_response.status_code,
                headers=dict(html_response.headers),
            )
        else:
            page_title, content, error = self.getContent(path)
            html_frame = HtmlFrame(self, title=page_title)
            html = content
            framed_html = None
            if error:
                html = f"error getting {page_title} for {self.name}:<br>{error}"
            else:
                if "<slideshow" in html or "&lt;slideshow" in html:
                    content = self.toReveal(content)
                    # Complete reveal.js webpage
                    framed_html = self.wrapWithReveal(html)
                    html = content

            if not framed_html:
                framed_html = html_frame.frame(html)
            response = HTMLResponse(framed_html)
        return response

__init__(frontend, parser='lxml', proxy_prefixes=['/images/', '/videos'], debug=False, filterKeys=None)

Constructor Args: frontend(FrontendSite): the frontend parser(str): the beautiful soup parser to use e.g. html.parser proxy_prefixes(list): the list of prefixes that need direct proxy access debug: (bool): True if debugging should be on filterKeys: (list): a list of keys for filters to be applied e.g. editsection

Source code in frontend/wikicms.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    frontend: FrontendSite,
    parser: str = "lxml",
    proxy_prefixes=["/images/", "/videos"],
    debug: bool = False,
    filterKeys=None,
):
    """
    Constructor
    Args:
        frontend(FrontendSite): the frontend
        parser(str): the beautiful soup parser to use e.g. html.parser
        proxy_prefixes(list): the list of prefixes that need direct proxy access
        debug: (bool): True if debugging should be on
        filterKeys: (list): a list of keys for filters to be applied e.g. editsection
    """
    self.logger = logging.getLogger(self.__class__.__name__)
    self.parser = parser
    self.proxy_prefixes = proxy_prefixes
    self.frontend = frontend
    self.name = self.frontend.name
    self.debug = debug
    self.wiki = None
    if filterKeys is None:
        self.filterKeys = ["editsection", "parser-output", "parser-output"]
    else:
        self.filterKeys = []

checkPath(pagePath)

check the given pathPath

Parameters:

Name Type Description Default
pagePath str

the page Path to check

required

Returns:

Name Type Description
str str

None or an error message with the illegal chars being used

Source code in frontend/wikicms.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def checkPath(self, pagePath: str) -> str:
    """
    check the given pathPath

    Args:
        pagePath (str): the page Path to check

    Returns:
        str: None or an error message with the illegal chars being used
    """
    error = None
    self.log(pagePath)
    illegalChars = ["{", "}", "<", ">", "[", "]", "|"]
    for illegalChar in illegalChars:
        if illegalChar in pagePath:
            error = "invalid char %s in given pagePath " % (illegalChar)
    return error

extract_site_and_path(path) staticmethod

Splits the given path into the site component and the remaining path.

This static method assumes that the 'site' is the first element of the path when split by "/", and the 'path' is the rest of the string after the site.

Parameters: path (str): The complete path to split.

tuple: A tuple where the first element is the site and the second element is the subsequent path.

Source code in frontend/wikicms.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@staticmethod
def extract_site_and_path(path: str):
    """
    Splits the given path into the site component and the remaining path.

    This static method assumes that the 'site' is the first element of the
    path when split by "/", and the 'path' is the rest of the string after
    the site.

    Parameters:
    path (str): The complete path to split.

    Returns:
    tuple: A tuple where the first element is the site and the second
           element is the subsequent path.
    """
    # Check if the path is empty or does not contain a "/"
    if not path or "/" not in path:
        return "", path

    # Split the path into parts using the "/" as a separator
    parts = path.split("/")

    # The first part is the site, the rest is joined back into a path
    site = parts[0]
    remaining_path = "/" + "/".join(parts[1:])

    return site, remaining_path

filter(html)

filter the given html

Source code in frontend/wikicms.py
201
202
203
204
205
def filter(self, html: str) -> str:
    """
    filter the given html
    """
    return self.doFilter(html, self.filterKeys)

fixHtml(soup)

fix the HTML in the given soup

Parameters:

Name Type Description Default
soup(BeautifulSoup)

the html parser

required
Source code in frontend/wikicms.py
244
245
246
247
248
249
250
251
252
253
254
255
def fixHtml(self, soup):
    """
    fix the HTML in the given soup

    Args:
        soup(BeautifulSoup): the html parser
    """
    self.fix_images_and_videos(soup)
    # fix absolute hrefs
    for a in soup.findAll("a"):
        self.fixNode(a, "href", "/")
    return soup

fixNode(node, attribute, prefix, delim=None)

fix the given node

node (BeautifulSoup): the node attribute (str): the name of the attribute e.g. "href", "src" prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs" delim (str): if not None the delimiter for multiple values

Source code in frontend/wikicms.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def fixNode(self, node, attribute, prefix, delim=None):
    """
    fix the given node

    node (BeautifulSoup): the node
    attribute (str): the name of the attribute e.g. "href", "src"
    prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs"
    delim (str): if not None the delimiter for multiple values
    """
    siteprefix = f"/{self.frontend.name}{prefix}"
    if attribute in node.attrs:
        attrval = node.attrs[attribute]
        if delim is not None:
            vals = attrval.split(delim)
        else:
            vals = [attrval]
            delim = ""
        newvals = []
        for val in vals:
            if val.startswith(prefix):
                newvals.append(val.replace(prefix, siteprefix, 1))
            else:
                newvals.append(val)
        if delim is not None:
            node.attrs[attribute] = delim.join(newvals)

fix_images_and_videos(soup)

fix image and video entries in the source code

Source code in frontend/wikicms.py
233
234
235
236
237
238
239
240
241
242
def fix_images_and_videos(self, soup):
    """
    fix image and video entries in the source code
    """
    for img in soup.findAll("img"):
        self.fixNode(img, "src", "/")
        self.fixNode(img, "srcset", "/", ", ")
    for video in soup.findAll("video"):
        for source in video.findAll("source"):
            self.fixNode(source, "src", "/")

getContent(pagePath)

get the content for the given pagePath Args: pagePath(str): the pagePath whatToFilter(list): list of filter keys Returns: str: the HTML content for the given path

Source code in frontend/wikicms.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def getContent(self, pagePath: str):
    """get the content for the given pagePath
    Args:
        pagePath(str): the pagePath
        whatToFilter(list): list of filter keys
    Returns:
        str: the HTML content for the given path
    """
    content = None
    error = None
    pageTitle = "?"
    try:
        if pagePath == "/":
            pageTitle = self.frontend.defaultPage
        else:
            error = self.checkPath(pagePath)
            pageTitle = self.wikiPage(pagePath)
        if error is None:
            if self.wiki is None:
                raise Exception(
                    "getContent without wiki - you might want to call open first"
                )
            content = self.wiki.getHtml(pageTitle)
            soup = self.filter(content)
            soup = self.fixHtml(soup)
            content = self.unwrap(soup)
    except Exception as e:
        error = self.errMsg(e)
    return pageTitle, content, error

get_cms_pages()

get the Content Management elements for this site

Source code in frontend/wikicms.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_cms_pages(self) -> dict:
    """
    get the Content Management elements for this site
    """
    cms_pages = {}
    ask_query = "[[Category:CMS]]"
    page_records = self.smwclient.query(ask_query, "cms pages")
    for page_title in list(page_records):
        page_title, html, error = self.getContent(page_title)
        if not error:
            cms_pages[page_title] = html
        else:
            self.logger.warn(error)
    return cms_pages

get_frame(page_title)

get the frame property for the given page_title

Source code in frontend/wikicms.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def get_frame(self, page_title: str) -> str:
    """
    get the frame property for the given page_title
    """
    frame = None
    markup = self.wiki.get_wiki_markup(page_title)
    # {{#set:frame=reveal}}
    # {{UseFrame|Contact.rythm|
    patterns = [
        r"{{#set:frame=([^}]+)}}",  # {{#set:frame=reveal}}
        r"{{UseFrame\|([^.]+)",  # {{UseFrame|Contact.rythm|
    ]

    for pattern in patterns:
        match = re.search(pattern, markup)
        if match:
            frame = match.group(1)
    return frame

get_path_response(path)

get the repsonse for the the given path

Parameters:

Name Type Description Default
path(str)

the path to render the content for

required

Returns:

Name Type Description
Response str

a FastAPI response

Source code in frontend/wikicms.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def get_path_response(self, path: str) -> str:
    """
    get the repsonse for the the given path

    Args:
        path(str): the path to render the content for

    Returns:
        Response: a FastAPI response
    """
    if self.needsProxy(path):
        html_response = self.proxy(path)
        # Create a FastAPI response object
        response = Response(
            content=html_response.content,
            status_code=html_response.status_code,
            headers=dict(html_response.headers),
        )
    else:
        page_title, content, error = self.getContent(path)
        html_frame = HtmlFrame(self, title=page_title)
        html = content
        framed_html = None
        if error:
            html = f"error getting {page_title} for {self.name}:<br>{error}"
        else:
            if "<slideshow" in html or "&lt;slideshow" in html:
                content = self.toReveal(content)
                # Complete reveal.js webpage
                framed_html = self.wrapWithReveal(html)
                html = content

        if not framed_html:
            framed_html = html_frame.frame(html)
        response = HTMLResponse(framed_html)
    return response

log(msg)

log the given message if debugging is true

Parameters:

Name Type Description Default
msg str

the message to log

required
Source code in frontend/wikicms.py
59
60
61
62
63
64
65
66
67
def log(self, msg: str):
    """
    log the given message if debugging is true

    Args:
        msg (str): the message to log
    """
    if self.debug:
        print(msg, flush=True)

needsProxy(path)

Parameters:

Name Type Description Default
path str

the path to check

required

Returns:

Name Type Description
bool bool

True if this path needs to be proxied

Source code in frontend/wikicms.py
169
170
171
172
173
174
175
176
177
178
179
180
def needsProxy(self, path: str) -> bool:
    """
    Args:
        path (str): the path to check

    Returns:
        bool: True if this path needs to be proxied
    """
    needs_proxy = False
    for prefix in self.proxy_prefixes:
        needs_proxy = needs_proxy or path.startswith(prefix)
    return needs_proxy

open()

open the frontend

Source code in frontend/wikicms.py
 98
 99
100
101
102
103
104
105
106
107
108
109
def open(self):
    """
    open the frontend

    """
    if self.wiki is None:
        self.wiki = WikiClient.ofWikiId(self.frontend.wikiId)
        if WikiFrontend.with_login:
            self.wiki.login()
        self.smwclient = SMWClient(self.wiki.getSite())
        self.cms_pages = self.get_cms_pages()
        self.frontend.enabled = True

proxy(path)

Proxy a request. See https://stackoverflow.com/a/50231825/1497139

Parameters:

Name Type Description Default
path str

the path to proxy

required

Returns:

Type Description
str

the proxied result as a string

Source code in frontend/wikicms.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def proxy(self, path: str) -> str:
    """
    Proxy a request.
    See https://stackoverflow.com/a/50231825/1497139

    Args:
        path (str): the path to proxy

    Returns:
        the proxied result as a string
    """
    wikiUser = self.wiki.wikiUser
    url = f"{wikiUser.url}{wikiUser.scriptPath}{path}"

    # Get the response
    response = requests.get(url)

    return response

toReveal(html)

convert the given html to reveal see https://revealjs.com/

Source code in frontend/wikicms.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def toReveal(self, html: str):
    """
    convert the given html to reveal
    see https://revealjs.com/
    """
    soup = BeautifulSoup(html, "lxml")
    for h2 in soup.findChildren(recursive=True):
        if h2.name == "h2":
            span = h2.next_element
            if span.name == "span":
                tagid = span.get("id")
                if tagid.startswith("⌘⌘"):
                    section = soup.new_tag("section")
                    h2.parent.append(section)
                    section.insert(0, h2)
                    tag = h2.next_element
                    while tag is not None and tag.name != "h2":
                        if tag.parent != h2:
                            section.append(tag)
                        tag = tag.next_element
    html = self.unwrap(soup)
    return html

unwrap(soup)

unwrap the soup

Source code in frontend/wikicms.py
257
258
259
260
261
262
263
264
265
266
267
268
269
def unwrap(self, soup) -> str:
    """
    unwrap the soup
    """
    html = str(soup)
    html = html.replace("<html><body>", "")
    html = html.replace("</body></html>", "")
    # Remove  empty paragraphs
    html = re.sub(r'<p class="mw-empty-elt">\s*</p>', "", html)

    # Replace multiple newline characters with a single newline character
    html = re.sub(r"\n\s*\n", "\n", html)
    return html

wikiPage(pagePath)

Get the wiki page for the given page path.

Parameters:

Name Type Description Default
pagePath str

The path of the page.

required

Returns:

Name Type Description
str str

The title of the page.

Source code in frontend/wikicms.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def wikiPage(self, pagePath: str) -> str:
    """
    Get the wiki page for the given page path.

    Args:
        pagePath (str): The path of the page.

    Returns:
        str: The title of the page.
    """
    if "/index.php/" in pagePath:
        wikipage = pagePath.replace("/index.php/", "")
    elif pagePath.startswith("/"):
        wikipage = pagePath[1:]
    else:
        wikipage = pagePath
    return wikipage

wrapWithReveal(html)

wrap html content with reveal.js structure and dependencies

Source code in frontend/wikicms.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
    def wrapWithReveal(self, html: str):
        """
        wrap html content with reveal.js structure and dependencies
        """
        wrapped_html = f"""<!DOCTYPE html>
<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/reveal.min.css">
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/theme/white.css">
</head>
<body>
    <div class="reveal">
        <div class="slides">
{html}
        </div>
    </div>
    <script src="https://cdn.jsdelivr.net/npm/reveal.js@4.3.1/dist/reveal.js"></script>
    <script>Reveal.initialize({{
    }});</script>
</body>
</html>"""
        return wrapped_html

WikiFrontends

wiki frontends

Source code in frontend/wikicms.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class WikiFrontends:
    """
    wiki frontends
    """

    def __init__(self, servers):
        """
        constructor
        """
        self.servers = servers
        self.wiki_frontends = {}

    def enableSites(self, siteNames):
        """
        enable the sites given in the sites list
        Args:
            siteNames(list): a list of strings with wikiIds to be enabled
        """
        if siteNames is None:
            return
        for siteName in siteNames:
            self.get_frontend(siteName)

    def get_frontend(self, name: str) -> WikiFrontend:
        """
        Get WikiFrontend from cache or create new one
        """
        # Check cache first
        if name in self.wiki_frontends:
            cached_frontend = self.wiki_frontends[name]
            return cached_frontend

        # Create new frontend if not cached
        frontend = self.servers.frontends_by_name.get(name)
        if frontend:
            wiki_frontend = WikiFrontend(frontend)
            wiki_frontend.open()
            # Cache it
            self.wiki_frontends[name] = wiki_frontend
            return wiki_frontend

__init__(servers)

constructor

Source code in frontend/wikicms.py
428
429
430
431
432
433
def __init__(self, servers):
    """
    constructor
    """
    self.servers = servers
    self.wiki_frontends = {}

enableSites(siteNames)

enable the sites given in the sites list Args: siteNames(list): a list of strings with wikiIds to be enabled

Source code in frontend/wikicms.py
435
436
437
438
439
440
441
442
443
444
def enableSites(self, siteNames):
    """
    enable the sites given in the sites list
    Args:
        siteNames(list): a list of strings with wikiIds to be enabled
    """
    if siteNames is None:
        return
    for siteName in siteNames:
        self.get_frontend(siteName)

get_frontend(name)

Get WikiFrontend from cache or create new one

Source code in frontend/wikicms.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def get_frontend(self, name: str) -> WikiFrontend:
    """
    Get WikiFrontend from cache or create new one
    """
    # Check cache first
    if name in self.wiki_frontends:
        cached_frontend = self.wiki_frontends[name]
        return cached_frontend

    # Create new frontend if not cached
    frontend = self.servers.frontends_by_name.get(name)
    if frontend:
        wiki_frontend = WikiFrontend(frontend)
        wiki_frontend.open()
        # Cache it
        self.wiki_frontends[name] = wiki_frontend
        return wiki_frontend

wikigrid

Created on 2022-12-03

@author: wf

WikiCheck

Check to be performed on a Mediawiki.

Source code in frontend/wikigrid.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class WikiCheck:
    """
    Check to be performed on  a Mediawiki.
    """

    def __init__(self, name, func, checked=True):
        self.name = name
        self.func = func  # the check function to be performed on a WikiState
        self.checked = checked
        self.checkbox = None

    def as_checkbox(self):
        """
        Return a checkbox representation of the instance.
        """
        self.checkbox = ui.checkbox(self.name).bind_value(self, "checked")
        return self.checkbox

as_checkbox()

Return a checkbox representation of the instance.

Source code in frontend/wikigrid.py
32
33
34
35
36
37
def as_checkbox(self):
    """
    Return a checkbox representation of the instance.
    """
    self.checkbox = ui.checkbox(self.name).bind_value(self, "checked")
    return self.checkbox

WikiGrid

A grid of Wikis.

Source code in frontend/wikigrid.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class WikiGrid:
    """
    A grid of Wikis.
    """

    def __init__(self, solution):
        # back reference to nicegui solution
        self.solution = solution

        self.wiki_users = WikiUser.getWikiUsers()
        self.wiki_clients = {}
        self.smw_clients = {}
        self.sorted_wiki_users = sorted(
            self.wiki_users.values(), key=lambda w: w.wikiId
        )
        self.lod = []
        self.task_runner = TaskRunner(timeout=40)
        self.wikistates_by_row_no = {}
        for index, wiki_user in enumerate(self.sorted_wiki_users):
            wiki_state = MediaWikiSite(wiki_user=wiki_user, row_index=index)
            record = wiki_state.as_dict()
            self.lod.append(record)
            self.wikistates_by_row_no[wiki_state.row_no] = wiki_state

    def setup(self):
        """
        setup the ui
        """
        self.add_checkboxes()
        self.progressbar = NiceguiProgressbar(
            len(self.wikistates_by_row_no), "work on wikis", "steps"
        )
        self.task_runner.progress = self.progressbar
        self.as_grid()
        self.lod_grid.update()

    def as_grid(self):
        # Configure grid with checkbox selection
        grid_config = GridConfig(
            key_col="#",
            editable=False,
            multiselect=True,
            with_buttons=True,
            button_names=["all", "fit"],
            debug=False,
        )
        self.lod_grid = ListOfDictsGrid(lod=self.lod, config=grid_config)
        self.lod_grid.ag_grid._props["html_columns"] = [0, 1, 2]
        return self.lod_grid

    def add_checkboxes(self):
        """
        Add check boxes.
        """
        self.button_row = ui.row()
        with self.button_row:
            self.wiki_checks = [
                WikiCheck("version", self.check_wiki_version),
                WikiCheck("backup", self.check_backup),
                WikiCheck("pages", self.check_pages),
            ]
            for wiki_check in self.wiki_checks:
                wiki_check.as_checkbox()
            ui.button(text="Checks", on_click=self.perform_wiki_checks)

    async def get_selected_lod(self):
        lod_index = self.lod_grid.get_index(
            lenient=self.lod_grid.config.lenient, lod=self.lod
        )
        lod = await self.lod_grid.get_selected_lod(lod_index=lod_index)
        if len(lod) == 0:
            with self.button_row:
                ui.notify("Please select at least one row")
        return lod

    async def perform_wiki_checks(self, _msg):
        """
        react on the button for check having been clicked
        """
        self.select_lod = await self.get_selected_lod()
        if self.select_lod:
            with self.solution.content_div:
                total = len(self.select_lod)
                ui.notify(f"Checking {total} wikis ...")
                # Use single task_runner
                self.task_runner.run_blocking(self.run_all_wiki_checks)

    def run_all_wiki_checks(self):
        """
        Process all selected wikis sequentially
        """
        try:
            # Calculate total steps
            steps = 0
            for wiki_check in self.wiki_checks:
                if wiki_check.checked:
                    steps += len(self.select_lod)
            self.progressbar.total = steps
            self.progressbar.reset()

            # Process each wiki sequentially
            for row in self.select_lod:
                row_no = row["#"]
                wiki_state = self.wikistates_by_row_no.get(row_no)
                self.run_wiki_check(wiki_state)

        except BaseException as ex:
            self.solution.handle_exception(ex)

    def run_wiki_check(self, wiki_state):
        """
        perform the selected wiki checks for a single wiki
        """
        try:
            for wiki_check in self.wiki_checks:
                if wiki_check.checked:
                    wiki_check.func(wiki_state)
                with self.solution.content_div:
                    self.lod_grid.update()
                    # Update the progress bar
                    self.progressbar.update(1)
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_pages(self, wiki_state: MediaWikiSite):
        """
        Try login for wiki user and report success or failure.
        """
        try:
            try:
                client = wiki_state.wiki_client
                stats = client.get_site_statistics()
                pages = stats["pages"]
                self.lod_grid.update_cell(wiki_state.row_no, "login", f"✅")
                self.lod_grid.update_cell(wiki_state.row_no, "pages", f"✅{pages}")
            except Exception as ex:
                self.lod_grid.update_cell(wiki_state.row_no, "login", f"❌ {str(ex)}")
                self.lod_grid.update_cell(wiki_state.row_no, "pages", "❌")
                return
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_wiki_version(self, wiki_state: MediaWikiSite):
        """
        Check the MediaWiki version for a specific WikiState.
        """
        try:
            mw_version = wiki_state.check_version()
            if not mw_version.startswith("MediaWiki"):
                mw_version = f"MediaWiki {mw_version}"
            row = self.lod_grid.get_row_for_key(wiki_state.row_no)
            if row:
                ex_version = wiki_state.wiki_user.version
                if ex_version == mw_version:
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "version", f"{mw_version}✅"
                    )
                else:
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "version", f"{ex_version}!={mw_version}❌"
                    )
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_backup(self, wiki_state):
        """
        Check the backup status for a specific WikiUser.
        """
        try:
            row = self.lod_grid.get_row_for_key(wiki_state.row_no)
            if row:
                backup_path = f"{Path.home()}/wikibackup/{wiki_state.wiki_user.wikiId}"
                if os.path.isdir(backup_path):
                    wiki_files = glob.glob(f"{backup_path}/*.wiki")
                    msg = f"{len(wiki_files):6} ✅"
                    self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
                    # https://stackoverflow.com/a/39327156/1497139
                    if wiki_files:
                        latest_file = max(wiki_files, key=os.path.getctime)
                        st = os.stat(latest_file)
                        age_days = round((time.time() - st.st_mtime) / 86400)
                        self.lod_grid.update_cell(
                            wiki_state.row_no, "age", f"{age_days}"
                        )
                else:
                    msg = "❌"
                    self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
        except BaseException as ex:
            self.solution.handle_exception(ex)

add_checkboxes()

Add check boxes.

Source code in frontend/wikigrid.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def add_checkboxes(self):
    """
    Add check boxes.
    """
    self.button_row = ui.row()
    with self.button_row:
        self.wiki_checks = [
            WikiCheck("version", self.check_wiki_version),
            WikiCheck("backup", self.check_backup),
            WikiCheck("pages", self.check_pages),
        ]
        for wiki_check in self.wiki_checks:
            wiki_check.as_checkbox()
        ui.button(text="Checks", on_click=self.perform_wiki_checks)

check_backup(wiki_state)

Check the backup status for a specific WikiUser.

Source code in frontend/wikigrid.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def check_backup(self, wiki_state):
    """
    Check the backup status for a specific WikiUser.
    """
    try:
        row = self.lod_grid.get_row_for_key(wiki_state.row_no)
        if row:
            backup_path = f"{Path.home()}/wikibackup/{wiki_state.wiki_user.wikiId}"
            if os.path.isdir(backup_path):
                wiki_files = glob.glob(f"{backup_path}/*.wiki")
                msg = f"{len(wiki_files):6} ✅"
                self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
                # https://stackoverflow.com/a/39327156/1497139
                if wiki_files:
                    latest_file = max(wiki_files, key=os.path.getctime)
                    st = os.stat(latest_file)
                    age_days = round((time.time() - st.st_mtime) / 86400)
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "age", f"{age_days}"
                    )
            else:
                msg = "❌"
                self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
    except BaseException as ex:
        self.solution.handle_exception(ex)

check_pages(wiki_state)

Try login for wiki user and report success or failure.

Source code in frontend/wikigrid.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def check_pages(self, wiki_state: MediaWikiSite):
    """
    Try login for wiki user and report success or failure.
    """
    try:
        try:
            client = wiki_state.wiki_client
            stats = client.get_site_statistics()
            pages = stats["pages"]
            self.lod_grid.update_cell(wiki_state.row_no, "login", f"✅")
            self.lod_grid.update_cell(wiki_state.row_no, "pages", f"✅{pages}")
        except Exception as ex:
            self.lod_grid.update_cell(wiki_state.row_no, "login", f"❌ {str(ex)}")
            self.lod_grid.update_cell(wiki_state.row_no, "pages", "❌")
            return
    except BaseException as ex:
        self.solution.handle_exception(ex)

check_wiki_version(wiki_state)

Check the MediaWiki version for a specific WikiState.

Source code in frontend/wikigrid.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def check_wiki_version(self, wiki_state: MediaWikiSite):
    """
    Check the MediaWiki version for a specific WikiState.
    """
    try:
        mw_version = wiki_state.check_version()
        if not mw_version.startswith("MediaWiki"):
            mw_version = f"MediaWiki {mw_version}"
        row = self.lod_grid.get_row_for_key(wiki_state.row_no)
        if row:
            ex_version = wiki_state.wiki_user.version
            if ex_version == mw_version:
                self.lod_grid.update_cell(
                    wiki_state.row_no, "version", f"{mw_version}✅"
                )
            else:
                self.lod_grid.update_cell(
                    wiki_state.row_no, "version", f"{ex_version}!={mw_version}❌"
                )
    except BaseException as ex:
        self.solution.handle_exception(ex)

perform_wiki_checks(_msg) async

react on the button for check having been clicked

Source code in frontend/wikigrid.py
115
116
117
118
119
120
121
122
123
124
125
async def perform_wiki_checks(self, _msg):
    """
    react on the button for check having been clicked
    """
    self.select_lod = await self.get_selected_lod()
    if self.select_lod:
        with self.solution.content_div:
            total = len(self.select_lod)
            ui.notify(f"Checking {total} wikis ...")
            # Use single task_runner
            self.task_runner.run_blocking(self.run_all_wiki_checks)

run_all_wiki_checks()

Process all selected wikis sequentially

Source code in frontend/wikigrid.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def run_all_wiki_checks(self):
    """
    Process all selected wikis sequentially
    """
    try:
        # Calculate total steps
        steps = 0
        for wiki_check in self.wiki_checks:
            if wiki_check.checked:
                steps += len(self.select_lod)
        self.progressbar.total = steps
        self.progressbar.reset()

        # Process each wiki sequentially
        for row in self.select_lod:
            row_no = row["#"]
            wiki_state = self.wikistates_by_row_no.get(row_no)
            self.run_wiki_check(wiki_state)

    except BaseException as ex:
        self.solution.handle_exception(ex)

run_wiki_check(wiki_state)

perform the selected wiki checks for a single wiki

Source code in frontend/wikigrid.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def run_wiki_check(self, wiki_state):
    """
    perform the selected wiki checks for a single wiki
    """
    try:
        for wiki_check in self.wiki_checks:
            if wiki_check.checked:
                wiki_check.func(wiki_state)
            with self.solution.content_div:
                self.lod_grid.update()
                # Update the progress bar
                self.progressbar.update(1)
    except BaseException as ex:
        self.solution.handle_exception(ex)

setup()

setup the ui

Source code in frontend/wikigrid.py
64
65
66
67
68
69
70
71
72
73
74
def setup(self):
    """
    setup the ui
    """
    self.add_checkboxes()
    self.progressbar = NiceguiProgressbar(
        len(self.wikistates_by_row_no), "work on wikis", "steps"
    )
    self.task_runner.progress = self.progressbar
    self.as_grid()
    self.lod_grid.update()