메인 콘텐츠로 건너뛰기
Dagster와 Weights & Biases (W&B)를 사용하여 MLOps 파이프라인을 오케스트레이션하고 ML 자산을 관리하세요. Dagster와 W&B의 인테그레이션을 통해 다음 작업을 쉽게 수행할 수 있습니다.
  • W&B Artifact 생성 및 사용.
  • W&B Registry에서 Registered Models 생성 및 사용.
  • W&B Launch를 사용하여 전용 컴퓨팅 자원에서 트레이닝 작업 실행.
  • ops 및 assets에서 wandb 클라이언트 사용.
W&B Dagster 인테그레이션은 W&B 전용 Dagster 리소스와 IO Manager를 제공합니다.
  • wandb_resource: W&B API와의 인증 및 통신에 사용되는 Dagster 리소스.
  • wandb_artifacts_io_manager: W&B Artifacts를 사용하기 위해 제공되는 Dagster IO Manager.
이 가이드에서는 Dagster에서 W&B를 사용하기 위한 사전 요구 사항, ops 및 assets에서 W&B Artifacts를 생성하고 사용하는 방법, W&B Launch 사용법 및 권장 모범 사례를 설명합니다.

시작하기 전 준비 사항

W&B 내에서 Dagster를 사용하려면 다음 리소스가 필요합니다.
  1. W&B API 키.
  2. W&B entity (사용자 또는 팀): entity는 W&B Runs 및 Artifacts를 전송할 사용자 이름 또는 팀 이름입니다. 로그를 기록하기 전에 W&B 앱 UI에서 계정이나 팀 entity를 생성했는지 확인하세요. entity를 지정하지 않으면 run은 보통 사용자 이름인 기본 entity로 전송됩니다. Project Defaults 아래의 설정에서 기본 entity를 변경할 수 있습니다.
  3. W&B 프로젝트: W&B Runs가 저장될 프로젝트 이름.
W&B 앱의 프로필 페이지에서 해당 사용자 또는 팀의 W&B entity를 확인할 수 있습니다. 기존 W&B 프로젝트를 사용하거나 새로 만들 수 있습니다. 새 프로젝트는 W&B 앱 홈페이지나 사용자/팀 프로필 페이지에서 생성할 수 있습니다. 프로젝트가 존재하지 않는 경우 처음 사용할 때 자동으로 생성됩니다.

API 키 설정

  1. W&B에 로그인합니다. 안내: W&B Server를 사용하는 경우 관리자에게 인스턴스 호스트 이름을 문의하세요.
  2. 사용자 설정에서 API 키를 생성합니다. 프로덕션 환경에서는 해당 키를 관리하기 위해 서비스 계정을 사용하는 것이 좋습니다.
  3. 해당 API 키에 대한 환경 변수를 설정합니다: export WANDB_API_KEY=YOUR_KEY.
다음 예제는 Dagster 코드에서 API 키를 지정하는 위치를 보여줍니다. wandb_config 중첩 사전 내에 entity와 프로젝트 이름을 지정해야 합니다. 다른 W&B 프로젝트를 사용하려는 경우 각 ops/assets에 다른 wandb_config 값을 전달할 수 있습니다. 전달 가능한 키에 대한 자세한 정보는 아래의 Configuration 섹션을 참조하세요.
예시: @job에 대한 설정
# config.yaml에 추가하거나
# Dagit의 Launchpad 또는 JobDefinition.execute_in_process에서 설정할 수 있습니다.
# 참조: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 사용자 W&B entity로 교체
     project: my_project # 사용자 W&B 프로젝트로 교체


@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()

Configuration

