Skip to content

vllm.model_executor.model_loader.oci_loader

OCI Registry model loader for loading models from OCI registries.

logger module-attribute

logger = init_logger(__name__)

OciModelLoader

Bases: BaseModelLoader

Model loader that loads models from OCI registries.

This loader supports pulling models packaged as OCI artifacts with: - Safetensors layers (application/vnd.docker.ai.safetensors) - Config tar layer (application/vnd.docker.ai.vllm.config.tar)

The model reference format is: [registry/]repository[:tag|@digest] If registry is omitted, docker.io is used by default.

Example

model="namespace/model:tag" model="docker.io/user/model:v1" model="ghcr.io/org/model@sha256:abc123..."

Source code in vllm/model_executor/model_loader/oci_loader.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
class OciModelLoader(BaseModelLoader):
    """Model loader that loads models from OCI registries.

    This loader supports pulling models packaged as OCI artifacts with:
    - Safetensors layers (application/vnd.docker.ai.safetensors)
    - Config tar layer (application/vnd.docker.ai.vllm.config.tar)

    The model reference format is: [registry/]repository[:tag|@digest]
    If registry is omitted, docker.io is used by default.

    Example:
        model="namespace/model:tag"
        model="docker.io/user/model:v1"
        model="ghcr.io/org/model@sha256:abc123..."
    """

    SAFETENSORS_MEDIA_TYPE = "application/vnd.docker.ai.safetensors"
    CONFIG_TAR_MEDIA_TYPE = "application/vnd.docker.ai.vllm.config.tar"
    DEFAULT_REGISTRY = "docker.io"

    def __init__(self, load_config: LoadConfig):
        super().__init__(load_config)
        self.session = requests.Session()

    def _normalize_oci_reference(self, model_ref: str) -> str:
        """Normalize OCI reference to include registry.

        Args:
            model_ref: Model reference (e.g., "user/model:tag")

        Returns:
            Normalized reference (e.g., "docker.io/user/model:tag")
        """
        # If no registry is specified (no dots before first slash),
        # prepend default registry
        if "/" in model_ref:
            first_part = model_ref.split("/")[0]
            if "." not in first_part and ":" not in first_part:
                # This is a user/repo format without registry
                return f"{self.DEFAULT_REGISTRY}/{model_ref}"
        else:
            # Single name without slash, prepend library/
            return f"{self.DEFAULT_REGISTRY}/library/{model_ref}"

        return model_ref

    def _get_cache_dir(self, model_ref: str) -> str:
        """Get cache directory for OCI model.

        Args:
            model_ref: Normalized model reference

        Returns:
            Path to cache directory
        """
        download_dir = self.load_config.download_dir or envs.VLLM_CACHE_ROOT

        # Create a safe directory name from the reference
        safe_ref = model_ref.replace(":", "_").replace("/", "_").replace("@", "_")
        cache_dir = os.path.join(download_dir, "oci", safe_ref)
        os.makedirs(cache_dir, exist_ok=True)

        return cache_dir

    def _get_auth_token(
        self, registry: str, repository: str, www_authenticate: str
    ) -> Optional[str]:
        """Get authentication token using OCI-compliant auth discovery.

        This method parses the Www-Authenticate header to discover the
        authentication service and obtains a token dynamically, making it
        compatible with any OCI-compliant registry.

        Args:
            registry: Registry hostname
            repository: Repository name
            www_authenticate: Value of Www-Authenticate header from 401 response

        Returns:
            Authentication token, or None if no authentication is required
        """
        # Parse Www-Authenticate header
        # Format: Bearer realm="https://auth.example.com/token",service="registry.example.com",scope="repository:user/repo:pull"
        if not www_authenticate.startswith("Bearer "):
            logger.warning("Unsupported authentication scheme: %s", www_authenticate)
            return None

        auth_params = {}
        # Extract parameters from the header
        parts = www_authenticate[7:].split(",")  # Skip "Bearer "
        for part in parts:
            if "=" in part:
                key, value = part.strip().split("=", 1)
                # Remove quotes
                auth_params[key] = value.strip('"')

        realm = auth_params.get("realm")
        if not realm:
            logger.warning("No realm found in Www-Authenticate header")
            return None

        # Build token request parameters
        token_params = {}
        if "service" in auth_params:
            token_params["service"] = auth_params["service"]
        if "scope" in auth_params:
            token_params["scope"] = auth_params["scope"]
        else:
            # If no scope provided, use repository pull scope
            token_params["scope"] = f"repository:{repository}:pull"

        # Request token from auth service
        try:
            response = self.session.get(realm, params=token_params)
            response.raise_for_status()
            token_data = response.json()
            return token_data.get("token") or token_data.get("access_token")
        except Exception as e:
            logger.warning("Failed to obtain auth token: %s", e)
            return None

    def _parse_oci_reference(self, model_ref: str) -> tuple[str, str, str]:
        """Parse OCI reference into registry, repository, and tag/digest.

        Args:
            model_ref: Normalized OCI reference

        Returns:
            Tuple of (registry, repository, reference)
        """
        # Format: registry/repository:tag or registry/repository@digest
        parts = model_ref.split("/", 1)
        registry = parts[0]

        if "@" in parts[1]:
            repository, reference = parts[1].split("@", 1)
        elif ":" in parts[1]:
            repository, reference = parts[1].rsplit(":", 1)
        else:
            repository = parts[1]
            reference = "latest"

        return registry, repository, reference

    def _normalize_registry(self, registry: str) -> str:
        """Normalize registry hostname for API calls.

        Docker Hub uses registry-1.docker.io for API calls instead of docker.io.

        Args:
            registry: Registry hostname

        Returns:
            Normalized registry hostname
        """
        if registry == "docker.io":
            return "registry-1.docker.io"
        return registry

    def _authenticated_request(
        self,
        url: str,
        registry: str,
        repository: str,
        headers: dict[str, str],
        stream: bool = False,
    ) -> requests.Response:
        """Make an authenticated request to OCI registry.

        Handles authentication by trying without auth first, then obtaining
        and using a token if a 401 response is received.

        Args:
            url: Request URL
            registry: Registry hostname
            repository: Repository name
            headers: Request headers
            stream: Whether to stream the response

        Returns:
            Response object
        """
        # Try without authentication first
        response = self.session.get(url, headers=headers, stream=stream)

        # If we get 401, parse Www-Authenticate and get token
        if response.status_code == 401:
            www_auth = response.headers.get("Www-Authenticate", "")
            if www_auth:
                token = self._get_auth_token(registry, repository, www_auth)
                if token:
                    headers["Authorization"] = f"Bearer {token}"
                    response = self.session.get(url, headers=headers, stream=stream)

        return response

    def _pull_oci_manifest(
        self, model_ref: str, cache_dir: str
    ) -> tuple[dict, list[dict], Optional[dict]]:
        """Pull OCI manifest and identify layers.

        Args:
            model_ref: Normalized OCI reference
            cache_dir: Cache directory

        Returns:
            Tuple of (manifest, safetensors_layers, config_layer)
        """
        logger.info("Pulling OCI manifest for %s", model_ref)

        # Parse reference
        registry, repository, reference = self._parse_oci_reference(model_ref)
        registry = self._normalize_registry(registry)

        # Use standard OCI registry URL format
        manifest_url = f"https://{registry}/v2/{repository}/manifests/{reference}"
        headers = {
            "Accept": "application/vnd.oci.image.manifest.v1+json, "
            "application/vnd.docker.distribution.manifest.v2+json"
        }

        # Make authenticated request
        try:
            response = self._authenticated_request(
                manifest_url, registry, repository, headers
            )
            response.raise_for_status()
            manifest = response.json()
        except Exception as e:
            raise ValueError(
                f"Failed to pull manifest for {model_ref}. "
                f"Please ensure the image exists and is accessible. "
                f"Error: {e}"
            ) from e

        if not manifest:
            raise ValueError(f"Failed to pull manifest for {model_ref}")

        # Parse layers
        safetensors_layers = []
        config_layer = None

        for layer in manifest.get("layers", []):
            media_type = layer.get("mediaType", "")

            if media_type == self.SAFETENSORS_MEDIA_TYPE:
                safetensors_layers.append(layer)
            elif media_type == self.CONFIG_TAR_MEDIA_TYPE:
                config_layer = layer

        if not safetensors_layers:
            raise ValueError(f"No safetensors layers found in OCI image {model_ref}")

        logger.info(
            "Found %d safetensors layer(s) in manifest", len(safetensors_layers)
        )
        if config_layer:
            logger.info("Found config tar layer in manifest")

        return manifest, safetensors_layers, config_layer

    def _download_layer(self, model_ref: str, layer: dict, output_path: str) -> None:
        """Download a layer from OCI registry.

        Args:
            model_ref: Normalized OCI reference
            layer: Layer descriptor from manifest
            output_path: Path to save the layer
        """
        if os.path.exists(output_path):
            logger.info("Layer already cached at %s", output_path)
            return

        digest = layer.get("digest", "")
        size = layer.get("size", 0)

        logger.info("Downloading layer %s (%.2f MB)", digest, size / (1024 * 1024))

        # Parse reference
        registry, repository, _ = self._parse_oci_reference(model_ref)
        registry = self._normalize_registry(registry)

        # Use standard OCI registry URL format
        blob_url = f"https://{registry}/v2/{repository}/blobs/{digest}"
        headers: dict[str, str] = {}

        # Make authenticated request
        response = self._authenticated_request(
            blob_url, registry, repository, headers, stream=True
        )
        response.raise_for_status()

        # Write to file
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        logger.info("Downloaded layer to %s", output_path)

    def _extract_config_tar(self, tar_path: str, extract_dir: str) -> None:
        """Extract config tar file.

        Args:
            tar_path: Path to tar file
            extract_dir: Directory to extract to
        """
        logger.info("Extracting config tar to %s", extract_dir)

        os.makedirs(extract_dir, exist_ok=True)

        with tarfile.open(tar_path, "r") as tar:
            tar.extractall(extract_dir)

        logger.info("Config extracted successfully")

    def _download_oci_model_if_needed(
        self, model_ref: str, download_weights: bool = True
    ) -> str:
        """Download OCI model and its components if not already cached.

        This is the shared logic for both download_model and
        download_oci_model_simple.

        Args:
            model_ref: OCI model reference
            download_weights: If True, download safetensors weight layers.
                            If False, only download config layer.

        Returns:
            Path to the extracted config directory
        """
        normalized_ref = self._normalize_oci_reference(model_ref)
        cache_dir = self._get_cache_dir(normalized_ref)
        config_dir = os.path.join(cache_dir, "config")
        manifest_path = os.path.join(cache_dir, "manifest.json")

        # Check if config directory is already populated and manifest exists
        config_exists = (
            os.path.exists(config_dir)
            and os.listdir(config_dir)
            and os.path.exists(manifest_path)
        )

        if config_exists and not download_weights:
            logger.info("OCI model config already cached at %s", cache_dir)
            return config_dir

        # If weights are needed, check if all layers are downloaded
        if config_exists and download_weights:
            with open(manifest_path) as f:
                manifest = json.load(f)

            safetensors_layers = [
                layer
                for layer in manifest.get("layers", [])
                if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
            ]

            layers_dir = os.path.join(cache_dir, "layers")
            all_weights_cached = True

            if safetensors_layers and os.path.exists(layers_dir):
                for i, layer in enumerate(safetensors_layers):
                    digest = layer.get("digest", "").replace("sha256:", "")
                    layer_path = os.path.join(
                        layers_dir, f"{i:04d}_{digest}.safetensors"
                    )
                    if not os.path.exists(layer_path):
                        all_weights_cached = False
                        break
            else:
                all_weights_cached = False

            if all_weights_cached:
                logger.info("OCI model fully cached at %s", cache_dir)
                return config_dir

        logger.info(
            "Downloading OCI model: %s -> %s (weights=%s)",
            model_ref,
            normalized_ref,
            download_weights,
        )

        # Pull manifest (or reload if exists)
        if os.path.exists(manifest_path):
            with open(manifest_path) as f:
                manifest = json.load(f)

            safetensors_layers = [
                layer
                for layer in manifest.get("layers", [])
                if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
            ]
            config_layer = next(
                (
                    layer
                    for layer in manifest.get("layers", [])
                    if layer.get("mediaType") == self.CONFIG_TAR_MEDIA_TYPE
                ),
                None,
            )
        else:
            manifest, safetensors_layers, config_layer = self._pull_oci_manifest(
                normalized_ref, cache_dir
            )

            # Save manifest
            with open(manifest_path, "w") as f:
                json.dump(manifest, f, indent=2)

            # Save original OCI reference for later retrieval
            metadata_path = os.path.join(cache_dir, "oci_metadata.json")
            with open(metadata_path, "w") as f:
                json.dump({"original_reference": model_ref}, f, indent=2)

        # Download safetensors layers only if requested
        if download_weights:
            layers_dir = os.path.join(cache_dir, "layers")
            os.makedirs(layers_dir, exist_ok=True)

            for i, layer in enumerate(safetensors_layers):
                digest = layer.get("digest", "").replace("sha256:", "")
                layer_path = os.path.join(layers_dir, f"{i:04d}_{digest}.safetensors")
                self._download_layer(normalized_ref, layer, layer_path)

        # Download and extract config layer if present
        if config_layer and not os.path.exists(config_dir):
            digest = config_layer.get("digest", "").replace("sha256:", "")
            tar_path = os.path.join(cache_dir, f"config_{digest}.tar")
            self._download_layer(normalized_ref, config_layer, tar_path)
            self._extract_config_tar(tar_path, config_dir)

        logger.info("Model download completed: %s", cache_dir)
        return config_dir

    def download_oci_model_simple(self, model_ref: str) -> str:
        """Download OCI model without requiring ModelConfig.

        This is a simplified version for early config loading that only
        downloads the config layer, not the weight files. Weights are
        downloaded later during model initialization.

        Args:
            model_ref: OCI model reference

        Returns:
            Path to extracted config directory
        """
        return self._download_oci_model_if_needed(model_ref, download_weights=False)

    def download_model(self, model_config: ModelConfig) -> None:
        """Download model from OCI registry.

        Args:
            model_config: Model configuration
        """
        model_ref = model_config.model

        # If model_ref is a local config path, read the original OCI reference
        if os.path.isdir(model_ref) and model_ref.endswith("/config"):
            cache_dir = os.path.dirname(model_ref)
            metadata_path = os.path.join(cache_dir, "oci_metadata.json")

            if os.path.exists(metadata_path):
                with open(metadata_path) as f:
                    metadata = json.load(f)
                    model_ref = metadata.get("original_reference", model_ref)
                    logger.info("Retrieved original OCI reference: %s", model_ref)

        self._download_oci_model_if_needed(model_ref)

    def _get_weights_iterator(
        self, model_config: ModelConfig
    ) -> Generator[tuple[str, torch.Tensor], None, None]:
        """Get iterator over model weights from safetensors layers.

        Downloads weights if they haven't been downloaded yet.

        Args:
            model_config: Model configuration

        Yields:
            Tuples of (parameter_name, tensor)
        """
        model_ref = model_config.model
        original_oci_ref = None

        # Check if model_ref is already a local config path
        # (this happens when loading in worker processes)
        if os.path.isdir(model_ref) and model_ref.endswith("/config"):
            cache_dir = os.path.dirname(model_ref)
            # Try to extract original OCI reference from attribute
            original_oci_ref = getattr(model_config, "_original_model", None)

            # If not available, try reading from metadata file
            if not original_oci_ref:
                metadata_path = os.path.join(cache_dir, "oci_metadata.json")
                if os.path.exists(metadata_path):
                    with open(metadata_path) as f:
                        metadata = json.load(f)
                        original_oci_ref = metadata.get("original_reference")
                        if original_oci_ref:
                            logger.info(
                                "Retrieved original OCI reference from metadata: %s",
                                original_oci_ref,
                            )
        else:
            # It's an OCI reference, normalize and get cache dir
            normalized_ref = self._normalize_oci_reference(model_ref)
            cache_dir = self._get_cache_dir(normalized_ref)
            original_oci_ref = model_ref

        # Load manifest
        manifest_path = os.path.join(cache_dir, "manifest.json")
        if not os.path.exists(manifest_path):
            raise ValueError(
                f"Manifest not found at {manifest_path}. "
                f"Cache dir: {cache_dir}, Model ref: {model_ref}"
            )

        with open(manifest_path) as f:
            manifest = json.load(f)

        # Get safetensors layers in order
        layers_dir = os.path.join(cache_dir, "layers")
        safetensors_files = []
        safetensors_layers = [
            layer
            for layer in manifest.get("layers", [])
            if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
        ]

        # Check if weights need to be downloaded
        weights_missing = False
        if not os.path.exists(layers_dir):
            weights_missing = True
        else:
            for layer in safetensors_layers:
                digest = layer.get("digest", "").replace("sha256:", "")
                # Check if any matching file exists
                found = False
                for filename in os.listdir(layers_dir):
                    if digest in filename and filename.endswith(".safetensors"):
                        found = True
                        break
                if not found:
                    weights_missing = True
                    break

        # Download weights if missing and we have a valid OCI reference
        if weights_missing:
            if not original_oci_ref:
                raise ValueError(
                    f"Weights not found in cache at {layers_dir}, but cannot "
                    f"download them because the original OCI reference is not "
                    f"available. Model ref: {model_ref}"
                )
            logger.info("Weights not found in cache, downloading now...")
            self._download_oci_model_if_needed(original_oci_ref, download_weights=True)

        # Now collect safetensors files
        for layer in safetensors_layers:
            digest = layer.get("digest", "").replace("sha256:", "")
            # Find matching file
            for filename in sorted(os.listdir(layers_dir)):
                if digest in filename and filename.endswith(".safetensors"):
                    safetensors_files.append(os.path.join(layers_dir, filename))
                    break

        if not safetensors_files:
            raise ValueError(f"No safetensors files found in {layers_dir}")

        logger.info(
            "Loading weights from %d safetensors file(s)", len(safetensors_files)
        )

        # Use existing safetensors iterator
        yield from safetensors_weights_iterator(
            safetensors_files,
            use_tqdm_on_load=self.load_config.use_tqdm_on_load,
            safetensors_load_strategy=(self.load_config.safetensors_load_strategy),
        )

    def load_weights(self, model: nn.Module, model_config: ModelConfig) -> None:
        """Load weights into the model from OCI layers.

        Args:
            model: Model to load weights into
            model_config: Model configuration
        """
        # Get the config directory path - update model_config.model to point
        # to the extracted config for compatibility with other components
        normalized_ref = self._normalize_oci_reference(model_config.model)
        cache_dir = self._get_cache_dir(normalized_ref)
        config_dir = os.path.join(cache_dir, "config")

        # If config directory exists, temporarily update model path
        original_model = model_config.model
        if os.path.exists(config_dir):
            logger.info("Using config from %s", config_dir)
            # Store original and update for tokenizer/config loading
            model_config._original_model = original_model
            model_config.model = config_dir

        # Load weights using iterator
        weights_to_load = {name for name, _ in model.named_parameters()}
        loaded_weights = model.load_weights(self._get_weights_iterator(model_config))

        # Check if all weights were loaded (for non-quantized models)
        if model_config.quantization is None and loaded_weights is not None:
            weights_not_loaded = weights_to_load - loaded_weights
            if weights_not_loaded:
                raise ValueError(
                    "Following weights were not initialized from "
                    f"checkpoint: {weights_not_loaded}"
                )

        logger.info("Weights loaded successfully from OCI registry")

