cluster
manager¶
fedvision.framework.cluster.manager
¶
ClusterManager
¶
__init__(self, port, host=None)
special
¶
init cluster manager instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int |
required | |
host |
str |
None |
Source code in fedvision/framework/cluster/manager.py
def __init__(
self,
port: int,
host: str = None,
):
"""
init cluster manager instance
Args:
port:
host:
"""
self._host = "[::]" if host is None else host
self._port = port
self._alive_workers: MutableMapping[str, _WorkerDescription] = {}
self._tasks_status = {}
self._max_heartbeat_delay = 5
self._server: Optional[grpc.aio.Server] = None
add_worker(self, worker_id, worker_ip, max_tasks, port_start, port_end)
¶
add worker to manager
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker_id |
|
required | |
worker_ip |
|
required | |
max_tasks |
|
required | |
port_start |
|
required | |
port_end |
|
required |
Source code in fedvision/framework/cluster/manager.py
def add_worker(self, worker_id, worker_ip, max_tasks, port_start, port_end):
"""
add worker to manager
Args:
worker_id:
worker_ip:
max_tasks:
port_start:
port_end:
Returns:
"""
worker = _WorkerDescription(
worker_id=worker_id,
worker_ip=worker_ip,
max_tasks=max_tasks,
max_delay=self._max_heartbeat_delay,
port_start=port_start,
port_end=port_end,
)
self._alive_workers[worker_id] = worker
async def _healthy_watcher():
try:
while True:
await asyncio.sleep(self._max_heartbeat_delay)
if worker_id not in self._alive_workers:
self.error(f"worker:{worker_id} not found")
break
if worker.is_asystole():
self.error(f"heartbeat from worker:{worker_id} loss")
break
finally:
self.remove_worker(worker_id)
asyncio.create_task(_healthy_watcher())
return worker
dispatch(self, resource=None)
async
¶
dispatch tasks to worker
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
dict |
None |
Returns:
Type | Description |
---|---|
Tuple[Optional[_WorkerDescription], list] |
Source code in fedvision/framework/cluster/manager.py
async def dispatch(
self, resource: dict = None
) -> Tuple[Optional["_WorkerDescription"], list]:
"""
dispatch tasks to worker
Args:
resource:
Returns:
"""
if resource is None:
resource = {}
if not resource:
for k, v in self._alive_workers.items():
if v.has_task_capacity():
v.task_task_capacity()
return v, []
elif "endpoints" in resource:
num_endpoints = resource["endpoints"]
for k, v in self._alive_workers.items():
if v.has_num_valid_endpoints(num_endpoints) and v.has_task_capacity():
v.task_task_capacity()
endpoints = v.take_endpoints(num_endpoints)
return v, endpoints
return None, []
Enroll(self, request, context)
¶
rpc server impl: process tasker enroll request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
REQ |
required | |
context |
ServicerContext |
required |
Returns:
Type | Description |
---|---|
AsyncGenerator[fedvision.framework.protobuf.cluster_pb2.REP, NoneType] |
Source code in fedvision/framework/cluster/manager.py
async def Enroll(
self,
request: cluster_pb2.Enroll.REQ,
context: grpc.aio.ServicerContext,
) -> AsyncGenerator[cluster_pb2.Enroll.REP, None]:
"""
rpc server impl: process tasker enroll request
Args:
request:
context:
Returns:
"""
self.debug(f"cluster worker enroll request: {pretty_pb(request)}")
if self.has_worker(request.worker_id):
yield cluster_pb2.Enroll.REP(status=cluster_pb2.Enroll.ALREADY_ENROLL)
return
worker = self.add_worker(
request.worker_id,
request.worker_ip,
request.max_tasks,
request.port_start,
request.port_end,
)
self.debug(f"cluster worker enroll success: worker: {request.worker_id}")
yield cluster_pb2.Enroll.REP(status=cluster_pb2.Enroll.ENROLL_SUCCESS)
while self.has_worker(request.worker_id):
try:
task = await worker.wait_next_task(timeout=5)
except asyncio.TimeoutError:
continue
self.debug(
f"task ready: job_id={task.job_id}, task_id={task.task_id}, task_type={task.task_type}"
)
rep = cluster_pb2.Enroll.REP(
status=cluster_pb2.Enroll.TASK_READY, task=task
)
self.debug(
f"response task({task.task_id}, {task.task_type}) to worker {request.worker_id}"
)
yield rep
self.remove_worker(request.worker_id)
has_worker(self, worker_id)
¶
check worker worker_id
alive(enrolled)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker_id |
|
required |
Returns:
Type | Description |
---|---|
bool |
Source code in fedvision/framework/cluster/manager.py
def has_worker(self, worker_id) -> bool:
"""
check worker `worker_id` alive(enrolled)
Args:
worker_id:
Returns:
"""
return worker_id in self._alive_workers
remove_worker(self, worker_id)
¶
remove worker from manager
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker_id |
|
required |
Source code in fedvision/framework/cluster/manager.py
def remove_worker(self, worker_id):
"""
remove worker from manager
Args:
worker_id:
"""
if worker_id not in self._alive_workers:
return
del self._alive_workers[worker_id]
start(self)
async
¶
start cluster manager service
Source code in fedvision/framework/cluster/manager.py
async def start(self):
"""
start cluster manager service
Returns:
"""
self.info(f"starting cluster manager at port: {self._port}")
self._server = grpc.aio.server(
options=[
("grpc.max_send_message_length", 512 * 1024 * 1024),
("grpc.max_receive_message_length", 512 * 1024 * 1024),
],
)
cluster_pb2_grpc.add_ClusterManagerServicer_to_server(self, self._server)
self._server.add_insecure_port(f"{self._host}:{self._port}")
await self._server.start()
self.info(f"cluster manager started at port: {self._port}")
stop(self)
async
¶
stop cluster manager service
Source code in fedvision/framework/cluster/manager.py
async def stop(self):
"""
stop cluster manager service
"""
await self._server.stop(1)
TaskResourceRequire(self, request, context)
async
¶
process task resource acquired request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
|
required | |
context |
|
required |
Source code in fedvision/framework/cluster/manager.py
async def TaskResourceRequire(self, request, context):
"""
process task resource acquired request
Args:
request:
context:
Returns:
"""
worker, endpoints = await self.dispatch(
resource={"endpoints": request.num_endpoints}
)
if worker is None:
return cluster_pb2.TaskResourceRequire.REP(
status=cluster_pb2.TaskResourceRequire.FAILED
)
response = cluster_pb2.TaskResourceRequire.REP(
status=cluster_pb2.TaskResourceRequire.SUCCESS, worker_id=worker.worker_id
)
for endpoint in endpoints:
response.endpoints.append(endpoint)
return response
TaskSubmit(self, request, context)
async
¶
process task submit request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
REQ |
required | |
context |
ServicerContext |
required |
Returns:
Type | Description |
---|---|
REP |
Source code in fedvision/framework/cluster/manager.py
async def TaskSubmit(
self, request: cluster_pb2.TaskSubmit.REQ, context: grpc.aio.ServicerContext
) -> cluster_pb2.TaskSubmit.REP:
"""
process task submit request
Args:
request:
context:
Returns:
"""
try:
task = request.task
if not task.assignee:
worker, _ = await self.dispatch()
await worker.put_task(task=task)
else:
await self._alive_workers[task.assignee].put_task(task=task)
return cluster_pb2.TaskSubmit.REP(status=cluster_pb2.TaskSubmit.SUCCESS)
except Exception as e:
self.exception(f"handle task submit failed: {e}")
return cluster_pb2.TaskSubmit.REP(status=cluster_pb2.TaskSubmit.FAILED)
UpdateTaskStatus(self, request, context)
async
¶
process task status update request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
REQ |
required | |
context |
ServicerContext |
required |
Returns:
Type | Description |
---|---|
REP |
Source code in fedvision/framework/cluster/manager.py
async def UpdateTaskStatus(
self, request: cluster_pb2.UpdateStatus.REQ, context: grpc.aio.ServicerContext
) -> cluster_pb2.UpdateStatus.REP:
"""
process task status update request
Args:
request:
context:
Returns:
"""
if request.worker_id not in self._alive_workers:
return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.FAILED)
await self._alive_workers[request.worker_id].update_heartbeat()
if not request.task_id:
return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.SUCCESS)
if not request.task_id not in self._tasks_status:
return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.FAILED)
self.debug(f"update task status: {request.task_id} to {request.task_status}")
self._tasks_status[request.task_id] = request.task_status
return cluster_pb2.UpdateStatus.REP(status=cluster_pb2.UpdateStatus.SUCCESS)
worker¶
fedvision.framework.cluster.worker
¶
ClusterWorker
¶
__init__(self, worker_id, worker_ip, max_tasks, port_start, port_end, manager_address, data_dir=None)
special
¶
init cluster worker instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker_id |
str |
required | |
worker_ip |
str |
required | |
max_tasks |
int |
required | |
port_start |
int |
required | |
port_end |
int |
required | |
manager_address |
str |
required | |
data_dir |
str |
None |
Source code in fedvision/framework/cluster/worker.py
def __init__(
self,
worker_id: str,
worker_ip: str,
max_tasks: int,
port_start: int,
port_end: int,
manager_address: str,
data_dir: str = None,
):
"""
init cluster worker instance
Args:
worker_id:
worker_ip:
max_tasks:
port_start:
port_end:
manager_address:
data_dir:
"""
self._task_queue: asyncio.Queue = asyncio.Queue()
self._semaphore = asyncio.Semaphore(max_tasks)
self._worker_id = worker_id
self._worker_ip = worker_ip
self._manager_address = manager_address
self._max_tasks = max_tasks
self._port_start = port_start
self._port_end = port_end
self._heartbeat_interval = 1
self._data_dir = data_dir
self._channel: Optional[grpc.Channel] = None
self._stub: Optional[cluster_pb2_grpc.ClusterManagerStub] = None
self._tasks: List[asyncio.Future] = []
self._task_status: asyncio.Queue = asyncio.Queue()
self._stop_event = asyncio.Event()
self._asyncio_task_collection: Optional[List[asyncio.Task]] = None
start(self)
async
¶
start worker
- enroll to manager
- start heartbeat loop
- start task exec loop
- process tasks
Source code in fedvision/framework/cluster/worker.py
async def start(self):
"""
start worker
1. enroll to manager
2. start heartbeat loop
3. start task exec loop
4. process tasks
"""
self.info(f"starting worker {self._worker_id}")
self.info(f"staring grpc channel to cluster manager")
self._channel = grpc.aio.insecure_channel(
self._manager_address,
options=[
("grpc.max_send_message_length", 512 * 1024 * 1024),
("grpc.max_receive_message_length", 512 * 1024 * 1024),
],
)
self._stub = cluster_pb2_grpc.ClusterManagerStub(self._channel)
self.info(f"sending enroll request to cluster manager")
response_stream: AsyncIterable[cluster_pb2.Enroll.REP] = self._stub.Enroll(
cluster_pb2.Enroll.REQ(
worker_id=self._worker_id,
worker_ip=self._worker_ip,
max_tasks=self._max_tasks,
port_start=self._port_start,
port_end=self._port_end,
)
)
first_response = True
try:
async for response in response_stream:
if first_response:
if response.status == cluster_pb2.Enroll.ALREADY_ENROLL:
raise FedvisionWorkerException(
f"worker<{self._worker_id}> already enrolled, use new name or remove it from manager"
)
if response.status != cluster_pb2.Enroll.ENROLL_SUCCESS:
raise FedvisionWorkerException(
f"worker<{self._worker_id}>enroll failed with unknown status: {response.status}"
)
self.info(
f"worker<{self._worker_id}>success enrolled to cluster manager"
)
async def _co_update_status():
while True:
try:
request = await asyncio.wait_for(
self._task_status.get(), self._heartbeat_interval
)
except asyncio.TimeoutError:
self.trace(
"wait task status timeout. sending heartbeat request"
)
request = cluster_pb2.UpdateStatus.REQ(
worker_id=self._worker_id
)
try:
update_response = await self._stub.UpdateTaskStatus(
request
)
except grpc.aio.AioRpcError as _e:
self.error(f"can't send heartbeat to manager, {_e}")
self._stop_event.set()
return
if (
update_response.status
!= cluster_pb2.UpdateStatus.SUCCESS
):
self.error(
f"update status failed, please check manager status"
)
self.info("starting heartbeat loop")
self._asyncio_task_collection = [
asyncio.create_task(_co_update_status()),
]
self.info("heartbeat loop started")
self.info(f"starting task execute loop")
self._asyncio_task_collection.append(
asyncio.create_task(self._co_task_execute_loop())
)
self.info(f"task execute loop started")
first_response = False
continue
# fetch tasks
if response.status != cluster_pb2.Enroll.TASK_READY:
raise FedvisionWorkerException(
f"expect status {cluster_pb2.Enroll.TASK_READY}, got {response.status}"
)
self.trace_lazy(
f"response <{{response}}> got", response=lambda: pretty_pb(response)
)
try:
task_id = response.task.task_id
task_type = response.task.task_type
task_class = extensions.get_task_class(task_type)
if task_class is None:
self.error(f"task type {task_type} not found")
raise FedvisionExtensionException(
f"task type {task_type} not found"
)
task = task_class.deserialize(response.task)
await self._task_queue.put(task)
self.trace(f"put task in queue: task_id={task_id}")
except FedvisionException as e:
self.error(f"preprocess fetched task failed: {e}")
except Exception as e:
self.exception(e)
except grpc.aio.AioRpcError as e:
self.error(f"gRPC error: can't connect with cluster manager, {e}")
self._stop_event.set()
stop(self)
async
¶
stop worker
Source code in fedvision/framework/cluster/worker.py
async def stop(self):
"""
stop worker
"""
if self._channel is not None:
await self._channel.close()
self._channel = None
self.info(f"canceling unfinished asyncio tasks")
if self._asyncio_task_collection is not None:
for task in self._asyncio_task_collection:
if not task.done():
task.cancel()
self.trace(f"canceled task {task}")
self.info(f"all unfinished asyncio tasks canceled")
wait_for_termination(self)
async
¶
block until stop event was set
Source code in fedvision/framework/cluster/worker.py
async def wait_for_termination(self):
"""
block until stop event was set
"""
await self._stop_event.wait()
self.info(f"stop event set, stopping worker {self._worker_id}")