다음 설정 옵션들은 인테그레이션에서 제공하는 W&B 전용 Dagster 리소스 및 IO Manager의 설정값으로 사용됩니다.
  • wandb_resource: W&B API와 통신하는 데 사용되는 Dagster resource. 제공된 API 키를 사용하여 자동으로 인증합니다. 속성:
    • api_key: (str, 필수): W&B API와 통신하는 데 필요한 W&B API 키.
    • host: (str, 선택 사항): 사용할 API 호스트 서버. W&B Server를 사용하는 경우에만 필요합니다. 기본값은 퍼블릭 클라우드 호스트인 https://api.wandb.ai입니다.
  • wandb_artifacts_io_manager: W&B Artifacts를 사용하기 위한 Dagster IO Manager. 속성:
    • base_dir: (int, 선택 사항) 로컬 저장 및 캐싱에 사용되는 기본 디렉토리. W&B Artifacts 및 W&B Run 로그가 이 디렉토리에 기록되고 읽힙니다. 기본값은 DAGSTER_HOME 디렉토리입니다.
    • cache_duration_in_minutes: (int, 선택 사항) W&B Artifacts 및 W&B Run 로그를 로컬 저장소에 보관할 시간입니다. 해당 시간 동안 열리지 않은 파일과 디렉토리만 캐시에서 제거됩니다. 캐시 정리는 IO Manager 실행이 끝날 때 발생합니다. 캐싱을 완전히 끄려면 0으로 설정할 수 있습니다. 캐싱은 동일한 머신에서 실행되는 작업 간에 아티팩트를 재사용할 때 속도를 향상시킵니다. 기본값은 30일입니다.
    • run_id: (str, 선택 사항): 재개(resume)에 사용되는 run의 고유 ID. 프로젝트 내에서 고유해야 하며, run을 삭제해도 해당 ID를 재사용할 수 없습니다. 짧은 설명형 이름은 name 필드를 사용하고, 하이퍼파라미터를 저장하여 run 간 비교를 하려면 config를 사용하세요. ID에는 /\#?%:..와 같은 특수 문자를 포함할 수 없습니다. IO Manager가 run을 재개할 수 있도록 Dagster 내부에서 실험 트래킹을 수행할 때 Run ID를 설정해야 합니다. 기본적으로 Dagster Run ID(예: 7e4df022-1bf2-44b5-a383-bb852df4077e)로 설정됩니다.
    • run_name: (str, 선택 사항) UI에서 이 run을 식별하는 데 도움이 되는 짧은 표시 이름입니다. 기본적으로 dagster-run-[Dagster Run ID의 앞 8글자] 형식의 문자열입니다 (예: dagster-run-7e4df022).
    • run_tags: (list[str], 선택 사항): UI에서 이 run의 태그 목록을 채울 문자열 목록입니다. 태그는 run을 함께 구성하거나 baseline 또는 production과 같은 임시 레이블을 적용하는 데 유용합니다. UI에서 태그를 쉽게 추가 및 제거하거나 특정 태그가 있는 run만 필터링할 수 있습니다. 인테그레이션에서 사용하는 모든 W&B Run에는 dagster_wandb 태그가 붙습니다.

W&B Artifacts 사용하기

W&B Artifact와의 인테그레이션은 Dagster IO Manager를 기반으로 합니다. IO Managers는 asset 또는 op의 출력을 저장하고 이를 다운스트림 asset 또는 op의 입력으로 로드하는 역할을 하는 사용자 제공 오브젝트입니다. 예를 들어, IO Manager는 파일 시스템의 파일에서 오브젝트를 저장하고 로드할 수 있습니다. 인테그레이션은 W&B Artifacts를 위한 IO Manager를 제공합니다. 이를 통해 모든 Dagster @op 또는 @asset에서 기본적으로 W&B Artifacts를 생성하고 사용할 수 있습니다. 다음은 Python 리스트를 포함하는 dataset 타입의 W&B Artifact를 생성하는 @asset의 간단한 예입니다.
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # Artifact에 저장될 내용
Artifact를 기록하기 위해 @op, @asset@multi_asset에 메타데이터 설정을 추가할 수 있습니다. 마찬가지로 Dagster 외부에서 생성된 W&B Artifacts도 사용할 수 있습니다.

W&B Artifacts 기록하기

계속하기 전에 W&B Artifacts 사용 방법을 잘 숙지하는 것이 좋습니다. Artifact 가이드를 참고하세요. W&B Artifact를 기록하려면 Python 함수에서 오브젝트를 반환하세요. W&B는 다음 오브젝트들을 지원합니다.
  • Python 오브젝트 (int, dict, list…)
  • W&B 오브젝트 (Table, Image, Graph…)
  • W&B Artifact 오브젝트