CONFIG_TAR_MEDIA_TYPE class-attribute instance-attribute

CONFIG_TAR_MEDIA_TYPE = (
    "application/vnd.docker.ai.vllm.config.tar"
)

DEFAULT_REGISTRY class-attribute instance-attribute

DEFAULT_REGISTRY = 'docker.io'

SAFETENSORS_MEDIA_TYPE class-attribute instance-attribute

SAFETENSORS_MEDIA_TYPE = (
    "application/vnd.docker.ai.safetensors"
)

session instance-attribute

session = Session()

__init__

__init__(load_config: LoadConfig)
Source code in vllm/model_executor/model_loader/oci_loader.py
def __init__(self, load_config: LoadConfig):
    super().__init__(load_config)
    self.session = requests.Session()

_authenticated_request

_authenticated_request(
    url: str,
    registry: str,
    repository: str,
    headers: dict[str, str],
    stream: bool = False,
) -> Response

Make an authenticated request to OCI registry.

Handles authentication by trying without auth first, then obtaining and using a token if a 401 response is received.

Parameters:

Name Type Description Default
url str

Request URL

required
registry str

Registry hostname

required
repository str

Repository name

required
headers dict[str, str]

Request headers

required
stream bool

Whether to stream the response

False

Returns:

Type Description
Response

