Skip to content

pyWikiCMS API Documentation

clickstream

Created on 2023-06-11

@author: wf

ClickStream dataclass

Represents a clickstream with associated page hits and user agent data.

Source code in frontend/clickstream.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@dataclass
class ClickStream:
    """Represents a clickstream with associated page hits and user agent data."""

    url: str
    ip: str
    domain: str
    timeStamp: datetime
    pageHits: List[PageHit]
    userAgent: UserAgent
    userAgentHeader: Optional[str] = None
    referrer: Optional[str] = None
    acceptLanguage: Optional[str] = None

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "ClickStream":
        data["timeStamp"] = DateParse.parse_date(data["timeStamp"])
        # Ensure `pageHits` are processed into PageHit instances
        # Initialize an empty list to store PageHit instances.
        page_hits = []

        # Iterate through each item in the list obtained from the 'pageHits' key.
        # Using .get() with a default empty list to handle the absence of 'pageHits'.
        for hit in data.get("pageHits", []):
            # Check if the current hit is not None before processing.
            if hit is not None:
                # Convert the hit dictionary to a PageHit instance and add it to the list.
                page_hits.append(PageHit.from_dict(hit))

        # 'data' dictionary is updated to hold the list of PageHit instances.
        data["pageHits"] = page_hits
        # Remove any keys from `data` that are not fields of the `ClickStream` dataclass
        # data = {key: value for key, value in data.items() if key in ClickStream.__annotations__}

        # Let the `_postprocess` handle the userAgent conversion
        data = ClickStream._postprocess(data)
        return ClickStream(**data)

    @staticmethod
    def _postprocess(data: Dict[str, Any]) -> Dict[str, Any]:
        # Ensure `userAgent` is a dictionary before trying to convert
        if isinstance(data.get("userAgent"), dict):
            data["userAgent"] = UserAgent.from_dict(data["userAgent"])
        # If `pageHits` needs to be processed again (not typically necessary if handled in `from_dict`)
        if isinstance(data.get("pageHits"), list):
            data["pageHits"] = [
                PageHit.from_dict(hit) if isinstance(hit, dict) else hit
                for hit in data["pageHits"]
            ]
        return data

ClickstreamLog dataclass

single log of clickstreams

Source code in frontend/clickstream.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@dataclass
class ClickstreamLog:
    """
    single log of clickstreams
    """

    debug: bool
    MAX_CLICKSTREAMS: int
    LOGGING_TIME_PERIOD: int
    MAX_SESSION_TIME: int
    FLUSH_PERIOD: int
    startTime: datetime
    lastFlush: datetime
    lastLogRotate: datetime
    fileName: str
    clickStreams: List[ClickStream]

    @classmethod
    def from_json(cls, json_file: str):
        with open(json_file, "r", encoding="utf-8") as file:
            data = json.load(file)

        # Handle nested structures
        data = ClickstreamLog._postprocess(data)

        return ClickstreamLog(**data)

    @classmethod
    def _postprocess(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        data["startTime"] = DateParse.parse_date(data["startTime"])
        data["lastFlush"] = DateParse.parse_date(data["lastFlush"])
        data["lastLogRotate"] = DateParse.parse_date(data["lastLogRotate"])
        data["clickStreams"] = [
            ClickStream.from_dict(cs) for cs in data.get("clickStreams", [])
        ]
        return data

ClickstreamManager

Bases: object

logging of client clicks

Source code in frontend/clickstream.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class ClickstreamManager(object):
    """
    logging of client clicks
    """

    def __init__(
        self,
        root_path: str,
        rdf_namespace:str ="http://cms.bitplan.com/clickstream#",
        show_progress: bool = True,
        verbose: bool = True,
    ):
        """
        Constructor

        Args:
            root_path (str): the root path
            rdf_namespace (str): The base namespace URI for the RDF export.
            show_progress (bool): If True, show progress.
            verbose (bool): If True, print the output message.
        """
        self.root_path = root_path
        self.rdf_namespace = rdf_namespace
        self.clickstream_logs: List[ClickstreamLog] = []
        self.show_progress = show_progress
        self.verbose = verbose

    def get_progress(self, iterable, desc="Processing"):
        """
        Wrap an iterable with a progress bar if show_progress is True
        """
        if self.show_progress:
            return tqdm(iterable, desc=desc)
        else:
            return iterable

    def load_clickstream_logs(self, limit: Optional[int] = None) -> None:
        """
        Load all clickstream logs from the directory
        """
        # Find all json files in the directory
        json_files = glob.glob(os.path.join(self.root_path, "*.json"))
        # If a limit is set, truncate the file list
        if limit is not None:
            json_files = json_files[:limit]

        # Prepare tqdm iterator if required and tqdm is available
        iterator = self.get_progress(json_files, desc="Loading Clickstream Logs")

        total_clickstreams = 0

        # Load each file

        for json_file in iterator:
            try:
                # Parse the JSON file into ClickstreamLog
                clickstream_log = ClickstreamLog.from_json(json_file)
                self.clickstream_logs.append(clickstream_log)
                total_clickstreams += len(
                    clickstream_log.clickStreams
                )  # Count the clickstreams
            except json.JSONDecodeError as jde:
                # Handle JSON-specific parsing errors
                print(f"JSON decode error in file {json_file}: {jde.msg}")
                print(f"Error at line {jde.lineno}, column {jde.colno}")
            except Exception as e:
                tb = traceback.format_exc()  # This will give you the stack trace
                print(f"Error loading {json_file}: {e}")
                print(tb)  # Print stack trace to get more details about the exception
        # After importing, show the total counts
        total_logs = len(self.clickstream_logs)
        print(
            f"Imported {total_logs} clickstream logs with a total of {total_clickstreams} clickstreams."
        )

    def serialize_batch(
        self, g: Graph, rdf_file: str, file_counter: int, rdf_format: str
    ) -> None:
        """
        Serializes a batch of RDF data to a file.

        Args:
            g (Graph): The RDF graph to serialize.
            rdf_file (str): The base name for the RDF file.
            file_counter (int): The current file count for naming.
            rdf_format (str): The format to serialize the RDF data.

        """
        batch_file = f"{rdf_file}_part{file_counter:03}.{rdf_format}"
        g.serialize(destination=batch_file, format=rdf_format)
        if self.verbose:
            print(f"Exported RDF to {batch_file}")

    def add_stream_properties_to_graph(
        self, g: Graph, CS: Namespace, stream: Any, entity_counter: int
    ) -> int:
        """
        Adds the properties of a clickstream to the RDF graph.

        Args:
            g (Graph): The graph to which the properties will be added.
            CS (Namespace): The namespace for clickstream data.
            stream (Any): The clickstream object containing the data.
            entity_counter (int): A counter for creating unique entities.

        Returns:
            int: The updated entity counter after adding the properties.
        """
        stream_uri = URIRef(f"{CS}clickstream/{entity_counter}")
        entity_counter += 1

        # Add properties to the stream URI
        g.add((stream_uri, RDF.type, CS.ClickStream))
        g.add((stream_uri, CS.url, Literal(stream.url)))
        g.add((stream_uri, CS.ip, Literal(stream.ip)))
        g.add((stream_uri, CS.domain, Literal(stream.domain)))
        g.add((stream_uri, CS.userAgentHeader, Literal(stream.userAgentHeader)))
        g.add(
            (
                stream_uri,
                CS.timeStamp,
                Literal(stream.timeStamp.isoformat(), datatype=XSD.dateTime),
            )
        )

        # Optional referrer information
        if stream.referrer:
            g.add((stream_uri, CS.referrer, Literal(stream.referrer)))

        # User Agent details
        ua_uri = URIRef(f"{CS}useragent/{entity_counter}")
        entity_counter += 1
        g.add((ua_uri, RDF.type, CS.UserAgent))
        g.add((ua_uri, CS.hasSyntaxError, Literal(stream.userAgent.hasSyntaxError)))
        g.add((ua_uri, CS.hasAmbiguity, Literal(stream.userAgent.hasAmbiguity)))
        g.add((ua_uri, CS.ambiguityCount, Literal(stream.userAgent.ambiguityCount)))
        g.add((ua_uri, CS.userAgentString, Literal(stream.userAgent.userAgentString)))
        g.add((stream_uri, CS.userAgent, ua_uri))

        # Page Hits
        for hit in stream.pageHits:
            hit_uri = URIRef(f"{CS}pagehit/{entity_counter}")
            entity_counter += 1
            g.add((hit_uri, RDF.type, CS.PageHit))
            g.add((hit_uri, CS.path, Literal(hit.path)))
            g.add(
                (
                    hit_uri,
                    CS.timeStamp,
                    Literal(hit.timeStamp.isoformat(), datatype=XSD.dateTime),
                )
            )
            g.add((stream_uri, CS.pageHits, hit_uri))

        return entity_counter

    def export_to_rdf(
        self,
        rdf_file: str,
        batch_size: int,
        rdf_format: str = "nt",
    ) -> None:
        """
        Export clickstream logs to RDF files in batches.
        :param rdf_file: The base file name to write the RDF data to.
        :param batch_size: The number of clickstream records per file.
        :param rdf_format: The RDF serialization format to use (default is "nt").
        """
        # Namespace definition
        CS = Namespace(self.rdf_namespace)

        # Initialize variables
        file_counter = 1
        entity_counter = 1
        g = Graph()
        g.bind("cs", CS)

        # Create the directory if it doesn't exist
        os.makedirs(os.path.dirname(rdf_file), exist_ok=True)
        iterator = self.get_progress(self.clickstream_logs, desc="Export Progress")

        for log in iterator:
            for stream in log.clickStreams:
                entity_counter = self.add_stream_properties_to_graph(
                    g, CS, stream, entity_counter
                )

                # If batch size is reached, serialize and save to file
                if entity_counter % batch_size == 0:
                    self.serialize_batch(g, rdf_file, file_counter, rdf_format)
                    file_counter += 1
                    g = Graph()  # Reset the graph for the next batch
                    g.bind("cs", CS)

        # Serialize and save any remaining triples that didn't fill up the last batch
        if len(g):
            self.serialize_batch(g, rdf_file, file_counter, rdf_format)

    def reload_graph(self, rdf_file_pattern: str, rdf_format: str = "nt") -> Graph:
        """
        Reloads the RDF data from a batch of files into the clickstream logs.

        Args:
            rdf_file_pattern (str): The file pattern to search for RDF files.
                                    A wildcard '*' will be appended if not present.
            rdf_format (str): The RDF serialization format of the files (default is "nt").

        Returns:
            Graph: The RDF graph populated with data from the files.
        """
        # Ensure the pattern ends with a wildcard, append if necessary
        if not rdf_file_pattern.endswith("*"):
            rdf_file_pattern += "*"

        # Find all files matching the pattern
        rdf_files = glob.glob(rdf_file_pattern)

        # Initialize a new RDF graph
        g = Graph()

        # Use a progress bar if available or simply iterate over files
        try:
            iterator = self.get_progress(rdf_files, desc="Loading graph")
        except AttributeError:
            # If get_progress is not defined, fall back to simple iteration
            iterator = rdf_files

        for rdf_file in iterator:
            # Parse each RDF file and add it to the graph
            g.parse(rdf_file, format=rdf_format)

        # After loading all files, return the populated graph
        return g

__init__(root_path, rdf_namespace='http://cms.bitplan.com/clickstream#', show_progress=True, verbose=True)

Constructor

Parameters:

Name Type Description Default
root_path str

the root path

required
rdf_namespace str

The base namespace URI for the RDF export.

'http://cms.bitplan.com/clickstream#'
show_progress bool

If True, show progress.

True
verbose bool

If True, print the output message.

True
Source code in frontend/clickstream.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def __init__(
    self,
    root_path: str,
    rdf_namespace:str ="http://cms.bitplan.com/clickstream#",
    show_progress: bool = True,
    verbose: bool = True,
):
    """
    Constructor

    Args:
        root_path (str): the root path
        rdf_namespace (str): The base namespace URI for the RDF export.
        show_progress (bool): If True, show progress.
        verbose (bool): If True, print the output message.
    """
    self.root_path = root_path
    self.rdf_namespace = rdf_namespace
    self.clickstream_logs: List[ClickstreamLog] = []
    self.show_progress = show_progress
    self.verbose = verbose

add_stream_properties_to_graph(g, CS, stream, entity_counter)

Adds the properties of a clickstream to the RDF graph.

Parameters:

Name Type Description Default
g Graph

The graph to which the properties will be added.

required
CS Namespace

The namespace for clickstream data.

required
stream Any

The clickstream object containing the data.

required
entity_counter int

A counter for creating unique entities.

required

Returns:

Name Type Description
int int

The updated entity counter after adding the properties.

Source code in frontend/clickstream.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def add_stream_properties_to_graph(
    self, g: Graph, CS: Namespace, stream: Any, entity_counter: int
) -> int:
    """
    Adds the properties of a clickstream to the RDF graph.

    Args:
        g (Graph): The graph to which the properties will be added.
        CS (Namespace): The namespace for clickstream data.
        stream (Any): The clickstream object containing the data.
        entity_counter (int): A counter for creating unique entities.

    Returns:
        int: The updated entity counter after adding the properties.
    """
    stream_uri = URIRef(f"{CS}clickstream/{entity_counter}")
    entity_counter += 1

    # Add properties to the stream URI
    g.add((stream_uri, RDF.type, CS.ClickStream))
    g.add((stream_uri, CS.url, Literal(stream.url)))
    g.add((stream_uri, CS.ip, Literal(stream.ip)))
    g.add((stream_uri, CS.domain, Literal(stream.domain)))
    g.add((stream_uri, CS.userAgentHeader, Literal(stream.userAgentHeader)))
    g.add(
        (
            stream_uri,
            CS.timeStamp,
            Literal(stream.timeStamp.isoformat(), datatype=XSD.dateTime),
        )
    )

    # Optional referrer information
    if stream.referrer:
        g.add((stream_uri, CS.referrer, Literal(stream.referrer)))

    # User Agent details
    ua_uri = URIRef(f"{CS}useragent/{entity_counter}")
    entity_counter += 1
    g.add((ua_uri, RDF.type, CS.UserAgent))
    g.add((ua_uri, CS.hasSyntaxError, Literal(stream.userAgent.hasSyntaxError)))
    g.add((ua_uri, CS.hasAmbiguity, Literal(stream.userAgent.hasAmbiguity)))
    g.add((ua_uri, CS.ambiguityCount, Literal(stream.userAgent.ambiguityCount)))
    g.add((ua_uri, CS.userAgentString, Literal(stream.userAgent.userAgentString)))
    g.add((stream_uri, CS.userAgent, ua_uri))

    # Page Hits
    for hit in stream.pageHits:
        hit_uri = URIRef(f"{CS}pagehit/{entity_counter}")
        entity_counter += 1
        g.add((hit_uri, RDF.type, CS.PageHit))
        g.add((hit_uri, CS.path, Literal(hit.path)))
        g.add(
            (
                hit_uri,
                CS.timeStamp,
                Literal(hit.timeStamp.isoformat(), datatype=XSD.dateTime),
            )
        )
        g.add((stream_uri, CS.pageHits, hit_uri))

    return entity_counter

export_to_rdf(rdf_file, batch_size, rdf_format='nt')

Export clickstream logs to RDF files in batches. :param rdf_file: The base file name to write the RDF data to. :param batch_size: The number of clickstream records per file. :param rdf_format: The RDF serialization format to use (default is "nt").

Source code in frontend/clickstream.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def export_to_rdf(
    self,
    rdf_file: str,
    batch_size: int,
    rdf_format: str = "nt",
) -> None:
    """
    Export clickstream logs to RDF files in batches.
    :param rdf_file: The base file name to write the RDF data to.
    :param batch_size: The number of clickstream records per file.
    :param rdf_format: The RDF serialization format to use (default is "nt").
    """
    # Namespace definition
    CS = Namespace(self.rdf_namespace)

    # Initialize variables
    file_counter = 1
    entity_counter = 1
    g = Graph()
    g.bind("cs", CS)

    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(rdf_file), exist_ok=True)
    iterator = self.get_progress(self.clickstream_logs, desc="Export Progress")

    for log in iterator:
        for stream in log.clickStreams:
            entity_counter = self.add_stream_properties_to_graph(
                g, CS, stream, entity_counter
            )

            # If batch size is reached, serialize and save to file
            if entity_counter % batch_size == 0:
                self.serialize_batch(g, rdf_file, file_counter, rdf_format)
                file_counter += 1
                g = Graph()  # Reset the graph for the next batch
                g.bind("cs", CS)

    # Serialize and save any remaining triples that didn't fill up the last batch
    if len(g):
        self.serialize_batch(g, rdf_file, file_counter, rdf_format)