다음 예제는 Dagster assets (@asset)를 사용하여 W&B Artifacts를 기록하는 방법을 보여줍니다.
pickle 모듈로 직렬화할 수 있는 모든 것은 피클링되어 인테그레이션에 의해 생성된 Artifact에 추가됩니다. Dagster 내부에서 해당 Artifact를 읽을 때 내용이 역직렬화(unpickle)됩니다 (자세한 내용은 Artifact 읽기 참조).
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]
W&B는 여러 Pickle 기반 직렬화 모듈(pickle, dill, cloudpickle, joblib)을 지원합니다. ONNX 또는 PMML과 같은 더 고급 직렬화도 사용할 수 있습니다. 자세한 내용은 Serialization 섹션을 참조하세요.

Configuration

wandb_artifact_configuration이라는 설정 사전은 @op, @asset@multi_asset에 설정할 수 있습니다. 이 사전은 데코레이터 인수의 메타데이터로 전달되어야 합니다. 이 설정은 IO Manager가 W&B Artifacts를 읽고 쓰는 방식을 제어하는 데 필수적입니다. @op의 경우, Out 메타데이터 인수를 통한 출력 메타데이터에 위치합니다. @asset의 경우, asset의 metadata 인수에 위치합니다. @multi_asset의 경우, AssetOut 메타데이터 인수를 통한 각 출력 메타데이터에 위치합니다. 다음 코드 예제는 @op, @asset@multi_asset 연산에서 사전을 구성하는 방법을 보여줍니다.
@op 예시:
@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]
지원되는 속성:
  • name: (str) 이 아티팩트의 식별 이름으로, UI에서 확인하거나 use_artifact 호출 시 참조합니다. 이름에는 영문자, 숫자, 언더바, 하이픈, 점을 포함할 수 있습니다. 이름은 프로젝트 전체에서 고유해야 합니다. @op의 경우 필수입니다.
  • type: (str) 아티팩트를 정리하고 구분하는 데 사용되는 아티팩트 타입. 일반적인 타입에는 dataset 또는 model이 포함되지만, 영문자, 숫자, 언더바, 하이픈, 점을 포함한 모든 문자열을 사용할 수 있습니다. 출력이 이미 Artifact가 아닌 경우 필수입니다.
  • description: (str) 아티팩트에 대한 설명을 제공하는 텍스트. 설명은 UI에서 마크다운으로 렌더링되므로 표, 링크 등을 넣기에 좋습니다.
  • aliases: (list[str]) Artifact에 적용할 하나 이상의 에일리어스를 포함하는 배열. 인테그레이션은 설정 여부와 관계없이 “latest” 태그를 해당 목록에 자동으로 추가합니다. 이는 모델 및 데이터셋의 버전 관리를 효과적으로 수행하는 방법입니다.
  • add_dirs: (list[dict[str, Any]]): Artifact에 포함할 각 로컬 디렉토리에 대한 설정을 포함하는 배열.
  • add_files: (list[dict[str, Any]]): Artifact에 포함할 각 로컬 파일에 대한 설정을 포함하는 배열.
  • add_references: (list[dict[str, Any]]): Artifact에 포함할 각 외부 참조에 대한 설정을 포함하는 배열.
  • serialization_module: (dict) 사용할 직렬화 모듈의 설정. 자세한 내용은 Serialization 섹션을 참조하세요.
    • name: (str) 직렬화 모듈의 이름. 허용되는 값: pickle, dill, cloudpickle, joblib. 해당 모듈은 로컬에서 사용 가능해야 합니다.
    • parameters: (dict[str, Any]) 직렬화 함수에 전달되는 선택적 인수. 해당 모듈의 dump 메소드와 동일한 파라미터를 허용합니다 (예: {"compress": 3, "protocol": 4}).
고급 예제:
@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]
asset은 인테그레이션 양측의 유용한 메타데이터와 함께 구체화(materialize)됩니다.
  • W&B 측: 소스 인테그레이션 이름 및 버전, 사용된 python 버전, pickle 프로토콜 버전 등.
  • Dagster 측:
    • Dagster Run ID
    • W&B Run: ID, 이름, 경로, URL
    • W&B Artifact: ID, 이름, 타입, 버전, 크기, URL
    • W&B Entity
    • W&B Project
