Skip to content

API Reference

This section is generated from the codebase using mkdocstrings.

gkc

GKC - Global Knowledge Commons

A Python package for working with the Global Knowledge Commons including Wikidata, Wikipedia, Wikimedia Commons, and OpenStreetMap.

AuthenticationError

Bases: Exception

Raised when authentication fails.

Source code in gkc/auth.py
34
35
36
37
class AuthenticationError(Exception):
    """Raised when authentication fails."""

    pass

ClaimsMapBuilder

Builds claims mapping configurations from ShEx schemas.

This class combines ShEx schema analysis with live Wikidata property metadata to generate skeleton mapping configurations.

Source code in gkc/mapping_builder.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
class ClaimsMapBuilder:
    """
    Builds claims mapping configurations from ShEx schemas.

    This class combines ShEx schema analysis with live Wikidata property
    metadata to generate skeleton mapping configurations.
    """

    def __init__(
        self,
        eid: Optional[str] = None,
        schema_text: Optional[str] = None,
        schema_file: Optional[str] = None,
        user_agent: Optional[str] = None,
    ):
        """
        Initialize the claims map builder.

        Args:
            eid: EntitySchema ID (e.g., 'E502')
            schema_text: ShEx schema as text
            schema_file: Path to ShEx schema file
            user_agent: Custom user agent for API requests
        """
        self.validator = ShExValidator(
            eid=eid, schema_text=schema_text, schema_file=schema_file
        )
        self.user_agent = user_agent
        self.schema_text: Optional[str] = None
        self.property_fetcher = WikidataPropertyFetcher(user_agent)

    def load_schema(self) -> "ClaimsMapBuilder":
        """Load the ShEx schema."""
        self.validator.load_schema()
        self.schema_text = self.validator._schema
        return self

    def build_claims_map(
        self, include_qualifiers: bool = True, include_references: bool = True
    ) -> list[dict[str, Any]]:
        """
        Build claims mapping structure from the loaded ShEx schema.

        Args:
            include_qualifiers: Whether to include qualifier properties
            include_references: Whether to include reference properties

        Returns:
            List of claim mapping dictionaries
        """
        if not self.schema_text:
            self.load_schema()

        # Extract properties from ShEx
        extractor = ShExPropertyExtractor(self.schema_text)
        shex_properties = extractor.extract()

        # Separate by context
        statement_props = {
            pid: info
            for pid, info in shex_properties.items()
            if info["context"] in ["direct", "statement", "statement_value"]
        }
        qualifier_props = {
            pid: info
            for pid, info in shex_properties.items()
            if info["context"] == "qualifier"
        }
        reference_props = {
            pid: info
            for pid, info in shex_properties.items()
            if info["context"] == "reference"
        }

        # Fetch property metadata from Wikidata
        all_prop_ids = list(shex_properties.keys())
        property_info = self.property_fetcher.fetch_properties(all_prop_ids)

        # Build claims mapping
        claims_map = []

        for prop_id, shex_info in statement_props.items():
            prop_data = property_info.get(prop_id)

            claim_mapping = {
                "property": prop_id,
                "comment": self._format_comment(shex_info, prop_data),
                "source_field": f"{prop_id.lower()}_value",
                "datatype": prop_data.datatype if prop_data else "unknown",
                "required": shex_info["cardinality"].get("required", False),
            }

            # Add transform hints based on datatype
            if prop_data:
                transform_hint = self._get_transform_hint(prop_data.datatype)
                if transform_hint:
                    claim_mapping["transform"] = transform_hint

            # Add qualifiers if requested
            if include_qualifiers and qualifier_props:
                claim_mapping["qualifiers"] = []
                # Note: In a real implementation, we'd need to parse which qualifiers
                # go with which statements from the ShEx structure

            # Add references if requested
            if include_references and reference_props:
                claim_mapping["references"] = []
                # Note: Similar to qualifiers, need ShEx structure parsing

            claims_map.append(claim_mapping)

        return claims_map

    def build_complete_mapping(
        self, entity_type: Optional[str] = None
    ) -> dict[str, Any]:
        """
        Build a complete mapping configuration skeleton.

        Args:
            entity_type: Wikidata QID of the entity type (e.g., 'Q7840353')

        Returns:
            Complete mapping configuration dictionary
        """
        if not self.schema_text:
            self.load_schema()

        claims_map = self.build_claims_map()

        mapping = {
            "$schema": "https://example.com/gkc/mapping-schema.json",
            "version": "1.0",
            "metadata": {
                "name": "Auto-generated mapping",
                "description": "Generated from ShEx schema",
                "entity_schema_id": self.validator.eid or "unknown",
                "target_entity_type": entity_type or "TODO",
                "generated_date": "TODO",
            },
            "reference_library": {
                "basic_reference": [
                    {
                        "property": "P248",
                        "value": "TODO_SOURCE_QID",
                        "datatype": "wikibase-item",
                        "comment": "Stated in: UPDATE with actual source QID",
                    },
                    {
                        "property": "P813",
                        "value": "current_date",
                        "datatype": "time",
                        "comment": "Retrieved date",
                    },
                ]
            },
            "qualifier_library": {
                "point_in_time": [
                    {
                        "property": "P585",
                        "source_field": "TODO_date_field",
                        "datatype": "time",
                        "comment": "Point in time qualifier - UPDATE source_field",
                    }
                ]
            },
            "mappings": {
                "labels": [
                    {
                        "source_field": "label",
                        "language": "en",
                        "required": True,
                        "comment": (
                            "Main label - UPDATE source_field to " "match your data"
                        ),
                    }
                ],
                "aliases": [
                    {
                        "source_field": "aliases",
                        "language": "en",
                        "separator": ";",
                        "required": False,
                        "comment": "Aliases - UPDATE source_field to match your data",
                    }
                ],
                "descriptions": [
                    {
                        "source_field": "description",
                        "language": "en",
                        "required": False,
                        "comment": (
                            "Description - UPDATE source_field to " "match your data"
                        ),
                    }
                ],
                "sitelinks": [
                    {
                        "site": "enwiki",
                        "source_field": "wikipedia_en",
                        "required": False,
                        "badges": [],
                        "comment": (
                            "English Wikipedia article - "
                            "UPDATE source_field to match your data"
                        ),
                    }
                ],
                "claims": claims_map,
            },
            "notes": [
                "This mapping was auto-generated from a ShEx schema",
                "UPDATE all 'source_field' values to match your data",
                "REVIEW all 'transform' configurations",
                (
                    "ADD appropriate references to claims "
                    "(use reference_library entries)"
                ),
                "UPDATE reference_library with actual source QIDs and URLs",
                (
                    "ADD fixed-value claims (instance of, continent, country) "
                    "with 'value' instead of 'source_field'"
                ),
                (
                    "For repeated references, use library entry names "
                    "(e.g., 'basic_reference') instead of inline dicts"
                ),
                (
                    "ADD sitelinks for Wikipedia and other Wikimedia projects "
                    "(enwiki, frwiki, commons, etc.)"
                ),
                (
                    "Sitelinks can use 'source_field' for data-driven titles "
                    "or 'title' for fixed values"
                ),
            ],
        }

        return mapping

    def _format_comment(
        self, shex_info: dict, prop_data: Optional[PropertyInfo]
    ) -> str:
        """Format a descriptive comment from ShEx and Wikidata info."""
        parts = []

        # Add property label
        if prop_data:
            label = prop_data.get_label()
            if label != prop_data.property_id:
                parts.append(label)

        # Add ShEx inline comment
        if shex_info.get("comment"):
            parts.append(shex_info["comment"])

        # Add property description if different from comment
        if prop_data:
            desc = prop_data.get_description()
            if desc and desc not in shex_info.get("comment", ""):
                parts.append(desc)

        return " - ".join(parts) if parts else f"Property {shex_info['property_id']}"

    def _get_transform_hint(self, datatype: str) -> Optional[dict]:
        """Get transform hint based on Wikidata datatype."""
        transform_hints = {
            "time": {"type": "iso_date_to_wikidata_time", "precision": 11},
            "quantity": {"type": "number_to_quantity", "unit": "1"},
            "globe-coordinate": {
                "type": "lat_lon_to_globe_coordinate",
                "latitude_field": "TODO_latitude",
                "longitude_field": "TODO_longitude",
            },
            "monolingualtext": {"type": "monolingualtext", "language": "en"},
        }

        return transform_hints.get(datatype)

    def print_summary(self):
        """Print a summary of the ShEx schema analysis."""
        if not self.schema_text:
            self.load_schema()

        extractor = ShExPropertyExtractor(self.schema_text)
        shex_properties = extractor.extract()

        print("=" * 60)
        print("ShEx Schema Analysis")
        print("=" * 60)

        # Fetch property metadata
        property_info = self.property_fetcher.fetch_properties(
            list(shex_properties.keys())
        )

        # Group by context
        by_context = {}
        for prop_id, info in shex_properties.items():
            context = info["context"]
            if context not in by_context:
                by_context[context] = []
            by_context[context].append((prop_id, info))

        for context, props in by_context.items():
            print(f"\n{context.upper()} Properties:")
            print("-" * 60)
            for prop_id, info in props:
                prop_data = property_info.get(prop_id)
                required = (
                    "REQUIRED" if info["cardinality"].get("required") else "optional"
                )
                label = prop_data.get_label() if prop_data else "Unknown"
                datatype = prop_data.datatype if prop_data else "unknown"
                comment = info.get("comment", "")

                print(f"  {prop_id} ({datatype}) - {required}")
                print(f"    Label: {label}")
                if comment:
                    print(f"    Comment: {comment}")
                if prop_data:
                    desc = prop_data.get_description()
                    if desc:
                        print(f"    Description: {desc}")
                print()

__init__(eid=None, schema_text=None, schema_file=None, user_agent=None)

Initialize the claims map builder.

Parameters:

Name Type Description Default
eid Optional[str]

EntitySchema ID (e.g., 'E502')

None
schema_text Optional[str]

ShEx schema as text

None
schema_file Optional[str]

Path to ShEx schema file

None
user_agent Optional[str]

Custom user agent for API requests

None
Source code in gkc/mapping_builder.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def __init__(
    self,
    eid: Optional[str] = None,
    schema_text: Optional[str] = None,
    schema_file: Optional[str] = None,
    user_agent: Optional[str] = None,
):
    """
    Initialize the claims map builder.

    Args:
        eid: EntitySchema ID (e.g., 'E502')
        schema_text: ShEx schema as text
        schema_file: Path to ShEx schema file
        user_agent: Custom user agent for API requests
    """
    self.validator = ShExValidator(
        eid=eid, schema_text=schema_text, schema_file=schema_file
    )
    self.user_agent = user_agent
    self.schema_text: Optional[str] = None
    self.property_fetcher = WikidataPropertyFetcher(user_agent)

build_claims_map(include_qualifiers=True, include_references=True)

Build claims mapping structure from the loaded ShEx schema.

Parameters:

Name Type Description Default
include_qualifiers bool

Whether to include qualifier properties

True
include_references bool

Whether to include reference properties

True

Returns:

Type Description
list[dict[str, Any]]

List of claim mapping dictionaries

Source code in gkc/mapping_builder.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def build_claims_map(
    self, include_qualifiers: bool = True, include_references: bool = True
) -> list[dict[str, Any]]:
    """
    Build claims mapping structure from the loaded ShEx schema.

    Args:
        include_qualifiers: Whether to include qualifier properties
        include_references: Whether to include reference properties

    Returns:
        List of claim mapping dictionaries
    """
    if not self.schema_text:
        self.load_schema()

    # Extract properties from ShEx
    extractor = ShExPropertyExtractor(self.schema_text)
    shex_properties = extractor.extract()

    # Separate by context
    statement_props = {
        pid: info
        for pid, info in shex_properties.items()
        if info["context"] in ["direct", "statement", "statement_value"]
    }
    qualifier_props = {
        pid: info
        for pid, info in shex_properties.items()
        if info["context"] == "qualifier"
    }
    reference_props = {
        pid: info
        for pid, info in shex_properties.items()
        if info["context"] == "reference"
    }

    # Fetch property metadata from Wikidata
    all_prop_ids = list(shex_properties.keys())
    property_info = self.property_fetcher.fetch_properties(all_prop_ids)

    # Build claims mapping
    claims_map = []

    for prop_id, shex_info in statement_props.items():
        prop_data = property_info.get(prop_id)

        claim_mapping = {
            "property": prop_id,
            "comment": self._format_comment(shex_info, prop_data),
            "source_field": f"{prop_id.lower()}_value",
            "datatype": prop_data.datatype if prop_data else "unknown",
            "required": shex_info["cardinality"].get("required", False),
        }

        # Add transform hints based on datatype
        if prop_data:
            transform_hint = self._get_transform_hint(prop_data.datatype)
            if transform_hint:
                claim_mapping["transform"] = transform_hint

        # Add qualifiers if requested
        if include_qualifiers and qualifier_props:
            claim_mapping["qualifiers"] = []
            # Note: In a real implementation, we'd need to parse which qualifiers
            # go with which statements from the ShEx structure

        # Add references if requested
        if include_references and reference_props:
            claim_mapping["references"] = []
            # Note: Similar to qualifiers, need ShEx structure parsing

        claims_map.append(claim_mapping)

    return claims_map

