Skip to content

cluster

manager

fedvision.framework.cluster.manager

ClusterManager

__init__(self, port, host=None) special

init cluster manager instance

Parameters:

Name Type Description Default
port int required
host str None
Source code in fedvision/framework/cluster/manager.py
def __init__(
    self,
    port: int,
    host: str = None,
):
    """
    init cluster manager instance
    Args:
        port:
        host:
    """
    self._host = "[::]" if host is None else host
    self._port = port
    self._alive_workers: MutableMapping[str, _WorkerDescription] = {}
    self._tasks_status = {}
    self._max_heartbeat_delay = 5

    self._server: Optional[grpc.aio.Server] = None

add_worker(self, worker_id, worker_ip, max_tasks, port_start, port_end)

add worker to manager

Parameters:

Name Type Description Default
worker_id required
worker_ip required
max_tasks required
port_start required
port_end required
Source code in fedvision/framework/cluster/manager.py
def add_worker(self, worker_id, worker_ip, max_tasks, port_start, port_end):
    """
    add worker to manager
    Args:
        worker_id:
        worker_ip:
        max_tasks:
        port_start:
        port_end:

    Returns:

    """
    worker = _WorkerDescription(
        worker_id=worker_id,
        worker_ip=worker_ip,
        max_tasks=max_tasks,
        max_delay=self._max_heartbeat_delay,
        port_start=port_start,
        port_end=port_end,
    )
    self._alive_workers[worker_id] = worker

    async def _healthy_watcher():
        try:
            while True:
                await asyncio.sleep(self._max_heartbeat_delay)
                if worker_id not in self._alive_workers:
                    self.error(f"worker:{worker_id} not found")
                    break

                if worker.is_asystole():
                    self.error(f"heartbeat from worker:{worker_id} loss")
                    break
        finally:
            self.remove_worker(worker_id)

    asyncio.create_task(_healthy_watcher())
    return worker

dispatch(self, resource=None) async

dispatch tasks to worker

Parameters:

Name Type Description Default
resource dict None

Returns:

Type Description
Tuple[Optional[_WorkerDescription], list]
Source code in fedvision/framework/cluster/manager.py
async def dispatch(
    self, resource: dict = None
) -> Tuple[Optional["_WorkerDescription"], list]:
    """
    dispatch tasks to worker
    Args:
        resource:

    Returns:

    """
    if resource is None:
        resource = {}
    if not resource:
        for k, v in self._alive_workers.items():
            if v.has_task_capacity():
                v.task_task_capacity()
                return v, []
    elif "endpoints" in resource:
        num_endpoints = resource["endpoints"]
        for k, v in self._alive_workers.items():
            if v.has_num_valid_endpoints(num_endpoints) and v.has_task_capacity():
                v.task_task_capacity()
                endpoints = v.take_endpoints(num_endpoints)
                return v, endpoints
    return None, []

Enroll(self, request, context)

rpc server impl: process tasker enroll request

Parameters:

Name Type Description Default
request REQ required
context ServicerContext required

Returns:

Type Description
AsyncGenerator[fedvision.framework.protobuf.cluster_pb2.REP, NoneType]
Source code in fedvision/framework/cluster/manager.py
async def Enroll(
    self,
    request: cluster_pb2.Enroll.REQ,
    context: grpc.aio.ServicerContext,
) -> AsyncGenerator[cluster_pb2.Enroll.REP, None]:
    """
      rpc server impl: process tasker enroll request

    Args:
        request:
        context:

    Returns:

    """
    self.debug(f"cluster worker enroll request: {pretty_pb(request)}")
    if self.has_worker(request.worker_id):
        yield cluster_pb2.Enroll.REP(status=cluster_pb2.Enroll.ALREADY_ENROLL)
        return

    worker = self.add_worker(
        request.worker_id,
        request.worker_ip,
        request.max_tasks,
        request.port_start,
        request.port_end,
    )
    self.debug(f"cluster worker enroll success: worker: {request.worker_id}")
    yield cluster_pb2.Enroll.REP(status=cluster_pb2.Enroll.ENROLL_SUCCESS)

    while self.has_worker(request.worker_id):
        try:
            task = await worker.wait_next_task(timeout=5)
        except asyncio.TimeoutError:
            continue

        self.debug(
            f"task ready: job_id={task.job_id}, task_id={task.task_id}, task_type={task.task_type}"
        )
        rep = cluster_pb2.Enroll.REP(
            status=cluster_pb2.Enroll.TASK_READY, task=task
        )
        self.debug(
            f"response task({task.task_id}, {task.task_type}) to worker {request.worker_id}"
        )
        yield rep

    self.remove_worker(request.worker_id)

has_worker(self, worker_id)

check worker worker_id alive(enrolled)

Parameters:

Name Type Description Default
worker_id required

Returns:

Type Description
bool
Source code in fedvision/framework/cluster/manager.py
def has_worker(self, worker_id) -> bool:
    """
    check worker `worker_id` alive(enrolled)
    Args:
        worker_id:

    Returns:

    """
    return worker_id in self._alive_workers

remove_worker(self, worker_id)

remove worker from manager