다음 이미지는 Dagster asset에 추가된 W&B의 메타데이터를 보여줍니다. 이 정보는 인테그레이션 없이는 사용할 수 없습니다.
다음 이미지는 제공된 설정이 W&B Artifact의 유용한 메타데이터로 어떻게 풍부해졌는지 보여줍니다. 이 정보는 재현성 및 유지 관리에 도움이 됩니다. 인테그레이션 없이는 사용할 수 없습니다.
mypy와 같은 정적 타입 체커를 사용하는 경우, 다음을 사용하여 설정 타입 정의 오브젝트를 임포트하세요.
from dagster_wandb import WandbArtifactConfiguration

파티션 사용하기

인테그레이션은 기본적으로 Dagster partitions를 지원합니다. 다음은 DailyPartitionsDefinition을 사용한 파티션 예시입니다.
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"Creating partitioned asset for {partition_key}")
    return random.randint(0, 100)
이 코드는 각 파티션에 대해 하나의 W&B Artifact를 생성합니다. UI의 Artifact 패널에서 asset 이름 아래에 파티션 키가 추가된 아티팩트를 확인할 수 있습니다 (예: my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02, 또는 my_daily_partitioned_asset.2023-01-03). 여러 차원에 걸쳐 파티션된 asset은 각 차원을 점(dot)으로 구분하여 표시합니다 (예: my_asset.car.blue).
인테그레이션은 한 번의 run 내에서 여러 파티션을 구체화하는 것을 허용하지 않습니다. asset을 구체화하려면 여러 번의 run을 실행해야 합니다. 이는 asset을 구체화할 때 Dagit에서 실행할 수 있습니다.

고급 사용법

W&B Artifacts 읽기

W&B Artifacts를 읽는 방법은 쓰는 방법과 비슷합니다. wandb_artifact_configuration이라는 설정 사전을 @op 또는 @asset에 설정할 수 있습니다. 유일한 차이점은 출력이 아닌 입력에 설정을 적용해야 한다는 것입니다. @op의 경우, In 메타데이터 인수를 통한 입력 메타데이터에 위치합니다. Artifact의 이름을 명시적으로 전달해야 합니다. @asset의 경우, AssetIn 메타데이터 인수를 통한 입력 메타데이터에 위치합니다. 부모 asset의 이름과 일치해야 하므로 Artifact 이름을 전달할 필요가 없습니다. 인테그레이션 외부에서 생성된 Artifact에 대한 종속성을 가지려면 SourceAsset을 사용해야 합니다. 이는 항상 해당 asset의 최신 버전을 읽습니다. 다음 예제는 다양한 ops에서 Artifact를 읽는 방법을 보여줍니다.
@op에서 아티팩트 읽기
@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)

Configuration

다음 설정은 IO Manager가 데코레이팅된 함수의 입력으로 수집하고 제공해야 할 항목을 나타내는 데 사용됩니다. 다음 읽기 패턴이 지원됩니다.
  1. Artifact 내에 포함된 이름이 지정된 오브젝트를 가져오려면 get을 사용합니다.
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
  1. Artifact 내에 포함된 다운로드된 파일의 로컬 경로를 가져오려면 get_path를 사용합니다.
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
  1. 전체 Artifact 오브젝트를 가져오려면 (내용이 로컬에 다운로드된 상태):
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)
지원되는 속성
  • get: (str) 아티팩트 상대 경로에 위치한 W&B 오브젝트를 가져옵니다.
  • get_path: (str) 아티팩트 상대 경로에 위치한 파일의 경로를 가져옵니다.

Serialization configuration

기본적으로 인테그레이션은 표준 pickle 모듈을 사용하지만, 일부 오브젝트는 이와 호환되지 않습니다. 예를 들어, yield가 있는 함수는 피클링하려고 하면 오류가 발생합니다. 더 많은 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)을 지원합니다. 또한 직렬화된 문자열을 반환하거나 Artifact를 직접 생성하여 ONNX 또는 PMML과 같은 고급 직렬화를 사용할 수도 있습니다. 적절한 선택은 유스 케이스에 따라 달라지므로 관련 자료를 참고하세요.

Pickle 기반 직렬화 모듈