build_complete_mapping(entity_type=None)

Build a complete mapping configuration skeleton.

Parameters:

Name Type Description Default
entity_type Optional[str]

Wikidata QID of the entity type (e.g., 'Q7840353')

None

Returns:

Type Description
dict[str, Any]

Complete mapping configuration dictionary

Source code in gkc/mapping_builder.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def build_complete_mapping(
    self, entity_type: Optional[str] = None
) -> dict[str, Any]:
    """
    Build a complete mapping configuration skeleton.

    Args:
        entity_type: Wikidata QID of the entity type (e.g., 'Q7840353')

    Returns:
        Complete mapping configuration dictionary
    """
    if not self.schema_text:
        self.load_schema()

    claims_map = self.build_claims_map()

    mapping = {
        "$schema": "https://example.com/gkc/mapping-schema.json",
        "version": "1.0",
        "metadata": {
            "name": "Auto-generated mapping",
            "description": "Generated from ShEx schema",
            "entity_schema_id": self.validator.eid or "unknown",
            "target_entity_type": entity_type or "TODO",
            "generated_date": "TODO",
        },
        "reference_library": {
            "basic_reference": [
                {
                    "property": "P248",
                    "value": "TODO_SOURCE_QID",
                    "datatype": "wikibase-item",
                    "comment": "Stated in: UPDATE with actual source QID",
                },
                {
                    "property": "P813",
                    "value": "current_date",
                    "datatype": "time",
                    "comment": "Retrieved date",
                },
            ]
        },
        "qualifier_library": {
            "point_in_time": [
                {
                    "property": "P585",
                    "source_field": "TODO_date_field",
                    "datatype": "time",
                    "comment": "Point in time qualifier - UPDATE source_field",
                }
            ]
        },
        "mappings": {
            "labels": [
                {
                    "source_field": "label",
                    "language": "en",
                    "required": True,
                    "comment": (
                        "Main label - UPDATE source_field to " "match your data"
                    ),
                }
            ],
            "aliases": [
                {
                    "source_field": "aliases",
                    "language": "en",
                    "separator": ";",
                    "required": False,
                    "comment": "Aliases - UPDATE source_field to match your data",
                }
            ],
            "descriptions": [
                {
                    "source_field": "description",
                    "language": "en",
                    "required": False,
                    "comment": (
                        "Description - UPDATE source_field to " "match your data"
                    ),
                }
            ],
            "sitelinks": [
                {
                    "site": "enwiki",
                    "source_field": "wikipedia_en",
                    "required": False,
                    "badges": [],
                    "comment": (
                        "English Wikipedia article - "
                        "UPDATE source_field to match your data"
                    ),
                }
            ],
            "claims": claims_map,
        },
        "notes": [
            "This mapping was auto-generated from a ShEx schema",
            "UPDATE all 'source_field' values to match your data",
            "REVIEW all 'transform' configurations",
            (
                "ADD appropriate references to claims "
                "(use reference_library entries)"
            ),
            "UPDATE reference_library with actual source QIDs and URLs",
            (
                "ADD fixed-value claims (instance of, continent, country) "
                "with 'value' instead of 'source_field'"
            ),
            (
                "For repeated references, use library entry names "
                "(e.g., 'basic_reference') instead of inline dicts"
            ),
            (
                "ADD sitelinks for Wikipedia and other Wikimedia projects "
                "(enwiki, frwiki, commons, etc.)"
            ),
            (
                "Sitelinks can use 'source_field' for data-driven titles "
                "or 'title' for fixed values"
            ),
        ],
    }

    return mapping

load_schema()

Load the ShEx schema.

Source code in gkc/mapping_builder.py
240
241
242
243
244
def load_schema(self) -> "ClaimsMapBuilder":
    """Load the ShEx schema."""
    self.validator.load_schema()
    self.schema_text = self.validator._schema
    return self

print_summary()

Print a summary of the ShEx schema analysis.

Source code in gkc/mapping_builder.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def print_summary(self):
    """Print a summary of the ShEx schema analysis."""
    if not self.schema_text:
        self.load_schema()

    extractor = ShExPropertyExtractor(self.schema_text)
    shex_properties = extractor.extract()

    print("=" * 60)
    print("ShEx Schema Analysis")
    print("=" * 60)

    # Fetch property metadata
    property_info = self.property_fetcher.fetch_properties(
        list(shex_properties.keys())
    )

    # Group by context
    by_context = {}
    for prop_id, info in shex_properties.items():
        context = info["context"]
        if context not in by_context:
            by_context[context] = []
        by_context[context].append((prop_id, info))

    for context, props in by_context.items():
        print(f"\n{context.upper()} Properties:")
        print("-" * 60)
        for prop_id, info in props:
            prop_data = property_info.get(prop_id)
            required = (
                "REQUIRED" if info["cardinality"].get("required") else "optional"
            )
            label = prop_data.get_label() if prop_data else "Unknown"
            datatype = prop_data.datatype if prop_data else "unknown"
            comment = info.get("comment", "")

            print(f"  {prop_id} ({datatype}) - {required}")
            print(f"    Label: {label}")
            if comment:
                print(f"    Comment: {comment}")
            if prop_data:
                desc = prop_data.get_description()
                if desc:
                    print(f"    Description: {desc}")
            print()

OpenStreetMapAuth

Bases: AuthBase

Authentication for OpenStreetMap.

Credentials can be provided in three ways (in order of precedence): 1. Direct parameters 2. Environment variables (OPENSTREETMAP_USERNAME, OPENSTREETMAP_PASSWORD) 3. Interactive prompt

Example

Using environment variables

auth = OpenStreetMapAuth()

Direct parameters

auth = OpenStreetMapAuth(username="myuser", password="mypass")

Interactive prompt

auth = OpenStreetMapAuth(interactive=True) Enter OpenStreetMap username: myuser Enter OpenStreetMap password: ****

Source code in gkc/auth.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
class OpenStreetMapAuth(AuthBase):
    """
    Authentication for OpenStreetMap.

    Credentials can be provided in three ways (in order of precedence):
    1. Direct parameters
    2. Environment variables (OPENSTREETMAP_USERNAME, OPENSTREETMAP_PASSWORD)
    3. Interactive prompt

    Example:
        >>> # Using environment variables
        >>> auth = OpenStreetMapAuth()

        >>> # Direct parameters
        >>> auth = OpenStreetMapAuth(username="myuser", password="mypass")

        >>> # Interactive prompt
        >>> auth = OpenStreetMapAuth(interactive=True)
        Enter OpenStreetMap username: myuser
        Enter OpenStreetMap password: ****
    """

    def __init__(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
        interactive: bool = False,
    ):
        """
        Initialize OpenStreetMap authentication.

        Args:
            username: OpenStreetMap username. If not provided, reads from
                     OPENSTREETMAP_USERNAME environment variable.
            password: OpenStreetMap password. If not provided, reads from
                     OPENSTREETMAP_PASSWORD environment variable.
            interactive: If True and credentials are not found, prompt user for input.
        """
        # Try provided parameters first
        username = username or os.environ.get("OPENSTREETMAP_USERNAME")
        password = password or os.environ.get("OPENSTREETMAP_PASSWORD")

        # If credentials still not available and interactive mode is requested
        if interactive and not (username and password):
            print("OpenStreetMap credentials not found in environment.")
            username = input("Enter OpenStreetMap username: ").strip()
            password = getpass.getpass("Enter OpenStreetMap password: ").strip()

        super().__init__(username, password)

    def __repr__(self) -> str:
        status = "authenticated" if self.is_authenticated() else "not authenticated"
        return f"OpenStreetMapAuth(username={self.username!r}, {status})"

__init__(username=None, password=None, interactive=False)

Initialize OpenStreetMap authentication.

Parameters:

Name Type Description Default
username Optional[str]

OpenStreetMap username. If not provided, reads from OPENSTREETMAP_USERNAME environment variable.

None
password Optional[str]

OpenStreetMap password. If not provided, reads from OPENSTREETMAP_PASSWORD environment variable.

None
interactive bool

If True and credentials are not found, prompt user for input.

False
Source code in gkc/auth.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def __init__(
    self,
    username: Optional[str] = None,
    password: Optional[str] = None,
    interactive: bool = False,
):
    """
    Initialize OpenStreetMap authentication.

    Args:
        username: OpenStreetMap username. If not provided, reads from
                 OPENSTREETMAP_USERNAME environment variable.
        password: OpenStreetMap password. If not provided, reads from
                 OPENSTREETMAP_PASSWORD environment variable.
        interactive: If True and credentials are not found, prompt user for input.
    """
    # Try provided parameters first
    username = username or os.environ.get("OPENSTREETMAP_USERNAME")
    password = password or os.environ.get("OPENSTREETMAP_PASSWORD")

    # If credentials still not available and interactive mode is requested
    if interactive and not (username and password):
        print("OpenStreetMap credentials not found in environment.")
        username = input("Enter OpenStreetMap username: ").strip()
        password = getpass.getpass("Enter OpenStreetMap password: ").strip()

    super().__init__(username, password)

PropertyInfo

Container for Wikidata property information.

Source code in gkc/mapping_builder.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class PropertyInfo:
    """Container for Wikidata property information."""

    def __init__(self, property_id: str, data: dict):
        self.property_id = property_id
        self.datatype = data.get("datatype", "unknown")
        self.labels = data.get("labels", {})
        self.descriptions = data.get("descriptions", {})
        self.aliases = data.get("aliases", {})

    def get_label(self, language: str = "en") -> str:
        """Get property label in specified language."""
        if language in self.labels:
            return self.labels[language].get("value", self.property_id)
        return self.property_id

    def get_description(self, language: str = "en") -> str:
        """Get property description in specified language."""
        if language in self.descriptions:
            return self.descriptions[language].get("value", "")
        return ""

get_description(language='en')

Get property description in specified language.

Source code in gkc/mapping_builder.py
33
34
35
36
37
def get_description(self, language: str = "en") -> str:
    """Get property description in specified language."""
    if language in self.descriptions:
        return self.descriptions[language].get("value", "")
    return ""

get_label(language='en')

Get property label in specified language.

Source code in gkc/mapping_builder.py
27
28
29
30
31
def get_label(self, language: str = "en") -> str:
    """Get property label in specified language."""
    if language in self.labels:
        return self.labels[language].get("value", self.property_id)
    return self.property_id

SPARQLError

Bases: Exception

Raised when a SPARQL query fails.

Source code in gkc/sparql.py
26
27
28
29
class SPARQLError(Exception):
    """Raised when a SPARQL query fails."""

    pass

SPARQLQuery

Execute SPARQL queries against a SPARQL endpoint.

