coordinator
Coordinator¶
¶
coordinator
¶
Coordinator
¶
__init__(self, port)
special
¶
init coordinator
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int |
coordinator serving port |
required |
Source code in fedvision/framework/coordinator/coordinator.py
def __init__(self, port: int):
"""
init coordinator
Args:
port: coordinator serving port
"""
self._serving = True
self._enrolled: MutableMapping[str, _TaskProviderForEnrolledParty] = {}
self._proposals: MutableMapping[str, _Proposal] = {}
self._job_type_to_subscribes: MutableMapping[str, MutableSet[str]] = {}
self._check_interval = 0.5
self._count_id = 0
self._grpc_port = port
self._grpc_server = None
FetchTask(self, request, context)
async
¶
handle task fetch gRPC request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
coordinator_pb2.FetchTask.REQ |
required | |
context |
grpc.aio.ServicerContext |
required |
Returns:
Type | Description |
---|---|
coordinator_pb2.Proposal.REP |
Source code in fedvision/framework/coordinator/coordinator.py
async def FetchTask(
self, request: coordinator_pb2.FetchTask.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
"""
handle task fetch gRPC request
Args:
request:
context:
Returns:
"""
if request.proposal_id not in self._proposals:
return coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.NOT_FOUND
)
if request.party_id not in self._enrolled:
return coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.NOT_ALLOW
)
proposal = self._proposals[request.proposal_id]
if proposal.open_period_finished.is_set():
return coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.TIMEOUT
)
proposal.add_responders(request.party_id)
await proposal.open_period_finished.wait()
if not proposal.goal_reached:
return coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.CANCELED
)
self.info(f"chosen: {proposal.chosen.keys()}")
if request.party_id not in proposal.chosen:
return coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.RANDOM_OUT
)
# accepted, finally!
success_rep = coordinator_pb2.FetchTask.REP(
status=coordinator_pb2.FetchTask.READY,
task=proposal.chosen[request.party_id],
)
return success_rep
Leave(self, request, context)
async
¶
Missing associated documentation comment in .proto file.
Source code in fedvision/framework/coordinator/coordinator.py
async def Leave(self, request, context):
if request.party_id not in self._enrolled:
return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.NOT_FOUND)
self._enrolled[request.party_id].closed = True
await self._enrolled[request.party_id].queue.join()
self._enrolled.__delitem__(request.party_id)
return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.SUCCESS)
Proposal(self, request, context)
async
¶
handle job proposal gRPC request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
coordinator_pb2.Proposal.REQ |
required | |
context |
grpc.aio.ServicerContext |
required |
Returns:
Type | Description |
---|---|
coordinator_pb2.Proposal.REP |
Source code in fedvision/framework/coordinator/coordinator.py
async def Proposal(
self, request: coordinator_pb2.Proposal.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
"""
handle job proposal gRPC request
Args:
request:
context:
Returns:
"""
uid = self._generate_proposal_id(request.job_id)
proposal = _Proposal.from_pb(uid=uid, pb=request)
self._proposals[uid] = proposal
if proposal.job_type not in self._job_type_to_subscribes:
self.info(
f"job type {proposal.job_type} not in {self._job_type_to_subscribes}, reject"
)
return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.REJECT)
if (
len(self._job_type_to_subscribes[proposal.job_type])
< proposal.minimum_acceptance
):
self.info(
f"not enough parties alive accept job type {proposal.job_type},"
f" required {proposal.minimum_acceptance}, "
f"{len(self._job_type_to_subscribes[proposal.job_type])} alive"
)
return coordinator_pb2.Proposal.REP(
status=coordinator_pb2.Proposal.NOT_ENOUGH_SUBSCRIBERS
)
# dispatch proposal
for party_id in self._job_type_to_subscribes[proposal.job_type]:
await self._enrolled[party_id].queue.put(proposal)
# sleep until timeout and then check if there are enough responders
await asyncio.sleep(request.proposal_wait_time)
if not proposal.has_enough_responders():
proposal.set_open_period_finished(goal_reached=False)
return coordinator_pb2.Proposal.REP(
status=coordinator_pb2.Proposal.NOT_ENOUGH_RESPONDERS
)
proposal.set_open_period_finished(goal_reached=True)
return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.SUCCESS)
Subscribe(self, request, context)
¶
handle subscribe gRPC request, response job proposals in stream
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
coordinator_pb2.Subscribe.REQ |
required | |
context |
grpc.aio.ServicerContext |
required |
Returns:
Type | Description |
---|---|
AsyncGenerator[coordinator_pb2.Subscribe.REP, None] |
Source code in fedvision/framework/coordinator/coordinator.py
async def Subscribe(
self, request: coordinator_pb2.Subscribe.REQ, context: grpc.aio.ServicerContext
) -> AsyncGenerator[coordinator_pb2.Subscribe.REP, None]:
"""
handle subscribe gRPC request, response job proposals in stream
Args:
request:
context:
Returns:
"""
if request.party_id in self._enrolled:
yield coordinator_pb2.Subscribe.REP(
status=coordinator_pb2.Subscribe.DUPLICATE_ENROLL
)
return
if not self._serving:
yield coordinator_pb2.Subscribe.REP(
status=coordinator_pb2.Subscribe.NOT_SERVING
)
return
task_provider = _TaskProviderForEnrolledParty()
self._enrolled[request.party_id] = task_provider
for job_type in request.job_types:
self._job_type_to_subscribes.setdefault(job_type, set()).add(
request.party_id
)
while True:
if task_provider.closed:
break
try:
# make stop subscribe passable, check status regularly
proposal = await asyncio.wait_for(
task_provider.queue.get(), timeout=self._check_interval
)
except asyncio.TimeoutError:
# not receive proposal task for a while, maybe:
# 1. just no new proposal
# 2. flag has changed to false and no new proposal will in-queue
continue
else:
yield coordinator_pb2.Subscribe.REP(
status=coordinator_pb2.Subscribe.SUCCESS,
proposal_id=proposal.uid,
job_type=proposal.job_type,
)
task_provider.queue.task_done()
# clean proposals
while not task_provider.queue.empty():
await task_provider.queue.get()
task_provider.queue.task_done()