피클링은 보안상 취약한 것으로 알려져 있습니다. 보안이 우려되는 경우 W&B 오브젝트만 사용하세요. 데이터에 서명하고 해시 키를 자체 시스템에 저장하는 것을 권장합니다. 더 복잡한 유스 케이스의 경우 언제든지 문의해 주세요.
wandb_artifact_configurationserialization_module 사전을 통해 사용되는 직렬화를 설정할 수 있습니다. Dagster를 실행하는 머신에서 해당 모듈을 사용할 수 있는지 확인하세요. 인테그레이션은 해당 Artifact를 읽을 때 어떤 직렬화 모듈을 사용해야 하는지 자동으로 파악합니다. 현재 지원되는 모듈은 pickle, dill, cloudpickle, joblib입니다. 다음은 joblib으로 직렬화된 “model”을 생성한 다음 이를 추론에 사용하는 단순화된 예제입니다.
@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # 실제 ML 모델은 아니지만 pickle 모듈로는 불가능한 예시입니다
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # 출력: 3
    return inference_result

고급 직렬화 형식 (ONNX, PMML)

ONNX 및 PMML과 같은 상호 교환 파일 형식을 사용하는 것이 일반적입니다. 인테그레이션은 이러한 형식을 지원하지만 Pickle 기반 직렬화보다 약간의 추가 작업이 필요합니다. 이러한 형식을 사용하는 데는 두 가지 방법이 있습니다.
  1. 모델을 선택한 형식으로 변환한 다음, 일반 Python 오브젝트인 것처럼 해당 형식의 문자열 표현을 반환합니다. 인테그레이션은 해당 문자열을 피클링합니다. 나중에 해당 문자열을 사용하여 모델을 재구성할 수 있습니다.
  2. 직렬화된 모델로 새 로컬 파일을 생성한 다음, add_file 설정을 사용하여 해당 파일로 커스텀 Artifact를 구축합니다.
다음은 Scikit-learn 모델이 ONNX를 사용하여 직렬화되는 예제입니다.
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # 출처: https://onnx.ai/sklearn-onnx/

    # 모델 트레이닝
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # ONNX 형식으로 변환
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # artifacts 기록 (model + test_set)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # 출처: https://onnx.ai/sklearn-onnx/

    # ONNX Runtime으로 예측 수행
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx

파티션 사용하기

인테그레이션은 기본적으로 Dagster partitions를 지원합니다. asset의 하나, 여러 개 또는 모든 파티션을 선택적으로 읽을 수 있습니다. 모든 파티션은 사전에 제공되며, 키와 값은 각각 파티션 키와 Artifact 내용을 나타냅니다.
업스트림 @asset의 모든 파티션을 읽어 사전 형태로 제공합니다. 이 사전에서 키와 값은 각각 파티션 키와 Artifact 내용에 대응됩니다.
@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
설정 오브젝트인 metadata는 W&B가 프로젝트의 다른 아티팩트 파티션과 상호 작용하는 방식을 구성합니다. metadata 오브젝트는 wandb_artifact_configuration이라는 키를 포함하며, 그 안에 partitions라는 중첩 오브젝트가 포함됩니다. partitions 오브젝트는 각 파티션의 이름을 해당 설정에 매핑합니다. 각 파티션에 대한 설정은 데이터를 가져오는 방법을 지정할 수 있습니다. 이러한 설정은 각 파티션의 요구 사항에 따라 get, version, alias와 같은 다양한 키를 포함할 수 있습니다. Configuration keys
  1. get: get 키는 데이터를 가져올 W&B 오브젝트 (Table, Image…)의 이름을 지정합니다.
  2. version: version 키는 Artifact의 특정 버전을 가져오고 싶을 때 사용합니다.
  3. alias: alias 키를 사용하면 에일리어스로 Artifact를 가져올 수 있습니다.