Source code in gkc/sparql.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
class SPARQLQuery:
    """Execute SPARQL queries against a SPARQL endpoint."""

    def __init__(
        self,
        endpoint: str = DEFAULT_WIKIDATA_ENDPOINT,
        user_agent: str = DEFAULT_USER_AGENT,
        timeout: int = 30,
    ):
        """
        Initialize SPARQL query executor.

        Args:
            endpoint: SPARQL endpoint URL (default: Wikidata)
            user_agent: User agent string for HTTP requests
            timeout: Request timeout in seconds
        """
        self.endpoint = endpoint
        self.user_agent = user_agent
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({"User-Agent": user_agent})

    @staticmethod
    def parse_wikidata_query_url(url: str) -> str:
        """
        Extract and decode SPARQL query from Wikidata Query Service URL.

        The Wikidata Query Service URL format is:
        https://query.wikidata.org/#<URL_ENCODED_QUERY>

        Args:
            url: Wikidata Query Service URL

        Returns:
            Decoded SPARQL query string

        Raises:
            SPARQLError: If URL is not a valid Wikidata Query Service URL

        Example:
            >>> url = "https://query.wikidata.org/#SELECT%20%3Fitem..."
            >>> query = SPARQLQuery.parse_wikidata_query_url(url)
        """
        try:
            # Parse the URL
            parsed = urlparse(url)

            # Check if it's a Wikidata Query Service URL
            if "query.wikidata.org" not in parsed.netloc:
                raise SPARQLError(f"Not a Wikidata Query Service URL: {parsed.netloc}")

            # Extract the fragment (everything after #)
            fragment = parsed.fragment
            if not fragment:
                raise SPARQLError("No query found in URL fragment (after #)")

            # URL decode the fragment
            decoded_query = unquote(fragment)

            return decoded_query
        except Exception as e:
            if isinstance(e, SPARQLError):
                raise
            raise SPARQLError(f"Failed to parse Wikidata URL: {str(e)}")

    @staticmethod
    def normalize_query(query: str) -> str:
        """
        Normalize a SPARQL query string.

        If the query appears to be a Wikidata Query Service URL,
        extract and decode it. Otherwise, return as-is.

        Args:
            query: SPARQL query string or Wikidata Query Service URL

        Returns:
            Normalized SPARQL query string
        """
        query = query.strip()

        # Check if it's a URL
        if query.startswith("http://") or query.startswith("https://"):
            return SPARQLQuery.parse_wikidata_query_url(query)

        return query

    def query(
        self,
        query: str,
        format: str = "json",
        raw: bool = False,
    ) -> Any:
        """
        Execute a SPARQL query.

        Args:
            query: SPARQL query string or Wikidata Query Service URL
            format: Response format ('json', 'xml', 'csv', 'tsv')
            raw: If False, parse JSON to Python dict; if True, return raw string

        Returns:
            Query results (dict if JSON and raw=False, else string)

        Raises:
            SPARQLError: If query fails

        Example:
            >>> executor = SPARQLQuery()
            >>> results = executor.query(
            ...     '''SELECT ?item ?itemLabel WHERE {
            ...         ?item wdt:P31 wd:Q7840353 .
            ...         SERVICE wikibase:label {
            ...             bd:serviceParam wikibase:language "en" .
            ...         }
            ...     }'''
            ... )
        """
        # Normalize query
        normalized_query = self.normalize_query(query)

        # Prepare request parameters
        params = {
            "query": normalized_query,
            "format": format,
        }

        try:
            response = self.session.get(
                self.endpoint,
                params=params,
                timeout=self.timeout,
            )
            response.raise_for_status()

            # Parse response
            if format == "json" and not raw:
                return response.json()
            else:
                return response.text

        except requests.Timeout:
            raise SPARQLError(f"Query timeout after {self.timeout} seconds")
        except requests.RequestException as e:
            raise SPARQLError(f"Query failed: {str(e)}")
        except ValueError as e:
            raise SPARQLError(f"Failed to parse response: {str(e)}")

    def to_dataframe(self, query: str) -> "pd.DataFrame":
        """
        Execute a SPARQL query and return results as a pandas DataFrame.

        Args:
            query: SPARQL query string or Wikidata Query Service URL

        Returns:
            pandas DataFrame with query results

        Raises:
            SPARQLError: If pandas is not installed or query fails

        Example:
            >>> executor = SPARQLQuery()
            >>> df = executor.to_dataframe(
            ...     'SELECT ?item ?itemLabel WHERE { ... }'
            ... )
            >>> print(df.head())
        """
        if not HAS_PANDAS:
            raise SPARQLError(
                "pandas is required for to_dataframe(). "
                "Install with: pip install pandas"
            )

        # Execute query
        results = self.query(query, format="json", raw=False)

        # Extract bindings
        bindings = results.get("results", {}).get("bindings", [])

        # Convert to DataFrame
        data = []
        for binding in bindings:
            row = {}
            for var, value_obj in binding.items():
                # Value objects have structure: {"value": "...", "type": "..."}
                row[var] = value_obj.get("value")
            data.append(row)

        return pd.DataFrame(data)

    def to_dict_list(self, query: str) -> list[dict[str, str]]:
        """
        Execute a SPARQL query and return results as a list of dicts.

        Each dict represents one result row, with variable names as keys
        and result values as values.

        Args:
            query: SPARQL query string or Wikidata Query Service URL

        Returns:
            List of dictionaries

        Example:
            >>> executor = SPARQLQuery()
            >>> results = executor.to_dict_list(
            ...     'SELECT ?item ?itemLabel WHERE { ... }'
            ... )
            >>> for row in results:
            ...     print(row)
        """
        results = self.query(query, format="json", raw=False)
        bindings = results.get("results", {}).get("bindings", [])

        data = []
        for binding in bindings:
            row = {}
            for var, value_obj in binding.items():
                row[var] = value_obj.get("value")
            data.append(row)

        return data

    def to_csv(self, query: str, filepath: Optional[str] = None) -> str:
        """
        Execute a SPARQL query and return results as CSV.

        Args:
            query: SPARQL query string or Wikidata Query Service URL
            filepath: Optional file path to save CSV results

        Returns:
            CSV string

        Example:
            >>> executor = SPARQLQuery()
            >>> csv_data = executor.to_csv(
            ...     'SELECT ?item ?itemLabel WHERE { ... }',
            ...     filepath="results.csv"
            ... )
        """
        csv_data = self.query(query, format="csv", raw=True)

        if filepath:
            with open(filepath, "w") as f:
                f.write(csv_data)

        return csv_data

__init__(endpoint=DEFAULT_WIKIDATA_ENDPOINT, user_agent=DEFAULT_USER_AGENT, timeout=30)

Initialize SPARQL query executor.

Parameters:

Name Type Description Default
endpoint str

SPARQL endpoint URL (default: Wikidata)

DEFAULT_WIKIDATA_ENDPOINT
user_agent str

User agent string for HTTP requests

DEFAULT_USER_AGENT
timeout int

Request timeout in seconds

30
Source code in gkc/sparql.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    endpoint: str = DEFAULT_WIKIDATA_ENDPOINT,
    user_agent: str = DEFAULT_USER_AGENT,
    timeout: int = 30,
):
    """
    Initialize SPARQL query executor.

    Args:
        endpoint: SPARQL endpoint URL (default: Wikidata)
        user_agent: User agent string for HTTP requests
        timeout: Request timeout in seconds
    """
    self.endpoint = endpoint
    self.user_agent = user_agent
    self.timeout = timeout
    self.session = requests.Session()
    self.session.headers.update({"User-Agent": user_agent})

normalize_query(query) staticmethod

Normalize a SPARQL query string.

If the query appears to be a Wikidata Query Service URL, extract and decode it. Otherwise, return as-is.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required

Returns:

Type Description
str

Normalized SPARQL query string

Source code in gkc/sparql.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@staticmethod
def normalize_query(query: str) -> str:
    """
    Normalize a SPARQL query string.

    If the query appears to be a Wikidata Query Service URL,
    extract and decode it. Otherwise, return as-is.

    Args:
        query: SPARQL query string or Wikidata Query Service URL

    Returns:
        Normalized SPARQL query string
    """
    query = query.strip()

    # Check if it's a URL
    if query.startswith("http://") or query.startswith("https://"):
        return SPARQLQuery.parse_wikidata_query_url(query)

    return query

parse_wikidata_query_url(url) staticmethod

Extract and decode SPARQL query from Wikidata Query Service URL.

The Wikidata Query Service URL format is: https://query.wikidata.org/#

Parameters:

Name Type Description Default
url str

Wikidata Query Service URL

required

Returns:

Type Description
str

Decoded SPARQL query string

Raises:

Type Description
SPARQLError

If URL is not a valid Wikidata Query Service URL

Example

url = "https://query.wikidata.org/#SELECT%20%3Fitem..." query = SPARQLQuery.parse_wikidata_query_url(url)

Source code in gkc/sparql.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@staticmethod
def parse_wikidata_query_url(url: str) -> str:
    """
    Extract and decode SPARQL query from Wikidata Query Service URL.

    The Wikidata Query Service URL format is:
    https://query.wikidata.org/#<URL_ENCODED_QUERY>

    Args:
        url: Wikidata Query Service URL

    Returns:
        Decoded SPARQL query string

    Raises:
        SPARQLError: If URL is not a valid Wikidata Query Service URL

    Example:
        >>> url = "https://query.wikidata.org/#SELECT%20%3Fitem..."
        >>> query = SPARQLQuery.parse_wikidata_query_url(url)
    """
    try:
        # Parse the URL
        parsed = urlparse(url)

        # Check if it's a Wikidata Query Service URL
        if "query.wikidata.org" not in parsed.netloc:
            raise SPARQLError(f"Not a Wikidata Query Service URL: {parsed.netloc}")

        # Extract the fragment (everything after #)
        fragment = parsed.fragment
        if not fragment:
            raise SPARQLError("No query found in URL fragment (after #)")

        # URL decode the fragment
        decoded_query = unquote(fragment)

        return decoded_query
    except Exception as e:
        if isinstance(e, SPARQLError):
            raise
        raise SPARQLError(f"Failed to parse Wikidata URL: {str(e)}")

query(query, format='json', raw=False)

Execute a SPARQL query.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required
format str

Response format ('json', 'xml', 'csv', 'tsv')

'json'
raw bool

If False, parse JSON to Python dict; if True, return raw string

False

Returns:

Type Description
Any

Query results (dict if JSON and raw=False, else string)

Raises:

Type Description
SPARQLError

If query fails

Example

executor = SPARQLQuery() results = executor.query( ... '''SELECT ?item ?itemLabel WHERE { ... ?item wdt:P31 wd:Q7840353 . ... SERVICE wikibase:label { ... bd:serviceParam wikibase:language "en" . ... } ... }''' ... )

Source code in gkc/sparql.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def query(
    self,
    query: str,
    format: str = "json",
    raw: bool = False,
) -> Any:
    """
    Execute a SPARQL query.

    Args:
        query: SPARQL query string or Wikidata Query Service URL
        format: Response format ('json', 'xml', 'csv', 'tsv')
        raw: If False, parse JSON to Python dict; if True, return raw string

    Returns:
        Query results (dict if JSON and raw=False, else string)

    Raises:
        SPARQLError: If query fails

    Example:
        >>> executor = SPARQLQuery()
        >>> results = executor.query(
        ...     '''SELECT ?item ?itemLabel WHERE {
        ...         ?item wdt:P31 wd:Q7840353 .
        ...         SERVICE wikibase:label {
        ...             bd:serviceParam wikibase:language "en" .
        ...         }
        ...     }'''
        ... )
    """
    # Normalize query
    normalized_query = self.normalize_query(query)

    # Prepare request parameters
    params = {
        "query": normalized_query,
        "format": format,
    }

    try:
        response = self.session.get(
            self.endpoint,
            params=params,
            timeout=self.timeout,
        )
        response.raise_for_status()

        # Parse response
        if format == "json" and not raw:
            return response.json()
        else:
            return response.text

    except requests.Timeout:
        raise SPARQLError(f"Query timeout after {self.timeout} seconds")
    except requests.RequestException as e:
        raise SPARQLError(f"Query failed: {str(e)}")
    except ValueError as e:
        raise SPARQLError(f"Failed to parse response: {str(e)}")

to_csv(query, filepath=None)

Execute a SPARQL query and return results as CSV.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required
filepath Optional[str]

Optional file path to save CSV results

None

Returns:

Type Description
str

CSV string

Example

executor = SPARQLQuery() csv_data = executor.to_csv( ... 'SELECT ?item ?itemLabel WHERE { ... }', ... filepath="results.csv" ... )

Source code in gkc/sparql.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def to_csv(self, query: str, filepath: Optional[str] = None) -> str:
    """
    Execute a SPARQL query and return results as CSV.

    Args:
        query: SPARQL query string or Wikidata Query Service URL
        filepath: Optional file path to save CSV results

    Returns:
        CSV string

    Example:
        >>> executor = SPARQLQuery()
        >>> csv_data = executor.to_csv(
        ...     'SELECT ?item ?itemLabel WHERE { ... }',
        ...     filepath="results.csv"
        ... )
    """
    csv_data = self.query(query, format="csv", raw=True)

    if filepath:
        with open(filepath, "w") as f:
            f.write(csv_data)

    return csv_data

to_dataframe(query)

Execute a SPARQL query and return results as a pandas DataFrame.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required

Returns:

Type Description
DataFrame

pandas DataFrame with query results

Raises:

Type Description
SPARQLError

If pandas is not installed or query fails

Example

executor = SPARQLQuery() df = executor.to_dataframe( ... 'SELECT ?item ?itemLabel WHERE { ... }' ... ) print(df.head())

Source code in gkc/sparql.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def to_dataframe(self, query: str) -> "pd.DataFrame":
    """
    Execute a SPARQL query and return results as a pandas DataFrame.

    Args:
        query: SPARQL query string or Wikidata Query Service URL

    Returns:
        pandas DataFrame with query results

    Raises:
        SPARQLError: If pandas is not installed or query fails

    Example:
        >>> executor = SPARQLQuery()
        >>> df = executor.to_dataframe(
        ...     'SELECT ?item ?itemLabel WHERE { ... }'
        ... )
        >>> print(df.head())
    """
    if not HAS_PANDAS:
        raise SPARQLError(
            "pandas is required for to_dataframe(). "
            "Install with: pip install pandas"
        )

    # Execute query
    results = self.query(query, format="json", raw=False)

    # Extract bindings
    bindings = results.get("results", {}).get("bindings", [])

    # Convert to DataFrame
    data = []
    for binding in bindings:
        row = {}
        for var, value_obj in binding.items():
            # Value objects have structure: {"value": "...", "type": "..."}
            row[var] = value_obj.get("value")
        data.append(row)

    return pd.DataFrame(data)

to_dict_list(query)

Execute a SPARQL query and return results as a list of dicts.

Each dict represents one result row, with variable names as keys and result values as values.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required

Returns:

Type Description
list[dict[str, str]]

List of dictionaries

Example

executor = SPARQLQuery() results = executor.to_dict_list( ... 'SELECT ?item ?itemLabel WHERE { ... }' ... ) for row in results: ... print(row)