get_progress(iterable, desc='Processing')

Wrap an iterable with a progress bar if show_progress is True

Source code in frontend/clickstream.py
189
190
191
192
193
194
195
196
def get_progress(self, iterable, desc="Processing"):
    """
    Wrap an iterable with a progress bar if show_progress is True
    """
    if self.show_progress:
        return tqdm(iterable, desc=desc)
    else:
        return iterable

load_clickstream_logs(limit=None)

Load all clickstream logs from the directory

Source code in frontend/clickstream.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def load_clickstream_logs(self, limit: Optional[int] = None) -> None:
    """
    Load all clickstream logs from the directory
    """
    # Find all json files in the directory
    json_files = glob.glob(os.path.join(self.root_path, "*.json"))
    # If a limit is set, truncate the file list
    if limit is not None:
        json_files = json_files[:limit]

    # Prepare tqdm iterator if required and tqdm is available
    iterator = self.get_progress(json_files, desc="Loading Clickstream Logs")

    total_clickstreams = 0

    # Load each file

    for json_file in iterator:
        try:
            # Parse the JSON file into ClickstreamLog
            clickstream_log = ClickstreamLog.from_json(json_file)
            self.clickstream_logs.append(clickstream_log)
            total_clickstreams += len(
                clickstream_log.clickStreams
            )  # Count the clickstreams
        except json.JSONDecodeError as jde:
            # Handle JSON-specific parsing errors
            print(f"JSON decode error in file {json_file}: {jde.msg}")
            print(f"Error at line {jde.lineno}, column {jde.colno}")
        except Exception as e:
            tb = traceback.format_exc()  # This will give you the stack trace
            print(f"Error loading {json_file}: {e}")
            print(tb)  # Print stack trace to get more details about the exception
    # After importing, show the total counts
    total_logs = len(self.clickstream_logs)
    print(
        f"Imported {total_logs} clickstream logs with a total of {total_clickstreams} clickstreams."
    )

reload_graph(rdf_file_pattern, rdf_format='nt')

Reloads the RDF data from a batch of files into the clickstream logs.

Parameters:

Name Type Description Default
rdf_file_pattern str

The file pattern to search for RDF files. A wildcard '*' will be appended if not present.

required
rdf_format str

The RDF serialization format of the files (default is "nt").

'nt'

Returns:

Name Type Description
Graph Graph

The RDF graph populated with data from the files.

Source code in frontend/clickstream.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def reload_graph(self, rdf_file_pattern: str, rdf_format: str = "nt") -> Graph:
    """
    Reloads the RDF data from a batch of files into the clickstream logs.

    Args:
        rdf_file_pattern (str): The file pattern to search for RDF files.
                                A wildcard '*' will be appended if not present.
        rdf_format (str): The RDF serialization format of the files (default is "nt").

    Returns:
        Graph: The RDF graph populated with data from the files.
    """
    # Ensure the pattern ends with a wildcard, append if necessary
    if not rdf_file_pattern.endswith("*"):
        rdf_file_pattern += "*"

    # Find all files matching the pattern
    rdf_files = glob.glob(rdf_file_pattern)

    # Initialize a new RDF graph
    g = Graph()

    # Use a progress bar if available or simply iterate over files
    try:
        iterator = self.get_progress(rdf_files, desc="Loading graph")
    except AttributeError:
        # If get_progress is not defined, fall back to simple iteration
        iterator = rdf_files

    for rdf_file in iterator:
        # Parse each RDF file and add it to the graph
        g.parse(rdf_file, format=rdf_format)

    # After loading all files, return the populated graph
    return g

serialize_batch(g, rdf_file, file_counter, rdf_format)

Serializes a batch of RDF data to a file.

Parameters:

Name Type Description Default
g Graph

The RDF graph to serialize.

required
rdf_file str

The base name for the RDF file.

required
file_counter int

The current file count for naming.

required
rdf_format str

The format to serialize the RDF data.

required
Source code in frontend/clickstream.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def serialize_batch(
    self, g: Graph, rdf_file: str, file_counter: int, rdf_format: str
) -> None:
    """
    Serializes a batch of RDF data to a file.

    Args:
        g (Graph): The RDF graph to serialize.
        rdf_file (str): The base name for the RDF file.
        file_counter (int): The current file count for naming.
        rdf_format (str): The format to serialize the RDF data.

    """
    batch_file = f"{rdf_file}_part{file_counter:03}.{rdf_format}"
    g.serialize(destination=batch_file, format=rdf_format)
    if self.verbose:
        print(f"Exported RDF to {batch_file}")

DateParse

Source code in frontend/clickstream.py
19
20
21
22
23
24
25
26
27
28
29
30
class DateParse:
    @staticmethod
    def parse_date(date_str: str) -> datetime:
        """Parse a string to a datetime object.

        Args:
            date_str (str): The date string to parse.

        Returns:
            datetime: The parsed datetime object.
        """
        return datetime.strptime(date_str, "%b %d, %Y %I:%M:%S %p")

parse_date(date_str) staticmethod

Parse a string to a datetime object.

Parameters:

Name Type Description Default
date_str str

The date string to parse.

required

Returns:

Name Type Description
datetime datetime

The parsed datetime object.

Source code in frontend/clickstream.py
20
21
22
23
24
25
26
27
28
29
30
@staticmethod
def parse_date(date_str: str) -> datetime:
    """Parse a string to a datetime object.

    Args:
        date_str (str): The date string to parse.

    Returns:
        datetime: The parsed datetime object.
    """
    return datetime.strptime(date_str, "%b %d, %Y %I:%M:%S %p")

PageHit dataclass

Represents a single page hit with path and timestamp.

Source code in frontend/clickstream.py
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class PageHit:
    """Represents a single page hit with path and timestamp."""

    path: str
    timeStamp: datetime

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "PageHit":
        data["timeStamp"] = DateParse.parse_date(data["timeStamp"])
        return PageHit(**data)

UserAgent dataclass

Represents a user agent with syntax errors, ambiguity and other attributes.

Source code in frontend/clickstream.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class UserAgent:
    """Represents a user agent with syntax errors, ambiguity and other attributes."""

    hasSyntaxError: bool
    hasAmbiguity: bool
    ambiguityCount: int
    userAgentString: str
    debug: bool
    allFields: Dict[str, Dict[str, Any]]

    @staticmethod
    def from_dict(data: Dict[str, Any]) -> "UserAgent":
        allFields = data.get("allFields", {})

        # Use `.get()` with defaults to prevent KeyError
        return UserAgent(
            hasSyntaxError=data.get("hasSyntaxError", False),
            hasAmbiguity=data.get("hasAmbiguity", False),
            ambiguityCount=data.get("ambiguityCount", 0),
            userAgentString=data.get("userAgentString", ""),
            debug=data.get("debug", False),
            allFields=allFields,
        )

cmsmain

Created on 2022-11-24

@author: wf

CmsMain

Bases: WebserverCmd

ContentManagement System Main Program

Source code in frontend/cmsmain.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CmsMain(WebserverCmd):
    """
    ContentManagement System Main Program
    """

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        """
        override the default argparser call
        """
        parser = super().getArgParser(description, version_msg)
        parser.add_argument(
            "--sites", nargs="+", required=False, help="the sites to enable"
        )
        return parser

getArgParser(description, version_msg)

override the default argparser call

Source code in frontend/cmsmain.py
19
20
21
22
23
24
25
26
27
def getArgParser(self, description: str, version_msg) -> ArgumentParser:
    """
    override the default argparser call
    """
    parser = super().getArgParser(description, version_msg)
    parser.add_argument(
        "--sites", nargs="+", required=False, help="the sites to enable"
    )
    return parser

main(argv=None)

main call

Source code in frontend/cmsmain.py
30
31
32
33
34
35
36
def main(argv: list = None):
    """
    main call
    """
    cmd = CmsMain(config=CmsWebServer.get_config(), webserver_cls=CmsWebServer)
    exit_code = cmd.cmd_main(argv)
    return exit_code

family

Created on 2021-01-01

@author: wf

LocalWiki

Bases: object

a local Wiki

Source code in frontend/family.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class LocalWiki(object):
    """
    a local Wiki
    """

    def __init__(self, siteName: str, family=None, localSettings: str = None):
        """
        Constructor

        Args:
            siteName(str): the name of the site
            localSettings(str): path to the LocalSettings.php (if any)
        """
        self.siteName = siteName
        try:
            self.ip = socket.gethostbyname(self.siteName)
        except Exception:
            self.ip = "?"
            pass
        self.siteId = siteName.split(".")[0]
        self.family = family
        self.localSettings = localSettings
        if self.localSettings is None:
            self.settingLines = []
        else:
            with open(localSettings) as f:
                self.settingLines = f.readlines()
            self.logo = self.getSetting("wgLogo")
            self.database = self.getSetting("wgDBname")
            self.url = self.getSetting("wgServer")
            self.dbUser = self.getSetting("wgDBuser")
            self.dbPassword = self.getSetting("wgDBpassword")
            self.scriptPath = self.getSetting("wgScriptPath")
            if self.scriptPath is None:
                self.scriptPath = ""
            self.url = "%s%s/" % (self.url, self.scriptPath)
            self.statusCode = self.getStatusCode()

    def getStatusCode(self, timeout=0.5):
        """
        get the status Code for my url

        Args:
            timeout(float): the maximum time to wait for a response

        Returns:
            int: html statusCode or -1 if there was a timeout
        """
        statusCode = -1
        try:
            page = requests.get(self.url, verify=False, timeout=timeout)
            statusCode = page.status_code
        except Exception:
            pass
        return statusCode

    def getSetting(self, varName: str) -> str:
        """
        get the setting of the given variableName from the LocalSettings.php

        Args:
            varName(str): the name of the variable to return
        Returns:
            str: the value of the variable
        """
        pattern = r'[^#]*\$%s\s*=\s*"(.*)"' % varName
        for line in self.settingLines:
            m = re.match(pattern, line)
            if m:
                value = m.group(1)
                return value
        return None

    def getLogo(self) -> str:
        """
        get the local path to the logo file of this wiki

        Returns:
            str: the logo path if logo is defined as file else None
        """
        logoPath = self.logo
        # work around wgResourceBasePath
        logoPath = logoPath.replace("$wgResourceBasePath", "")
        logoPath = logoPath.replace("/images/%s/" % self.siteId, "/images/")
        if logoPath.startswith("/") and self.family:
            logoFile = "%s/%s%s" % (self.family.sitedir, self.siteName, logoPath)
        else:
            logoFile = None
        return logoFile

__init__(siteName, family=None, localSettings=None)

Constructor

Parameters:

Name Type Description Default
siteName(str)

the name of the site

required
localSettings(str)