와일드카드 설정 와일드카드 "*"는 명시적으로 설정되지 않은 모든 파티션을 의미합니다. 이는 partitions 오브젝트에 명시적으로 언급되지 않은 파티션에 대한 기본 설정을 제공합니다. 예를 들어,
"*": {
    "get": "default_table_name",
},
이 설정은 명시적으로 구성되지 않은 모든 파티션에 대해 default_table_name이라는 이름의 테이블에서 데이터를 가져온다는 것을 의미합니다. 특정 파티션 설정 파티션별 설정을 제공하여 특정 파티션에 대한 와일드카드 설정을 오버라이드할 수 있습니다. 예를 들어,
"yellow": {
    "get": "custom_table_name",
},
이 설정은 yellow라는 이름의 파티션에 대해 custom_table_name 테이블에서 데이터를 가져오며, 와일드카드 설정을 오버라이드한다는 것을 의미합니다. 버전 관리 및 에일리어싱 버전 관리 및 에일리어싱을 위해 설정에 특정 versionalias 키를 제공할 수 있습니다. 버전의 경우,
"orange": {
    "version": "v0",
},
이 설정은 orange Artifact 파티션의 v0 버전에서 데이터를 가져옵니다. 에일리어스의 경우,
"blue": {
    "alias": "special_alias",
},
이 설정은 에일리어스가 special_alias인 Artifact 파티션(설정에서 blue로 참조됨)의 default_table_name 테이블에서 데이터를 가져옵니다.

고급 사용법

인테그레이션의 고급 사용법을 보려면 다음의 전체 코드 예제를 참조하세요.

W&B Launch 사용하기

활발히 개발 중인 베타 제품입니다. Launch에 관심이 있으신가요? W&B Launch 고객 파일럿 프로그램 참여에 대해 논의하려면 계정 담당 팀에 문의하세요. 파일럿 고객은 베타 프로그램 자격을 갖추기 위해 AWS EKS 또는 SageMaker를 사용해야 합니다. 궁극적으로 추가 플랫폼을 지원할 계획입니다.
계속하기 전에 W&B Launch 사용 방법을 잘 숙지하는 것이 좋습니다. Launch 가이드를 참고하세요. Dagster 인테그레이션은 다음을 지원합니다.
  • Dagster 인스턴스에서 하나 또는 여러 개의 Launch 에이전트 실행.
  • Dagster 인스턴스 내에서 로컬 Launch 작업 실행.
  • 온프레미스 또는 클라우드에서 원격 Launch 작업 실행.

Launch 에이전트

인테그레이션은 run_launch_agent라는 임포트 가능한 @op를 제공합니다. 이는 Launch Agent를 시작하고 수동으로 중지할 때까지 장기 실행 프로세스로 실행합니다. 에이전트는 launch queues를 폴링하고 작업 순서대로 실행하거나(또는 실행을 위해 외부 서비스로 디스패치) 하는 프로세스입니다. Launch 페이지를 참조하세요. 또한 Launchpad에서 모든 속성에 대한 유용한 설명을 볼 수 있습니다.
간단한 예시
# config.yaml에 추가하거나
# Dagit의 Launchpad 또는 JobDefinition.execute_in_process에서 설정할 수 있습니다.
# 참조: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 사용자 W&B entity로 교체
     project: my_project # 사용자 W&B 프로젝트로 교체
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues: 
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()

Launch 작업

인테그레이션은 run_launch_job이라는 임포트 가능한 @op를 제공합니다. 이는 Launch 작업을 실행합니다. Launch 작업은 실행을 위해 큐에 할당됩니다. 큐를 새로 만들거나 기본 큐를 사용할 수 있습니다. 해당 큐를 리스닝하는 활성 에이전트가 있는지 확인하세요. Dagster 인스턴스 내부에서 에이전트를 실행할 수도 있지만, Kubernetes에서 배포 가능한 에이전트를 사용하는 것도 고려해 볼 수 있습니다. Launch 페이지를 참조하세요. 또한 Launchpad에서 모든 속성에 대한 유용한 설명을 볼 수 있습니다.
간단한 예시
# config.yaml에 추가하거나
# Dagit의 Launchpad 또는 JobDefinition.execute_in_process에서 설정할 수 있습니다.
# 참조: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 사용자 W&B entity로 교체
     project: my_project # 사용자 W&B 프로젝트로 교체
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch


from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # 에일리어스로 작업 이름 변경