Source code in gkc/sparql.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def to_dict_list(self, query: str) -> list[dict[str, str]]:
    """
    Execute a SPARQL query and return results as a list of dicts.

    Each dict represents one result row, with variable names as keys
    and result values as values.

    Args:
        query: SPARQL query string or Wikidata Query Service URL

    Returns:
        List of dictionaries

    Example:
        >>> executor = SPARQLQuery()
        >>> results = executor.to_dict_list(
        ...     'SELECT ?item ?itemLabel WHERE { ... }'
        ... )
        >>> for row in results:
        ...     print(row)
    """
    results = self.query(query, format="json", raw=False)
    bindings = results.get("results", {}).get("bindings", [])

    data = []
    for binding in bindings:
        row = {}
        for var, value_obj in binding.items():
            row[var] = value_obj.get("value")
        data.append(row)

    return data

ShExPropertyExtractor

Extracts property information from ShEx schema text.

Source code in gkc/mapping_builder.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class ShExPropertyExtractor:
    """Extracts property information from ShEx schema text."""

    # Pattern to match property references like wdt:P31, p:P571, etc.
    PROPERTY_PATTERN = re.compile(r"\b(wdt?|p|ps|pq|pr):P(\d+)\b")

    # Pattern to match shape definitions with comments
    SHAPE_PATTERN = re.compile(r"<(\w+)>\s*{([^}]+)}", re.MULTILINE | re.DOTALL)

    # Pattern to match inline comments
    COMMENT_PATTERN = re.compile(r"#\s*(.+?)(?:\n|$)")

    def __init__(self, schema_text: str):
        self.schema_text = schema_text
        self.properties = {}
        self.shape_comments = {}

    def extract(self) -> dict[str, dict]:
        """
        Extract all properties from ShEx schema with context.

        Returns:
            Dictionary mapping property IDs to their context information
        """
        self._extract_shapes()
        self._extract_properties()
        return self.properties

    def _extract_shapes(self):
        """Extract shape definitions and their comments."""
        for match in self.SHAPE_PATTERN.finditer(self.schema_text):
            shape_name = match.group(1)
            shape_body = match.group(2)
            self.shape_comments[shape_name] = self._extract_shape_properties(shape_body)

    def _extract_shape_properties(self, shape_body: str) -> dict:
        """Extract properties from a shape body with their comments."""
        properties = {}
        lines = shape_body.split("\n")

        for line in lines:
            # Look for property references
            prop_match = self.PROPERTY_PATTERN.search(line)
            if prop_match:
                prefix = prop_match.group(1)
                prop_num = prop_match.group(2)
                prop_id = f"P{prop_num}"

                # Extract comment if present
                comment_match = self.COMMENT_PATTERN.search(line)
                comment = comment_match.group(1).strip() if comment_match else ""

                # Determine cardinality
                cardinality = self._extract_cardinality(line)

                # Determine if it's a qualifier or reference
                context = self._determine_context(prefix)

                if prop_id not in properties:
                    properties[prop_id] = {
                        "property_id": prop_id,
                        "comment": comment,
                        "cardinality": cardinality,
                        "context": context,
                        "prefix": prefix,
                        "full_line": line.strip(),
                    }

        return properties

    def _extract_properties(self):
        """Combine all property information from shapes."""
        for shape_name, shape_props in self.shape_comments.items():
            for prop_id, prop_info in shape_props.items():
                if prop_id not in self.properties:
                    self.properties[prop_id] = prop_info
                    self.properties[prop_id]["shapes"] = [shape_name]
                else:
                    # Property appears in multiple shapes
                    if "shapes" not in self.properties[prop_id]:
                        self.properties[prop_id]["shapes"] = []
                    self.properties[prop_id]["shapes"].append(shape_name)

    def _extract_cardinality(self, line: str) -> dict:
        """Extract cardinality information from a property line."""
        cardinality = {"min": 1, "max": 1}  # Default: exactly one

        # Check for cardinality indicators
        if line.strip().endswith(";"):
            # Exactly one (required)
            cardinality = {"min": 1, "max": 1, "required": True}
        elif line.strip().endswith("?"):
            # Zero or one (optional)
            cardinality = {"min": 0, "max": 1, "required": False}
        elif line.strip().endswith("*"):
            # Zero or more
            cardinality = {"min": 0, "max": None, "required": False}
        elif line.strip().endswith("+"):
            # One or more
            cardinality = {"min": 1, "max": None, "required": True}

        return cardinality

    def _determine_context(self, prefix: str) -> str:
        """Determine the context of a property based on its prefix."""
        context_map = {
            "wdt": "direct",  # Direct property value
            "wd": "item",  # Item reference
            "p": "statement",  # Full statement
            "ps": "statement_value",  # Statement value
            "pq": "qualifier",  # Qualifier
            "pr": "reference",  # Reference
        }
        return context_map.get(prefix, "unknown")

extract()

Extract all properties from ShEx schema with context.

Returns:

Type Description
dict[str, dict]

Dictionary mapping property IDs to their context information

Source code in gkc/mapping_builder.py
57
58
59
60
61
62
63
64
65
66
def extract(self) -> dict[str, dict]:
    """
    Extract all properties from ShEx schema with context.

    Returns:
        Dictionary mapping property IDs to their context information
    """
    self._extract_shapes()
    self._extract_properties()
    return self.properties

ShExValidationError

Bases: Exception

Raised when ShEx validation encounters an error.

Source code in gkc/shex.py
21
22
23
24
class ShExValidationError(Exception):
    """Raised when ShEx validation encounters an error."""

    pass

ShExValidator

Validate RDF data against Shape Expression (ShEx) schemas.

This class provides a flexible interface for validating RDF data against ShEx schemas, supporting multiple input sources for both RDF and schemas.

Example

Validate a Wikidata item against an EntitySchema

validator = ShExValidator(qid='Q42', eid='E502') result = validator.validate() print(result.results)

Use local schema file

validator = ShExValidator( ... qid='Q42', ... schema_file='schema.shex' ... ) validator.validate()

Use RDF text directly

validator = ShExValidator( ... rdf_text=my_rdf_data, ... schema_text=my_schema ... ) validator.validate()

Source code in gkc/shex.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class ShExValidator:
    """
    Validate RDF data against Shape Expression (ShEx) schemas.

    This class provides a flexible interface for validating RDF data
    against ShEx schemas, supporting multiple input sources for both
    RDF and schemas.

    Example:
        >>> # Validate a Wikidata item against an EntitySchema
        >>> validator = ShExValidator(qid='Q42', eid='E502')
        >>> result = validator.validate()
        >>> print(result.results)

        >>> # Use local schema file
        >>> validator = ShExValidator(
        ...     qid='Q42',
        ...     schema_file='schema.shex'
        ... )
        >>> validator.validate()

        >>> # Use RDF text directly
        >>> validator = ShExValidator(
        ...     rdf_text=my_rdf_data,
        ...     schema_text=my_schema
        ... )
        >>> validator.validate()
    """

    def __init__(
        self,
        qid: Optional[str] = None,
        eid: Optional[str] = None,
        user_agent: Optional[str] = None,
        schema_text: Optional[str] = None,
        schema_file: Optional[str] = None,
        rdf_text: Optional[str] = None,
        rdf_file: Optional[str] = None,
    ):
        """
        Initialize the ShEx validator.

        Args:
            qid: Wikidata entity ID (e.g., 'Q42'). Optional if rdf_text or
                rdf_file provided.
            eid: EntitySchema ID (e.g., 'E502'). Optional if schema_text or
                schema_file provided.
            user_agent: Custom user agent for Wikidata requests.
            schema_text: ShExC schema as a string (alternative to eid).
            schema_file: Path to file containing ShExC schema (alternative to eid).
            rdf_text: RDF data as a string (alternative to qid).
            rdf_file: Path to file containing RDF data (alternative to qid).
        """
        self.qid = qid
        self.eid = eid
        self.user_agent = user_agent
        self.schema_text = schema_text
        self.schema_file = schema_file
        self.rdf_text = rdf_text
        self.rdf_file = rdf_file

        self._schema: Optional[str] = None
        self._rdf: Optional[str] = None
        self.results = None

    def load_schema(self) -> "ShExValidator":
        """
        Load the ShEx schema from configured source.

        Tries sources in order: schema_text, schema_file, eid (from Wikidata).

        Returns:
            Self for method chaining

        Raises:
            ShExValidationError: If no valid schema source or loading fails
        """
        try:
            if self.schema_text:
                self._schema = self.schema_text
            elif self.schema_file:
                schema_path = Path(self.schema_file)
                if not schema_path.exists():
                    raise ShExValidationError(
                        f"Schema file not found: {self.schema_file}"
                    )
                self._schema = schema_path.read_text(encoding="utf-8")
            elif self.eid:
                self._schema = fetch_entity_schema(self.eid, self.user_agent)
            else:
                raise ShExValidationError(
                    "No schema source provided. "
                    "Specify eid, schema_text, or schema_file."
                )
        except WikidataFetchError as e:
            raise ShExValidationError(f"Failed to load schema: {str(e)}") from e
        except OSError as e:
            raise ShExValidationError(f"Failed to read schema file: {str(e)}") from e

        return self

    def load_rdf(self) -> "ShExValidator":
        """
        Load RDF data from configured source.

        Tries sources in order: rdf_text, rdf_file, qid (from Wikidata).

        Returns:
            Self for method chaining

        Raises:
            ShExValidationError: If no valid RDF source or loading fails
        """
        try:
            if self.rdf_text:
                self._rdf = self.rdf_text
            elif self.rdf_file:
                rdf_path = Path(self.rdf_file)
                if not rdf_path.exists():
                    raise ShExValidationError(f"RDF file not found: {self.rdf_file}")
                self._rdf = rdf_path.read_text(encoding="utf-8")
            elif self.qid:
                self._rdf = fetch_entity_rdf(
                    self.qid, format="ttl", user_agent=self.user_agent
                )
            else:
                raise ShExValidationError(
                    "No RDF source provided. Specify qid, rdf_text, or rdf_file."
                )
        except WikidataFetchError as e:
            raise ShExValidationError(f"Failed to load RDF: {str(e)}") from e
        except OSError as e:
            raise ShExValidationError(f"Failed to read RDF file: {str(e)}") from e

        return self

    def evaluate(self) -> "ShExValidator":
        """
        Evaluate RDF data against the ShEx schema.

        Must call load_schema() and load_rdf() first, or use validate().

        Returns:
            Self with results populated

        Raises:
            ShExValidationError: If evaluation fails or data not loaded
        """
        if self._schema is None:
            raise ShExValidationError(
                "Schema not loaded. Call load_schema() first or use validate()."
            )
        if self._rdf is None:
            raise ShExValidationError(
                "RDF data not loaded. Call load_rdf() first or use validate()."
            )

        # Determine focus node
        focus = None
        if self.qid:
            focus = get_entity_uri(self.qid)

        try:
            self.results = ShExEvaluator(
                rdf=self._rdf, schema=self._schema, focus=focus
            ).evaluate()
        except Exception as e:
            raise ShExValidationError(f"ShEx evaluation failed: {str(e)}") from e

        return self

    def validate(self) -> "ShExValidator":
        """
        Convenience method: load schema, load RDF, and evaluate in one call.

        Returns:
            Self with results populated

        Example:
            >>> validator = ShExValidator(qid='Q42', eid='E502')
            >>> validator.validate()
            >>> if validator.results:
            ...     print("Validation passed!")
        """
        self.load_schema()
        self.load_rdf()
        self.evaluate()
        return self

    def is_valid(self) -> bool:
        """
        Check if validation passed.

        Returns:
            True if validation passed, False otherwise

        Raises:
            ShExValidationError: If validate() hasn't been called yet
        """
        if self.results is None:
            raise ShExValidationError("No validation results. Call validate() first.")

        # Handle mocked results (for testing)
        if isinstance(self.results, bool):
            return self.results

        # PyShEx returns results as a list of EvaluationResult objects
        # When validation succeeds, reason contains matching triples
        # When validation fails, reason contains error messages like
        # "Node: ... not in value set"
        # If no focus is specified, PyShEx tests all nodes;
        # we need at least one success
        if not self.results:
            return False

        # Check if at least one result succeeded (no error indicators)
        for result in self.results:
            reason = result.reason or ""
            # Common failure indicators in PyShEx error messages
            has_error = any(
                indicator in reason
                for indicator in [
                    "not in value set",
                    "does not match",
                    "Constraint violation",
                    "No matching",
                    "Failed to",
                ]
            )
            if not has_error:
                return True

        return False

    def __repr__(self) -> str:
        """String representation of validator."""
        parts = []
        if self.qid:
            parts.append(f"qid={self.qid!r}")
        if self.eid:
            parts.append(f"eid={self.eid!r}")
        if self.rdf_file:
            parts.append(f"rdf_file={self.rdf_file!r}")
        if self.schema_file:
            parts.append(f"schema_file={self.schema_file!r}")

        params = ", ".join(parts) if parts else ""
        return f"ShExValidator({params})"

__init__(qid=None, eid=None, user_agent=None, schema_text=None, schema_file=None, rdf_text=None, rdf_file=None)

Initialize the ShEx validator.

Parameters:

Name Type Description Default
qid Optional[str]

Wikidata entity ID (e.g., 'Q42'). Optional if rdf_text or rdf_file provided.