path to the LocalSettings.php (if any)

required
Source code in frontend/family.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, siteName: str, family=None, localSettings: str = None):
    """
    Constructor

    Args:
        siteName(str): the name of the site
        localSettings(str): path to the LocalSettings.php (if any)
    """
    self.siteName = siteName
    try:
        self.ip = socket.gethostbyname(self.siteName)
    except Exception:
        self.ip = "?"
        pass
    self.siteId = siteName.split(".")[0]
    self.family = family
    self.localSettings = localSettings
    if self.localSettings is None:
        self.settingLines = []
    else:
        with open(localSettings) as f:
            self.settingLines = f.readlines()
        self.logo = self.getSetting("wgLogo")
        self.database = self.getSetting("wgDBname")
        self.url = self.getSetting("wgServer")
        self.dbUser = self.getSetting("wgDBuser")
        self.dbPassword = self.getSetting("wgDBpassword")
        self.scriptPath = self.getSetting("wgScriptPath")
        if self.scriptPath is None:
            self.scriptPath = ""
        self.url = "%s%s/" % (self.url, self.scriptPath)
        self.statusCode = self.getStatusCode()

get the local path to the logo file of this wiki

Returns:

Name Type Description
str str

the logo path if logo is defined as file else None

Source code in frontend/family.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def getLogo(self) -> str:
    """
    get the local path to the logo file of this wiki

    Returns:
        str: the logo path if logo is defined as file else None
    """
    logoPath = self.logo
    # work around wgResourceBasePath
    logoPath = logoPath.replace("$wgResourceBasePath", "")
    logoPath = logoPath.replace("/images/%s/" % self.siteId, "/images/")
    if logoPath.startswith("/") and self.family:
        logoFile = "%s/%s%s" % (self.family.sitedir, self.siteName, logoPath)
    else:
        logoFile = None
    return logoFile

getSetting(varName)

get the setting of the given variableName from the LocalSettings.php

Parameters:

Name Type Description Default
varName(str)

the name of the variable to return

required

Returns: str: the value of the variable

Source code in frontend/family.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def getSetting(self, varName: str) -> str:
    """
    get the setting of the given variableName from the LocalSettings.php

    Args:
        varName(str): the name of the variable to return
    Returns:
        str: the value of the variable
    """
    pattern = r'[^#]*\$%s\s*=\s*"(.*)"' % varName
    for line in self.settingLines:
        m = re.match(pattern, line)
        if m:
            value = m.group(1)
            return value
    return None

getStatusCode(timeout=0.5)

get the status Code for my url

Parameters:

Name Type Description Default
timeout(float)

the maximum time to wait for a response

required

Returns:

Name Type Description
int

html statusCode or -1 if there was a timeout

Source code in frontend/family.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def getStatusCode(self, timeout=0.5):
    """
    get the status Code for my url

    Args:
        timeout(float): the maximum time to wait for a response

    Returns:
        int: html statusCode or -1 if there was a timeout
    """
    statusCode = -1
    try:
        page = requests.get(self.url, verify=False, timeout=timeout)
        statusCode = page.status_code
    except Exception:
        pass
    return statusCode

WikiBackup

Bases: object

find out details about a WikiBackup

potentially this class needs to move upstream to py-3rdparty-MediaWiki

Source code in frontend/family.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class WikiBackup(object):
    """
    find out details about a WikiBackup

    potentially this class needs to move upstream to py-3rdparty-MediaWiki
    """

    def __init__(self, wikiuser):
        """
        constructor

        Arguments:
            wikiuser(WikiUser): the wikiuser to access this backup
        """
        self.wikiuser = wikiuser
        home = str(Path.home())
        self.backupPath = f"{home}/wikibackup/{wikiuser.wikiId}"
        self.gitPath = f"{self.backupPath}/.git"
        pass

    def exists(self) -> bool:
        """
        check if this Backup exists

        Returns:
            bool: True if the self.backupPath directory exists
        """
        return os.path.isdir(self.backupPath)

    def hasGit(self) -> bool:
        """
        check if this Backup has a local git repository

        Returns:
            bool: True if the self.gitPath directory exists
        """
        return os.path.isdir(self.gitPath)

__init__(wikiuser)

constructor

Parameters:

Name Type Description Default
wikiuser(WikiUser)

the wikiuser to access this backup

required
Source code in frontend/family.py
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(self, wikiuser):
    """
    constructor

    Arguments:
        wikiuser(WikiUser): the wikiuser to access this backup
    """
    self.wikiuser = wikiuser
    home = str(Path.home())
    self.backupPath = f"{home}/wikibackup/{wikiuser.wikiId}"
    self.gitPath = f"{self.backupPath}/.git"
    pass

exists()

check if this Backup exists

Returns:

Name Type Description
bool bool

True if the self.backupPath directory exists

Source code in frontend/family.py
125
126
127
128
129
130
131
132
def exists(self) -> bool:
    """
    check if this Backup exists

    Returns:
        bool: True if the self.backupPath directory exists
    """
    return os.path.isdir(self.backupPath)

hasGit()

check if this Backup has a local git repository

Returns:

Name Type Description
bool bool

True if the self.gitPath directory exists

Source code in frontend/family.py
134
135
136
137
138
139
140
141
def hasGit(self) -> bool:
    """
    check if this Backup has a local git repository

    Returns:
        bool: True if the self.gitPath directory exists
    """
    return os.path.isdir(self.gitPath)

WikiFamily

Bases: object

the wiki family found in the given site dir

Source code in frontend/family.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class WikiFamily(object):
    """
    the wiki family found in the given site dir
    """

    def __init__(self, sitedir: str = "/var/www/mediawiki/sites"):
        """
        constructor
        Args:
            sitedir(str): the path to the site definitions
            see http://wiki.bitplan.com/index.php/Wiki_Family
        """
        self.family = {}
        self.sitedir = sitedir
        if os.path.isdir(sitedir):
            for siteName in os.listdir(sitedir):
                lsettings = "%s/%s/LocalSettings.php" % (sitedir, siteName)
                if os.path.isfile(lsettings):
                    localWiki = LocalWiki(siteName, self, lsettings)
                    self.family[siteName] = localWiki

__init__(sitedir='/var/www/mediawiki/sites')

constructor Args: sitedir(str): the path to the site definitions see http://wiki.bitplan.com/index.php/Wiki_Family

Source code in frontend/family.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def __init__(self, sitedir: str = "/var/www/mediawiki/sites"):
    """
    constructor
    Args:
        sitedir(str): the path to the site definitions
        see http://wiki.bitplan.com/index.php/Wiki_Family
    """
    self.family = {}
    self.sitedir = sitedir
    if os.path.isdir(sitedir):
        for siteName in os.listdir(sitedir):
            lsettings = "%s/%s/LocalSettings.php" % (sitedir, siteName)
            if os.path.isfile(lsettings):
                localWiki = LocalWiki(siteName, self, lsettings)
                self.family[siteName] = localWiki

frame

HtmlFrame

A class to frame html content with a basic HTML document structure.

Attributes:

Name Type Description
lang str

Language of the HTML document.

title str

Title of the HTML document.

Source code in frontend/frame.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class HtmlFrame:
    """
    A class to frame html content with a basic HTML document structure.

    Attributes:
        lang (str): Language of the HTML document.
        title (str): Title of the HTML document.
    """

    def __init__(self, frontend, title: str, lang: str = "en") -> None:
        """
        Initialize HtmlFrame with a specified language and title.

        Args:
            title (str): Title for the HTML document.
            lang (str, optional): Language of the HTML document. Defaults to "en".
        """
        self.frontend = frontend
        self.lang = lang
        self.title = title

    def hamburger_menu(self) -> str:
        """
        Generate the HTML, CSS, and JavaScript for a hamburger menu.

        Returns:
            str: Hamburger menu HTML, CSS, and JavaScript.
        """
        menu_html = """
<!-- Hamburger Menu Start -->
<style>
  /* Basic styling */
  .menu { display: none; }
  .hamburger { cursor: pointer; }
  .hamburger:hover { opacity: 0.7; }

  /* Menu items layout */
  .menu ul { list-style-type: none; padding: 0; }
  .menu li { padding: 8px; background-color: #f0f0f0; margin-bottom: 5px; }

  /* Show the menu when .show class is added via JavaScript */
  .show { display: block; }
</style>

<!-- Hamburger Icon -->
<div class="hamburger" onclick="toggleMenu()">☰</div>

<!-- Menu Items -->
<div class="menu" id="mainMenu">
  <ul>
    <li><a href="#home">Home</a></li>
    <li><a href="#about">About</a></li>
    <li><a href="#services">Services</a></li>
    <li><a href="#contact">Contact</a></li>
  </ul>
</div>

<script>
  function toggleMenu() {
    var menu = document.getElementById("mainMenu");
    if (menu.classList.contains("show")) {
      menu.classList.remove("show");
    } else {
      menu.classList.add("show");
    }
  }
</script>
<!-- Hamburger Menu End -->
"""
        return menu_html

    def header(self) -> str:
        """
        Generate the header part of the HTML document.

        Returns:
            str: Header part of an HTML document as a string.
        """
        style_key = f"CMS/style"
        style_html = self.frontend.cms_pages.get(style_key, "")
        html = f"""<!doctype html>
<html lang="{self.lang}">
<head>
  <meta charset="utf-8"/>
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>{self.title}</title>
  {style_html}
</head>
<body>  
"""
        return html

    def footer(self) -> str:
        """
        Generate the footer part of the HTML document.

        Returns:
            str: Footer part of an HTML document as a string.
        """
        footer_key = f"CMS/footer/{self.lang}"
        footer_html = self.frontend.cms_pages.get(footer_key, "")
        html = f"""{footer_html}
  </body>
</html>
"""
        return html

    def frame(self, content: str) -> str:
        """
        Frame the given HTML content with the header and footer of the document.

        Args:
            content (str): HTML content to be framed within the HTML structure.

        Returns:
            str: Complete HTML document as a string with the provided content framed.
        """
        header_key = f"CMS/header/{self.lang}"
        header_html = self.frontend.cms_pages.get(header_key, "")
        html = f"""{self.header()}
{self.hamburger_menu()}  
{header_html}
      <div class="container">
{content}
      </div><!-- /.container -->
{self.footer()}"""
        return html

__init__(frontend, title, lang='en')

Initialize HtmlFrame with a specified language and title.

Parameters:

Name Type Description Default
title str

Title for the HTML document.

required
lang str

Language of the HTML document. Defaults to "en".

'en'
Source code in frontend/frame.py
10
11
12
13
14
15
16
17
18
19
20
def __init__(self, frontend, title: str, lang: str = "en") -> None:
    """
    Initialize HtmlFrame with a specified language and title.

    Args:
        title (str): Title for the HTML document.
        lang (str, optional): Language of the HTML document. Defaults to "en".
    """
    self.frontend = frontend
    self.lang = lang
    self.title = title

footer()

Generate the footer part of the HTML document.

Returns:

Name Type Description
str str

Footer part of an HTML document as a string.

Source code in frontend/frame.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
    def footer(self) -> str:
        """
        Generate the footer part of the HTML document.

        Returns:
            str: Footer part of an HTML document as a string.
        """
        footer_key = f"CMS/footer/{self.lang}"
        footer_html = self.frontend.cms_pages.get(footer_key, "")
        html = f"""{footer_html}
  </body>
</html>
"""
        return html

frame(content)

Frame the given HTML content with the header and footer of the document.

Parameters:

Name Type Description Default
content str

HTML content to be framed within the HTML structure.

required

Returns:

Name Type Description
str str

Complete HTML document as a string with the provided content framed.

Source code in frontend/frame.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    def frame(self, content: str) -> str:
        """
        Frame the given HTML content with the header and footer of the document.

        Args:
            content (str): HTML content to be framed within the HTML structure.

        Returns:
            str: Complete HTML document as a string with the provided content framed.
        """
        header_key = f"CMS/header/{self.lang}"
        header_html = self.frontend.cms_pages.get(header_key, "")
        html = f"""{self.header()}
{self.hamburger_menu()}  
{header_html}
      <div class="container">
{content}
      </div><!-- /.container -->
{self.footer()}"""
        return html

hamburger_menu()

Generate the HTML, CSS, and JavaScript for a hamburger menu.

Returns:

Name Type Description
str str

Hamburger menu HTML, CSS, and JavaScript.

Source code in frontend/frame.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    def hamburger_menu(self) -> str:
        """
        Generate the HTML, CSS, and JavaScript for a hamburger menu.

        Returns:
            str: Hamburger menu HTML, CSS, and JavaScript.
        """
        menu_html = """
<!-- Hamburger Menu Start -->
<style>
  /* Basic styling */
  .menu { display: none; }
  .hamburger { cursor: pointer; }
  .hamburger:hover { opacity: 0.7; }

  /* Menu items layout */
  .menu ul { list-style-type: none; padding: 0; }
  .menu li { padding: 8px; background-color: #f0f0f0; margin-bottom: 5px; }

  /* Show the menu when .show class is added via JavaScript */
  .show { display: block; }
</style>

<!-- Hamburger Icon -->
<div class="hamburger" onclick="toggleMenu()">☰</div>

<!-- Menu Items -->
<div class="menu" id="mainMenu">
  <ul>
    <li><a href="#home">Home</a></li>
    <li><a href="#about">About</a></li>
    <li><a href="#services">Services</a></li>
    <li><a href="#contact">Contact</a></li>
  </ul>
</div>

<script>
  function toggleMenu() {
    var menu = document.getElementById("mainMenu");
    if (menu.classList.contains("show")) {
      menu.classList.remove("show");
    } else {
      menu.classList.add("show");
    }
  }
</script>
<!-- Hamburger Menu End -->
"""
        return menu_html

