classDockerWorker(BaseWorker):"""Prefect worker that executes flow runs within Docker containers."""type="docker"job_configuration=DockerWorkerJobConfiguration_description=("Execute flow runs within Docker containers. Works well for managing flow ""execution environments via Docker images. Requires access to a running ""Docker daemon.")_display_name="Docker"_documentation_url="https://prefecthq.github.io/prefect-docker/worker/"_logo_url="https://images.ctfassets.net/gm98wzqotmnx/2IfXXfMq66mrzJBDFFCHTp/6d8f320d9e4fc4393f045673d61ab612/Moby-logo.png?h=250"# noqadef__init__(self,*args:Any,test_mode:bool=None,**kwargs:Any)->None:iftest_modeisNone:self.test_mode=bool(os.getenv("PREFECT_DOCKER_TEST_MODE",False))else:self.test_mode=test_modesuper().__init__(*args,**kwargs)asyncdefsetup(self):ifnotself.test_mode:self._client=get_client()ifself._client.server_type==ServerType.EPHEMERAL:raiseRuntimeError("Docker worker cannot be used with an ephemeral server. Please set"" PREFECT_API_URL to the URL for your Prefect API instance. You"" can use a local Prefect API instance by running `prefect server"" start`.")returnawaitsuper().setup()asyncdefrun(self,flow_run:"FlowRun",configuration:BaseJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->BaseWorkerResult:""" Executes a flow run within a Docker container and waits for the flow run to complete. """# The `docker` library uses requests instead of an async http library so it must# be run in a thread to avoid blocking the event loop.container,created_event=awaitrun_sync_in_worker_thread(self._create_and_start_container,configuration)container_pid=self._get_infrastructure_pid(container_id=container.id)# Mark as started and return the infrastructure idiftask_status:task_status.started(container_pid)# Monitor the containercontainer=awaitrun_sync_in_worker_thread(self._watch_container_safe,container,configuration,created_event)exit_code=container.attrs["State"].get("ExitCode")returnDockerWorkerResult(status_code=exit_codeifexit_codeisnotNoneelse-1,identifier=container_pid,)asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:DockerWorkerJobConfiguration,grace_seconds:int=30,):""" Stops a container for a cancelled flow run based on the provided infrastructure PID. """docker_client=self._get_client()base_url,container_id=self._parse_infrastructure_pid(infrastructure_pid)ifdocker_client.api.base_url!=base_url:raiseInfrastructureNotAvailable("".join([(f"Unable to stop container {container_id!r}: the current"" Docker API "),(f"URL {docker_client.api.base_url!r} does not match the"" expected "),f"API base URL {base_url}.",]))awaitrun_sync_in_worker_thread(self._stop_container,container_id,docker_client,grace_seconds)def_stop_container(self,container_id:str,client:"DockerClient",grace_seconds:int=30,):try:container=client.containers.get(container_id=container_id)exceptdocker.errors.NotFound:raiseInfrastructureNotFound(f"Unable to stop container {container_id!r}: The container was not"" found.")container.stop(timeout=grace_seconds)def_get_client(self):"""Returns a docker client."""try:withwarnings.catch_warnings():# Silence warnings due to use of deprecated methods within dockerpy# See https://github.com/docker/docker-py/pull/2931warnings.filterwarnings("ignore",message="distutils Version classes are deprecated.*",category=DeprecationWarning,)docker_client=docker.from_env()exceptdocker.errors.DockerExceptionasexc:raiseRuntimeError("Could not connect to Docker.")fromexcreturndocker_clientdef_get_infrastructure_pid(self,container_id:str)->str:"""Generates a Docker infrastructure_pid string in the form of `<docker_host_base_url>:<container_id>`. """docker_client=self._get_client()base_url=docker_client.api.base_urldocker_client.close()returnf"{base_url}:{container_id}"def_parse_infrastructure_pid(self,infrastructure_pid:str)->Tuple[str,str]:"""Splits a Docker infrastructure_pid into its component parts"""# base_url can contain `:` so we only want the last item of the splitbase_url,container_id=infrastructure_pid.rsplit(":",1)returnbase_url,str(container_id)def_build_container_settings(self,docker_client:"DockerClient",configuration:DockerWorkerJobConfiguration,)->Dict:"""Builds a dictionary of container settings to pass to the Docker API."""network_mode=configuration.get_network_mode()returndict(image=configuration.image,network=configuration.networks[0]ifconfiguration.networkselseNone,network_mode=network_mode,command=configuration.command,environment=configuration.env,auto_remove=configuration.auto_remove,labels=configuration.labels,extra_hosts=configuration.get_extra_hosts(docker_client),name=configuration.name,volumes=configuration.volumes,mem_limit=configuration.mem_limit,memswap_limit=configuration.memswap_limit,privileged=configuration.privileged,)def_create_and_start_container(self,configuration:DockerWorkerJobConfiguration)->Tuple["Container",Event]:"""Creates and starts a Docker container."""docker_client=self._get_client()ifconfiguration.registry_credentials:self._logger.info("Logging into Docker registry...")docker_client.login(username=configuration.registry_credentials.username,password=configuration.registry_credentials.password.get_secret_value(),registry=configuration.registry_credentials.registry_url,reauth=configuration.registry_credentials.reauth,)container_settings=self._build_container_settings(docker_client,configuration)ifself._should_pull_image(docker_client,configuration=configuration):self._logger.info(f"Pulling image {configuration.image!r}...")self._pull_image(docker_client,configuration)try:container=self._create_container(docker_client,**container_settings)exceptExceptionasexc:self._emit_container_creation_failed_event(configuration)raiseexccreated_event=self._emit_container_status_change_event(container,configuration)# Add additional networks after the container is created; only one network can# be attached at creation timeiflen(configuration.networks)>1:fornetwork_nameinconfiguration.networks[1:]:network=docker_client.networks.get(network_name)network.connect(container)# Start the containercontainer.start()docker_client.close()returncontainer,created_eventdef_watch_container_safe(self,container:"Container",configuration:DockerWorkerJobConfiguration,created_event:Event,)->"Container":"""Watches a container for completion, handling any errors that may occur."""# Monitor the container capturing the latest snapshot while capturing# not found errorsdocker_client=self._get_client()try:seen_statuses={container.status}last_event=created_eventforlatest_containerinself._watch_container(docker_client,container.id,configuration):container=latest_containerifcontainer.statusnotinseen_statuses:seen_statuses.add(container.status)last_event=self._emit_container_status_change_event(container,configuration,last_event=last_event)exceptdocker.errors.NotFound:# The container was removed during watchingself._logger.warning(f"Docker container {container.name} was removed before we could wait ""for its completion.")finally:docker_client.close()returncontainerdef_watch_container(self,docker_client:"DockerClient",container_id:str,configuration:DockerWorkerJobConfiguration,)->Generator[None,None,"Container"]:""" Watches a container for completion, yielding the latest container snapshot on each iteration. """container:"Container"=docker_client.containers.get(container_id)status=container.statusself._logger.info(f"Docker container {container.name!r} has status {container.status!r}")yieldcontainerifconfiguration.stream_output:try:forlogincontainer.logs(stream=True):log:bytesprint(log.decode().rstrip())exceptdocker.errors.APIErrorasexc:if"marked for removal"instr(exc):self._logger.warning(f"Docker container {container.name} was marked for removal"" before logs could be retrieved. Output will not be"" streamed. ")else:self._logger.exception("An unexpected Docker API error occurred while streaming output "f"from container {container.name}.")container.reload()ifcontainer.status!=status:self._logger.info(f"Docker container {container.name!r} has status"f" {container.status!r}")yieldcontainercontainer.wait()self._logger.info(f"Docker container {container.name!r} has status {container.status!r}")yieldcontainerdef_should_pull_image(self,docker_client:"DockerClient",configuration:DockerWorkerJobConfiguration)->bool:""" Decide whether we need to pull the Docker image. """image_pull_policy=configuration._determine_image_pull_policy()ifimage_pull_policyisImagePullPolicy.ALWAYS:returnTrueelifimage_pull_policyisImagePullPolicy.NEVER:returnFalseelifimage_pull_policyisImagePullPolicy.IF_NOT_PRESENT:try:# NOTE: images.get() wants the tag included with the image# name, while images.pull() wants them split.docker_client.images.get(configuration.image)exceptdocker.errors.ImageNotFound:self._logger.debug(f"Could not find Docker image locally: {configuration.image}")returnTruereturnFalsedef_pull_image(self,docker_client:"DockerClient",configuration:DockerWorkerJobConfiguration):""" Pull the image we're going to use to create the container. """image,tag=parse_image_tag(configuration.image)returndocker_client.images.pull(image,tag)def_create_container(self,docker_client:"DockerClient",**kwargs)->"Container":""" Create a docker container with retries on name conflicts. If the container already exists with the given name, an incremented index is added. """# Create the container with retries on name conflicts (with an incremented idx)index=0container=Nonename=original_name=kwargs.pop("name")whilenotcontainer:try:display_name=repr(name)ifnameelse"with auto-generated name"self._logger.info(f"Creating Docker container {display_name}...")container=docker_client.containers.create(name=name,**kwargs)exceptdocker.errors.APIErrorasexc:if"Conflict"instr(exc)and"container name"instr(exc):self._logger.info(f"Docker container name {display_name} already exists; ""retrying...")index+=1name=f"{original_name}-{index}"else:raiseself._logger.info(f"Docker container {container.name!r} has status {container.status!r}")returncontainerdef_container_as_resource(self,container:"Container")->Dict[str,str]:"""Convert a container to a resource dictionary"""return{"prefect.resource.id":f"prefect.docker.container.{container.id}","prefect.resource.name":container.name,}def_emit_container_creation_failed_event(self,configuration:DockerWorkerJobConfiguration)->Event:"""Emit a Prefect event when a docker container fails to be created."""returnemit_event(event="prefect.docker.container.creation-failed",resource=self._event_resource(),related=self._event_related_resources(configuration=configuration),)def_emit_container_status_change_event(self,container:"Container",configuration:DockerWorkerJobConfiguration,last_event:Optional[Event]=None,)->Event:"""Emit a Prefect event for a Docker container event."""related=self._event_related_resources(configuration=configuration)worker_resource=self._event_resource()worker_resource["prefect.resource.role"]="worker"worker_related_resource=RelatedResource(__root__=worker_resource)returnemit_event(event=f"prefect.docker.container.{container.status.lower()}",resource=self._container_as_resource(container),related=related+[worker_related_resource],follows=last_event,)
asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:DockerWorkerJobConfiguration,grace_seconds:int=30,):""" Stops a container for a cancelled flow run based on the provided infrastructure PID. """docker_client=self._get_client()base_url,container_id=self._parse_infrastructure_pid(infrastructure_pid)ifdocker_client.api.base_url!=base_url:raiseInfrastructureNotAvailable("".join([(f"Unable to stop container {container_id!r}: the current"" Docker API "),(f"URL {docker_client.api.base_url!r} does not match the"" expected "),f"API base URL {base_url}.",]))awaitrun_sync_in_worker_thread(self._stop_container,container_id,docker_client,grace_seconds)
asyncdefrun(self,flow_run:"FlowRun",configuration:BaseJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->BaseWorkerResult:""" Executes a flow run within a Docker container and waits for the flow run to complete. """# The `docker` library uses requests instead of an async http library so it must# be run in a thread to avoid blocking the event loop.container,created_event=awaitrun_sync_in_worker_thread(self._create_and_start_container,configuration)container_pid=self._get_infrastructure_pid(container_id=container.id)# Mark as started and return the infrastructure idiftask_status:task_status.started(container_pid)# Monitor the containercontainer=awaitrun_sync_in_worker_thread(self._watch_container_safe,container,configuration,created_event)exit_code=container.attrs["State"].get("ExitCode")returnDockerWorkerResult(status_code=exit_codeifexit_codeisnotNoneelse-1,identifier=container_pid,)
An instance of this class is passed to the Docker worker's run method
for each flow run. It contains all the information necessary to execute the
flow run as a Docker container.
Attributes:
Name
Type
Description
name
The name to give to created Docker containers.
command
The command executed in created Docker containers to kick off
flow run execution.
env
The environment variables to set in created Docker containers.
labels
The labels to set on created Docker containers.
image
str
The image reference of a container image to use for created jobs.
If not set, the latest Prefect image will be used.
Docker networks that created containers should be connected to.
network_mode
Optional[str]
The network mode for the created containers (e.g. host, bridge).
If 'networks' is set, this cannot be set.
auto_remove
bool
If set, containers will be deleted on completion.
volumes
List[str]
Docker volumes that should be mounted in created containers.
stream_output
bool
If set, the output from created containers will be streamed
to local standard output.
mem_limit
Optional[str]
Memory limit of created containers. Accepts a value
with a unit identifier (e.g. 100000b, 1000k, 128m, 1g.) If a value is
given without a unit, bytes are assumed.
memswap_limit
Optional[str]
Total memory (memory + swap), -1 to disable swap. Should only be
set if mem_limit is also set. If mem_limit is set, this defaults to
allowing the container to use as much swap as memory. For example, if
mem_limit is 300m and memswap_limit is not set, containers can use
600m in total of memory and swap.
classDockerWorkerJobConfiguration(BaseJobConfiguration):""" Configuration class used by the Docker worker. An instance of this class is passed to the Docker worker's `run` method for each flow run. It contains all the information necessary to execute the flow run as a Docker container. Attributes: name: The name to give to created Docker containers. command: The command executed in created Docker containers to kick off flow run execution. env: The environment variables to set in created Docker containers. labels: The labels to set on created Docker containers. image: The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used. image_pull_policy: The image pull policy to use when pulling images. networks: Docker networks that created containers should be connected to. network_mode: The network mode for the created containers (e.g. host, bridge). If 'networks' is set, this cannot be set. auto_remove: If set, containers will be deleted on completion. volumes: Docker volumes that should be mounted in created containers. stream_output: If set, the output from created containers will be streamed to local standard output. mem_limit: Memory limit of created containers. Accepts a value with a unit identifier (e.g. 100000b, 1000k, 128m, 1g.) If a value is given without a unit, bytes are assumed. memswap_limit: Total memory (memory + swap), -1 to disable swap. Should only be set if `mem_limit` is also set. If `mem_limit` is set, this defaults to allowing the container to use as much swap as memory. For example, if `mem_limit` is 300m and `memswap_limit` is not set, containers can use 600m in total of memory and swap. privileged: Give extended privileges to created containers. """image:str=Field(default_factory=get_prefect_image_name,description="The image reference of a container image to use for created jobs. ""If not set, the latest Prefect image will be used.",example="docker.io/prefecthq/prefect:2-latest",)registry_credentials:Optional[DockerRegistryCredentials]=Field(default=None,description="Credentials for logging into a Docker registry to pull"" images from.",)image_pull_policy:Optional[Literal["IfNotPresent","Always","Never"]]=Field(default=None,description="The image pull policy to use when pulling images.",)networks:List[str]=Field(default_factory=list,description="Docker networks that created containers should be connected to.",)network_mode:Optional[str]=Field(default=None,description=("The network mode for the created containers (e.g. host, bridge). If"" 'networks' is set, this cannot be set."),)auto_remove:bool=Field(default=False,description="If set, containers will be deleted on completion.",)volumes:List[str]=Field(default_factory=list,description="A list of volume to mount into created containers.",example=["/my/local/path:/path/in/container"],)stream_output:bool=Field(default=True,description=("If set, the output from created containers will be streamed to local ""standard output."),)mem_limit:Optional[str]=Field(default=None,title="Memory Limit",description=("Memory limit of created containers. Accepts a value ""with a unit identifier (e.g. 100000b, 1000k, 128m, 1g.) ""If a value is given without a unit, bytes are assumed."),)memswap_limit:Optional[str]=Field(default=None,title="Memory Swap Limit",description=("Total memory (memory + swap), -1 to disable swap. Should only be ""set if `mem_limit` is also set. If `mem_limit` is set, this defaults to""allowing the container to use as much swap as memory. For example, if ""`mem_limit` is 300m and `memswap_limit` is not set, containers can use ""600m in total of memory and swap."),)privileged:bool=Field(default=False,description="Give extended privileges to created container.",)@validator("volumes")def_validate_volume_format(cls,volumes):"""Validates that provided volume strings are in the correct format."""forvolumeinvolumes:if":"notinvolume:raiseValueError("Invalid volume specification. "f"Expected format 'path:container_path', but got {volume!r}")returnvolumesdef_convert_labels_to_docker_format(self,labels:Dict[str,str]):"""Converts labels to the format expected by Docker."""labels=labelsor{}new_labels={}forname,valueinlabels.items():if"/"inname:namespace,key=name.split("/",maxsplit=1)new_namespace=".".join(reversed(namespace.split(".")))new_labels[f"{new_namespace}.{key}"]=valueelse:new_labels[name]=valuereturnnew_labelsdef_slugify_container_name(self)->Optional[str]:""" Generates a container name to match the configured name, ensuring it is Docker compatible. """# Must match `/?[a-zA-Z0-9][a-zA-Z0-9_.-]+` in the endifnotself.name:returnNonereturn(slugify(self.name,lowercase=False,# Docker does not limit length but URL limits apply eventually so# limit the length for safetymax_length=250,# Docker allows these characters for container namesregex_pattern=r"[^a-zA-Z0-9_.-]+",).lstrip(# Docker does not allow leading underscore, dash, or period"_-.")# Docker does not allow 0 character names so cast to null if the name is# empty after slufificationorNone)def_base_environment(self):""" If the API URL has been set update the value to ensure connectivity when using a bridge network by updating local connections to use the docker internal host unless the network mode is "host" where localhost is available already. """base_env=super()._base_environment()network_mode=self.get_network_mode()if("PREFECT_API_URL"inbase_envandbase_env["PREFECT_API_URL"]isnotNoneandnetwork_mode!="host"):base_env["PREFECT_API_URL"]=(base_env["PREFECT_API_URL"].replace("localhost","host.docker.internal").replace("127.0.0.1","host.docker.internal"))returnbase_envdefprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepares the agent for a flow run by setting the image, labels, and name attributes. """super().prepare_for_flow_run(flow_run,deployment,flow)self.image=self.imageorget_prefect_image_name()self.labels=self._convert_labels_to_docker_format({**self.labels,**CONTAINER_LABELS})self.name=self._slugify_container_name()defget_network_mode(self)->Optional[str]:""" Returns the network mode to use for the container based on the configured options and the platform. """# User's value takes precedence; this may collide with the incompatible options# mentioned below.ifself.network_mode:ifsys.platform!="linux"andself.network_mode=="host":warnings.warn(f"{self.network_mode!r} network mode is not supported on platform "f"{sys.platform!r} and may not work as intended.")returnself.network_mode# Network mode is not compatible with networks or ports (we do not support ports# yet though)ifself.networks:returnNone# Check for a local API connectionapi_url=self.env.get("PREFECT_API_URL",PREFECT_API_URL.value())ifapi_url:try:_,netloc,_,_,_,_=urllib.parse.urlparse(api_url)exceptExceptionasexc:warnings.warn(f"Failed to parse host from API URL {api_url!r} with exception: "f"{exc}\nThe network mode will not be inferred.")returnNonehost=netloc.split(":")[0]# If using a locally hosted API, use a host network on linuxifsys.platform=="linux"and(host=="127.0.0.1"orhost=="localhost"):return"host"# Default to unsetreturnNonedefget_extra_hosts(self,docker_client)->Optional[Dict[str,str]]:""" A host.docker.internal -> host-gateway mapping is necessary for communicating with the API on Linux machines. Docker Desktop on macOS will automatically already have this mapping. """ifsys.platform=="linux"and(# Do not warn if the user has specified a host manually that does not use# a local address"PREFECT_API_URL"notinself.envorre.search(".*(localhost)|(127.0.0.1)|(host.docker.internal).*",self.env["PREFECT_API_URL"],)):user_version=packaging.version.parse(format_outlier_version_name(docker_client.version()["Version"]))required_version=packaging.version.parse("20.10.0")ifuser_version<required_version:warnings.warn("`host.docker.internal` could not be automatically resolved to"" your local ip address. This feature is not supported on Docker"f" Engine v{user_version}, upgrade to v{required_version}+ if you"" encounter issues.")return{}else:# Compatibility for linux -- https://github.com/docker/cli/issues/2290# Only supported by Docker v20.10.0+ which is our minimum recommend# versionreturn{"host.docker.internal":"host-gateway"}def_determine_image_pull_policy(self)->ImagePullPolicy:""" Determine the appropriate image pull policy. 1. If they specified an image pull policy, use that. 2. If they did not specify an image pull policy and gave us the "latest" tag, use ImagePullPolicy.always. 3. If they did not specify an image pull policy and did not specify a tag, use ImagePullPolicy.always. 4. If they did not specify an image pull policy and gave us a tag other than "latest", use ImagePullPolicy.if_not_present. This logic matches the behavior of Kubernetes. See:https://kubernetes.io/docs/concepts/containers/images/#imagepullpolicy-defaulting """ifnotself.image_pull_policy:_,tag=parse_image_tag(self.image)iftag=="latest"ornottag:returnImagePullPolicy.ALWAYSreturnImagePullPolicy.IF_NOT_PRESENTreturnImagePullPolicy(self.image_pull_policy)
A host.docker.internal -> host-gateway mapping is necessary for communicating
with the API on Linux machines. Docker Desktop on macOS will automatically
already have this mapping.
defget_extra_hosts(self,docker_client)->Optional[Dict[str,str]]:""" A host.docker.internal -> host-gateway mapping is necessary for communicating with the API on Linux machines. Docker Desktop on macOS will automatically already have this mapping. """ifsys.platform=="linux"and(# Do not warn if the user has specified a host manually that does not use# a local address"PREFECT_API_URL"notinself.envorre.search(".*(localhost)|(127.0.0.1)|(host.docker.internal).*",self.env["PREFECT_API_URL"],)):user_version=packaging.version.parse(format_outlier_version_name(docker_client.version()["Version"]))required_version=packaging.version.parse("20.10.0")ifuser_version<required_version:warnings.warn("`host.docker.internal` could not be automatically resolved to"" your local ip address. This feature is not supported on Docker"f" Engine v{user_version}, upgrade to v{required_version}+ if you"" encounter issues.")return{}else:# Compatibility for linux -- https://github.com/docker/cli/issues/2290# Only supported by Docker v20.10.0+ which is our minimum recommend# versionreturn{"host.docker.internal":"host-gateway"}
defget_network_mode(self)->Optional[str]:""" Returns the network mode to use for the container based on the configured options and the platform. """# User's value takes precedence; this may collide with the incompatible options# mentioned below.ifself.network_mode:ifsys.platform!="linux"andself.network_mode=="host":warnings.warn(f"{self.network_mode!r} network mode is not supported on platform "f"{sys.platform!r} and may not work as intended.")returnself.network_mode# Network mode is not compatible with networks or ports (we do not support ports# yet though)ifself.networks:returnNone# Check for a local API connectionapi_url=self.env.get("PREFECT_API_URL",PREFECT_API_URL.value())ifapi_url:try:_,netloc,_,_,_,_=urllib.parse.urlparse(api_url)exceptExceptionasexc:warnings.warn(f"Failed to parse host from API URL {api_url!r} with exception: "f"{exc}\nThe network mode will not be inferred.")returnNonehost=netloc.split(":")[0]# If using a locally hosted API, use a host network on linuxifsys.platform=="linux"and(host=="127.0.0.1"orhost=="localhost"):return"host"# Default to unsetreturnNone
defprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepares the agent for a flow run by setting the image, labels, and name attributes. """super().prepare_for_flow_run(flow_run,deployment,flow)self.image=self.imageorget_prefect_image_name()self.labels=self._convert_labels_to_docker_format({**self.labels,**CONTAINER_LABELS})self.name=self._slugify_container_name()
Enum representing the image pull policy options for a Docker container.
Source code in prefect_docker/worker.py
636465666768
classImagePullPolicy(enum.Enum):"""Enum representing the image pull policy options for a Docker container."""IF_NOT_PRESENT="IfNotPresent"ALWAYS="Always"NEVER="Never"