master
Coordinator¶
¶
ClusterManagerConnect
¶
cluster manager client
__init__(self, address, shared_status)
special
¶
init cluster manager client
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
|
required | |
shared_status |
_SharedStatus |
required |
Source code in fedvision/framework/master/master.py
def __init__(self, address, shared_status: _SharedStatus):
"""
init cluster manager client
Args:
address:
shared_status:
"""
self.address = address
self.shared_status = shared_status
self._channel: Optional[grpc.aio.Channel] = None
self._stub: Optional[cluster_pb2_grpc.ClusterManagerStub] = None
cluster_channel_ready(self)
async
¶
await until channel ready
Source code in fedvision/framework/master/master.py
async def cluster_channel_ready(self):
"""
await until channel ready
"""
return await self._channel.channel_ready()
start_cluster_channel(self)
async
¶
start channel to cluster manager
Source code in fedvision/framework/master/master.py
async def start_cluster_channel(self):
"""
start channel to cluster manager
"""
self.info(f"start cluster channel to {self.address}")
self._channel = grpc.aio.insecure_channel(
self.address,
options=[
("grpc.max_send_message_length", 512 * 1024 * 1024),
("grpc.max_receive_message_length", 512 * 1024 * 1024),
],
)
self._stub = cluster_pb2_grpc.ClusterManagerStub(self._channel)
self.info(f"cluster channel started to {self.address}")
stop_cluster_channel(self, grace=None)
async
¶
stop channel to cluster manager
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grace |
Optional[float] |
None |
Source code in fedvision/framework/master/master.py
async def stop_cluster_channel(self, grace: Optional[float] = None):
"""
stop channel to cluster manager
Args:
grace:
Returns:
"""
self.info(f"stopping cluster channel")
await self._channel.close(grace)
self.info(f"cluster channel started to {self.address}")
submit_tasks_to_cluster(self)
async
¶
infinity loop to get task from queue and submit it to cluster
Source code in fedvision/framework/master/master.py
async def submit_tasks_to_cluster(self):
"""
infinity loop to get task from queue and submit it to cluster
"""
while True:
task = await self.shared_status.cluster_task_queue.get()
self.debug(
f"task sending: task_id={task.task_id} task_type={task.task_type} to cluster"
)
await self._stub.TaskSubmit(cluster_pb2.TaskSubmit.REQ(task=task))
self.debug(
f"task sent: task_id={task.task_id} task_type={task.task_type} to cluster"
)
task_resource_require(self, request)
async
¶
acquired resource from cluster(ports)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
cluster_pb2.TaskResourceRequire.REQ |
required |
Returns:
Type | Description |
---|---|
cluster_pb2.TaskResourceRequire.REP |
Source code in fedvision/framework/master/master.py
async def task_resource_require(
self, request: cluster_pb2.TaskResourceRequire.REQ
) -> cluster_pb2.TaskResourceRequire.REP:
"""
acquired resource from cluster(ports)
Args:
request:
Returns:
"""
response = await self._stub.TaskResourceRequire(request)
return response
CoordinatorConnect
¶
client connects to coordinator
__init__(self, address, shared_status)
special
¶
init coordinator client
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address |
str |
str |
required |
shared_status |
_SharedStatus |
required |
Source code in fedvision/framework/master/master.py
def __init__(self, address: str, shared_status: _SharedStatus):
"""
init coordinator client
Args:
address: str
shared_status:
"""
self.address = address
self.shared_status = shared_status
self.accept_rule = ProposalAcceptRule(self.shared_status)
self._channel = None
self._stub = None
coordinator_channel_ready(self)
async
¶
wait until channel ready
Source code in fedvision/framework/master/master.py
async def coordinator_channel_ready(self):
"""
wait until channel ready
"""
return await self._channel.channel_ready()
leave(self)
async
¶
disconnect with coordinator
Source code in fedvision/framework/master/master.py
async def leave(self):
"""
disconnect with coordinator
"""
return await self._stub.Leave(
coordinator_pb2.Leave.REQ(party_id=self.shared_status.party_id)
)
make_proposal(self, request)
async
¶
publish a job proposal to coordinator
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
coordinator_pb2.Proposal.REQ |
request |
required |
Returns:
Type | Description |
---|---|
coordinator_pb2.Proposal.REP |
response |
Source code in fedvision/framework/master/master.py
async def make_proposal(
self, request: coordinator_pb2.Proposal.REQ
) -> coordinator_pb2.Proposal.REP:
"""
publish a job proposal to coordinator
Args:
request:
request
Returns:
response
"""
return await self._stub.Proposal(request)
start_coordinator_channel(self)
async
¶
start channel to coordinator
Source code in fedvision/framework/master/master.py
async def start_coordinator_channel(self):
"""
start channel to coordinator
"""
self.info(f"start coordinator channel to {self.address}")
self._channel = grpc.aio.insecure_channel(
self.address,
options=[
("grpc.max_send_message_length", 512 * 1024 * 1024),
("grpc.max_receive_message_length", 512 * 1024 * 1024),
],
)
self._stub = coordinator_pb2_grpc.CoordinatorStub(
self._channel,
)
self.info(f"coordinator channel started to {self.address}")
stop_coordinator_channel(self, grace=None)
async
¶
stop channel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grace |
Optional[float] |
wait seconds to gracefully stop |
None |
Source code in fedvision/framework/master/master.py
async def stop_coordinator_channel(self, grace: Optional[float] = None):
"""
stop channel
Args:
grace:
wait seconds to gracefully stop
"""
self.info(f"stopping coordinator channel")
await self._channel.close(grace)
self.info(f"coordinator channel started to {self.address}")
subscribe(self)
async
¶
start subscribe to coordinator and accept proposals
Source code in fedvision/framework/master/master.py
async def subscribe(self):
"""
start subscribe to coordinator and accept `proposals`
"""
request = coordinator_pb2.Subscribe.REQ(party_id=self.shared_status.party_id)
for job_type in self.shared_status.job_types:
request.job_types.append(job_type)
async for response in self._stub.Subscribe(request):
if response.status != coordinator_pb2.Subscribe.SUCCESS:
return
if not await self.accept_rule.accept(response.job_type):
return
async def _acceptor():
# accept all
fetch_response = await self._stub.FetchTask(
coordinator_pb2.FetchTask.REQ(
party_id=self.shared_status.party_id,
proposal_id=response.proposal_id,
)
)
if fetch_response.status != coordinator_pb2.FetchTask.READY:
self.debug(
f"proposal {response.proposal_id} not ready: {fetch_response.status}"
)
return
# put task in cluster task queue
await self.shared_status.cluster_task_queue.put(fetch_response.task)
asyncio.create_task(_acceptor())
Master
¶
__init__(self, party_id, coordinator_address, cluster_address, rest_port, rest_host=None)
special
¶
init master
Parameters:
Name | Type | Description | Default |
---|---|---|---|
party_id |
str |
required | |
coordinator_address |
str |
required | |
rest_port |
int |
required | |
rest_host |
str |
None |
Source code in fedvision/framework/master/master.py
def __init__(
self,
party_id: str,
coordinator_address: str,
cluster_address: str,
rest_port: int,
rest_host: str = None,
):
"""
init master
Args:
party_id:
coordinator_address:
rest_port:
rest_host:
"""
self.shared_status = _SharedStatus(party_id=party_id)
self._coordinator = CoordinatorConnect(
shared_status=self.shared_status, address=coordinator_address
)
self._rest_site = RESTService(
shared_status=self.shared_status, port=rest_port, host=rest_host
)
self._cluster = ClusterManagerConnect(
shared_status=self.shared_status, address=cluster_address
)
start(self)
async
¶
start master:
1. cluster manager to process tasks
2. restful service to handler request from user
3. coordinator to connect to `the world`
Source code in fedvision/framework/master/master.py
async def start(self):
"""
start master:
1. cluster manager to process tasks
2. restful service to handler request from user
3. coordinator to connect to `the world`
"""
# connect to cluster
await self._cluster.start_cluster_channel()
while True:
try:
await asyncio.wait_for(self._cluster.cluster_channel_ready(), 5)
except asyncio.TimeoutError:
self.warning(f"cluster channel not ready, retry in 5 seconds")
else:
self.info(f"cluster channel ready!")
break
asyncio.create_task(self._cluster.submit_tasks_to_cluster())
# start rest site
await self._rest_site.start_rest_site()
# connect to coordinator
await self._coordinator.start_coordinator_channel()
while True:
try:
await asyncio.wait_for(self._coordinator.coordinator_channel_ready(), 5)
except asyncio.TimeoutError:
self.warning(f"coordinator channel not ready, retry in 5 seconds")
else:
self.info(f"coordinator channel ready!")
break
asyncio.create_task(self._coordinator.subscribe())
# job process loop:
# 1. get job from rest site
# 2. make proposal to coordinator
# 3. send task to cluster by put it into a queue
asyncio.create_task(self._submitted_job_handler())
stop(self)
async
¶
stop master
Source code in fedvision/framework/master/master.py
async def stop(self):
"""
stop master
"""
await self._coordinator.stop_coordinator_channel(grace=1)
await self._rest_site.stop_rest_site()
await self._cluster.stop_cluster_channel(grace=1)
ProposalAcceptRule
¶
__eq__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __eq__(self, other):
if other.__class__ is not self.__class__:
return NotImplemented
return (
self.shared_status,
) == (
other.shared_status,
)
__ge__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __ge__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) >= attrs_to_tuple(other)
return NotImplemented
__gt__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __gt__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) > attrs_to_tuple(other)
return NotImplemented
__init__(self, shared_status)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __init__(self, shared_status):
self.shared_status = shared_status
__le__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __le__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) <= attrs_to_tuple(other)
return NotImplemented
__lt__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __lt__(self, other):
"""
Automatically created by attrs.
"""
if other.__class__ is self.__class__:
return attrs_to_tuple(self) < attrs_to_tuple(other)
return NotImplemented
__ne__(self, other)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __ne__(self, other):
"""
Check equality and either forward a NotImplemented or
return the result negated.
"""
result = self.__eq__(other)
if result is NotImplemented:
return NotImplemented
return not result
__repr__(self)
special
¶
Method generated by attrs for class ProposalAcceptRule.
Source code in fedvision/framework/master/master.py
def __repr__(self):
"""
Automatically created by attrs.
"""
try:
working_set = _already_repring.working_set
except AttributeError:
working_set = set()
_already_repring.working_set = working_set
if id(self) in working_set:
return "..."
real_cls = self.__class__
if ns is None:
qualname = getattr(real_cls, "__qualname__", None)
if qualname is not None:
class_name = qualname.rsplit(">.", 1)[-1]
else:
class_name = real_cls.__name__
else:
class_name = ns + "." + real_cls.__name__
# Since 'self' remains on the stack (i.e.: strongly referenced) for the
# duration of this call, it's safe to depend on id(...) stability, and
# not need to track the instance and therefore worry about properties
# like weakref- or hash-ability.
working_set.add(id(self))
try:
result = [class_name, "("]
first = True
for name, attr_repr in attr_names_with_reprs:
if first:
first = False
else:
result.append(", ")
result.extend(
(name, "=", attr_repr(getattr(self, name, NOTHING)))
)
return "".join(result) + ")"
finally:
working_set.remove(id(self))
RESTService
¶
service accept restful request from users
__init__(self, shared_status, port, host=None)
special
¶
init rest services instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shared_status |
_SharedStatus |
required | |
port |
int |
required | |
host |
str |
None |
Source code in fedvision/framework/master/master.py
def __init__(self, shared_status: _SharedStatus, port: int, host: str = None):
"""
init rest services instance
Args:
shared_status:
port:
host:
"""
self.shared_status = shared_status
self.port = port
self.host = host
self._site: Optional[web.TCPSite] = None
start_rest_site(self)
async
¶
start web service non-blocked
Source code in fedvision/framework/master/master.py
async def start_rest_site(self):
"""
start web service non-blocked
"""
self.info(
f"starting restful services at {':' if self.host is None else self.host}:{self.port}"
)
app = web.Application()
app.add_routes(self._register_routes())
runner = web.AppRunner(app, access_log=self.get_logger())
await runner.setup()
self._site = web.TCPSite(runner=runner, host=self.host, port=self.port)
await self._site.start()
self.info(
f"restful services started at {':' if self.host is None else self.host}:{self.port}"
)
stop_rest_site(self)
async
¶
stop web service
Source code in fedvision/framework/master/master.py
async def stop_rest_site(self):
"""
stop web service
"""
if self._site is not None:
await self._site.stop()