None
eid Optional[str]

EntitySchema ID (e.g., 'E502'). Optional if schema_text or schema_file provided.

None
user_agent Optional[str]

Custom user agent for Wikidata requests.

None
schema_text Optional[str]

ShExC schema as a string (alternative to eid).

None
schema_file Optional[str]

Path to file containing ShExC schema (alternative to eid).

None
rdf_text Optional[str]

RDF data as a string (alternative to qid).

None
rdf_file Optional[str]

Path to file containing RDF data (alternative to qid).

None
Source code in gkc/shex.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    qid: Optional[str] = None,
    eid: Optional[str] = None,
    user_agent: Optional[str] = None,
    schema_text: Optional[str] = None,
    schema_file: Optional[str] = None,
    rdf_text: Optional[str] = None,
    rdf_file: Optional[str] = None,
):
    """
    Initialize the ShEx validator.

    Args:
        qid: Wikidata entity ID (e.g., 'Q42'). Optional if rdf_text or
            rdf_file provided.
        eid: EntitySchema ID (e.g., 'E502'). Optional if schema_text or
            schema_file provided.
        user_agent: Custom user agent for Wikidata requests.
        schema_text: ShExC schema as a string (alternative to eid).
        schema_file: Path to file containing ShExC schema (alternative to eid).
        rdf_text: RDF data as a string (alternative to qid).
        rdf_file: Path to file containing RDF data (alternative to qid).
    """
    self.qid = qid
    self.eid = eid
    self.user_agent = user_agent
    self.schema_text = schema_text
    self.schema_file = schema_file
    self.rdf_text = rdf_text
    self.rdf_file = rdf_file

    self._schema: Optional[str] = None
    self._rdf: Optional[str] = None
    self.results = None

__repr__()

String representation of validator.

Source code in gkc/shex.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def __repr__(self) -> str:
    """String representation of validator."""
    parts = []
    if self.qid:
        parts.append(f"qid={self.qid!r}")
    if self.eid:
        parts.append(f"eid={self.eid!r}")
    if self.rdf_file:
        parts.append(f"rdf_file={self.rdf_file!r}")
    if self.schema_file:
        parts.append(f"schema_file={self.schema_file!r}")

    params = ", ".join(parts) if parts else ""
    return f"ShExValidator({params})"

evaluate()

Evaluate RDF data against the ShEx schema.

Must call load_schema() and load_rdf() first, or use validate().

Returns:

Type Description
ShExValidator

Self with results populated

Raises:

Type Description
ShExValidationError

If evaluation fails or data not loaded

Source code in gkc/shex.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def evaluate(self) -> "ShExValidator":
    """
    Evaluate RDF data against the ShEx schema.

    Must call load_schema() and load_rdf() first, or use validate().

    Returns:
        Self with results populated

    Raises:
        ShExValidationError: If evaluation fails or data not loaded
    """
    if self._schema is None:
        raise ShExValidationError(
            "Schema not loaded. Call load_schema() first or use validate()."
        )
    if self._rdf is None:
        raise ShExValidationError(
            "RDF data not loaded. Call load_rdf() first or use validate()."
        )

    # Determine focus node
    focus = None
    if self.qid:
        focus = get_entity_uri(self.qid)

    try:
        self.results = ShExEvaluator(
            rdf=self._rdf, schema=self._schema, focus=focus
        ).evaluate()
    except Exception as e:
        raise ShExValidationError(f"ShEx evaluation failed: {str(e)}") from e

    return self

is_valid()

Check if validation passed.

Returns:

Type Description
bool

True if validation passed, False otherwise

Raises:

Type Description
ShExValidationError

If validate() hasn't been called yet

Source code in gkc/shex.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def is_valid(self) -> bool:
    """
    Check if validation passed.

    Returns:
        True if validation passed, False otherwise

    Raises:
        ShExValidationError: If validate() hasn't been called yet
    """
    if self.results is None:
        raise ShExValidationError("No validation results. Call validate() first.")

    # Handle mocked results (for testing)
    if isinstance(self.results, bool):
        return self.results

    # PyShEx returns results as a list of EvaluationResult objects
    # When validation succeeds, reason contains matching triples
    # When validation fails, reason contains error messages like
    # "Node: ... not in value set"
    # If no focus is specified, PyShEx tests all nodes;
    # we need at least one success
    if not self.results:
        return False

    # Check if at least one result succeeded (no error indicators)
    for result in self.results:
        reason = result.reason or ""
        # Common failure indicators in PyShEx error messages
        has_error = any(
            indicator in reason
            for indicator in [
                "not in value set",
                "does not match",
                "Constraint violation",
                "No matching",
                "Failed to",
            ]
        )
        if not has_error:
            return True

    return False

load_rdf()

Load RDF data from configured source.

Tries sources in order: rdf_text, rdf_file, qid (from Wikidata).

Returns:

Type Description
ShExValidator

Self for method chaining

Raises:

Type Description
ShExValidationError

If no valid RDF source or loading fails

Source code in gkc/shex.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def load_rdf(self) -> "ShExValidator":
    """
    Load RDF data from configured source.

    Tries sources in order: rdf_text, rdf_file, qid (from Wikidata).

    Returns:
        Self for method chaining

    Raises:
        ShExValidationError: If no valid RDF source or loading fails
    """
    try:
        if self.rdf_text:
            self._rdf = self.rdf_text
        elif self.rdf_file:
            rdf_path = Path(self.rdf_file)
            if not rdf_path.exists():
                raise ShExValidationError(f"RDF file not found: {self.rdf_file}")
            self._rdf = rdf_path.read_text(encoding="utf-8")
        elif self.qid:
            self._rdf = fetch_entity_rdf(
                self.qid, format="ttl", user_agent=self.user_agent
            )
        else:
            raise ShExValidationError(
                "No RDF source provided. Specify qid, rdf_text, or rdf_file."
            )
    except WikidataFetchError as e:
        raise ShExValidationError(f"Failed to load RDF: {str(e)}") from e
    except OSError as e:
        raise ShExValidationError(f"Failed to read RDF file: {str(e)}") from e

    return self

load_schema()

Load the ShEx schema from configured source.

Tries sources in order: schema_text, schema_file, eid (from Wikidata).

Returns:

Type Description
ShExValidator

Self for method chaining

Raises:

Type Description
ShExValidationError

If no valid schema source or loading fails

Source code in gkc/shex.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def load_schema(self) -> "ShExValidator":
    """
    Load the ShEx schema from configured source.

    Tries sources in order: schema_text, schema_file, eid (from Wikidata).

    Returns:
        Self for method chaining

    Raises:
        ShExValidationError: If no valid schema source or loading fails
    """
    try:
        if self.schema_text:
            self._schema = self.schema_text
        elif self.schema_file:
            schema_path = Path(self.schema_file)
            if not schema_path.exists():
                raise ShExValidationError(
                    f"Schema file not found: {self.schema_file}"
                )
            self._schema = schema_path.read_text(encoding="utf-8")
        elif self.eid:
            self._schema = fetch_entity_schema(self.eid, self.user_agent)
        else:
            raise ShExValidationError(
                "No schema source provided. "
                "Specify eid, schema_text, or schema_file."
            )
    except WikidataFetchError as e:
        raise ShExValidationError(f"Failed to load schema: {str(e)}") from e
    except OSError as e:
        raise ShExValidationError(f"Failed to read schema file: {str(e)}") from e

    return self

validate()

Convenience method: load schema, load RDF, and evaluate in one call.

Returns:

Type Description
ShExValidator

Self with results populated

Example

validator = ShExValidator(qid='Q42', eid='E502') validator.validate() if validator.results: ... print("Validation passed!")

Source code in gkc/shex.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def validate(self) -> "ShExValidator":
    """
    Convenience method: load schema, load RDF, and evaluate in one call.

    Returns:
        Self with results populated

    Example:
        >>> validator = ShExValidator(qid='Q42', eid='E502')
        >>> validator.validate()
        >>> if validator.results:
        ...     print("Validation passed!")
    """
    self.load_schema()
    self.load_rdf()
    self.evaluate()
    return self

SitelinkValidator

Validates Wikipedia and Wikimedia project sitelinks.

Source code in gkc/sitelinks.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class SitelinkValidator:
    """Validates Wikipedia and Wikimedia project sitelinks."""

    # Map site codes to API endpoints
    SITE_API_ENDPOINTS = {
        # Wikipedia sites
        "enwiki": "https://en.wikipedia.org/w/api.php",
        "frwiki": "https://fr.wikipedia.org/w/api.php",
        "dewiki": "https://de.wikipedia.org/w/api.php",
        "eswiki": "https://es.wikipedia.org/w/api.php",
        "jawiki": "https://ja.wikipedia.org/w/api.php",
        "itwiki": "https://it.wikipedia.org/w/api.php",
        "nlwiki": "https://nl.wikipedia.org/w/api.php",
        "plwiki": "https://pl.wikipedia.org/w/api.php",
        "ptwiki": "https://pt.wikipedia.org/w/api.php",
        "ruwiki": "https://ru.wikipedia.org/w/api.php",
        "zhwiki": "https://zh.wikipedia.org/w/api.php",
        # Wikimedia Commons
        "commonswiki": "https://commons.wikimedia.org/w/api.php",
        # Wikispecies
        "specieswiki": "https://species.wikimedia.org/w/api.php",
        # Add more as needed - pattern: {lang}wiki, {lang}wikisource, etc.
    }

    def __init__(self, user_agent: str = DEFAULT_USER_AGENT, timeout: int = 10):
        """
        Initialize the sitelink validator.

        Args:
            user_agent: User agent string for API requests
            timeout: Timeout in seconds for API requests
        """
        self.user_agent = user_agent
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({"User-Agent": user_agent})

    def _get_api_endpoint(self, site_code: str) -> Optional[str]:
        """
        Get the MediaWiki API endpoint for a site code.

        Args:
            site_code: Site code like 'enwiki', 'frwiki', 'commonswiki'

        Returns:
            API endpoint URL or None if not found
        """
        # Check known sites
        if site_code in self.SITE_API_ENDPOINTS:
            return self.SITE_API_ENDPOINTS[site_code]

        # Try to construct URL for Wikipedia sites
        if site_code.endswith("wiki") and len(site_code) > 4:
            lang_code = site_code[:-4]
            return f"https://{lang_code}.wikipedia.org/w/api.php"

        # Try for other Wikimedia projects
        if site_code.endswith("wikisource"):
            lang_code = site_code[:-10]
            return f"https://{lang_code}.wikisource.org/w/api.php"
        elif site_code.endswith("wikivoyage"):
            lang_code = site_code[:-10]
            return f"https://{lang_code}.wikivoyage.org/w/api.php"
        elif site_code.endswith("wiktionary"):
            lang_code = site_code[:-10]
            return f"https://{lang_code}.wiktionary.org/w/api.php"

        return None

    def check_page_exists(
        self, title: str, site_code: str, allow_redirects: bool = False
    ) -> tuple[bool, Optional[str]]:
        """
        Check if a Wikipedia/Wikimedia page exists and optionally check for redirects.

        Args:
            title: Page title to check
            site_code: Site code (e.g., 'enwiki', 'commonswiki')
            allow_redirects: If False, return False for redirect pages

        Returns:
            Tuple of (exists: bool, message: Optional[str])
            - (True, None): Page exists and is valid
            - (False, reason): Page doesn't exist or is invalid, with reason
        """
        if not title or not title.strip():
            return (False, "Empty title")

        # Get API endpoint
        api_url = self._get_api_endpoint(site_code)
        if not api_url:
            return (False, f"Unknown site code: {site_code}")

        # Query the MediaWiki API
        params = {
            "action": "query",
            "titles": title.strip(),
            "format": "json",
            "redirects": "" if not allow_redirects else None,
        }

        try:
            response = self.session.get(api_url, params=params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            # Check for redirects
            if not allow_redirects and "redirects" in data.get("query", {}):
                redirect_to = data["query"]["redirects"][0].get("to", "")
                return (False, f"Page is a redirect to: {redirect_to}")

            # Check if page exists
            pages = data.get("query", {}).get("pages", {})
            for page_id, page_info in pages.items():
                if int(page_id) > 0:
                    # Page exists (positive page ID)
                    return (True, None)
                else:
                    # Page doesn't exist (negative page ID)
                    return (False, "Page does not exist")

            return (False, "No pages returned from API")

        except requests.Timeout:
            return (False, f"Timeout checking {site_code}")
        except requests.RequestException as e:
            return (False, f"Request error: {str(e)}")
        except (KeyError, ValueError, TypeError) as e:
            return (False, f"Error parsing response: {str(e)}")

    def validate_sitelinks(
        self, sitelinks: dict[str, dict], delay_between_checks: float = 0.1
    ) -> dict[str, tuple[bool, Optional[str]]]:
        """
        Validate multiple sitelinks at once.

        Args:
            sitelinks: Dictionary of sitelinks from transform_to_wikidata()
                Format: {"enwiki": {"site": "enwiki", "title": "...",
                         "badges": []}}
            delay_between_checks: Delay in seconds between API requests
                (rate limiting)

        Returns:
            Dictionary mapping site codes to (valid: bool, message: Optional[str])

        Example:
            >>> validator = SitelinkValidator()
            >>> sitelinks = {
            ...     "enwiki": {"site": "enwiki", "title": "Example", "badges": []},
            ...     "frwiki": {"site": "frwiki", "title": "Exemple", "badges": []}
            ... }
            >>> results = validator.validate_sitelinks(sitelinks)
            >>> results
            {
                "enwiki": (True, None),
                "frwiki": (False, "Page does not exist")
            }
        """
        results = {}

        for site_code, sitelink_data in sitelinks.items():
            title = sitelink_data.get("title")
            if not title:
                results[site_code] = (False, "No title provided")
                continue

            # Check if page exists
            exists, message = self.check_page_exists(title, site_code)
            results[site_code] = (exists, message)

            # Rate limiting
            if delay_between_checks > 0:
                sleep(delay_between_checks)

        return results

    def filter_valid_sitelinks(
        self, sitelinks: dict[str, dict], verbose: bool = False
    ) -> dict[str, dict]:
        """
        Filter out invalid sitelinks, returning only valid ones.

        Args:
            sitelinks: Dictionary of sitelinks to validate
            verbose: If True, print validation results

        Returns:
            Filtered dictionary containing only valid sitelinks
        """
        validation_results = self.validate_sitelinks(sitelinks)
        valid_sitelinks = {}

        for site_code, sitelink_data in sitelinks.items():
            is_valid, message = validation_results.get(
                site_code, (False, "Not checked")
            )

            if verbose:
                status = "✓" if is_valid else "✗"
                title = sitelink_data.get("title", "")
                print(
                    f"{status} {site_code}: {title} - {message if message else 'valid'}"
                )

            if is_valid:
                valid_sitelinks[site_code] = sitelink_data

        return valid_sitelinks

__init__(user_agent=DEFAULT_USER_AGENT, timeout=10)

Initialize the sitelink validator.

Parameters:

Name Type Description Default
user_agent str

User agent string for API requests

DEFAULT_USER_AGENT
timeout int

Timeout in seconds for API requests

10
Source code in gkc/sitelinks.py
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, user_agent: str = DEFAULT_USER_AGENT, timeout: int = 10):
    """
    Initialize the sitelink validator.

    Args:
        user_agent: User agent string for API requests
        timeout: Timeout in seconds for API requests
    """
    self.user_agent = user_agent
    self.timeout = timeout
    self.session = requests.Session()
    self.session.headers.update({"User-Agent": user_agent})