header()

Generate the header part of the HTML document.

Returns:

Name Type Description
str str

Header part of an HTML document as a string.

Source code in frontend/frame.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    def header(self) -> str:
        """
        Generate the header part of the HTML document.

        Returns:
            str: Header part of an HTML document as a string.
        """
        style_key = f"CMS/style"
        style_html = self.frontend.cms_pages.get(style_key, "")
        html = f"""<!doctype html>
<html lang="{self.lang}">
<head>
  <meta charset="utf-8"/>
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>{self.title}</title>
  {style_html}
</head>
<body>  
"""
        return html

html_table

Created on 2022-10-25

@author: wf

HtmlTables

Bases: WebScrape

HtmlTables extractor

Source code in frontend/html_table.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class HtmlTables(WebScrape):
    """
    HtmlTables extractor
    """

    def __init__(self, url: str, debug=False, showHtml=False):
        """
        Constructor

        url(str): the url to read the tables from
        debug(bool): if True switch on debugging
        showHtml(bool): if True show the HTML retrieved
        """
        super().__init__(debug, showHtml)
        self.soup = super().getSoup(url, showHtml)

    def get_tables(self, header_tag: str = None) -> dict:
        """
        get all tables from my soup as a list of list of dicts

        Args:
            header_tag(str): if set search the table name from the given header tag

        Return:
            dict: the list of list of dicts for all tables

        """
        tables = {}
        for i, table in enumerate(self.soup.find_all("table")):
            fields = []
            table_data = []
            category = None
            for tr in table.find_all("tr", recursive=True):
                for th in tr.find_all("th", recursive=True):
                    if "colspan" in th.attrs:
                        category = th.text
                    else:
                        fields.append(th.text)
            for tr in table.find_all("tr", recursive=True):
                record = {}
                for i, td in enumerate(tr.find_all("td", recursive=True)):
                    record[fields[i]] = td.text
                if record:
                    if category:
                        record["category"] = category
                    table_data.append(record)
            if header_tag is not None:
                header = table.find_previous_sibling(header_tag)
                table_name = header.text
            else:
                table_name = f"table{i}"
            tables[table_name] = table_data
        return tables

__init__(url, debug=False, showHtml=False)

Constructor

url(str): the url to read the tables from debug(bool): if True switch on debugging showHtml(bool): if True show the HTML retrieved

Source code in frontend/html_table.py
14
15
16
17
18
19
20
21
22
23
def __init__(self, url: str, debug=False, showHtml=False):
    """
    Constructor

    url(str): the url to read the tables from
    debug(bool): if True switch on debugging
    showHtml(bool): if True show the HTML retrieved
    """
    super().__init__(debug, showHtml)
    self.soup = super().getSoup(url, showHtml)

get_tables(header_tag=None)

get all tables from my soup as a list of list of dicts

Parameters:

Name Type Description Default
header_tag(str)

if set search the table name from the given header tag

required
Return

dict: the list of list of dicts for all tables

Source code in frontend/html_table.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_tables(self, header_tag: str = None) -> dict:
    """
    get all tables from my soup as a list of list of dicts

    Args:
        header_tag(str): if set search the table name from the given header tag

    Return:
        dict: the list of list of dicts for all tables

    """
    tables = {}
    for i, table in enumerate(self.soup.find_all("table")):
        fields = []
        table_data = []
        category = None
        for tr in table.find_all("tr", recursive=True):
            for th in tr.find_all("th", recursive=True):
                if "colspan" in th.attrs:
                    category = th.text
                else:
                    fields.append(th.text)
        for tr in table.find_all("tr", recursive=True):
            record = {}
            for i, td in enumerate(tr.find_all("td", recursive=True)):
                record[fields[i]] = td.text
            if record:
                if category:
                    record["category"] = category
                table_data.append(record)
        if header_tag is not None:
            header = table.find_previous_sibling(header_tag)
            table_name = header.text
        else:
            table_name = f"table{i}"
        tables[table_name] = table_data
    return tables

server

Created on 2021-01-06

@author: wf

Server

Bases: JSONAble

a server that might serve multiple wikis for a wikiFarm

Source code in frontend/server.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class Server(JSONAble):
    """
    a server that might serve multiple wikis for a wikiFarm
    """

    homePath = None

    def __init__(self, debug=False):

        """
        Constructor

        Args:
            storePath(str): the path to load my configuration from (if any)
        """
        self.storage_secret = None
        self.frontendConfigs = None
        self.logo = "https://wiki.bitplan.com/images/wiki/6/63/Profiwikiicon.png"
        self.purpose = ""
        self.reinit(debug)

    def reinit(self, debug=False):
        """
        reinitialize me
        """
        self.debug = debug
        self.platform = platform
        self.uname = os.uname()
        self.name = self.uname[1]
        self.hostname = "?"
        self.ip = "127.0.0.1"
        try:
            self.hostname = socket.getfqdn()
            self.ip = socket.gethostbyname(self.hostname)
        except Exception as ex:
            if self.debug:
                print(str(ex))
            pass
        self.frontends = {}
        self.siteLookup = {}
        defaults = {"sqlBackupPath": "/var/backup/sqlbackup"}
        for key, value in defaults.items():
            if not hasattr(self, key):
                setattr(self, key, value)
        if Server.homePath is None:
            self.homePath = str(Path.home())
        else:
            self.homePath = Server.homePath

    def sqlGetDatabaseUrl(
        self, dbname: str, username: str, password: str, hostname: str = None
    ) -> str:
        """
        get the DatabaseUrl for the given database Name

        Args:
            dbname(str): the name of the database
            username(str): the username
            password(str): the password

        Returns:
            str: the url for sqlAlchemy in rfc1738 format e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db
        """
        # http://docs.sqlalchemy.org/en/latest/dialects/mysql.html
        if hostname is None:
            hostname = self.hostname
        url = "mysql+pymysql://%s:%s@%s/%s" % (username, password, hostname, dbname)
        return url

    def sqlDatabaseExist(
        self,
        dburl: str,
    ) -> bool:
        """
        check if the database with the given name exists


        Args:
            dburl(str): rfd 1738 formatted database url e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db

        Returns:
            True if the database exists, else False
        """
        dbExists = False
        try:
            dbExists = database_exists(dburl)
        except Exception:
            # bad luck
            pass
        return dbExists

    def sqlBackupStateAsHtml(self, dbName):
        """
        get the backup state of the given sql backup

        Args:
           dbName(str): the name of the database to check

        Returns:
            html: backup State html representation
        """
        backupState = self.sqlBackupState(dbName)
        mbSize = backupState["size"] / 1024 / 1024
        mdate = backupState["mdate"]
        isoDate = mdate.strftime("%Y-%m-%d %H:%M:%S") if mdate else ""
        html = "%s %s - %4d MB" % (
            self.stateSymbol(backupState["exists"]),
            isoDate,
            mbSize,
        )
        return html

    def sqlBackupState(self, dbName):
        """
        get the backup state of the given sql backup

        Args:
           dbName(str): the name of the database to check

        Returns:
            dict: backup State

        """
        fullBackup = "%s/today/%s_full.sql" % (self.sqlBackupPath, dbName)
        size = 0
        mdate = None
        exists = os.path.isfile(fullBackup)
        if exists:
            stat = os.stat(fullBackup)
            size = stat.st_size
            mtime = stat.st_mtime
            mdate = datetime.datetime.fromtimestamp(mtime)
        result = {"size": size, "exists": exists, "mdate": mdate}
        return result

    def enableFrontend(self, siteName: str, appWrap=None, debug: bool = False):
        """
        enable the given frontend

        Args:
            siteName(str): the siteName of the frontend to enable
            appWrap(appWrap): optional fb4 Application Wrapper
        Returns:
            Frontend: the configured frontend
        """
        if self.frontendConfigs is None:
            raise Exception("No frontend configurations loaded yet")
        if siteName not in self.siteLookup:
            raise Exception(f"frontend for site {siteName} not configured yet")
        frontend = Frontend(siteName)
        self.frontends[siteName] = frontend
        config = self.siteLookup[siteName]
        frontend.site.configure(config)
        frontend.site.debug = debug
        frontend.open(appWrap)
        return frontend
        pass

    def getFrontend(self, wikiId):
        """
        get the frontend for the given wikiid

        Args:
            wikiId(str): the wikiId to get the frontend for

        Returns:
            Frontend: the frontend for this wikiId
        """
        return self.frontends[wikiId]

    def load(self):
        """
        load my front end configurations
        """
        storePath = self.getStorePath()
        if os.path.isfile(storePath + ".json"):
            self.restoreFromJsonFile(storePath)
            self.reinit()
            for config in self.frontendConfigs:
                siteName = config["site"]
                self.siteLookup[siteName] = config
        pass

    def getStorePath(self, prefix: str = "serverConfig") -> str:
        """
        get the path where my store files are located
        Returns:
            path to .wikicms in the homedirectory of the current user
        """
        iniPath = self.homePath + "/.wikicms"
        if not os.path.isdir(iniPath):
            os.makedirs(iniPath)
        storePath = f"{iniPath}/{prefix}"
        return storePath

    def store(self):
        if self.frontends is not None:
            storePath = self.getStorePath()
            self.storeToJsonFile(storePath)

    def getPlatformLogo(self) -> str:
        """
        get the logo url for the platform this server runs on

        Returns:
            str: the url of the logo for the current operating system platform
        """
        logos = {
            "aix": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a0/IBM_AIX_logo.svg/200px-IBM_AIX_logo.svg.png",
            "cygwin": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/29/Cygwin_logo.svg/200px-Cygwin_logo.svg.png",
            "darwin": "https://upload.wikimedia.org/wikipedia/de/thumb/b/b1/MacOS-Logo.svg/200px-MacOS-Logo.svg.png",
            "linux": "https://upload.wikimedia.org/wikipedia/commons/a/af/Tux.png",
            "win32": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5f/Windows_logo_-_2012.svg/200px-Windows_logo_-_2012.svg.png",
            "unknown": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d7/Blue_question_mark.jpg/240px-Blue_question_mark.jpg",
        }
        if self.platform in logos:
            logo = logos[self.platform]
        else:
            logo = logos["unknown"]
        return logo

    def stateSymbol(self, b: bool) -> str:
        """
        return the symbol for the given boolean state b

        Args:
            b(bool): the state to return a symbol for

        Returns:
            ✅ for True and ❌ for false
        """
        symbol = "✅" if b else "❌"
        return symbol

    def checkApacheConfiguration(self, conf, status="enabled") -> str:
        """
        check the given apache configuration and return an indicator symbol

        Args:
            conf(str): the name of the apache configuration

        Returns:
            a state symbol
        """
        path = f"/etc/apache2/sites-{status}/{conf}.conf"
        confExists = os.path.isfile(path)
        stateSymbol = self.stateSymbol(confExists)
        return stateSymbol

    def asHtml(self, logo_size: int = 64) -> str:
        """
        render me as HTML code

        Args:
            logo_size(int): the logo_size to applyå
        """
        server = self
        logo_html = ""
        if server.logo is not None:
            logo_html = f"""<td><img src='{server.logo }' alt='{server.name} logo' height='{logo_size}' width='{logo_size}'></td>"""
        html = f"""<table>
<tr>
    <td><img src='{server.getPlatformLogo()}' alt='{server.platform} logo' height='{logo_size}' width='{logo_size}'></td>
    {logo_html}
    <td><span>Welcome to {server.name } ({ server.ip }) { server.purpose }</span><td>
</tr>
</table>
"""
        return html

__init__(debug=False)

Constructor

Parameters:

Name Type Description Default
storePath(str)

the path to load my configuration from (if any)

required
Source code in frontend/server.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, debug=False):

    """
    Constructor

    Args:
        storePath(str): the path to load my configuration from (if any)
    """
    self.storage_secret = None
    self.frontendConfigs = None
    self.logo = "https://wiki.bitplan.com/images/wiki/6/63/Profiwikiicon.png"
    self.purpose = ""
    self.reinit(debug)

asHtml(logo_size=64)

render me as HTML code

Parameters:

Name Type Description Default
logo_size(int)

the logo_size to applyå

required
Source code in frontend/server.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
    def asHtml(self, logo_size: int = 64) -> str:
        """
        render me as HTML code

        Args:
            logo_size(int): the logo_size to applyå
        """
        server = self
        logo_html = ""
        if server.logo is not None:
            logo_html = f"""<td><img src='{server.logo }' alt='{server.name} logo' height='{logo_size}' width='{logo_size}'></td>"""
        html = f"""<table>
<tr>
    <td><img src='{server.getPlatformLogo()}' alt='{server.platform} logo' height='{logo_size}' width='{logo_size}'></td>
    {logo_html}
    <td><span>Welcome to {server.name } ({ server.ip }) { server.purpose }</span><td>
</tr>
</table>
"""
        return html

checkApacheConfiguration(conf, status='enabled')

check the given apache configuration and return an indicator symbol

Parameters:

Name Type Description Default
conf(str)

the name of the apache configuration

required

Returns:

Type Description
str

a state symbol

Source code in frontend/server.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def checkApacheConfiguration(self, conf, status="enabled") -> str:
    """
    check the given apache configuration and return an indicator symbol

    Args:
        conf(str): the name of the apache configuration

    Returns:
        a state symbol
    """
    path = f"/etc/apache2/sites-{status}/{conf}.conf"
    confExists = os.path.isfile(path)
    stateSymbol = self.stateSymbol(confExists)
    return stateSymbol

enableFrontend(siteName, appWrap=None, debug=False)

enable the given frontend

Parameters:

Name Type Description Default
siteName(str)

the siteName of the frontend to enable

required
appWrap(appWrap)

optional fb4 Application Wrapper