Parameters:

Name Type Description Default
worker_id required
Source code in fedvision/framework/cluster/manager.py
def remove_worker(self, worker_id):
    """
    remove worker from manager
    Args:
        worker_id:
    """
    if worker_id not in self._alive_workers:
        return
    del self._alive_workers[worker_id]

start(self) async

start cluster manager service

Source code in fedvision/framework/cluster/manager.py
async def start(self):
    """
    start cluster manager service
    Returns:

    """
    self.info(f"starting cluster manager at port: {self._port}")
    self._server = grpc.aio.server(
        options=[
            ("grpc.max_send_message_length", 512 * 1024 * 1024),
            ("grpc.max_receive_message_length", 512 * 1024 * 1024),
        ],
    )
    cluster_pb2_grpc.add_ClusterManagerServicer_to_server(self, self._server)
    self._server.add_insecure_port(f"{self._host}:{self._port}")
    await self._server.start()
    self.info(f"cluster manager started at port: {self._port}")

stop(self) async

stop cluster manager service

Source code in fedvision/framework/cluster/manager.py
async def stop(self):
    """
    stop cluster manager service
    """
    await self._server.stop(1)

TaskResourceRequire(self, request, context) async

process task resource acquired request

Parameters:

Name Type Description Default
request required
context required
Source code in fedvision/framework/cluster/manager.py
async def TaskResourceRequire(self, request, context):
    """
    process task resource acquired request
    Args:
        request:
        context:

    Returns:

    """
    worker, endpoints = await self.dispatch(
        resource={"endpoints": request.num_endpoints}
    )
    if worker is None:
        return cluster_pb2.TaskResourceRequire.REP(
            status=cluster_pb2.TaskResourceRequire.FAILED
        )

    response = cluster_pb2.TaskResourceRequire.REP(
        status=cluster_pb2.TaskResourceRequire.SUCCESS, worker_id=worker.worker_id
    )
    for endpoint in endpoints:
        response.endpoints.append(endpoint)
    return response

TaskSubmit(self, request, context) async

process task submit request

Parameters:

Name Type Description Default
request REQ required
context ServicerContext required

Returns:

Type Description
REP
Source code in fedvision/framework/cluster/manager.py
async def TaskSubmit(
    self, request: cluster_pb2.TaskSubmit.REQ, context: grpc.aio.ServicerContext
) -> cluster_pb2.TaskSubmit.REP:
    """
    process task submit request
    Args:
        request:
        context:

    Returns:

    """
    try:
        task = request.task
        if not task.assignee:
            worker, _ = await self.dispatch()
            await worker.put_task(task=task)
        else:
            await self._alive_workers[task.assignee].put_task(task=task)
        return cluster_pb2.TaskSubmit.REP(status=cluster_pb2.TaskSubmit.SUCCESS)
    except Exception as e:
        self.exception(f"handle task submit failed: {e}")
        return cluster_pb2.TaskSubmit.REP(status=cluster_pb2.TaskSubmit.FAILED)

UpdateTaskStatus(self, request, context) async

process task status update request

Parameters:

Name Type Description Default
request REQ required
context ServicerContext required

Returns:

Type Description
REP
Source code in fedvision/framework/cluster/manager.py
async def UpdateTaskStatus(
    self, request: cluster_pb2.UpdateStatus.REQ, context: grpc.aio.ServicerContext
) -> cluster_pb2.UpdateStatus.REP:
    """
    process task status update request
    Args:
        request:
        context:

    Returns:

    """
    if request.worker_id not in self._alive_workers:
        return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.FAILED)
    await self._alive_workers[request.worker_id].update_heartbeat()

    if not request.task_id:
        return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.SUCCESS)

    if not request.task_id not in self._tasks_status:
        return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.FAILED)

    self.debug(f"update task status: {request.task_id} to {request.task_status}")
    self._tasks_status[request.task_id] = request.task_status
    return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.SUCCESS)

worker

fedvision.framework.cluster.worker

ClusterWorker

__init__(self, worker_id, worker_ip, max_tasks, port_start, port_end, manager_address, data_dir=None) special

init cluster worker instance

Parameters:

Name Type Description Default
worker_id str required
worker_ip str required
max_tasks int required
port_start int required
port_end int required
manager_address str required
data_dir str None
Source code in fedvision/framework/cluster/worker.py
def __init__(
    self,
    worker_id: str,
    worker_ip: str,
    max_tasks: int,
    port_start: int,
    port_end: int,
    manager_address: str,
    data_dir: str = None,
):
    """
    init cluster worker instance
    Args:
        worker_id:
        worker_ip:
        max_tasks:
        port_start:
        port_end:
        manager_address:
        data_dir:
    """
    self._task_queue: asyncio.Queue = asyncio.Queue()
    self._semaphore = asyncio.Semaphore(max_tasks)

    self._worker_id = worker_id
    self._worker_ip = worker_ip
    self._manager_address = manager_address
    self._max_tasks = max_tasks
    self._port_start = port_start
    self._port_end = port_end
    self._heartbeat_interval = 1
    self._data_dir = data_dir

    self._channel: Optional[grpc.Channel] = None
    self._stub: Optional[cluster_pb2_grpc.ClusterManagerStub] = None

    self._tasks: List[asyncio.Future] = []
    self._task_status: asyncio.Queue = asyncio.Queue()
    self._stop_event = asyncio.Event()

    self._asyncio_task_collection: Optional[List[asyncio.Task]] = None