Response object

Source code in vllm/model_executor/model_loader/oci_loader.py
def _authenticated_request(
    self,
    url: str,
    registry: str,
    repository: str,
    headers: dict[str, str],
    stream: bool = False,
) -> requests.Response:
    """Make an authenticated request to OCI registry.

    Handles authentication by trying without auth first, then obtaining
    and using a token if a 401 response is received.

    Args:
        url: Request URL
        registry: Registry hostname
        repository: Repository name
        headers: Request headers
        stream: Whether to stream the response

    Returns:
        Response object
    """
    # Try without authentication first
    response = self.session.get(url, headers=headers, stream=stream)

    # If we get 401, parse Www-Authenticate and get token
    if response.status_code == 401:
        www_auth = response.headers.get("Www-Authenticate", "")
        if www_auth:
            token = self._get_auth_token(registry, repository, www_auth)
            if token:
                headers["Authorization"] = f"Bearer {token}"
                response = self.session.get(url, headers=headers, stream=stream)

    return response

_download_layer

_download_layer(
    model_ref: str, layer: dict, output_path: str
) -> None

Download a layer from OCI registry.

Parameters:

Name Type Description Default
model_ref str

Normalized OCI reference

required
layer dict

Layer descriptor from manifest

required
output_path str

Path to save the layer