required

Returns: Frontend: the configured frontend

Source code in frontend/server.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def enableFrontend(self, siteName: str, appWrap=None, debug: bool = False):
    """
    enable the given frontend

    Args:
        siteName(str): the siteName of the frontend to enable
        appWrap(appWrap): optional fb4 Application Wrapper
    Returns:
        Frontend: the configured frontend
    """
    if self.frontendConfigs is None:
        raise Exception("No frontend configurations loaded yet")
    if siteName not in self.siteLookup:
        raise Exception(f"frontend for site {siteName} not configured yet")
    frontend = Frontend(siteName)
    self.frontends[siteName] = frontend
    config = self.siteLookup[siteName]
    frontend.site.configure(config)
    frontend.site.debug = debug
    frontend.open(appWrap)
    return frontend
    pass

getFrontend(wikiId)

get the frontend for the given wikiid

Parameters:

Name Type Description Default
wikiId(str)

the wikiId to get the frontend for

required

Returns:

Name Type Description
Frontend

the frontend for this wikiId

Source code in frontend/server.py
176
177
178
179
180
181
182
183
184
185
186
def getFrontend(self, wikiId):
    """
    get the frontend for the given wikiid

    Args:
        wikiId(str): the wikiId to get the frontend for

    Returns:
        Frontend: the frontend for this wikiId
    """
    return self.frontends[wikiId]

get the logo url for the platform this server runs on

Returns:

Name Type Description
str str

the url of the logo for the current operating system platform

Source code in frontend/server.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def getPlatformLogo(self) -> str:
    """
    get the logo url for the platform this server runs on

    Returns:
        str: the url of the logo for the current operating system platform
    """
    logos = {
        "aix": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a0/IBM_AIX_logo.svg/200px-IBM_AIX_logo.svg.png",
        "cygwin": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/29/Cygwin_logo.svg/200px-Cygwin_logo.svg.png",
        "darwin": "https://upload.wikimedia.org/wikipedia/de/thumb/b/b1/MacOS-Logo.svg/200px-MacOS-Logo.svg.png",
        "linux": "https://upload.wikimedia.org/wikipedia/commons/a/af/Tux.png",
        "win32": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5f/Windows_logo_-_2012.svg/200px-Windows_logo_-_2012.svg.png",
        "unknown": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d7/Blue_question_mark.jpg/240px-Blue_question_mark.jpg",
    }
    if self.platform in logos:
        logo = logos[self.platform]
    else:
        logo = logos["unknown"]
    return logo

getStorePath(prefix='serverConfig')

get the path where my store files are located Returns: path to .wikicms in the homedirectory of the current user

Source code in frontend/server.py
201
202
203
204
205
206
207
208
209
210
211
def getStorePath(self, prefix: str = "serverConfig") -> str:
    """
    get the path where my store files are located
    Returns:
        path to .wikicms in the homedirectory of the current user
    """
    iniPath = self.homePath + "/.wikicms"
    if not os.path.isdir(iniPath):
        os.makedirs(iniPath)
    storePath = f"{iniPath}/{prefix}"
    return storePath

load()

load my front end configurations

Source code in frontend/server.py
188
189
190
191
192
193
194
195
196
197
198
199
def load(self):
    """
    load my front end configurations
    """
    storePath = self.getStorePath()
    if os.path.isfile(storePath + ".json"):
        self.restoreFromJsonFile(storePath)
        self.reinit()
        for config in self.frontendConfigs:
            siteName = config["site"]
            self.siteLookup[siteName] = config
    pass

reinit(debug=False)

reinitialize me

Source code in frontend/server.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def reinit(self, debug=False):
    """
    reinitialize me
    """
    self.debug = debug
    self.platform = platform
    self.uname = os.uname()
    self.name = self.uname[1]
    self.hostname = "?"
    self.ip = "127.0.0.1"
    try:
        self.hostname = socket.getfqdn()
        self.ip = socket.gethostbyname(self.hostname)
    except Exception as ex:
        if self.debug:
            print(str(ex))
        pass
    self.frontends = {}
    self.siteLookup = {}
    defaults = {"sqlBackupPath": "/var/backup/sqlbackup"}
    for key, value in defaults.items():
        if not hasattr(self, key):
            setattr(self, key, value)
    if Server.homePath is None:
        self.homePath = str(Path.home())
    else:
        self.homePath = Server.homePath

sqlBackupState(dbName)

get the backup state of the given sql backup

Parameters:

Name Type Description Default
dbName(str)

the name of the database to check

required

Returns:

Name Type Description
dict

backup State

Source code in frontend/server.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def sqlBackupState(self, dbName):
    """
    get the backup state of the given sql backup

    Args:
       dbName(str): the name of the database to check

    Returns:
        dict: backup State

    """
    fullBackup = "%s/today/%s_full.sql" % (self.sqlBackupPath, dbName)
    size = 0
    mdate = None
    exists = os.path.isfile(fullBackup)
    if exists:
        stat = os.stat(fullBackup)
        size = stat.st_size
        mtime = stat.st_mtime
        mdate = datetime.datetime.fromtimestamp(mtime)
    result = {"size": size, "exists": exists, "mdate": mdate}
    return result

sqlBackupStateAsHtml(dbName)

get the backup state of the given sql backup

Parameters:

Name Type Description Default
dbName(str)

the name of the database to check

required

Returns:

Name Type Description
html

backup State html representation

Source code in frontend/server.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def sqlBackupStateAsHtml(self, dbName):
    """
    get the backup state of the given sql backup

    Args:
       dbName(str): the name of the database to check

    Returns:
        html: backup State html representation
    """
    backupState = self.sqlBackupState(dbName)
    mbSize = backupState["size"] / 1024 / 1024
    mdate = backupState["mdate"]
    isoDate = mdate.strftime("%Y-%m-%d %H:%M:%S") if mdate else ""
    html = "%s %s - %4d MB" % (
        self.stateSymbol(backupState["exists"]),
        isoDate,
        mbSize,
    )
    return html

sqlDatabaseExist(dburl)

check if the database with the given name exists

Parameters:

Name Type Description Default
dburl(str)

rfd 1738 formatted database url e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db

required

Returns:

Type Description
bool

True if the database exists, else False

Source code in frontend/server.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def sqlDatabaseExist(
    self,
    dburl: str,
) -> bool:
    """
    check if the database with the given name exists


    Args:
        dburl(str): rfd 1738 formatted database url e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db

    Returns:
        True if the database exists, else False
    """
    dbExists = False
    try:
        dbExists = database_exists(dburl)
    except Exception:
        # bad luck
        pass
    return dbExists

sqlGetDatabaseUrl(dbname, username, password, hostname=None)

get the DatabaseUrl for the given database Name

Parameters:

Name Type Description Default
dbname(str)

the name of the database

required
username(str)

the username

required
password(str)

the password

required

Returns:

Name Type Description
str str

the url for sqlAlchemy in rfc1738 format e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db

Source code in frontend/server.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def sqlGetDatabaseUrl(
    self, dbname: str, username: str, password: str, hostname: str = None
) -> str:
    """
    get the DatabaseUrl for the given database Name

    Args:
        dbname(str): the name of the database
        username(str): the username
        password(str): the password

    Returns:
        str: the url for sqlAlchemy in rfc1738 format e.g. mysql://dt_admin:dt2016@localhost:3308/dreamteam_db
    """
    # http://docs.sqlalchemy.org/en/latest/dialects/mysql.html
    if hostname is None:
        hostname = self.hostname
    url = "mysql+pymysql://%s:%s@%s/%s" % (username, password, hostname, dbname)
    return url

stateSymbol(b)

return the symbol for the given boolean state b

Parameters:

Name Type Description Default
b(bool)

the state to return a symbol for

required

Returns:

Type Description
str

✅ for True and ❌ for false

Source code in frontend/server.py
239
240
241
242
243
244
245
246
247
248
249
250
def stateSymbol(self, b: bool) -> str:
    """
    return the symbol for the given boolean state b

    Args:
        b(bool): the state to return a symbol for

    Returns:
        ✅ for True and ❌ for false
    """
    symbol = "✅" if b else "❌"
    return symbol

site

Created on 2020-12-31

@author: wf

Site

Bases: object

migrated from: https://github.com/BITPlan/com.bitplan.wikifrontend/blob/master/src/main/java/com/bitplan/wikifrontend/Site.java

Source code in frontend/site.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Site(object):
    """
    migrated from:
    https://github.com/BITPlan/com.bitplan.wikifrontend/blob/master/src/main/java/com/bitplan/wikifrontend/Site.java
    """

    def __init__(
        self, name: str, defaultPage: str = "Main Page", lang: str = "en", debug=False
    ):
        """
        Constructor

        Args:
            name(str): the name of this site
            defaultPage(str): the default Page of this site
            lang(str): the default language of this site
            debug(bool): True if debug info should be given
        """
        self.name = name
        self.defaultPage = defaultPage
        self.lang = lang
        self.configured = False
        self.debug = debug

    def configure(self, config: dict):
        """
        configure me from the given configuration
        Args:
            config(dict): the configuration to use
        """
        self.wikiId = config["wikiId"]
        self.defaultPage = config["defaultPage"]
        self.configured = True

    def open(self, ws=None):
        """
        open this site

        Args:
             ws: Nicegui Webserver
        """
        if not self.configured:
            raise Exception("need to configure site before opening it")
        self.ws = ws

__init__(name, defaultPage='Main Page', lang='en', debug=False)

Constructor

Parameters:

Name Type Description Default
name(str)

the name of this site

required
defaultPage(str)

the default Page of this site

required
lang(str)

the default language of this site

required
debug(bool)

True if debug info should be given

required
Source code in frontend/site.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self, name: str, defaultPage: str = "Main Page", lang: str = "en", debug=False
):
    """
    Constructor

    Args:
        name(str): the name of this site
        defaultPage(str): the default Page of this site
        lang(str): the default language of this site
        debug(bool): True if debug info should be given
    """
    self.name = name
    self.defaultPage = defaultPage
    self.lang = lang
    self.configured = False
    self.debug = debug

configure(config)

configure me from the given configuration Args: config(dict): the configuration to use

Source code in frontend/site.py
32
33
34
35
36
37
38
39
40
def configure(self, config: dict):
    """
    configure me from the given configuration
    Args:
        config(dict): the configuration to use
    """
    self.wikiId = config["wikiId"]
    self.defaultPage = config["defaultPage"]
    self.configured = True

open(ws=None)

open this site

Parameters:

Name Type Description Default
ws

Nicegui Webserver

None
Source code in frontend/site.py
42
43
44
45
46
47
48
49
50
51
def open(self, ws=None):
    """
    open this site

    Args:
         ws: Nicegui Webserver
    """
    if not self.configured:
        raise Exception("need to configure site before opening it")
    self.ws = ws

version

Created on 2022-12-03

@author: wf

Version dataclass

Bases: object

Version handling for pyWikiCMS