start(self) async

start worker

  1. enroll to manager
  2. start heartbeat loop
  3. start task exec loop
  4. process tasks
Source code in fedvision/framework/cluster/worker.py
async def start(self):
    """
    start worker

    1. enroll to manager
    2. start heartbeat loop
    3. start task exec loop
    4. process tasks
    """
    self.info(f"starting worker {self._worker_id}")

    self.info(f"staring grpc channel to cluster manager")
    self._channel = grpc.aio.insecure_channel(
        self._manager_address,
        options=[
            ("grpc.max_send_message_length", 512 * 1024 * 1024),
            ("grpc.max_receive_message_length", 512 * 1024 * 1024),
        ],
    )
    self._stub = cluster_pb2_grpc.ClusterManagerStub(self._channel)

    self.info(f"sending enroll request to cluster manager")
    response_stream: AsyncIterable[cluster_pb2.Enroll.REP] = self._stub.Enroll(
        cluster_pb2.Enroll.REQ(
            worker_id=self._worker_id,
            worker_ip=self._worker_ip,
            max_tasks=self._max_tasks,
            port_start=self._port_start,
            port_end=self._port_end,
        )
    )
    first_response = True

    try:
        async for response in response_stream:

            if first_response:
                if response.status == cluster_pb2.Enroll.ALREADY_ENROLL:
                    raise FedvisionWorkerException(
                        f"worker<{self._worker_id}> already enrolled, use new name or remove it from manager"
                    )

                if response.status != cluster_pb2.Enroll.ENROLL_SUCCESS:
                    raise FedvisionWorkerException(
                        f"worker<{self._worker_id}>enroll failed with unknown status: {response.status}"
                    )
                self.info(
                    f"worker<{self._worker_id}>success enrolled to cluster manager"
                )

                async def _co_update_status():
                    while True:
                        try:
                            request = await asyncio.wait_for(
                                self._task_status.get(), self._heartbeat_interval
                            )
                        except asyncio.TimeoutError:
                            self.trace(
                                "wait task status timeout. sending heartbeat request"
                            )
                            request = cluster_pb2.UpdateStatus.REQ(
                                worker_id=self._worker_id
                            )

                        try:
                            update_response = await self._stub.UpdateTaskStatus(
                                request
                            )
                        except grpc.aio.AioRpcError as _e:
                            self.error(f"can't send heartbeat to manager, {_e}")
                            self._stop_event.set()
                            return
                        if (
                            update_response.status
                            != cluster_pb2.UpdateStatus.SUCCESS
                        ):
                            self.error(
                                f"update status failed, please check manager status"
                            )

                self.info("starting heartbeat loop")
                self._asyncio_task_collection = [
                    asyncio.create_task(_co_update_status()),
                ]
                self.info("heartbeat loop started")

                self.info(f"starting task execute loop")
                self._asyncio_task_collection.append(
                    asyncio.create_task(self._co_task_execute_loop())
                )
                self.info(f"task execute loop started")
                first_response = False
                continue

            # fetch tasks
            if response.status != cluster_pb2.Enroll.TASK_READY:
                raise FedvisionWorkerException(
                    f"expect status {cluster_pb2.Enroll.TASK_READY}, got {response.status}"
                )

            self.trace_lazy(
                f"response <{{response}}> got", response=lambda: pretty_pb(response)
            )
            try:
                task_id = response.task.task_id
                task_type = response.task.task_type
                task_class = extensions.get_task_class(task_type)
                if task_class is None:
                    self.error(f"task type {task_type} not found")
                    raise FedvisionExtensionException(
                        f"task type {task_type} not found"
                    )
                task = task_class.deserialize(response.task)
                await self._task_queue.put(task)
                self.trace(f"put task in queue: task_id={task_id}")
            except FedvisionException as e:
                self.error(f"preprocess fetched task failed: {e}")
            except Exception as e:
                self.exception(e)
    except grpc.aio.AioRpcError as e:
        self.error(f"gRPC error: can't connect with cluster manager, {e}")
        self._stop_event.set()

stop(self) async

stop worker

Source code in fedvision/framework/cluster/worker.py
async def stop(self):
    """
    stop worker
    """
    if self._channel is not None:
        await self._channel.close()
        self._channel = None

    self.info(f"canceling unfinished asyncio tasks")
    if self._asyncio_task_collection is not None:
        for task in self._asyncio_task_collection:
            if not task.done():
                task.cancel()
                self.trace(f"canceled task {task}")
        self.info(f"all unfinished asyncio tasks canceled")

wait_for_termination(self) async

block until stop event was set

Source code in fedvision/framework/cluster/worker.py
async def wait_for_termination(self):
    """
    block until stop event was set
    """
    await self._stop_event.wait()
    self.info(f"stop event set, stopping worker {self._worker_id}")