check_page_exists(title, site_code, allow_redirects=False)

Check if a Wikipedia/Wikimedia page exists and optionally check for redirects.

Parameters:

Name Type Description Default
title str

Page title to check

required
site_code str

Site code (e.g., 'enwiki', 'commonswiki')

required
allow_redirects bool

If False, return False for redirect pages

False

Returns:

Type Description
bool

Tuple of (exists: bool, message: Optional[str])

Optional[str]
  • (True, None): Page exists and is valid
tuple[bool, Optional[str]]
  • (False, reason): Page doesn't exist or is invalid, with reason
Source code in gkc/sitelinks.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def check_page_exists(
    self, title: str, site_code: str, allow_redirects: bool = False
) -> tuple[bool, Optional[str]]:
    """
    Check if a Wikipedia/Wikimedia page exists and optionally check for redirects.

    Args:
        title: Page title to check
        site_code: Site code (e.g., 'enwiki', 'commonswiki')
        allow_redirects: If False, return False for redirect pages

    Returns:
        Tuple of (exists: bool, message: Optional[str])
        - (True, None): Page exists and is valid
        - (False, reason): Page doesn't exist or is invalid, with reason
    """
    if not title or not title.strip():
        return (False, "Empty title")

    # Get API endpoint
    api_url = self._get_api_endpoint(site_code)
    if not api_url:
        return (False, f"Unknown site code: {site_code}")

    # Query the MediaWiki API
    params = {
        "action": "query",
        "titles": title.strip(),
        "format": "json",
        "redirects": "" if not allow_redirects else None,
    }

    try:
        response = self.session.get(api_url, params=params, timeout=self.timeout)
        response.raise_for_status()
        data = response.json()

        # Check for redirects
        if not allow_redirects and "redirects" in data.get("query", {}):
            redirect_to = data["query"]["redirects"][0].get("to", "")
            return (False, f"Page is a redirect to: {redirect_to}")

        # Check if page exists
        pages = data.get("query", {}).get("pages", {})
        for page_id, page_info in pages.items():
            if int(page_id) > 0:
                # Page exists (positive page ID)
                return (True, None)
            else:
                # Page doesn't exist (negative page ID)
                return (False, "Page does not exist")

        return (False, "No pages returned from API")

    except requests.Timeout:
        return (False, f"Timeout checking {site_code}")
    except requests.RequestException as e:
        return (False, f"Request error: {str(e)}")
    except (KeyError, ValueError, TypeError) as e:
        return (False, f"Error parsing response: {str(e)}")

Filter out invalid sitelinks, returning only valid ones.

Parameters:

Name Type Description Default
sitelinks dict[str, dict]

Dictionary of sitelinks to validate

required
verbose bool

If True, print validation results

False

Returns:

Type Description
dict[str, dict]

Filtered dictionary containing only valid sitelinks

Source code in gkc/sitelinks.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def filter_valid_sitelinks(
    self, sitelinks: dict[str, dict], verbose: bool = False
) -> dict[str, dict]:
    """
    Filter out invalid sitelinks, returning only valid ones.

    Args:
        sitelinks: Dictionary of sitelinks to validate
        verbose: If True, print validation results

    Returns:
        Filtered dictionary containing only valid sitelinks
    """
    validation_results = self.validate_sitelinks(sitelinks)
    valid_sitelinks = {}

    for site_code, sitelink_data in sitelinks.items():
        is_valid, message = validation_results.get(
            site_code, (False, "Not checked")
        )

        if verbose:
            status = "✓" if is_valid else "✗"
            title = sitelink_data.get("title", "")
            print(
                f"{status} {site_code}: {title} - {message if message else 'valid'}"
            )

        if is_valid:
            valid_sitelinks[site_code] = sitelink_data

    return valid_sitelinks

Validate multiple sitelinks at once.

Parameters:

Name Type Description Default
sitelinks dict[str, dict]

Dictionary of sitelinks from transform_to_wikidata() Format: {"enwiki": {"site": "enwiki", "title": "...", "badges": []}}

required
delay_between_checks float

Delay in seconds between API requests (rate limiting)

0.1

Returns:

Type Description
dict[str, tuple[bool, Optional[str]]]

Dictionary mapping site codes to (valid: bool, message: Optional[str])

Example

validator = SitelinkValidator() sitelinks = { ... "enwiki": {"site": "enwiki", "title": "Example", "badges": []}, ... "frwiki": {"site": "frwiki", "title": "Exemple", "badges": []} ... } results = validator.validate_sitelinks(sitelinks) results { "enwiki": (True, None), "frwiki": (False, "Page does not exist") }

Source code in gkc/sitelinks.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def validate_sitelinks(
    self, sitelinks: dict[str, dict], delay_between_checks: float = 0.1
) -> dict[str, tuple[bool, Optional[str]]]:
    """
    Validate multiple sitelinks at once.

    Args:
        sitelinks: Dictionary of sitelinks from transform_to_wikidata()
            Format: {"enwiki": {"site": "enwiki", "title": "...",
                     "badges": []}}
        delay_between_checks: Delay in seconds between API requests
            (rate limiting)

    Returns:
        Dictionary mapping site codes to (valid: bool, message: Optional[str])

    Example:
        >>> validator = SitelinkValidator()
        >>> sitelinks = {
        ...     "enwiki": {"site": "enwiki", "title": "Example", "badges": []},
        ...     "frwiki": {"site": "frwiki", "title": "Exemple", "badges": []}
        ... }
        >>> results = validator.validate_sitelinks(sitelinks)
        >>> results
        {
            "enwiki": (True, None),
            "frwiki": (False, "Page does not exist")
        }
    """
    results = {}

    for site_code, sitelink_data in sitelinks.items():
        title = sitelink_data.get("title")
        if not title:
            results[site_code] = (False, "No title provided")
            continue

        # Check if page exists
        exists, message = self.check_page_exists(title, site_code)
        results[site_code] = (exists, message)

        # Rate limiting
        if delay_between_checks > 0:
            sleep(delay_between_checks)

    return results

WikidataFetchError

Bases: Exception

Raised when fetching data from Wikidata fails.

Source code in gkc/wd.py
15
16
17
18
class WikidataFetchError(Exception):
    """Raised when fetching data from Wikidata fails."""

    pass

WikidataPropertyFetcher

Fetches property metadata from Wikidata API.

Source code in gkc/mapping_builder.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class WikidataPropertyFetcher:
    """Fetches property metadata from Wikidata API."""

    def __init__(self, user_agent: Optional[str] = None):
        self.user_agent = user_agent or DEFAULT_USER_AGENT
        self.api_url = "https://www.wikidata.org/w/api.php"
        self._cache = {}

    def fetch_properties(self, property_ids: list[str]) -> dict[str, PropertyInfo]:
        """
        Fetch metadata for multiple properties from Wikidata.

        Args:
            property_ids: List of property IDs (e.g., ['P31', 'P571'])

        Returns:
            Dictionary mapping property IDs to PropertyInfo objects
        """
        # Filter out cached properties
        uncached_ids = [pid for pid in property_ids if pid not in self._cache]

        if uncached_ids:
            # Fetch in batches of 50 (API limit)
            for i in range(0, len(uncached_ids), 50):
                batch = uncached_ids[i : i + 50]
                self._fetch_batch(batch)

        return {pid: self._cache[pid] for pid in property_ids if pid in self._cache}

    def _fetch_batch(self, property_ids: list[str]):
        """Fetch a batch of properties from the API."""
        params = {
            "action": "wbgetentities",
            "ids": "|".join(property_ids),
            "props": "labels|descriptions|aliases|datatype",
            "format": "json",
        }

        headers = {"User-Agent": self.user_agent}

        try:
            response = requests.get(self.api_url, params=params, headers=headers)
            response.raise_for_status()
            data = response.json()

            if "entities" in data:
                for prop_id, prop_data in data["entities"].items():
                    if "missing" not in prop_data:
                        self._cache[prop_id] = PropertyInfo(prop_id, prop_data)
        except requests.RequestException as e:
            print(f"Warning: Failed to fetch properties: {e}")

fetch_properties(property_ids)

Fetch metadata for multiple properties from Wikidata.

Parameters:

Name Type Description Default
property_ids list[str]

List of property IDs (e.g., ['P31', 'P571'])

required

Returns:

Type Description
dict[str, PropertyInfo]

Dictionary mapping property IDs to PropertyInfo objects

Source code in gkc/mapping_builder.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def fetch_properties(self, property_ids: list[str]) -> dict[str, PropertyInfo]:
    """
    Fetch metadata for multiple properties from Wikidata.

    Args:
        property_ids: List of property IDs (e.g., ['P31', 'P571'])

    Returns:
        Dictionary mapping property IDs to PropertyInfo objects
    """
    # Filter out cached properties
    uncached_ids = [pid for pid in property_ids if pid not in self._cache]

    if uncached_ids:
        # Fetch in batches of 50 (API limit)
        for i in range(0, len(uncached_ids), 50):
            batch = uncached_ids[i : i + 50]
            self._fetch_batch(batch)

    return {pid: self._cache[pid] for pid in property_ids if pid in self._cache}

WikiverseAuth

Bases: AuthBase

Authentication for Wikimedia projects (Wikidata, Wikipedia, Wikimedia Commons).

Designed for bot accounts using bot passwords. The same credentials work across all Wikimedia projects due to Single User Login (SUL).

Supports both default Wikimedia instances and custom MediaWiki installations.

Credentials can be provided in three ways (in order of precedence): 1. Direct parameters 2. Environment variables (WIKIVERSE_USERNAME, WIKIVERSE_PASSWORD, WIKIVERSE_API_URL) 3. Interactive prompt

Example

Authenticate to Wikidata (default)

auth = WikiverseAuth() auth.login()

Direct parameters (bot password format)

auth = WikiverseAuth( ... username="MyUsername@MyBot", ... password="abc123def456ghi789", ... api_url="https://www.wikidata.org/w/api.php" ... ) auth.login()

Custom MediaWiki instance

auth = WikiverseAuth( ... username="MyUsername@MyBot", ... password="abc123def456ghi789", ... api_url="https://my-wiki.example.com/w/api.php" ... ) auth.login()

Use authenticated session for API requests

response = auth.session.get(auth.api_url, params={ ... "action": "query", ... "format": "json" ... })