required
Source code in vllm/model_executor/model_loader/oci_loader.py
def _download_layer(self, model_ref: str, layer: dict, output_path: str) -> None:
    """Download a layer from OCI registry.

    Args:
        model_ref: Normalized OCI reference
        layer: Layer descriptor from manifest
        output_path: Path to save the layer
    """
    if os.path.exists(output_path):
        logger.info("Layer already cached at %s", output_path)
        return

    digest = layer.get("digest", "")
    size = layer.get("size", 0)

    logger.info("Downloading layer %s (%.2f MB)", digest, size / (1024 * 1024))

    # Parse reference
    registry, repository, _ = self._parse_oci_reference(model_ref)
    registry = self._normalize_registry(registry)

    # Use standard OCI registry URL format
    blob_url = f"https://{registry}/v2/{repository}/blobs/{digest}"
    headers: dict[str, str] = {}

    # Make authenticated request
    response = self._authenticated_request(
        blob_url, registry, repository, headers, stream=True
    )
    response.raise_for_status()

    # Write to file
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    logger.info("Downloaded layer to %s", output_path)

_download_oci_model_if_needed

_download_oci_model_if_needed(
    model_ref: str, download_weights: bool = True
) -> str

Download OCI model and its components if not already cached.

This is the shared logic for both download_model and download_oci_model_simple.