Source code in frontend/version.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@dataclass
class Version(object):
    """
    Version handling for pyWikiCMS
    """

    name = "pyWikiCMS"
    description = "pyWikiCMS: python implementation of a Mediawiki based Content Management System"
    version = frontend.__version__
    date = "2022-11-16"
    updated = "2024-04-17"
    authors = "Wolfgang Fahl"
    doc_url = "http://wiki.bitplan.com/index.php/PyWikiCMS"
    chat_url = "https://github.com/BITPlan/pyWikiCMS/discussions"
    cm_url = "https://github.com/BITPlan/pyWikiCMS"
    license = f"""Copyright 2022-2024 contributors. All rights reserved.
  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0
  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""
    longDescription = f"""{name} version {version}
{description}
  Created by {authors} on {date} last updated {updated}"""

webscrape

Created on 2020-08-20

@author: wf

WebScrape

Bases: object

WebScraper

Source code in frontend/webscrape.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class WebScrape(object):
    """
    WebScraper
    """

    def __init__(self, debug=False, showHtml=False):
        """
        Constructor
        """
        self.err = None
        self.valid = False
        self.debug = debug
        self.showHtml = showHtml

    def getSoup(self, url, showHtml:bool):
        """
        get the beautiful Soup parser

        Args:
           showHtml(bool): True if the html code should be pretty printed and shown
        """
        req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        html = urlopen(req).read()
        soup = BeautifulSoup(html, "html.parser", from_encoding="utf-8")
        if showHtml:
            self.printPrettyHtml(soup)

        return soup

    def printPrettyHtml(self, soup):
        """
        print the prettified html for the given soup

        Args:
            soup(BeuatifulSoup): the parsed html to print
        """
        prettyHtml = soup.prettify()
        print(prettyHtml)

__init__(debug=False, showHtml=False)

Constructor

Source code in frontend/webscrape.py
16
17
18
19
20
21
22
23
def __init__(self, debug=False, showHtml=False):
    """
    Constructor
    """
    self.err = None
    self.valid = False
    self.debug = debug
    self.showHtml = showHtml

getSoup(url, showHtml)

get the beautiful Soup parser

Parameters:

Name Type Description Default
showHtml(bool)

True if the html code should be pretty printed and shown

required
Source code in frontend/webscrape.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def getSoup(self, url, showHtml:bool):
    """
    get the beautiful Soup parser

    Args:
       showHtml(bool): True if the html code should be pretty printed and shown
    """
    req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
    html = urlopen(req).read()
    soup = BeautifulSoup(html, "html.parser", from_encoding="utf-8")
    if showHtml:
        self.printPrettyHtml(soup)

    return soup

printPrettyHtml(soup)

print the prettified html for the given soup

Parameters:

Name Type Description Default
soup(BeuatifulSoup)

the parsed html to print

required
Source code in frontend/webscrape.py
40
41
42
43
44
45
46
47
48
def printPrettyHtml(self, soup):
    """
    print the prettified html for the given soup

    Args:
        soup(BeuatifulSoup): the parsed html to print
    """
    prettyHtml = soup.prettify()
    print(prettyHtml)

webserver

Created on 2020-12-30

@author: wf

CmsSolution

Bases: InputWebSolution

Content management solution

Source code in frontend/webserver.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class CmsSolution(InputWebSolution):
    """
    Content management solution
    """

    def __init__(self, webserver: CmsWebServer, client: Client):
        """
        Initialize the solution

        Calls the constructor of the base solution
        Args:
            webserver (Cms    WebServer): The webserver instance associated with this context.
            client (Client): The client instance this context is associated with.
        """
        super().__init__(webserver, client)  # Call to the superclass constructor
        self.wiki_grid = WikiGrid(self)
        self.server = webserver.server

    def configure_menu(self):
        """
        configure specific menu entries
        """
        username = app.storage.user.get("username", "?")
        ui.label(username)

    async def home(self):
        """
        provide the main content page

        """

        def show():
            with self.content_div:
                self.server_html = ui.html(self.server.asHtml())
                self.wiki_grid.setup()

        await self.setup_content_div(show)

__init__(webserver, client)

Initialize the solution

Calls the constructor of the base solution Args: webserver (Cms WebServer): The webserver instance associated with this context. client (Client): The client instance this context is associated with.

Source code in frontend/webserver.py
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, webserver: CmsWebServer, client: Client):
    """
    Initialize the solution

    Calls the constructor of the base solution
    Args:
        webserver (Cms    WebServer): The webserver instance associated with this context.
        client (Client): The client instance this context is associated with.
    """
    super().__init__(webserver, client)  # Call to the superclass constructor
    self.wiki_grid = WikiGrid(self)
    self.server = webserver.server

configure_menu()

configure specific menu entries

Source code in frontend/webserver.py
138
139
140
141
142
143
def configure_menu(self):
    """
    configure specific menu entries
    """
    username = app.storage.user.get("username", "?")
    ui.label(username)

home() async

provide the main content page

Source code in frontend/webserver.py
145
146
147
148
149
150
151
152
153
154
155
156
async def home(self):
    """
    provide the main content page

    """

    def show():
        with self.content_div:
            self.server_html = ui.html(self.server.asHtml())
            self.wiki_grid.setup()

    await self.setup_content_div(show)

CmsWebServer

Bases: InputWebserver

WebServer class that manages the server

Source code in frontend/webserver.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class CmsWebServer(InputWebserver):
    """
    WebServer class that manages the server

    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        copy_right = "(c)2023-2024 Wolfgang Fahl"
        config = WebserverConfig(
            copy_right=copy_right,
            version=Version(),
            default_port=8252,
            short_name="wikicms",
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = CmsSolution
        return server_config

    def __init__(self):
        """
        constructor

        """
        InputWebserver.__init__(self, config=CmsWebServer.get_config())
        users = Users("~/.wikicms/")
        self.login = Login(self, users)
        self.server = Server()
        self.server.load()
        self.enabledSites = ["admin"]

        # @ui.page("/login")
        # async def login(client: Client):
        #    return await self.page(
        #        client,CmsSolution.login
        #    )

        # @ui.page("/wikis")
        # async def wikis(client: Client):
        #    if not self.login.authenticated():
        #        return RedirectResponse("/login")
        #    return await self.wikis()

        @app.get("/{frontend_name}/{page_path:path}")
        def render_path(frontend_name: str, page_path: str) -> HTMLResponse:
            """
            Handles a GET request to render the path of the given frontend.

            Args:
                frontend_name: The name of the frontend to be rendered.
                page_path: The specific path within the frontend to be rendered.

            Returns:
                An HTMLResponse containing the rendered page content.

            """
            return self.render_path(frontend_name, page_path)

    def render_path(self, frontend_name: str, page_path: str):
        """
        Renders the content for a specific path of the given frontend.

        Args:
            frontend_name: The name of the frontend to be rendered.
            page_path: The specific path within the frontend to be rendered.

        Returns:
            An HTMLResponse containing the rendered page content or an error page if something goes wrong.

        Raises:
            SomeException: If an error occurs during page content retrieval or rendering.

        """
        frontend = self.server.frontends.get(frontend_name, None)
        if frontend is None:
            raise HTTPException(
                status_code=404, detail=f"frontend {frontend_name} is not available"
            )
        response = frontend.get_path_response(f"/{page_path}")
        return response

    def enableSites(self, siteNames):
        """
        enable the sites given in the sites list
        Args:
            siteNames(list): a list of strings with wikiIds to be enabled
        """
        if siteNames is None:
            return
        for siteName in siteNames:
            self.server.enableFrontend(siteName, self)
            self.enabledSites.append(siteName)

    def configure_run(self):
        """
        configure command line specific details
        """
        InputWebserver.configure_run(self)
        self.enableSites(self.args.sites)

__init__()

constructor

Source code in frontend/webserver.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self):
    """
    constructor

    """
    InputWebserver.__init__(self, config=CmsWebServer.get_config())
    users = Users("~/.wikicms/")
    self.login = Login(self, users)
    self.server = Server()
    self.server.load()
    self.enabledSites = ["admin"]

    # @ui.page("/login")
    # async def login(client: Client):
    #    return await self.page(
    #        client,CmsSolution.login
    #    )

    # @ui.page("/wikis")
    # async def wikis(client: Client):
    #    if not self.login.authenticated():
    #        return RedirectResponse("/login")
    #    return await self.wikis()

    @app.get("/{frontend_name}/{page_path:path}")
    def render_path(frontend_name: str, page_path: str) -> HTMLResponse:
        """
        Handles a GET request to render the path of the given frontend.

        Args:
            frontend_name: The name of the frontend to be rendered.
            page_path: The specific path within the frontend to be rendered.

        Returns:
            An HTMLResponse containing the rendered page content.

        """
        return self.render_path(frontend_name, page_path)

configure_run()

configure command line specific details

Source code in frontend/webserver.py
112
113
114
115
116
117
def configure_run(self):
    """
    configure command line specific details
    """
    InputWebserver.configure_run(self)
    self.enableSites(self.args.sites)

enableSites(siteNames)

enable the sites given in the sites list Args: siteNames(list): a list of strings with wikiIds to be enabled

Source code in frontend/webserver.py
100
101
102
103
104
105
106
107
108
109
110
def enableSites(self, siteNames):
    """
    enable the sites given in the sites list
    Args:
        siteNames(list): a list of strings with wikiIds to be enabled
    """
    if siteNames is None:
        return
    for siteName in siteNames:
        self.server.enableFrontend(siteName, self)
        self.enabledSites.append(siteName)

render_path(frontend_name, page_path)

Renders the content for a specific path of the given frontend.

Parameters:

Name Type Description Default
frontend_name str

The name of the frontend to be rendered.

required
page_path str

The specific path within the frontend to be rendered.

required

Returns:

Type Description

An HTMLResponse containing the rendered page content or an error page if something goes wrong.

Raises:

Type Description
SomeException

If an error occurs during page content retrieval or rendering.

Source code in frontend/webserver.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def render_path(self, frontend_name: str, page_path: str):
    """
    Renders the content for a specific path of the given frontend.

    Args:
        frontend_name: The name of the frontend to be rendered.
        page_path: The specific path within the frontend to be rendered.

    Returns:
        An HTMLResponse containing the rendered page content or an error page if something goes wrong.

    Raises:
        SomeException: If an error occurs during page content retrieval or rendering.

    """
    frontend = self.server.frontends.get(frontend_name, None)
    if frontend is None:
        raise HTTPException(
            status_code=404, detail=f"frontend {frontend_name} is not available"
        )
    response = frontend.get_path_response(f"/{page_path}")
    return response

wikicms

Created on 2020-07-27

@author: wf

Frontend

Bases: object

Wiki Content Management System Frontend

Source code in frontend/wikicms.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class Frontend(object):
    """
    Wiki Content Management System Frontend
    """

    def __init__(
        self,
        site_name: str,
        parser: str = "lxml",
        proxy_prefixes=["/images/", "/videos"],
        debug: bool = False,
        filterKeys=None,
    ):
        """
        Constructor
        Args:
            site_name(str): the name of the site this frontend is for
            parser(str): the beautiful soup parser to use e.g. html.parser
            proxy_prefixes(list): the list of prefixes that need direct proxy access
            debug: (bool): True if debugging should be on
            filterKeys: (list): a list of keys for filters to be applied e.g. editsection
        """
        self.name = site_name
        self.parser = parser
        self.proxy_prefixes = proxy_prefixes
        self.site = Site(site_name)
        self.debug = debug
        self.wiki = None
        if filterKeys is None:
            self.filterKeys = ["editsection", "parser-output", "parser-output"]
        else:
            self.filterKeys = []

    def log(self, msg:str):
        """
        log the given message if debugging is true

        Args:
            msg (str): the message to log
        """
        if self.debug:
            print(msg, flush=True)

    @staticmethod
    def extract_site_and_path(path:str):
        """
        Splits the given path into the site component and the remaining path.

        This static method assumes that the 'site' is the first element of the
        path when split by "/", and the 'path' is the rest of the string after
        the site.

        Parameters:
        path (str): The complete path to split.

        Returns:
        tuple: A tuple where the first element is the site and the second
               element is the subsequent path.
        """
        # Check if the path is empty or does not contain a "/"
        if not path or "/" not in path:
            return "", path

        # Split the path into parts using the "/" as a separator
        parts = path.split("/")

        # The first part is the site, the rest is joined back into a path
        site = parts[0]
        remaining_path = "/" + "/".join(parts[1:])

        return site, remaining_path

    def open(self, ws=None):
        """
        open the frontend

        Args:
             ws: optional Nicegui webserver
        """
        self.ws = ws
        if self.wiki is None:
            self.wiki = WikiClient.ofWikiId(self.site.wikiId)
            self.wiki.login()
            self.smwclient = SMWClient(self.wiki.getSite())
            self.site.open(ws)
            self.cms_pages = self.get_cms_pages()

    def get_cms_pages(self) -> dict:
        """
        get the Content Management elements for this site
        """
        cms_pages = {}
        ask_query = "[[Category:CMS]]"
        page_records = self.smwclient.query(ask_query, "cms pages")
        for page_title in list(page_records):
            page_title, html, error = self.getContent(page_title)
            if not error:
                cms_pages[page_title] = html
        return cms_pages

    def errMsg(self, ex):
        if self.debug:
            msg = "%s\n%s" % (repr(ex), traceback.format_exc())
        else:
            msg = repr(ex)
        return msg

    def wikiPage(self, pagePath: str) -> str:
        """
        Get the wiki page for the given page path.

        Args:
            pagePath (str): The path of the page.

        Returns:
            str: The title of the page.
        """
        if "/index.php/" in pagePath:
            wikipage = pagePath.replace("/index.php/", "")
        elif pagePath.startswith("/"):
            wikipage = pagePath[1:]
        else:
            wikipage = pagePath
        return wikipage

    def checkPath(self, pagePath:str)->str:
        """
        check the given pathPath

        Args:
            pagePath (str): the page Path to check

        Returns:
            str: None or an error message with the illegal chars being used
        """
        error = None
        self.log(pagePath)
        illegalChars = ["{", "}", "<", ">", "[", "]", "|"]
        for illegalChar in illegalChars:
            if illegalChar in pagePath:
                error = "invalid char %s in given pagePath " % (illegalChar)
        return error

    def needsProxy(self, path:str) -> bool:
        """
        Args:
            path (str): the path to check

        Returns:
            bool: True if this path needs to be proxied
        """
        needs_proxy = False
        for prefix in self.proxy_prefixes:
            needs_proxy = needs_proxy or path.startswith(prefix)
        return needs_proxy

    def proxy(self, path: str) -> str:
        """
        Proxy a request.
        See https://stackoverflow.com/a/50231825/1497139

        Args:
            path (str): the path to proxy

        Returns:
            the proxied result as a string
        """
        wikiUser = self.wiki.wikiUser
        url = f"{wikiUser.url}{wikiUser.scriptPath}{path}"

        # Get the response
        response = requests.get(url)

        return response

    def filter(self, html: str) -> str:
        """
        filter the given html
        """
        return self.doFilter(html, self.filterKeys)

    def fixNode(self, node, attribute, prefix, delim=None):
        """
        fix the given node

        node (BeautifulSoup): the node
        attribute (str): the name of the attribute e.g. "href", "src"
        prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs"
        delim (str): if not None the delimiter for multiple values
        """
        siteprefix = f"/{self.site.name}{prefix}"
        if attribute in node.attrs:
            attrval = node.attrs[attribute]
            if delim is not None:
                vals = attrval.split(delim)
            else:
                vals = [attrval]
                delim = ""
            newvals = []
            for val in vals:
                if val.startswith(prefix):
                    newvals.append(val.replace(prefix, siteprefix, 1))
                else:
                    newvals.append(val)
            if delim is not None:
                node.attrs[attribute] = delim.join(newvals)

    def fix_images_and_videos(self, soup):
        """
        fix image and video entries in the source code
        """
        for img in soup.findAll("img"):
            self.fixNode(img, "src", "/")
            self.fixNode(img, "srcset", "/", ", ")
        for video in soup.findAll("video"):
            for source in video.findAll("source"):
                self.fixNode(source, "src", "/")

    def fixHtml(self, soup):
        """
        fix the HTML in the given soup

        Args:
            soup(BeautifulSoup): the html parser
        """
        self.fix_images_and_videos(soup)
        # fix absolute hrefs
        for a in soup.findAll("a"):
            self.fixNode(a, "href", "/")
        return soup

    def unwrap(self, soup) -> str:
        """
        unwrap the soup
        """
        html = str(soup)
        html = html.replace("<html><body>", "")
        html = html.replace("</body></html>", "")
        # Remove  empty paragraphs
        html = re.sub(r'<p class="mw-empty-elt">\s*</p>', "", html)

        # Replace multiple newline characters with a single newline character
        html = re.sub(r"\n\s*\n", "\n", html)
        return html

    def doFilter(self, html, filterKeys):
        # https://stackoverflow.com/questions/5598524/can-i-remove-script-tags-with-beautifulsoup
        soup = BeautifulSoup(html, self.parser)
        if "parser-output" in filterKeys:
            parserdiv = soup.find("div", {"class": "mw-parser-output"})
            if parserdiv:
                soup = parserdiv
                inner_html = parserdiv.decode_contents()
                # Parse the inner HTML string to create a new BeautifulSoup object
                soup = BeautifulSoup(inner_html, self.parser)
                pass
        # https://stackoverflow.com/questions/5041008/how-to-find-elements-by-class
        if "editsection" in filterKeys:
            for s in soup.select("span.mw-editsection"):
                s.extract()
        for comments in soup.findAll(text=lambda text: isinstance(text, Comment)):
            comments.extract()
        return soup

    def getContent(self, pagePath: str):
        """get the content for the given pagePath
        Args:
            pagePath(str): the pagePath
            whatToFilter(list): list of filter keys
        Returns:
            str: the HTML content for the given path
        """
        content = None
        error = None
        pageTitle = "?"
        try:
            if pagePath == "/":
                pageTitle = self.site.defaultPage
            else:
                error = self.checkPath(pagePath)
                pageTitle = self.wikiPage(pagePath)
            if error is None:
                if self.wiki is None:
                    raise Exception(
                        "getContent without wiki - you might want to call open first"
                    )
                content = self.wiki.getHtml(pageTitle)
                soup = self.filter(content)
                soup = self.fixHtml(soup)
                content = self.unwrap(soup)
        except Exception as e:
            error = self.errMsg(e)
        return pageTitle, content, error

    def toReveal(self, html):
        """
        convert the given html to reveal
        """
        soup = BeautifulSoup(html, "lxml")
        for h2 in soup.findChildren(recursive=True):
            if h2.name == "h2":
                span = h2.next_element
                if span.name == "span":
                    tagid = span.get("id")
                    if tagid.startswith("⌘⌘"):
                        section = soup.new_tag("section")
                        h2.parent.append(section)
                        section.insert(0, h2)
                        tag = h2.next_element
                        while tag is not None and tag.name != "h2":
                            if tag.parent != h2:
                                section.append(tag)
                            tag = tag.next_element
        html = self.unwrap(soup)
        return html

    def get_path_response(self, path: str) -> str:
        """
        get the repsonse for the the given path

        Args:
            path(str): the path to render the content for

        Returns:
            Response: a FastAPI response
        """
        if self.needsProxy(path):
            html_response = self.proxy(path)
            # Create a FastAPI response object
            response = Response(
                content=html_response.content,
                status_code=html_response.status_code,
                headers=dict(html_response.headers),
            )
        else:
            page_title, content, error = self.getContent(path)
            frame = HtmlFrame(self, title=page_title)
            html = content
            if error:
                html = f"error getting {page_title} for {self.name}:<br>{error}"
            else:
                if "<slideshow" in html or "&lt;slideshow" in html:
                    content = self.toReveal(content)
                    html = content
            framed_html = frame.frame(html)
            response = HTMLResponse(framed_html)
        return response