Source code in gkc/auth.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class WikiverseAuth(AuthBase):
    """
    Authentication for Wikimedia projects (Wikidata, Wikipedia, Wikimedia Commons).

    Designed for bot accounts using bot passwords. The same credentials work
    across all Wikimedia projects due to Single User Login (SUL).

    Supports both default Wikimedia instances and custom MediaWiki installations.

    Credentials can be provided in three ways (in order of precedence):
    1. Direct parameters
    2. Environment variables (WIKIVERSE_USERNAME, WIKIVERSE_PASSWORD, WIKIVERSE_API_URL)
    3. Interactive prompt

    Example:
        >>> # Authenticate to Wikidata (default)
        >>> auth = WikiverseAuth()
        >>> auth.login()

        >>> # Direct parameters (bot password format)
        >>> auth = WikiverseAuth(
        ...     username="MyUsername@MyBot",
        ...     password="abc123def456ghi789",
        ...     api_url="https://www.wikidata.org/w/api.php"
        ... )
        >>> auth.login()

        >>> # Custom MediaWiki instance
        >>> auth = WikiverseAuth(
        ...     username="MyUsername@MyBot",
        ...     password="abc123def456ghi789",
        ...     api_url="https://my-wiki.example.com/w/api.php"
        ... )
        >>> auth.login()

        >>> # Use authenticated session for API requests
        >>> response = auth.session.get(auth.api_url, params={
        ...     "action": "query",
        ...     "format": "json"
        ... })
    """

    def __init__(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
        api_url: Optional[str] = None,
        interactive: bool = False,
    ):
        """
        Initialize Wikiverse authentication for bot accounts.

        Args:
            username: Bot password username in format "Username@BotName".
                If not provided, reads from WIKIVERSE_USERNAME
                environment variable.
            password: Bot password. If not provided, reads from
                WIKIVERSE_PASSWORD environment variable.
            api_url: MediaWiki API endpoint URL. If not provided, reads from
                    WIKIVERSE_API_URL environment variable, or defaults to Wikidata.
                    Can also use shortcuts: "wikidata", "wikipedia", "commons"
            interactive: If True and credentials are not found, prompt user for input.
        """
        # Try provided parameters first
        username = username or os.environ.get("WIKIVERSE_USERNAME")
        password = password or os.environ.get("WIKIVERSE_PASSWORD")
        api_url = api_url or os.environ.get("WIKIVERSE_API_URL")

        # If credentials still not available and interactive mode is requested
        if interactive and not (username and password):
            print("Bot password credentials not found in environment.")
            username = input(
                "Enter Wikiverse username (format: Username@BotName): "
            ).strip()
            password = getpass.getpass("Enter Wikiverse password: ").strip()
            if not api_url:
                api_url_input = input(
                    "Enter API URL (or 'wikidata', 'wikipedia', 'commons') "
                    "[default: wikidata]: "
                ).strip()
                api_url = api_url_input if api_url_input else "wikidata"

        super().__init__(username, password)

        # Resolve API URL shortcuts to full URLs
        if api_url and api_url.lower() in DEFAULT_WIKIMEDIA_APIS:
            self.api_url = DEFAULT_WIKIMEDIA_APIS[api_url.lower()]
        elif api_url:
            self.api_url = api_url
        else:
            # Default to Wikidata
            self.api_url = DEFAULT_WIKIMEDIA_APIS["wikidata"]

        # Initialize session for authenticated requests
        self.session = requests.Session()
        self.session.headers.update(
            {"User-Agent": "GKC-Python-Client/0.1 (https://github.com/skybristol/gkc)"}
        )
        self._logged_in = False

    def login(self) -> bool:
        """
        Perform login to MediaWiki API using bot password credentials.

        Returns:
            True if login successful, False otherwise.

        Raises:
            AuthenticationError: If login fails with detailed error message.

        Example:
            >>> auth = WikiverseAuth(username="User@Bot", password="secret")
            >>> if auth.login():
            ...     print("Successfully logged in!")
        """
        if not self.is_authenticated():
            raise AuthenticationError(
                "Cannot login: credentials not provided. "
                "Please provide username and password."
            )

        try:
            # Step 1: Get login token
            token_params = {
                "action": "query",
                "meta": "tokens",
                "type": "login",
                "format": "json",
            }
            token_response = self.session.get(self.api_url, params=token_params)
            token_response.raise_for_status()
            token_data = token_response.json()

            if "query" not in token_data or "tokens" not in token_data["query"]:
                raise AuthenticationError(
                    f"Failed to get login token from {self.api_url}. "
                    f"Response: {token_data}"
                )

            login_token = token_data["query"]["tokens"]["logintoken"]

            # Step 2: Perform login with credentials and token
            login_params = {
                "action": "login",
                "lgname": self.username,
                "lgpassword": self.password,
                "lgtoken": login_token,
                "format": "json",
            }
            login_response = self.session.post(self.api_url, data=login_params)
            login_response.raise_for_status()
            login_data = login_response.json()

            # Check login result
            if "login" not in login_data:
                raise AuthenticationError(
                    f"Unexpected login response from {self.api_url}. "
                    f"Response: {login_data}"
                )

            result = login_data["login"]["result"]

            if result == "Success":
                self._logged_in = True
                return True
            else:
                # Provide detailed error message
                reason = login_data["login"].get("reason", "Unknown reason")
                raise AuthenticationError(
                    f"Login failed with result '{result}'. Reason: {reason}. "
                    f"Check your bot password credentials and permissions."
                )

        except requests.RequestException as e:
            raise AuthenticationError(
                f"Network error during login to {self.api_url}: {str(e)}"
            )

    def is_logged_in(self) -> bool:
        """
        Check if currently logged in to MediaWiki API.

        Returns:
            True if logged in, False otherwise.
        """
        return self._logged_in

    def logout(self) -> None:
        """
        Logout from MediaWiki API and clear session.

        Example:
            >>> auth = WikiverseAuth(username="User@Bot", password="secret")
            >>> auth.login()
            >>> # ... do some work ...
            >>> auth.logout()
        """
        if self._logged_in:
            try:
                # Get CSRF token for logout
                token_params = {
                    "action": "query",
                    "meta": "tokens",
                    "type": "csrf",
                    "format": "json",
                }
                token_response = self.session.get(self.api_url, params=token_params)
                token_data = token_response.json()
                csrf_token = token_data["query"]["tokens"]["csrftoken"]

                # Perform logout
                logout_params = {
                    "action": "logout",
                    "token": csrf_token,
                    "format": "json",
                }
                self.session.post(self.api_url, data=logout_params)
            except Exception:
                # Ignore logout errors, just clear session
                pass
            finally:
                self._logged_in = False
                self.session.cookies.clear()

    def get_csrf_token(self) -> str:
        """
        Get a CSRF token for making edits.

        Returns:
            CSRF token string.

        Raises:
            AuthenticationError: If not logged in or token retrieval fails.

        Example:
            >>> auth = WikiverseAuth(username="User@Bot", password="secret")
            >>> auth.login()
            >>> token = auth.get_csrf_token()
            >>> # Use token for edits
        """
        if not self.is_logged_in():
            raise AuthenticationError(
                "Not logged in. Call login() first before getting CSRF token."
            )

        try:
            token_params = {
                "action": "query",
                "meta": "tokens",
                "type": "csrf",
                "format": "json",
            }
            response = self.session.get(self.api_url, params=token_params)
            response.raise_for_status()
            data = response.json()

            if "query" in data and "tokens" in data["query"]:
                csrf_token: str = data["query"]["tokens"]["csrftoken"]
                return csrf_token
            else:
                raise AuthenticationError(f"Failed to get CSRF token. Response: {data}")

        except requests.RequestException as e:
            raise AuthenticationError(f"Network error getting CSRF token: {str(e)}")

    def __repr__(self) -> str:
        status = (
            "logged in"
            if self._logged_in
            else ("authenticated" if self.is_authenticated() else "not authenticated")
        )
        return (
            f"WikiverseAuth(username={self.username!r}, "
            f"api_url={self.api_url!r}, {status})"
        )

    def get_bot_name(self) -> Optional[str]:
        """
        Extract bot name from username.

        Returns:
            Bot name if username is in bot password format, None otherwise.

        Example:
            >>> auth = WikiverseAuth(username="Alice@MyBot")
            >>> auth.get_bot_name()
            'MyBot'
        """
        if self.username and "@" in self.username:
            return self.username.split("@", 1)[1]
        return None

    def get_account_name(self) -> Optional[str]:
        """
        Extract account name from username.

        Returns:
            Account name if username is in bot password format, None otherwise.

        Example:
            >>> auth = WikiverseAuth(username="Alice@MyBot")
            >>> auth.get_account_name()
            'Alice'
        """
        if self.username and "@" in self.username:
            return self.username.split("@", 1)[0]
        return None

__init__(username=None, password=None, api_url=None, interactive=False)

Initialize Wikiverse authentication for bot accounts.

Parameters:

Name Type Description Default
username Optional[str]

Bot password username in format "Username@BotName". If not provided, reads from WIKIVERSE_USERNAME environment variable.

None
password Optional[str]

Bot password. If not provided, reads from WIKIVERSE_PASSWORD environment variable.

None
api_url Optional[str]

MediaWiki API endpoint URL. If not provided, reads from WIKIVERSE_API_URL environment variable, or defaults to Wikidata. Can also use shortcuts: "wikidata", "wikipedia", "commons"

None
interactive bool

If True and credentials are not found, prompt user for input.

False
Source code in gkc/auth.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(
    self,
    username: Optional[str] = None,
    password: Optional[str] = None,
    api_url: Optional[str] = None,
    interactive: bool = False,
):
    """
    Initialize Wikiverse authentication for bot accounts.

    Args:
        username: Bot password username in format "Username@BotName".
            If not provided, reads from WIKIVERSE_USERNAME
            environment variable.
        password: Bot password. If not provided, reads from
            WIKIVERSE_PASSWORD environment variable.
        api_url: MediaWiki API endpoint URL. If not provided, reads from
                WIKIVERSE_API_URL environment variable, or defaults to Wikidata.
                Can also use shortcuts: "wikidata", "wikipedia", "commons"
        interactive: If True and credentials are not found, prompt user for input.
    """
    # Try provided parameters first
    username = username or os.environ.get("WIKIVERSE_USERNAME")
    password = password or os.environ.get("WIKIVERSE_PASSWORD")
    api_url = api_url or os.environ.get("WIKIVERSE_API_URL")

    # If credentials still not available and interactive mode is requested
    if interactive and not (username and password):
        print("Bot password credentials not found in environment.")
        username = input(
            "Enter Wikiverse username (format: Username@BotName): "
        ).strip()
        password = getpass.getpass("Enter Wikiverse password: ").strip()
        if not api_url:
            api_url_input = input(
                "Enter API URL (or 'wikidata', 'wikipedia', 'commons') "
                "[default: wikidata]: "
            ).strip()
            api_url = api_url_input if api_url_input else "wikidata"

    super().__init__(username, password)

    # Resolve API URL shortcuts to full URLs
    if api_url and api_url.lower() in DEFAULT_WIKIMEDIA_APIS:
        self.api_url = DEFAULT_WIKIMEDIA_APIS[api_url.lower()]
    elif api_url:
        self.api_url = api_url
    else:
        # Default to Wikidata
        self.api_url = DEFAULT_WIKIMEDIA_APIS["wikidata"]

    # Initialize session for authenticated requests
    self.session = requests.Session()
    self.session.headers.update(
        {"User-Agent": "GKC-Python-Client/0.1 (https://github.com/skybristol/gkc)"}
    )
    self._logged_in = False

get_account_name()

Extract account name from username.

Returns:

Type Description
Optional[str]

Account name if username is in bot password format, None otherwise.

Example

auth = WikiverseAuth(username="Alice@MyBot") auth.get_account_name() 'Alice'

Source code in gkc/auth.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def get_account_name(self) -> Optional[str]:
    """
    Extract account name from username.

    Returns:
        Account name if username is in bot password format, None otherwise.

    Example:
        >>> auth = WikiverseAuth(username="Alice@MyBot")
        >>> auth.get_account_name()
        'Alice'
    """
    if self.username and "@" in self.username:
        return self.username.split("@", 1)[0]
    return None

get_bot_name()

Extract bot name from username.

Returns:

Type Description
Optional[str]

Bot name if username is in bot password format, None otherwise.

Example

auth = WikiverseAuth(username="Alice@MyBot") auth.get_bot_name() 'MyBot'

Source code in gkc/auth.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def get_bot_name(self) -> Optional[str]:
    """
    Extract bot name from username.

    Returns:
        Bot name if username is in bot password format, None otherwise.

    Example:
        >>> auth = WikiverseAuth(username="Alice@MyBot")
        >>> auth.get_bot_name()
        'MyBot'
    """
    if self.username and "@" in self.username:
        return self.username.split("@", 1)[1]
    return None

get_csrf_token()

Get a CSRF token for making edits.

Returns:

Type Description
str

CSRF token string.

Raises:

Type Description
AuthenticationError

If not logged in or token retrieval fails.

Example

auth = WikiverseAuth(username="User@Bot", password="secret") auth.login() token = auth.get_csrf_token()