Parameters:

Name Type Description Default
model_ref str

OCI model reference

required
download_weights bool

If True, download safetensors weight layers. If False, only download config layer.

True

Returns:

Type Description
str

Path to the extracted config directory

Source code in vllm/model_executor/model_loader/oci_loader.py
def _download_oci_model_if_needed(
    self, model_ref: str, download_weights: bool = True
) -> str:
    """Download OCI model and its components if not already cached.

    This is the shared logic for both download_model and
    download_oci_model_simple.

    Args:
        model_ref: OCI model reference
        download_weights: If True, download safetensors weight layers.
                        If False, only download config layer.

    Returns:
        Path to the extracted config directory
    """
    normalized_ref = self._normalize_oci_reference(model_ref)
    cache_dir = self._get_cache_dir(normalized_ref)
    config_dir = os.path.join(cache_dir, "config")
    manifest_path = os.path.join(cache_dir, "manifest.json")

    # Check if config directory is already populated and manifest exists
    config_exists = (
        os.path.exists(config_dir)
        and os.listdir(config_dir)
        and os.path.exists(manifest_path)
    )

    if config_exists and not download_weights:
        logger.info("OCI model config already cached at %s", cache_dir)
        return config_dir

    # If weights are needed, check if all layers are downloaded
    if config_exists and download_weights:
        with open(manifest_path) as f:
            manifest = json.load(f)

        safetensors_layers = [
            layer
            for layer in manifest.get("layers", [])
            if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
        ]

        layers_dir = os.path.join(cache_dir, "layers")
        all_weights_cached = True

        if safetensors_layers and os.path.exists(layers_dir):
            for i, layer in enumerate(safetensors_layers):
                digest = layer.get("digest", "").replace("sha256:", "")
                layer_path = os.path.join(
                    layers_dir, f"{i:04d}_{digest}.safetensors"
                )
                if not os.path.exists(layer_path):
                    all_weights_cached = False
                    break
        else:
            all_weights_cached = False

        if all_weights_cached:
            logger.info("OCI model fully cached at %s", cache_dir)
            return config_dir

    logger.info(
        "Downloading OCI model: %s -> %s (weights=%s)",
        model_ref,
        normalized_ref,
        download_weights,
    )

    # Pull manifest (or reload if exists)
    if os.path.exists(manifest_path):
        with open(manifest_path) as f:
            manifest = json.load(f)

        safetensors_layers = [
            layer
            for layer in manifest.get("layers", [])
            if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
        ]
        config_layer = next(
            (
                layer
                for layer in manifest.get("layers", [])
                if layer.get("mediaType") == self.CONFIG_TAR_MEDIA_TYPE
            ),
            None,
        )
    else:
        manifest, safetensors_layers, config_layer = self._pull_oci_manifest(
            normalized_ref, cache_dir
        )

        # Save manifest
        with open(manifest_path, "w") as f:
            json.dump(manifest, f, indent=2)

        # Save original OCI reference for later retrieval
        metadata_path = os.path.join(cache_dir, "oci_metadata.json")
        with open(metadata_path, "w") as f:
            json.dump({"original_reference": model_ref}, f, indent=2)

    # Download safetensors layers only if requested
    if download_weights:
        layers_dir = os.path.join(cache_dir, "layers")
        os.makedirs(layers_dir, exist_ok=True)

        for i, layer in enumerate(safetensors_layers):
            digest = layer.get("digest", "").replace("sha256:", "")
            layer_path = os.path.join(layers_dir, f"{i:04d}_{digest}.safetensors")
            self._download_layer(normalized_ref, layer, layer_path)

    # Download and extract config layer if present
    if config_layer and not os.path.exists(config_dir):
        digest = config_layer.get("digest", "").replace("sha256:", "")
        tar_path = os.path.join(cache_dir, f"config_{digest}.tar")
        self._download_layer(normalized_ref, config_layer, tar_path)
        self._extract_config_tar(tar_path, config_dir)

    logger.info("Model download completed: %s", cache_dir)
    return config_dir

_extract_config_tar

_extract_config_tar(
    tar_path: str, extract_dir: str
) -> None

Extract config tar file.