모범 사례

  1. Artifact를 읽고 쓸 때 IO Manager를 사용하세요. Artifact.download() 또는 Run.log_artifact()를 직접 사용하지 마세요. 이러한 메소드는 인테그레이션에 의해 처리됩니다. 대신 Artifact에 저장하려는 데이터를 반환하면 인테그레이션이 나머지를 처리합니다. 이 방식은 Artifact에 대해 더 나은 계보(lineage)를 제공합니다.
  2. 복잡한 유스 케이스인 경우에만 Artifact 오브젝트를 직접 구축하세요. Python 오브젝트와 W&B 오브젝트는 ops/assets에서 반환되어야 합니다. 인테그레이션이 Artifact 번들링을 처리합니다. 복잡한 유스 케이스의 경우 Dagster 작업에서 Artifact를 직접 구축할 수 있습니다. 소스 인테그레이션 이름 및 버전, 사용된 python 버전, pickle 프로토콜 버전 등과 같은 메타데이터 보강을 위해 Artifact 오브젝트를 인테그레이션에 전달하는 것이 좋습니다.
  3. 메타데이터를 통해 Artifact에 파일, 디렉토리 및 외부 참조를 추가하세요. 인테그레이션의 wandb_artifact_configuration 오브젝트를 사용하여 파일, 디렉토리 또는 외부 참조(Amazon S3, GCS, HTTP…)를 추가하세요. 자세한 내용은 Artifact configuration 섹션의 고급 예제를 참조하세요.
  4. Artifact가 생성될 때는 @op 대신 @asset을 사용하세요. Artifacts는 assets입니다. Dagster가 해당 자산을 관리하는 경우 asset을 사용하는 것이 권장됩니다. 이는 Dagit Asset Catalog에서 더 나은 가시성을 제공합니다.
  5. Dagster 외부에서 생성된 Artifact를 사용하려면 SourceAsset을 사용하세요. 이를 통해 인테그레이션을 활용하여 외부에서 생성된 Artifact를 읽을 수 있습니다. 그렇지 않으면 인테그레이션에 의해 생성된 Artifact만 사용할 수 있습니다.
  6. 대규모 모델의 전용 컴퓨팅 트레이닝을 오케스트레이션하려면 W&B Launch를 사용하세요. 소규모 모델은 Dagster 클러스터 내부에서 트레이닝할 수 있으며, GPU 노드가 있는 Kubernetes 클러스터에서 Dagster를 실행할 수 있습니다. 하지만 대규모 모델 트레이닝에는 W&B Launch를 사용하는 것이 좋습니다. 이를 통해 인스턴스 과부하를 방지하고 더 적합한 컴퓨팅 자원에 액세스할 수 있습니다.
  7. Dagster 내에서 실험 트래킹을 할 때 W&B Run ID를 Dagster Run ID 값으로 설정하세요. Run을 재개 가능하게 만들고, W&B Run ID를 Dagster Run ID 또는 선택한 문자열로 설정하는 것을 권장합니다. 이 권장 사항을 따르면 Dagster 내부에서 모델을 트레이닝할 때 W&B 메트릭과 W&B Artifacts가 동일한 W&B Run에 저장되도록 보장할 수 있습니다.
W&B Run ID를 Dagster Run ID로 설정하는 방법:
wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)
또는 고유한 W&B Run ID를 선택하여 IO Manager 설정에 전달하는 방법:
wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
  1. 대용량 W&B Artifact의 경우 get 또는 get_path로 필요한 데이터만 수집하세요. 기본적으로 인테그레이션은 전체 Artifact를 다운로드합니다. 매우 큰 아티팩트를 사용하는 경우 필요한 특정 파일이나 오브젝트만 수집하고 싶을 수 있습니다. 이는 속도와 리소스 활용도를 향상시킵니다.
  2. Python 오브젝트의 경우 유스 케이스에 맞춰 피클링 모듈을 조정하세요. 기본적으로 W&B 인테그레이션은 표준 pickle 모듈을 사용합니다. 하지만 일부 오브젝트는 이와 호환되지 않습니다. 예를 들어, yield가 있는 함수는 피클링하려고 하면 오류가 발생합니다. W&B는 다른 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)을 지원합니다.
또한 직렬화된 문자열을 반환하거나 Artifact를 직접 생성하여 ONNX 또는 PMML과 같은 고급 직렬화를 사용할 수도 있습니다. 적절한 선택은 유스 케이스에 따라 달라지므로 관련 자료를 참조하세요.