__init__(site_name, parser='lxml', proxy_prefixes=['/images/', '/videos'], debug=False, filterKeys=None)

Constructor Args: site_name(str): the name of the site this frontend is for parser(str): the beautiful soup parser to use e.g. html.parser proxy_prefixes(list): the list of prefixes that need direct proxy access debug: (bool): True if debugging should be on filterKeys: (list): a list of keys for filters to be applied e.g. editsection

Source code in frontend/wikicms.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    site_name: str,
    parser: str = "lxml",
    proxy_prefixes=["/images/", "/videos"],
    debug: bool = False,
    filterKeys=None,
):
    """
    Constructor
    Args:
        site_name(str): the name of the site this frontend is for
        parser(str): the beautiful soup parser to use e.g. html.parser
        proxy_prefixes(list): the list of prefixes that need direct proxy access
        debug: (bool): True if debugging should be on
        filterKeys: (list): a list of keys for filters to be applied e.g. editsection
    """
    self.name = site_name
    self.parser = parser
    self.proxy_prefixes = proxy_prefixes
    self.site = Site(site_name)
    self.debug = debug
    self.wiki = None
    if filterKeys is None:
        self.filterKeys = ["editsection", "parser-output", "parser-output"]
    else:
        self.filterKeys = []

checkPath(pagePath)

check the given pathPath

Parameters:

Name Type Description Default
pagePath str

the page Path to check

required

Returns:

Name Type Description
str str

None or an error message with the illegal chars being used

Source code in frontend/wikicms.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def checkPath(self, pagePath:str)->str:
    """
    check the given pathPath

    Args:
        pagePath (str): the page Path to check

    Returns:
        str: None or an error message with the illegal chars being used
    """
    error = None
    self.log(pagePath)
    illegalChars = ["{", "}", "<", ">", "[", "]", "|"]
    for illegalChar in illegalChars:
        if illegalChar in pagePath:
            error = "invalid char %s in given pagePath " % (illegalChar)
    return error

extract_site_and_path(path) staticmethod

Splits the given path into the site component and the remaining path.

This static method assumes that the 'site' is the first element of the path when split by "/", and the 'path' is the rest of the string after the site.

Parameters: path (str): The complete path to split.

tuple: A tuple where the first element is the site and the second element is the subsequent path.

Source code in frontend/wikicms.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@staticmethod
def extract_site_and_path(path:str):
    """
    Splits the given path into the site component and the remaining path.

    This static method assumes that the 'site' is the first element of the
    path when split by "/", and the 'path' is the rest of the string after
    the site.

    Parameters:
    path (str): The complete path to split.

    Returns:
    tuple: A tuple where the first element is the site and the second
           element is the subsequent path.
    """
    # Check if the path is empty or does not contain a "/"
    if not path or "/" not in path:
        return "", path

    # Split the path into parts using the "/" as a separator
    parts = path.split("/")

    # The first part is the site, the rest is joined back into a path
    site = parts[0]
    remaining_path = "/" + "/".join(parts[1:])

    return site, remaining_path

filter(html)

filter the given html

Source code in frontend/wikicms.py
195
196
197
198
199
def filter(self, html: str) -> str:
    """
    filter the given html
    """
    return self.doFilter(html, self.filterKeys)

fixHtml(soup)

fix the HTML in the given soup

Parameters:

Name Type Description Default
soup(BeautifulSoup)

the html parser

required
Source code in frontend/wikicms.py
238
239
240
241
242
243
244
245
246
247
248
249
def fixHtml(self, soup):
    """
    fix the HTML in the given soup

    Args:
        soup(BeautifulSoup): the html parser
    """
    self.fix_images_and_videos(soup)
    # fix absolute hrefs
    for a in soup.findAll("a"):
        self.fixNode(a, "href", "/")
    return soup

fixNode(node, attribute, prefix, delim=None)

fix the given node

node (BeautifulSoup): the node attribute (str): the name of the attribute e.g. "href", "src" prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs" delim (str): if not None the delimiter for multiple values

Source code in frontend/wikicms.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def fixNode(self, node, attribute, prefix, delim=None):
    """
    fix the given node

    node (BeautifulSoup): the node
    attribute (str): the name of the attribute e.g. "href", "src"
    prefix (str): the prefix to replace e.g. "/", "/images", "/thumbs"
    delim (str): if not None the delimiter for multiple values
    """
    siteprefix = f"/{self.site.name}{prefix}"
    if attribute in node.attrs:
        attrval = node.attrs[attribute]
        if delim is not None:
            vals = attrval.split(delim)
        else:
            vals = [attrval]
            delim = ""
        newvals = []
        for val in vals:
            if val.startswith(prefix):
                newvals.append(val.replace(prefix, siteprefix, 1))
            else:
                newvals.append(val)
        if delim is not None:
            node.attrs[attribute] = delim.join(newvals)

fix_images_and_videos(soup)

fix image and video entries in the source code

Source code in frontend/wikicms.py
227
228
229
230
231
232
233
234
235
236
def fix_images_and_videos(self, soup):
    """
    fix image and video entries in the source code
    """
    for img in soup.findAll("img"):
        self.fixNode(img, "src", "/")
        self.fixNode(img, "srcset", "/", ", ")
    for video in soup.findAll("video"):
        for source in video.findAll("source"):
            self.fixNode(source, "src", "/")

getContent(pagePath)

get the content for the given pagePath Args: pagePath(str): the pagePath whatToFilter(list): list of filter keys Returns: str: the HTML content for the given path

Source code in frontend/wikicms.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def getContent(self, pagePath: str):
    """get the content for the given pagePath
    Args:
        pagePath(str): the pagePath
        whatToFilter(list): list of filter keys
    Returns:
        str: the HTML content for the given path
    """
    content = None
    error = None
    pageTitle = "?"
    try:
        if pagePath == "/":
            pageTitle = self.site.defaultPage
        else:
            error = self.checkPath(pagePath)
            pageTitle = self.wikiPage(pagePath)
        if error is None:
            if self.wiki is None:
                raise Exception(
                    "getContent without wiki - you might want to call open first"
                )
            content = self.wiki.getHtml(pageTitle)
            soup = self.filter(content)
            soup = self.fixHtml(soup)
            content = self.unwrap(soup)
    except Exception as e:
        error = self.errMsg(e)
    return pageTitle, content, error

get_cms_pages()

get the Content Management elements for this site

Source code in frontend/wikicms.py
107
108
109
110
111
112
113
114
115
116
117
118
def get_cms_pages(self) -> dict:
    """
    get the Content Management elements for this site
    """
    cms_pages = {}
    ask_query = "[[Category:CMS]]"
    page_records = self.smwclient.query(ask_query, "cms pages")
    for page_title in list(page_records):
        page_title, html, error = self.getContent(page_title)
        if not error:
            cms_pages[page_title] = html
    return cms_pages

get_path_response(path)

get the repsonse for the the given path

Parameters:

Name Type Description Default
path(str)

the path to render the content for

required

Returns:

Name Type Description
Response str

a FastAPI response

Source code in frontend/wikicms.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_path_response(self, path: str) -> str:
    """
    get the repsonse for the the given path

    Args:
        path(str): the path to render the content for

    Returns:
        Response: a FastAPI response
    """
    if self.needsProxy(path):
        html_response = self.proxy(path)
        # Create a FastAPI response object
        response = Response(
            content=html_response.content,
            status_code=html_response.status_code,
            headers=dict(html_response.headers),
        )
    else:
        page_title, content, error = self.getContent(path)
        frame = HtmlFrame(self, title=page_title)
        html = content
        if error:
            html = f"error getting {page_title} for {self.name}:<br>{error}"
        else:
            if "<slideshow" in html or "&lt;slideshow" in html:
                content = self.toReveal(content)
                html = content
        framed_html = frame.frame(html)
        response = HTMLResponse(framed_html)
    return response

log(msg)

log the given message if debugging is true

Parameters:

Name Type Description Default
msg str

the message to log

required
Source code in frontend/wikicms.py
53
54
55
56
57
58
59
60
61
def log(self, msg:str):
    """
    log the given message if debugging is true

    Args:
        msg (str): the message to log
    """
    if self.debug:
        print(msg, flush=True)

needsProxy(path)

Parameters:

Name Type Description Default
path str

the path to check

required

Returns:

Name Type Description
bool bool

True if this path needs to be proxied

Source code in frontend/wikicms.py
163
164
165
166
167
168
169
170
171
172
173
174
def needsProxy(self, path:str) -> bool:
    """
    Args:
        path (str): the path to check

    Returns:
        bool: True if this path needs to be proxied
    """
    needs_proxy = False
    for prefix in self.proxy_prefixes:
        needs_proxy = needs_proxy or path.startswith(prefix)
    return needs_proxy

open(ws=None)

open the frontend

Parameters:

Name Type Description Default
ws

optional Nicegui webserver

None
Source code in frontend/wikicms.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def open(self, ws=None):
    """
    open the frontend

    Args:
         ws: optional Nicegui webserver
    """
    self.ws = ws
    if self.wiki is None:
        self.wiki = WikiClient.ofWikiId(self.site.wikiId)
        self.wiki.login()
        self.smwclient = SMWClient(self.wiki.getSite())
        self.site.open(ws)
        self.cms_pages = self.get_cms_pages()

proxy(path)

Proxy a request. See https://stackoverflow.com/a/50231825/1497139

Parameters:

Name Type Description Default
path str

the path to proxy

required

Returns:

Type Description
str

the proxied result as a string

Source code in frontend/wikicms.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def proxy(self, path: str) -> str:
    """
    Proxy a request.
    See https://stackoverflow.com/a/50231825/1497139

    Args:
        path (str): the path to proxy

    Returns:
        the proxied result as a string
    """
    wikiUser = self.wiki.wikiUser
    url = f"{wikiUser.url}{wikiUser.scriptPath}{path}"

    # Get the response
    response = requests.get(url)

    return response

toReveal(html)

convert the given html to reveal