Parameters:

Name Type Description Default
tar_path str

Path to tar file

required
extract_dir str

Directory to extract to

required
Source code in vllm/model_executor/model_loader/oci_loader.py
def _extract_config_tar(self, tar_path: str, extract_dir: str) -> None:
    """Extract config tar file.

    Args:
        tar_path: Path to tar file
        extract_dir: Directory to extract to
    """
    logger.info("Extracting config tar to %s", extract_dir)

    os.makedirs(extract_dir, exist_ok=True)

    with tarfile.open(tar_path, "r") as tar:
        tar.extractall(extract_dir)

    logger.info("Config extracted successfully")

_get_auth_token

_get_auth_token(
    registry: str, repository: str, www_authenticate: str
) -> Optional[str]

Get authentication token using OCI-compliant auth discovery.

This method parses the Www-Authenticate header to discover the authentication service and obtains a token dynamically, making it compatible with any OCI-compliant registry.

Parameters:

Name Type Description Default
registry str

Registry hostname

required
repository str

Repository name

required
www_authenticate str

Value of Www-Authenticate header from 401 response

required

Returns:

Type Description
Optional[str]

Authentication token, or None if no authentication is required

Source code in vllm/model_executor/model_loader/oci_loader.py
def _get_auth_token(
    self, registry: str, repository: str, www_authenticate: str
) -> Optional[str]:
    """Get authentication token using OCI-compliant auth discovery.

    This method parses the Www-Authenticate header to discover the
    authentication service and obtains a token dynamically, making it
    compatible with any OCI-compliant registry.

    Args:
        registry: Registry hostname
        repository: Repository name
        www_authenticate: Value of Www-Authenticate header from 401 response

    Returns:
        Authentication token, or None if no authentication is required
    """
    # Parse Www-Authenticate header
    # Format: Bearer realm="https://auth.example.com/token",service="registry.example.com",scope="repository:user/repo:pull"
    if not www_authenticate.startswith("Bearer "):
        logger.warning("Unsupported authentication scheme: %s", www_authenticate)
        return None

    auth_params = {}
    # Extract parameters from the header
    parts = www_authenticate[7:].split(",")  # Skip "Bearer "
    for part in parts:
        if "=" in part:
            key, value = part.strip().split("=", 1)
            # Remove quotes
            auth_params[key] = value.strip('"')

    realm = auth_params.get("realm")
    if not realm:
        logger.warning("No realm found in Www-Authenticate header")
        return None

    # Build token request parameters
    token_params = {}
    if "service" in auth_params:
        token_params["service"] = auth_params["service"]
    if "scope" in auth_params:
        token_params["scope"] = auth_params["scope"]
    else:
        # If no scope provided, use repository pull scope
        token_params["scope"] = f"repository:{repository}:pull"

    # Request token from auth service
    try:
        response = self.session.get(realm, params=token_params)
        response.raise_for_status()
        token_data = response.json()
        return token_data.get("token") or token_data.get("access_token")
    except Exception as e:
        logger.warning("Failed to obtain auth token: %s", e)
        return None

_get_cache_dir

_get_cache_dir(model_ref: str) -> str

Get cache directory for OCI model.

Parameters:

Name Type Description Default
model_ref str

Normalized model reference

required

Returns:

Type Description
str

Path to cache directory

Source code in vllm/model_executor/model_loader/oci_loader.py
def _get_cache_dir(self, model_ref: str) -> str:
    """Get cache directory for OCI model.

    Args:
        model_ref: Normalized model reference

    Returns:
        Path to cache directory
    """
    download_dir = self.load_config.download_dir or envs.VLLM_CACHE_ROOT

    # Create a safe directory name from the reference
    safe_ref = model_ref.replace(":", "_").replace("/", "_").replace("@", "_")
    cache_dir = os.path.join(download_dir, "oci", safe_ref)
    os.makedirs(cache_dir, exist_ok=True)

    return cache_dir

_get_weights_iterator

_get_weights_iterator(
    model_config: ModelConfig,
) -> Generator[tuple[str, Tensor], None, None]

Get iterator over model weights from safetensors layers.

Downloads weights if they haven't been downloaded yet.

Parameters:

Name Type Description Default
model_config ModelConfig

Model configuration

required

Yields:

Type Description
tuple[str, Tensor]

Tuples of (parameter_name, tensor)

