Skip to content

coordinator

Coordinator

coordinator

Coordinator

__init__(self, port) special

init coordinator

Parameters:

Name Type Description Default
port int

coordinator serving port

required
Source code in fedvision/framework/coordinator/coordinator.py
def __init__(self, port: int):
    """
    init coordinator

    Args:
        port: coordinator serving port
    """
    self._serving = True
    self._enrolled: MutableMapping[str, _TaskProviderForEnrolledParty] = {}
    self._proposals: MutableMapping[str, _Proposal] = {}
    self._job_type_to_subscribes: MutableMapping[str, MutableSet[str]] = {}
    self._check_interval = 0.5
    self._count_id = 0

    self._grpc_port = port
    self._grpc_server = None

FetchTask(self, request, context) async

handle task fetch gRPC request

Parameters:

Name Type Description Default
request coordinator_pb2.FetchTask.REQ required
context grpc.aio.ServicerContext required

Returns:

Type Description
coordinator_pb2.Proposal.REP
Source code in fedvision/framework/coordinator/coordinator.py
async def FetchTask(
    self, request: coordinator_pb2.FetchTask.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
    """
    handle task fetch gRPC request

    Args:
        request:
        context:

    Returns:

    """

    if request.proposal_id not in self._proposals:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.NOT_FOUND
        )

    if request.party_id not in self._enrolled:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.NOT_ALLOW
        )

    proposal = self._proposals[request.proposal_id]

    if proposal.open_period_finished.is_set():
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.TIMEOUT
        )

    proposal.add_responders(request.party_id)
    await proposal.open_period_finished.wait()

    if not proposal.goal_reached:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.CANCELED
        )

    self.info(f"chosen: {proposal.chosen.keys()}")
    if request.party_id not in proposal.chosen:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.RANDOM_OUT
        )

    # accepted, finally!
    success_rep = coordinator_pb2.FetchTask.REP(
        status=coordinator_pb2.FetchTask.READY,
        task=proposal.chosen[request.party_id],
    )
    return success_rep

Leave(self, request, context) async

Missing associated documentation comment in .proto file.

Source code in fedvision/framework/coordinator/coordinator.py
async def Leave(self, request, context):
    if request.party_id not in self._enrolled:
        return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.NOT_FOUND)
    self._enrolled[request.party_id].closed = True
    await self._enrolled[request.party_id].queue.join()
    self._enrolled.__delitem__(request.party_id)
    return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.SUCCESS)

Proposal(self, request, context) async

handle job proposal gRPC request

Parameters:

Name Type Description Default
request coordinator_pb2.Proposal.REQ required
context grpc.aio.ServicerContext required

Returns:

Type Description
coordinator_pb2.Proposal.REP
Source code in fedvision/framework/coordinator/coordinator.py
async def Proposal(
    self, request: coordinator_pb2.Proposal.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
    """
    handle job proposal gRPC request

    Args:
        request:
        context:

    Returns:

    """

    uid = self._generate_proposal_id(request.job_id)
    proposal = _Proposal.from_pb(uid=uid, pb=request)
    self._proposals[uid] = proposal

    if proposal.job_type not in self._job_type_to_subscribes:
        self.info(
            f"job type {proposal.job_type} not in {self._job_type_to_subscribes}, reject"
        )
        return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.REJECT)

    if (
        len(self._job_type_to_subscribes[proposal.job_type])
        < proposal.minimum_acceptance
    ):
        self.info(
            f"not enough parties alive accept job type {proposal.job_type},"
            f" required {proposal.minimum_acceptance}, "
            f"{len(self._job_type_to_subscribes[proposal.job_type])} alive"
        )
        return coordinator_pb2.Proposal.REP(
            status=coordinator_pb2.Proposal.NOT_ENOUGH_SUBSCRIBERS
        )

    # dispatch proposal
    for party_id in self._job_type_to_subscribes[proposal.job_type]:
        await self._enrolled[party_id].queue.put(proposal)

    # sleep until timeout and then check if there are enough responders
    await asyncio.sleep(request.proposal_wait_time)
    if not proposal.has_enough_responders():
        proposal.set_open_period_finished(goal_reached=False)
        return coordinator_pb2.Proposal.REP(
            status=coordinator_pb2.Proposal.NOT_ENOUGH_RESPONDERS
        )

    proposal.set_open_period_finished(goal_reached=True)
    return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.SUCCESS)

Subscribe(self, request, context)

handle subscribe gRPC request, response job proposals in stream

Parameters:

Name Type Description Default
request coordinator_pb2.Subscribe.REQ required
context grpc.aio.ServicerContext required

Returns:

Type Description
AsyncGenerator[coordinator_pb2.Subscribe.REP, None]
Source code in fedvision/framework/coordinator/coordinator.py
async def Subscribe(
    self, request: coordinator_pb2.Subscribe.REQ, context: grpc.aio.ServicerContext
) -> AsyncGenerator[coordinator_pb2.Subscribe.REP, None]:
    """
    handle subscribe gRPC request, response job proposals in stream

    Args:
        request:
        context:

    Returns:

    """
    if request.party_id in self._enrolled:
        yield coordinator_pb2.Subscribe.REP(
            status=coordinator_pb2.Subscribe.DUPLICATE_ENROLL
        )
        return
    if not self._serving:
        yield coordinator_pb2.Subscribe.REP(
            status=coordinator_pb2.Subscribe.NOT_SERVING
        )
        return

    task_provider = _TaskProviderForEnrolledParty()
    self._enrolled[request.party_id] = task_provider

    for job_type in request.job_types:
        self._job_type_to_subscribes.setdefault(job_type, set()).add(
            request.party_id
        )

    while True:
        if task_provider.closed:
            break

        try:
            # make stop subscribe passable, check status regularly
            proposal = await asyncio.wait_for(
                task_provider.queue.get(), timeout=self._check_interval
            )
        except asyncio.TimeoutError:
            # not receive proposal task for a while, maybe:
            # 1. just no new proposal
            # 2. flag has changed to false and no new proposal will in-queue
            continue
        else:
            yield coordinator_pb2.Subscribe.REP(
                status=coordinator_pb2.Subscribe.SUCCESS,
                proposal_id=proposal.uid,
                job_type=proposal.job_type,
            )
            task_provider.queue.task_done()

    # clean proposals
    while not task_provider.queue.empty():
        await task_provider.queue.get()
        task_provider.queue.task_done()