Source code in frontend/wikicms.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def toReveal(self, html):
    """
    convert the given html to reveal
    """
    soup = BeautifulSoup(html, "lxml")
    for h2 in soup.findChildren(recursive=True):
        if h2.name == "h2":
            span = h2.next_element
            if span.name == "span":
                tagid = span.get("id")
                if tagid.startswith("⌘⌘"):
                    section = soup.new_tag("section")
                    h2.parent.append(section)
                    section.insert(0, h2)
                    tag = h2.next_element
                    while tag is not None and tag.name != "h2":
                        if tag.parent != h2:
                            section.append(tag)
                        tag = tag.next_element
    html = self.unwrap(soup)
    return html

unwrap(soup)

unwrap the soup

Source code in frontend/wikicms.py
251
252
253
254
255
256
257
258
259
260
261
262
263
def unwrap(self, soup) -> str:
    """
    unwrap the soup
    """
    html = str(soup)
    html = html.replace("<html><body>", "")
    html = html.replace("</body></html>", "")
    # Remove  empty paragraphs
    html = re.sub(r'<p class="mw-empty-elt">\s*</p>', "", html)

    # Replace multiple newline characters with a single newline character
    html = re.sub(r"\n\s*\n", "\n", html)
    return html

wikiPage(pagePath)

Get the wiki page for the given page path.

Parameters:

Name Type Description Default
pagePath str

The path of the page.

required

Returns:

Name Type Description
str str

The title of the page.

Source code in frontend/wikicms.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def wikiPage(self, pagePath: str) -> str:
    """
    Get the wiki page for the given page path.

    Args:
        pagePath (str): The path of the page.

    Returns:
        str: The title of the page.
    """
    if "/index.php/" in pagePath:
        wikipage = pagePath.replace("/index.php/", "")
    elif pagePath.startswith("/"):
        wikipage = pagePath[1:]
    else:
        wikipage = pagePath
    return wikipage

wikigrid

Created on 2022-12-03

@author: wf

WikiCheck

A check for a Mediawiki.

Source code in frontend/wikigrid.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class WikiCheck:
    """
    A check for a Mediawiki.
    """

    def __init__(self, name, func, checked=True):
        self.name = name
        self.func = func  # the check function to be performed on a WikiState
        self.checked = checked
        self.checkbox = None

    def as_checkbox(self):
        """
        Return a checkbox representation of the instance.
        """
        self.checkbox = ui.checkbox(self.name).bind_value(self, "checked")
        return self.checkbox

as_checkbox()

Return a checkbox representation of the instance.

Source code in frontend/wikigrid.py
65
66
67
68
69
70
def as_checkbox(self):
    """
    Return a checkbox representation of the instance.
    """
    self.checkbox = ui.checkbox(self.name).bind_value(self, "checked")
    return self.checkbox

WikiGrid

A grid of Wikis.

Source code in frontend/wikigrid.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class WikiGrid:
    """
    A grid of Wikis.
    """

    def __init__(self, solution):
        # back reference to nicegui solution
        self.solution = solution

        self.wiki_users = WikiUser.getWikiUsers()
        self.wiki_clients = {}
        self.smw_clients = {}
        self.sorted_wiki_users = sorted(
            self.wiki_users.values(), key=lambda w: w.wikiId
        )
        self.lod = []
        self.wikistates_by_row_no = {}
        for index, wiki_user in enumerate(self.sorted_wiki_users):
            wiki_state = WikiState(index, wiki_user)
            record = wiki_state.as_dict()
            self.lod.append(record)
            self.wikistates_by_row_no[wiki_state.row_no] = wiki_state

    def setup(self):
        self.add_checkboxes()
        self.progressbar = NiceguiProgressbar(
            len(self.wikistates_by_row_no), "work on wikis", "steps"
        )
        self.as_grid()
        self.lod_grid.update()

    def as_grid(self):
        self.lod_grid = ListOfDictsGrid(lod=self.lod)
        self.lod_grid.ag_grid._props["html_columns"] = [0, 1, 2]
        return self.lod_grid

    def add_checkboxes(self):
        """
        Add check boxes.
        """
        self.wiki_checks = [
            WikiCheck("version", self.check_wiki_version),
            WikiCheck("backup", self.check_backup),
            WikiCheck("pages", self.check_pages),
        ]
        for wiki_check in self.wiki_checks:
            wiki_check.as_checkbox()
        ui.button(text="Checks", on_click=self.perform_wiki_checks)

    def check_version(self, wiki_url):
        """
        Check the MediaWiki version.
        """
        version_url = f"{wiki_url}/index.php/Special:Version"
        mw_version = "?"
        try:
            html_tables = HtmlTables(version_url)
            tables = html_tables.get_tables("h2")
            if "Installed software" in tables:
                software = tables["Installed software"]
                software_map, _dup = LOD.getLookup(
                    software, "Product", withDuplicates=False
                )
                mw_version = software_map["MediaWiki"]["Version"]
        except Exception as ex:
            mw_version = f"error: {str(ex)}"
        return mw_version

    async def perform_wiki_checks(self, _msg):
        await run.io_bound(self.run_wiki_checks)

    def run_wiki_checks(self):
        """
        perform the selected wiki checks
        """
        with self.solution.content_div:
            ui.notify(f"Checking {len(self.wikistates_by_row_no)} wikis ...")
        progress_bar = self.progressbar
        try:
            with self.solution.content_div:
                progress_bar.reset()
            for wiki_state in self.wikistates_by_row_no.values():
                for wiki_check in self.wiki_checks:
                    if wiki_check.checked:
                        wiki_check.func(wiki_state)
                    with self.solution.content_div:
                        self.lod_grid.update()
                        # Update the progress bar
                        progress_bar.update(1)
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_pages(self, wiki_state):
        """
        Try login for wiki user and report success or failure.
        """
        try:
            wiki_state.wiki_client = WikiClient.ofWikiUser(wiki_state.wiki_user)
            try:
                wiki_state.wiki_client.login()
                stats = wiki_state.wiki_client.get_site_statistics()
                pages = stats["pages"]
                self.lod_grid.update_cell(wiki_state.row_no, "login", f"✅")
                self.lod_grid.update_cell(wiki_state.row_no, "pages", f"✅{pages}")
            except Exception as ex:
                self.lod_grid.update_cell(wiki_state.row_no, "login", f"❌ {str(ex)}")
                self.lod_grid.update_cell(wiki_state.row_no, "pages", "❌")
                return
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_wiki_version(self, wiki_state):
        """
        Check the MediaWiki version for a specific WikiState.
        """
        try:
            wiki_url = wiki_state.wiki_user.getWikiUrl()
            mw_version = self.check_version(wiki_url)
            if not mw_version.startswith("MediaWiki"):
                mw_version = f"MediaWiki {mw_version}"
            row = self.lod_grid.get_row_for_key(wiki_state.row_no)
            if row:
                ex_version = row["version"]
                if ex_version == mw_version:
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "version", f"{mw_version}✅"
                    )
                else:
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "version", f"{ex_version}!={mw_version}❌"
                    )
        except BaseException as ex:
            self.solution.handle_exception(ex)

    def check_backup(self, wiki_state):
        """
        Check the backup status for a specific WikiUser.
        """
        try:
            row = self.lod_grid.get_row_for_key(wiki_state.row_no)
            if row:
                backup_path = f"{Path.home()}/wikibackup/{wiki_state.wiki_user.wikiId}"
                if os.path.isdir(backup_path):
                    wiki_files = glob.glob(f"{backup_path}/*.wiki")
                    msg = f"{len(wiki_files):6} ✅"
                    self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
                    # https://stackoverflow.com/a/39327156/1497139
                    if wiki_files:
                        latest_file = max(wiki_files, key=os.path.getctime)
                        st = os.stat(latest_file)
                        age_days = round((time.time() - st.st_mtime) / 86400)
                        self.lod_grid.update_cell(
                            wiki_state.row_no, "age", f"{age_days}"
                        )
                else:
                    msg = "❌"
                    self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
        except BaseException as ex:
            self.solution.handle_exception(ex)

add_checkboxes()

Add check boxes.

Source code in frontend/wikigrid.py
109
110
111
112
113
114
115
116
117
118
119
120
def add_checkboxes(self):
    """
    Add check boxes.
    """
    self.wiki_checks = [
        WikiCheck("version", self.check_wiki_version),
        WikiCheck("backup", self.check_backup),
        WikiCheck("pages", self.check_pages),
    ]
    for wiki_check in self.wiki_checks:
        wiki_check.as_checkbox()
    ui.button(text="Checks", on_click=self.perform_wiki_checks)

check_backup(wiki_state)

Check the backup status for a specific WikiUser.

Source code in frontend/wikigrid.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def check_backup(self, wiki_state):
    """
    Check the backup status for a specific WikiUser.
    """
    try:
        row = self.lod_grid.get_row_for_key(wiki_state.row_no)
        if row:
            backup_path = f"{Path.home()}/wikibackup/{wiki_state.wiki_user.wikiId}"
            if os.path.isdir(backup_path):
                wiki_files = glob.glob(f"{backup_path}/*.wiki")
                msg = f"{len(wiki_files):6} ✅"
                self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
                # https://stackoverflow.com/a/39327156/1497139
                if wiki_files:
                    latest_file = max(wiki_files, key=os.path.getctime)
                    st = os.stat(latest_file)
                    age_days = round((time.time() - st.st_mtime) / 86400)
                    self.lod_grid.update_cell(
                        wiki_state.row_no, "age", f"{age_days}"
                    )
            else:
                msg = "❌"
                self.lod_grid.update_cell(wiki_state.row_no, "backup", msg)
    except BaseException as ex:
        self.solution.handle_exception(ex)

check_pages(wiki_state)

Try login for wiki user and report success or failure.

Source code in frontend/wikigrid.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def check_pages(self, wiki_state):
    """
    Try login for wiki user and report success or failure.
    """
    try:
        wiki_state.wiki_client = WikiClient.ofWikiUser(wiki_state.wiki_user)
        try:
            wiki_state.wiki_client.login()
            stats = wiki_state.wiki_client.get_site_statistics()
            pages = stats["pages"]
            self.lod_grid.update_cell(wiki_state.row_no, "login", f"✅")
            self.lod_grid.update_cell(wiki_state.row_no, "pages", f"✅{pages}")
        except Exception as ex:
            self.lod_grid.update_cell(wiki_state.row_no, "login", f"❌ {str(ex)}")
            self.lod_grid.update_cell(wiki_state.row_no, "pages", "❌")
            return
    except BaseException as ex:
        self.solution.handle_exception(ex)

check_version(wiki_url)

Check the MediaWiki version.

Source code in frontend/wikigrid.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def check_version(self, wiki_url):
    """
    Check the MediaWiki version.
    """
    version_url = f"{wiki_url}/index.php/Special:Version"
    mw_version = "?"
    try:
        html_tables = HtmlTables(version_url)
        tables = html_tables.get_tables("h2")
        if "Installed software" in tables:
            software = tables["Installed software"]
            software_map, _dup = LOD.getLookup(
                software, "Product", withDuplicates=False
            )
            mw_version = software_map["MediaWiki"]["Version"]
    except Exception as ex:
        mw_version = f"error: {str(ex)}"
    return mw_version

check_wiki_version(wiki_state)

Check the MediaWiki version for a specific WikiState.

Source code in frontend/wikigrid.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def check_wiki_version(self, wiki_state):
    """
    Check the MediaWiki version for a specific WikiState.
    """
    try:
        wiki_url = wiki_state.wiki_user.getWikiUrl()
        mw_version = self.check_version(wiki_url)
        if not mw_version.startswith("MediaWiki"):
            mw_version = f"MediaWiki {mw_version}"
        row = self.lod_grid.get_row_for_key(wiki_state.row_no)
        if row:
            ex_version = row["version"]
            if ex_version == mw_version:
                self.lod_grid.update_cell(
                    wiki_state.row_no, "version", f"{mw_version}✅"
                )
            else:
                self.lod_grid.update_cell(
                    wiki_state.row_no, "version", f"{ex_version}!={mw_version}❌"
                )
    except BaseException as ex:
        self.solution.handle_exception(ex)

run_wiki_checks()

perform the selected wiki checks

Source code in frontend/wikigrid.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run_wiki_checks(self):
    """
    perform the selected wiki checks
    """
    with self.solution.content_div:
        ui.notify(f"Checking {len(self.wikistates_by_row_no)} wikis ...")
    progress_bar = self.progressbar
    try:
        with self.solution.content_div:
            progress_bar.reset()
        for wiki_state in self.wikistates_by_row_no.values():
            for wiki_check in self.wiki_checks:
                if wiki_check.checked:
                    wiki_check.func(wiki_state)
                with self.solution.content_div:
                    self.lod_grid.update()
                    # Update the progress bar
                    progress_bar.update(1)
    except BaseException as ex:
        self.solution.handle_exception(ex)

WikiState

the state of a wiki

Source code in frontend/wikigrid.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class WikiState:
    """
    the state of a wiki
    """

    def __init__(self, row_index, wiki_user):
        """
        constructor
        """
        self.row_no = row_index + 1
        self.wiki_user = wiki_user
        self.wiki_backup = WikiBackup(wiki_user)

    def as_dict(self):
        url = f"{self.wiki_user.url}{self.wiki_user.scriptPath}"
        link = Link.create(url=url, text=self.wiki_user.wikiId, target="_blank")

        record = {
            "#": self.row_no,
            "wiki": link,
            "version": self.wiki_user.version,
            "pages": "",
            "backup": "✅" if self.wiki_backup.exists() else "❌",
            "git": "✅" if self.wiki_backup.hasGit() else "❌",
            "age": "",
            "login": "",
        }
        return record

__init__(row_index, wiki_user)

constructor

Source code in frontend/wikigrid.py
29
30
31
32
33
34
35
def __init__(self, row_index, wiki_user):
    """
    constructor
    """
    self.row_no = row_index + 1
    self.wiki_user = wiki_user
    self.wiki_backup = WikiBackup(wiki_user)