Source code in vllm/model_executor/model_loader/oci_loader.py
def _get_weights_iterator(
    self, model_config: ModelConfig
) -> Generator[tuple[str, torch.Tensor], None, None]:
    """Get iterator over model weights from safetensors layers.

    Downloads weights if they haven't been downloaded yet.

    Args:
        model_config: Model configuration

    Yields:
        Tuples of (parameter_name, tensor)
    """
    model_ref = model_config.model
    original_oci_ref = None

    # Check if model_ref is already a local config path
    # (this happens when loading in worker processes)
    if os.path.isdir(model_ref) and model_ref.endswith("/config"):
        cache_dir = os.path.dirname(model_ref)
        # Try to extract original OCI reference from attribute
        original_oci_ref = getattr(model_config, "_original_model", None)

        # If not available, try reading from metadata file
        if not original_oci_ref:
            metadata_path = os.path.join(cache_dir, "oci_metadata.json")
            if os.path.exists(metadata_path):
                with open(metadata_path) as f:
                    metadata = json.load(f)
                    original_oci_ref = metadata.get("original_reference")
                    if original_oci_ref:
                        logger.info(
                            "Retrieved original OCI reference from metadata: %s",
                            original_oci_ref,
                        )
    else:
        # It's an OCI reference, normalize and get cache dir
        normalized_ref = self._normalize_oci_reference(model_ref)
        cache_dir = self._get_cache_dir(normalized_ref)
        original_oci_ref = model_ref

    # Load manifest
    manifest_path = os.path.join(cache_dir, "manifest.json")
    if not os.path.exists(manifest_path):
        raise ValueError(
            f"Manifest not found at {manifest_path}. "
            f"Cache dir: {cache_dir}, Model ref: {model_ref}"
        )

    with open(manifest_path) as f:
        manifest = json.load(f)

    # Get safetensors layers in order
    layers_dir = os.path.join(cache_dir, "layers")
    safetensors_files = []
    safetensors_layers = [
        layer
        for layer in manifest.get("layers", [])
        if layer.get("mediaType") == self.SAFETENSORS_MEDIA_TYPE
    ]

    # Check if weights need to be downloaded
    weights_missing = False
    if not os.path.exists(layers_dir):
        weights_missing = True
    else:
        for layer in safetensors_layers:
            digest = layer.get("digest", "").replace("sha256:", "")
            # Check if any matching file exists
            found = False
            for filename in os.listdir(layers_dir):
                if digest in filename and filename.endswith(".safetensors"):
                    found = True
                    break
            if not found:
                weights_missing = True
                break

    # Download weights if missing and we have a valid OCI reference
    if weights_missing:
        if not original_oci_ref:
            raise ValueError(
                f"Weights not found in cache at {layers_dir}, but cannot "
                f"download them because the original OCI reference is not "
                f"available. Model ref: {model_ref}"
            )
        logger.info("Weights not found in cache, downloading now...")
        self._download_oci_model_if_needed(original_oci_ref, download_weights=True)

    # Now collect safetensors files
    for layer in safetensors_layers:
        digest = layer.get("digest", "").replace("sha256:", "")
        # Find matching file
        for filename in sorted(os.listdir(layers_dir)):
            if digest in filename and filename.endswith(".safetensors"):
                safetensors_files.append(os.path.join(layers_dir, filename))
                break

    if not safetensors_files:
        raise ValueError(f"No safetensors files found in {layers_dir}")

    logger.info(
        "Loading weights from %d safetensors file(s)", len(safetensors_files)
    )

    # Use existing safetensors iterator
    yield from safetensors_weights_iterator(
        safetensors_files,
        use_tqdm_on_load=self.load_config.use_tqdm_on_load,
        safetensors_load_strategy=(self.load_config.safetensors_load_strategy),
    )

_normalize_oci_reference

_normalize_oci_reference(model_ref: str) -> str

Normalize OCI reference to include registry.

Parameters:

Name Type Description Default
model_ref str

Model reference (e.g., "user/model:tag")

required

Returns:

Type Description
str

Normalized reference (e.g., "docker.io/user/model:tag")

Source code in vllm/model_executor/model_loader/oci_loader.py
def _normalize_oci_reference(self, model_ref: str) -> str:
    """Normalize OCI reference to include registry.

    Args:
        model_ref: Model reference (e.g., "user/model:tag")

    Returns:
        Normalized reference (e.g., "docker.io/user/model:tag")
    """
    # If no registry is specified (no dots before first slash),
    # prepend default registry
    if "/" in model_ref:
        first_part = model_ref.split("/")[0]
        if "." not in first_part and ":" not in first_part:
            # This is a user/repo format without registry
            return f"{self.DEFAULT_REGISTRY}/{model_ref}"
    else:
        # Single name without slash, prepend library/
        return f"{self.DEFAULT_REGISTRY}/library/{model_ref}"

    return model_ref

_normalize_registry

_normalize_registry(registry: str) -> str

Normalize registry hostname for API calls.

Docker Hub uses registry-1.docker.io for API calls instead of docker.io.

Parameters:

Name Type Description Default
registry str

Registry hostname

required

Returns:

Type Description
str

Normalized registry hostname

Source code in vllm/model_executor/model_loader/oci_loader.py
def _normalize_registry(self, registry: str) -> str:
    """Normalize registry hostname for API calls.

    Docker Hub uses registry-1.docker.io for API calls instead of docker.io.

    Args:
        registry: Registry hostname

    Returns:
        Normalized registry hostname
    """
    if registry == "docker.io":
        return "registry-1.docker.io"
    return registry

_parse_oci_reference

_parse_oci_reference(
    model_ref: str,
) -> tuple[str, str, str]

Parse OCI reference into registry, repository, and tag/digest.

Parameters:

Name Type Description Default
model_ref str

Normalized OCI reference

required

Returns:

Type Description
tuple[str, str, str]

Tuple of (registry, repository, reference)

Source code in vllm/model_executor/model_loader/oci_loader.py
def _parse_oci_reference(self, model_ref: str) -> tuple[str, str, str]:
    """Parse OCI reference into registry, repository, and tag/digest.

    Args:
        model_ref: Normalized OCI reference

    Returns:
        Tuple of (registry, repository, reference)
    """
    # Format: registry/repository:tag or registry/repository@digest
    parts = model_ref.split("/", 1)
    registry = parts[0]

    if "@" in parts[1]:
        repository, reference = parts[1].split("@", 1)
    elif ":" in parts[1]:
        repository, reference = parts[1].rsplit(":", 1)
    else:
        repository = parts[1]
        reference = "latest"

    return registry, repository, reference

_pull_oci_manifest

_pull_oci_manifest(
    model_ref: str, cache_dir: str
) -> tuple[dict, list[dict], Optional[dict]]

Pull OCI manifest and identify layers.

Parameters:

Name Type Description Default
model_ref str