Use token for edits
Source code in gkc/auth.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get_csrf_token(self) -> str:
    """
    Get a CSRF token for making edits.

    Returns:
        CSRF token string.

    Raises:
        AuthenticationError: If not logged in or token retrieval fails.

    Example:
        >>> auth = WikiverseAuth(username="User@Bot", password="secret")
        >>> auth.login()
        >>> token = auth.get_csrf_token()
        >>> # Use token for edits
    """
    if not self.is_logged_in():
        raise AuthenticationError(
            "Not logged in. Call login() first before getting CSRF token."
        )

    try:
        token_params = {
            "action": "query",
            "meta": "tokens",
            "type": "csrf",
            "format": "json",
        }
        response = self.session.get(self.api_url, params=token_params)
        response.raise_for_status()
        data = response.json()

        if "query" in data and "tokens" in data["query"]:
            csrf_token: str = data["query"]["tokens"]["csrftoken"]
            return csrf_token
        else:
            raise AuthenticationError(f"Failed to get CSRF token. Response: {data}")

    except requests.RequestException as e:
        raise AuthenticationError(f"Network error getting CSRF token: {str(e)}")

is_logged_in()

Check if currently logged in to MediaWiki API.

Returns:

Type Description
bool

True if logged in, False otherwise.

Source code in gkc/auth.py
239
240
241
242
243
244
245
246
def is_logged_in(self) -> bool:
    """
    Check if currently logged in to MediaWiki API.

    Returns:
        True if logged in, False otherwise.
    """
    return self._logged_in

login()

Perform login to MediaWiki API using bot password credentials.

Returns:

Type Description
bool

True if login successful, False otherwise.

Raises:

Type Description
AuthenticationError

If login fails with detailed error message.

Example

auth = WikiverseAuth(username="User@Bot", password="secret") if auth.login(): ... print("Successfully logged in!")

Source code in gkc/auth.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def login(self) -> bool:
    """
    Perform login to MediaWiki API using bot password credentials.

    Returns:
        True if login successful, False otherwise.

    Raises:
        AuthenticationError: If login fails with detailed error message.

    Example:
        >>> auth = WikiverseAuth(username="User@Bot", password="secret")
        >>> if auth.login():
        ...     print("Successfully logged in!")
    """
    if not self.is_authenticated():
        raise AuthenticationError(
            "Cannot login: credentials not provided. "
            "Please provide username and password."
        )

    try:
        # Step 1: Get login token
        token_params = {
            "action": "query",
            "meta": "tokens",
            "type": "login",
            "format": "json",
        }
        token_response = self.session.get(self.api_url, params=token_params)
        token_response.raise_for_status()
        token_data = token_response.json()

        if "query" not in token_data or "tokens" not in token_data["query"]:
            raise AuthenticationError(
                f"Failed to get login token from {self.api_url}. "
                f"Response: {token_data}"
            )

        login_token = token_data["query"]["tokens"]["logintoken"]

        # Step 2: Perform login with credentials and token
        login_params = {
            "action": "login",
            "lgname": self.username,
            "lgpassword": self.password,
            "lgtoken": login_token,
            "format": "json",
        }
        login_response = self.session.post(self.api_url, data=login_params)
        login_response.raise_for_status()
        login_data = login_response.json()

        # Check login result
        if "login" not in login_data:
            raise AuthenticationError(
                f"Unexpected login response from {self.api_url}. "
                f"Response: {login_data}"
            )

        result = login_data["login"]["result"]

        if result == "Success":
            self._logged_in = True
            return True
        else:
            # Provide detailed error message
            reason = login_data["login"].get("reason", "Unknown reason")
            raise AuthenticationError(
                f"Login failed with result '{result}'. Reason: {reason}. "
                f"Check your bot password credentials and permissions."
            )

    except requests.RequestException as e:
        raise AuthenticationError(
            f"Network error during login to {self.api_url}: {str(e)}"
        )

logout()

Logout from MediaWiki API and clear session.

Example

auth = WikiverseAuth(username="User@Bot", password="secret") auth.login()

... do some work ...

auth.logout()

Source code in gkc/auth.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def logout(self) -> None:
    """
    Logout from MediaWiki API and clear session.

    Example:
        >>> auth = WikiverseAuth(username="User@Bot", password="secret")
        >>> auth.login()
        >>> # ... do some work ...
        >>> auth.logout()
    """
    if self._logged_in:
        try:
            # Get CSRF token for logout
            token_params = {
                "action": "query",
                "meta": "tokens",
                "type": "csrf",
                "format": "json",
            }
            token_response = self.session.get(self.api_url, params=token_params)
            token_data = token_response.json()
            csrf_token = token_data["query"]["tokens"]["csrftoken"]

            # Perform logout
            logout_params = {
                "action": "logout",
                "token": csrf_token,
                "format": "json",
            }
            self.session.post(self.api_url, data=logout_params)
        except Exception:
            # Ignore logout errors, just clear session
            pass
        finally:
            self._logged_in = False
            self.session.cookies.clear()

check_wikipedia_page(title, site_code='enwiki', allow_redirects=False)

Convenience function to check if a Wikipedia page exists.

Parameters:

Name Type Description Default
title str

Page title to check

required
site_code str

Wikipedia site code (default: "enwiki" for English Wikipedia)

'enwiki'
allow_redirects bool

If False, reject redirect pages

False

Returns:

Type Description
Optional[str]

The title if page exists and is valid, None otherwise

Example

check_wikipedia_page("Python (programming language)") 'Python (programming language)' check_wikipedia_page("NonexistentPage123") None

Source code in gkc/sitelinks.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def check_wikipedia_page(
    title: str, site_code: str = "enwiki", allow_redirects: bool = False
) -> Optional[str]:
    """
    Convenience function to check if a Wikipedia page exists.

    Args:
        title: Page title to check
        site_code: Wikipedia site code (default: "enwiki" for English Wikipedia)
        allow_redirects: If False, reject redirect pages

    Returns:
        The title if page exists and is valid, None otherwise

    Example:
        >>> check_wikipedia_page("Python (programming language)")
        'Python (programming language)'
        >>> check_wikipedia_page("NonexistentPage123")
        None
    """
    if not title:
        return None

    validator = SitelinkValidator()
    exists, message = validator.check_page_exists(title, site_code, allow_redirects)

    return title if exists else None

execute_sparql(query, endpoint=DEFAULT_WIKIDATA_ENDPOINT, format='json')

Convenience function to execute a single SPARQL query.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required
endpoint str

SPARQL endpoint (default: Wikidata)

DEFAULT_WIKIDATA_ENDPOINT
format str

Response format ('json', 'xml', 'csv', 'tsv')

'json'

Returns:

Type Description
Any

Query results

Example

results = execute_sparql( ... 'SELECT ?item ?itemLabel WHERE { ... }' ... )

Source code in gkc/sparql.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def execute_sparql(
    query: str,
    endpoint: str = DEFAULT_WIKIDATA_ENDPOINT,
    format: str = "json",
) -> Any:
    """
    Convenience function to execute a single SPARQL query.

    Args:
        query: SPARQL query string or Wikidata Query Service URL
        endpoint: SPARQL endpoint (default: Wikidata)
        format: Response format ('json', 'xml', 'csv', 'tsv')

    Returns:
        Query results

    Example:
        >>> results = execute_sparql(
        ...     'SELECT ?item ?itemLabel WHERE { ... }'
        ... )
    """
    executor = SPARQLQuery(endpoint=endpoint)
    return executor.query(query, format=format)

execute_sparql_to_dataframe(query, endpoint=DEFAULT_WIKIDATA_ENDPOINT)

Convenience function to execute a SPARQL query and return DataFrame.

Parameters:

Name Type Description Default
query str

SPARQL query string or Wikidata Query Service URL

required
endpoint str

SPARQL endpoint (default: Wikidata)

DEFAULT_WIKIDATA_ENDPOINT

Returns:

Type Description
DataFrame

pandas DataFrame with query results

Example

df = execute_sparql_to_dataframe( ... 'SELECT ?item ?itemLabel WHERE { ... }' ... )

Source code in gkc/sparql.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def execute_sparql_to_dataframe(
    query: str,
    endpoint: str = DEFAULT_WIKIDATA_ENDPOINT,
) -> "pd.DataFrame":
    """
    Convenience function to execute a SPARQL query and return DataFrame.

    Args:
        query: SPARQL query string or Wikidata Query Service URL
        endpoint: SPARQL endpoint (default: Wikidata)

    Returns:
        pandas DataFrame with query results

    Example:
        >>> df = execute_sparql_to_dataframe(
        ...     'SELECT ?item ?itemLabel WHERE { ... }'
        ... )
    """
    executor = SPARQLQuery(endpoint=endpoint)
    return executor.to_dataframe(query)

fetch_entity_rdf(qid, format='ttl', user_agent=None)

Fetch RDF data for a Wikidata entity.

Parameters:

Name Type Description Default
qid str

Wikidata entity ID (e.g., 'Q42', 'P31')

required
format str

RDF format - 'ttl' (Turtle), 'rdf' (RDF/XML), 'nt' (N-Triples)

'ttl'
user_agent Optional[str]

Custom user agent string

None

Returns:

Type Description
str

RDF data as string

Raises:

Type Description
WikidataFetchError

If fetch fails

Example

rdf = fetch_entity_rdf('Q42') # Get Douglas Adams RDF rdf = fetch_entity_rdf('P31', format='nt') # Get property in N-Triples

Source code in gkc/wd.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def fetch_entity_rdf(
    qid: str, format: str = "ttl", user_agent: Optional[str] = None
) -> str:
    """
    Fetch RDF data for a Wikidata entity.

    Args:
        qid: Wikidata entity ID (e.g., 'Q42', 'P31')
        format: RDF format - 'ttl' (Turtle), 'rdf' (RDF/XML), 'nt' (N-Triples)
        user_agent: Custom user agent string

    Returns:
        RDF data as string

    Raises:
        WikidataFetchError: If fetch fails

    Example:
        >>> rdf = fetch_entity_rdf('Q42')  # Get Douglas Adams RDF
        >>> rdf = fetch_entity_rdf('P31', format='nt')  # Get property in N-Triples
    """
    if not qid:
        raise ValueError("Entity ID (qid) is required")

    # Validate format
    valid_formats = {"ttl", "rdf", "nt"}
    if format not in valid_formats:
        raise ValueError(f"Invalid format '{format}'. Must be one of: {valid_formats}")

    url = f"https://www.wikidata.org/wiki/Special:EntityData/{qid}.{format}"
    headers = {"User-Agent": user_agent or DEFAULT_USER_AGENT}

    try:
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.RequestException as e:
        raise WikidataFetchError(
            f"Failed to fetch RDF for {qid} from {url}: {str(e)}"
        ) from e

fetch_entity_schema(eid, user_agent=None)

Fetch ShExC schema text for a Wikidata EntitySchema.

Parameters:

Name Type Description Default
eid str

EntitySchema ID (e.g., 'E502')

required
user_agent Optional[str]

Custom user agent string

None

Returns:

Type Description
str

ShExC schema text as string

Raises:

Type Description
WikidataFetchError

If fetch fails

Example

schema = fetch_entity_schema('E502') # Schema for organisms

Source code in gkc/wd.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def fetch_entity_schema(eid: str, user_agent: Optional[str] = None) -> str:
    """
    Fetch ShExC schema text for a Wikidata EntitySchema.

    Args:
        eid: EntitySchema ID (e.g., 'E502')
        user_agent: Custom user agent string

    Returns:
        ShExC schema text as string

    Raises:
        WikidataFetchError: If fetch fails

    Example:
        >>> schema = fetch_entity_schema('E502')  # Schema for organisms
    """
    if not eid:
        raise ValueError("EntitySchema ID (eid) is required")

    url = f"https://www.wikidata.org/wiki/Special:EntitySchemaText/{eid}"
    headers = {"User-Agent": user_agent or DEFAULT_USER_AGENT}

    try:
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.RequestException as e:
        raise WikidataFetchError(
            f"Failed to fetch EntitySchema {eid} from {url}: {str(e)}"
        ) from e

Convenience function to validate and filter sitelinks.

Parameters:

Name Type Description Default
sitelinks dict[str, dict]

Dictionary of sitelinks from transform_to_wikidata()

required

Returns:

Type Description
dict[str, dict]

Filtered dictionary containing only valid sitelinks

Example

sitelinks = { ... "enwiki": {"site": "enwiki", "title": "Example", "badges": []}, ... "frwiki": {"site": "frwiki", "title": "BadPage", "badges": []} ... } valid = validate_sitelink_dict(sitelinks)

Source code in gkc/sitelinks.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def validate_sitelink_dict(sitelinks: dict[str, dict]) -> dict[str, dict]:
    """
    Convenience function to validate and filter sitelinks.

    Args:
        sitelinks: Dictionary of sitelinks from transform_to_wikidata()

    Returns:
        Filtered dictionary containing only valid sitelinks

    Example:
        >>> sitelinks = {
        ...     "enwiki": {"site": "enwiki", "title": "Example", "badges": []},
        ...     "frwiki": {"site": "frwiki", "title": "BadPage", "badges": []}
        ... }
        >>> valid = validate_sitelink_dict(sitelinks)
        >>> # Returns only valid sitelinks
    """
    validator = SitelinkValidator()
    return validator.filter_valid_sitelinks(sitelinks, verbose=False)