Normalized OCI reference

required
cache_dir str

Cache directory

required

Returns:

Type Description
tuple[dict, list[dict], Optional[dict]]

Tuple of (manifest, safetensors_layers, config_layer)

Source code in vllm/model_executor/model_loader/oci_loader.py
def _pull_oci_manifest(
    self, model_ref: str, cache_dir: str
) -> tuple[dict, list[dict], Optional[dict]]:
    """Pull OCI manifest and identify layers.

    Args:
        model_ref: Normalized OCI reference
        cache_dir: Cache directory

    Returns:
        Tuple of (manifest, safetensors_layers, config_layer)
    """
    logger.info("Pulling OCI manifest for %s", model_ref)

    # Parse reference
    registry, repository, reference = self._parse_oci_reference(model_ref)
    registry = self._normalize_registry(registry)

    # Use standard OCI registry URL format
    manifest_url = f"https://{registry}/v2/{repository}/manifests/{reference}"
    headers = {
        "Accept": "application/vnd.oci.image.manifest.v1+json, "
        "application/vnd.docker.distribution.manifest.v2+json"
    }

    # Make authenticated request
    try:
        response = self._authenticated_request(
            manifest_url, registry, repository, headers
        )
        response.raise_for_status()
        manifest = response.json()
    except Exception as e:
        raise ValueError(
            f"Failed to pull manifest for {model_ref}. "
            f"Please ensure the image exists and is accessible. "
            f"Error: {e}"
        ) from e

    if not manifest:
        raise ValueError(f"Failed to pull manifest for {model_ref}")

    # Parse layers
    safetensors_layers = []
    config_layer = None

    for layer in manifest.get("layers", []):
        media_type = layer.get("mediaType", "")

        if media_type == self.SAFETENSORS_MEDIA_TYPE:
            safetensors_layers.append(layer)
        elif media_type == self.CONFIG_TAR_MEDIA_TYPE:
            config_layer = layer

    if not safetensors_layers:
        raise ValueError(f"No safetensors layers found in OCI image {model_ref}")

    logger.info(
        "Found %d safetensors layer(s) in manifest", len(safetensors_layers)
    )
    if config_layer:
        logger.info("Found config tar layer in manifest")

    return manifest, safetensors_layers, config_layer

download_model

download_model(model_config: ModelConfig) -> None

Download model from OCI registry.

Parameters:

Name Type Description Default
model_config ModelConfig

Model configuration

required
Source code in vllm/model_executor/model_loader/oci_loader.py
def download_model(self, model_config: ModelConfig) -> None:
    """Download model from OCI registry.

    Args:
        model_config: Model configuration
    """
    model_ref = model_config.model

    # If model_ref is a local config path, read the original OCI reference
    if os.path.isdir(model_ref) and model_ref.endswith("/config"):
        cache_dir = os.path.dirname(model_ref)
        metadata_path = os.path.join(cache_dir, "oci_metadata.json")

        if os.path.exists(metadata_path):
            with open(metadata_path) as f:
                metadata = json.load(f)
                model_ref = metadata.get("original_reference", model_ref)
                logger.info("Retrieved original OCI reference: %s", model_ref)

    self._download_oci_model_if_needed(model_ref)

download_oci_model_simple

download_oci_model_simple(model_ref: str) -> str

Download OCI model without requiring ModelConfig.

This is a simplified version for early config loading that only downloads the config layer, not the weight files. Weights are downloaded later during model initialization.

Parameters:

Name Type Description Default
model_ref str

OCI model reference

required

Returns:

Type Description
str

Path to extracted config directory

Source code in vllm/model_executor/model_loader/oci_loader.py
def download_oci_model_simple(self, model_ref: str) -> str:
    """Download OCI model without requiring ModelConfig.

    This is a simplified version for early config loading that only
    downloads the config layer, not the weight files. Weights are
    downloaded later during model initialization.

    Args:
        model_ref: OCI model reference

    Returns:
        Path to extracted config directory
    """
    return self._download_oci_model_if_needed(model_ref, download_weights=False)

load_weights

load_weights(
    model: Module, model_config: ModelConfig
) -> None

Load weights into the model from OCI layers.

Parameters:

Name Type Description Default
model Module

Model to load weights into

required
model_config ModelConfig

Model configuration

required
Source code in vllm/model_executor/model_loader/oci_loader.py
def load_weights(self, model: nn.Module, model_config: ModelConfig) -> None:
    """Load weights into the model from OCI layers.

    Args:
        model: Model to load weights into
        model_config: Model configuration
    """
    # Get the config directory path - update model_config.model to point
    # to the extracted config for compatibility with other components
    normalized_ref = self._normalize_oci_reference(model_config.model)
    cache_dir = self._get_cache_dir(normalized_ref)
    config_dir = os.path.join(cache_dir, "config")

    # If config directory exists, temporarily update model path
    original_model = model_config.model
    if os.path.exists(config_dir):
        logger.info("Using config from %s", config_dir)
        # Store original and update for tokenizer/config loading
        model_config._original_model = original_model
        model_config.model = config_dir

    # Load weights using iterator
    weights_to_load = {name for name, _ in model.named_parameters()}
    loaded_weights = model.load_weights(self._get_weights_iterator(model_config))

    # Check if all weights were loaded (for non-quantized models)
    if model_config.quantization is None and loaded_weights is not None:
        weights_not_loaded = weights_to_load - loaded_weights
        if weights_not_loaded:
            raise ValueError(
                "Following weights were not initialized from "
                f"checkpoint: {weights_not_loaded}"
            )

    logger.info("Weights loaded successfully from OCI registry")