Skip to content

master

Coordinator

ClusterManagerConnect

cluster manager client

__init__(self, address, shared_status) special

init cluster manager client

Parameters:

Name Type Description Default
address required
shared_status _SharedStatus required
Source code in fedvision/framework/master/master.py
def __init__(self, address, shared_status: _SharedStatus):
    """
    init cluster manager client
    Args:
        address:
        shared_status:
    """
    self.address = address
    self.shared_status = shared_status
    self._channel: Optional[grpc.aio.Channel] = None
    self._stub: Optional[cluster_pb2_grpc.ClusterManagerStub] = None

cluster_channel_ready(self) async

await until channel ready

Source code in fedvision/framework/master/master.py
async def cluster_channel_ready(self):
    """
    await until channel ready
    """
    return await self._channel.channel_ready()

start_cluster_channel(self) async

start channel to cluster manager

Source code in fedvision/framework/master/master.py
async def start_cluster_channel(self):
    """
    start channel to cluster manager
    """
    self.info(f"start cluster channel to {self.address}")
    self._channel = grpc.aio.insecure_channel(
        self.address,
        options=[
            ("grpc.max_send_message_length", 512 * 1024 * 1024),
            ("grpc.max_receive_message_length", 512 * 1024 * 1024),
        ],
    )
    self._stub = cluster_pb2_grpc.ClusterManagerStub(self._channel)
    self.info(f"cluster channel started to {self.address}")

stop_cluster_channel(self, grace=None) async

stop channel to cluster manager

Parameters:

Name Type Description Default
grace Optional[float] None
Source code in fedvision/framework/master/master.py
async def stop_cluster_channel(self, grace: Optional[float] = None):
    """
    stop channel to cluster manager
    Args:
        grace:

    Returns:

    """
    self.info(f"stopping cluster channel")
    await self._channel.close(grace)
    self.info(f"cluster channel started to {self.address}")

submit_tasks_to_cluster(self) async

infinity loop to get task from queue and submit it to cluster

Source code in fedvision/framework/master/master.py
async def submit_tasks_to_cluster(self):
    """
    infinity loop to get task from queue and submit it to cluster
    """
    while True:
        task = await self.shared_status.cluster_task_queue.get()
        self.debug(
            f"task sending: task_id={task.task_id} task_type={task.task_type} to cluster"
        )
        await self._stub.TaskSubmit(cluster_pb2.TaskSubmit.REQ(task=task))
        self.debug(
            f"task sent: task_id={task.task_id} task_type={task.task_type} to cluster"
        )

task_resource_require(self, request) async

acquired resource from cluster(ports)

Parameters:

Name Type Description Default
request cluster_pb2.TaskResourceRequire.REQ required

Returns:

Type Description
cluster_pb2.TaskResourceRequire.REP
Source code in fedvision/framework/master/master.py
async def task_resource_require(
    self, request: cluster_pb2.TaskResourceRequire.REQ
) -> cluster_pb2.TaskResourceRequire.REP:
    """
    acquired resource from cluster(ports)
    Args:
        request:

    Returns:

    """
    response = await self._stub.TaskResourceRequire(request)
    return response

CoordinatorConnect

client connects to coordinator

__init__(self, address, shared_status) special

init coordinator client

Parameters:

Name Type Description Default
address str

str

required
shared_status _SharedStatus required
Source code in fedvision/framework/master/master.py
def __init__(self, address: str, shared_status: _SharedStatus):
    """
    init coordinator client
    Args:
        address: str
        shared_status:
    """
    self.address = address
    self.shared_status = shared_status
    self.accept_rule = ProposalAcceptRule(self.shared_status)

    self._channel = None
    self._stub = None

coordinator_channel_ready(self) async

wait until channel ready

Source code in fedvision/framework/master/master.py
async def coordinator_channel_ready(self):
    """
    wait until channel ready
    """
    return await self._channel.channel_ready()

leave(self) async

disconnect with coordinator

Source code in fedvision/framework/master/master.py
async def leave(self):
    """
    disconnect with coordinator
    """
    return await self._stub.Leave(
        coordinator_pb2.Leave.REQ(party_id=self.shared_status.party_id)
    )

make_proposal(self, request) async

publish a job proposal to coordinator

Parameters:

Name Type Description Default
request coordinator_pb2.Proposal.REQ

request

required

Returns:

Type Description
coordinator_pb2.Proposal.REP

response

Source code in fedvision/framework/master/master.py
async def make_proposal(
    self, request: coordinator_pb2.Proposal.REQ
) -> coordinator_pb2.Proposal.REP:
    """
    publish a job proposal to coordinator
    Args:
        request:
            request

    Returns:
         response

    """
    return await self._stub.Proposal(request)

start_coordinator_channel(self) async

start channel to coordinator

Source code in fedvision/framework/master/master.py
async def start_coordinator_channel(self):
    """
    start channel to coordinator
    """
    self.info(f"start coordinator channel to {self.address}")
    self._channel = grpc.aio.insecure_channel(
        self.address,
        options=[
            ("grpc.max_send_message_length", 512 * 1024 * 1024),
            ("grpc.max_receive_message_length", 512 * 1024 * 1024),
        ],
    )
    self._stub = coordinator_pb2_grpc.CoordinatorStub(
        self._channel,
    )
    self.info(f"coordinator channel started to {self.address}")

stop_coordinator_channel(self, grace=None) async

stop channel

Parameters:

Name Type Description Default
grace Optional[float]

wait seconds to gracefully stop

None
Source code in fedvision/framework/master/master.py
async def stop_coordinator_channel(self, grace: Optional[float] = None):
    """
    stop channel
    Args:
        grace:
            wait seconds to gracefully stop
    """
    self.info(f"stopping coordinator channel")
    await self._channel.close(grace)
    self.info(f"coordinator channel started to {self.address}")

subscribe(self) async

start subscribe to coordinator and accept proposals

Source code in fedvision/framework/master/master.py
async def subscribe(self):
    """
    start subscribe to coordinator and accept `proposals`
    """
    request = coordinator_pb2.Subscribe.REQ(party_id=self.shared_status.party_id)
    for job_type in self.shared_status.job_types:
        request.job_types.append(job_type)
    async for response in self._stub.Subscribe(request):
        if response.status != coordinator_pb2.Subscribe.SUCCESS:
            return
        if not await self.accept_rule.accept(response.job_type):
            return

        async def _acceptor():
            # accept all
            fetch_response = await self._stub.FetchTask(
                coordinator_pb2.FetchTask.REQ(
                    party_id=self.shared_status.party_id,
                    proposal_id=response.proposal_id,
                )
            )
            if fetch_response.status != coordinator_pb2.FetchTask.READY:
                self.debug(
                    f"proposal {response.proposal_id} not ready: {fetch_response.status}"
                )
                return

            # put task in cluster task queue
            await self.shared_status.cluster_task_queue.put(fetch_response.task)

        asyncio.create_task(_acceptor())

Master

__init__(self, party_id, coordinator_address, cluster_address, rest_port, rest_host=None) special

init master

Parameters:

Name Type Description Default
party_id str required
coordinator_address str required
rest_port int required
rest_host str None
Source code in fedvision/framework/master/master.py
def __init__(
    self,
    party_id: str,
    coordinator_address: str,
    cluster_address: str,
    rest_port: int,
    rest_host: str = None,
):
    """
      init master

    Args:
        party_id:
        coordinator_address:
        rest_port:
        rest_host:
    """
    self.shared_status = _SharedStatus(party_id=party_id)
    self._coordinator = CoordinatorConnect(
        shared_status=self.shared_status, address=coordinator_address
    )
    self._rest_site = RESTService(
        shared_status=self.shared_status, port=rest_port, host=rest_host
    )
    self._cluster = ClusterManagerConnect(
        shared_status=self.shared_status, address=cluster_address
    )

start(self) async

start master:

1. cluster manager to process tasks
2. restful service to handler request from user
3. coordinator to connect to `the world`
Source code in fedvision/framework/master/master.py
async def start(self):
    """
    start master:

        1. cluster manager to process tasks
        2. restful service to handler request from user
        3. coordinator to connect to `the world`

    """

    # connect to cluster
    await self._cluster.start_cluster_channel()
    while True:
        try:
            await asyncio.wait_for(self._cluster.cluster_channel_ready(), 5)
        except asyncio.TimeoutError:
            self.warning(f"cluster channel not ready, retry in 5 seconds")
        else:
            self.info(f"cluster channel ready!")
            break
    asyncio.create_task(self._cluster.submit_tasks_to_cluster())

    # start rest site
    await self._rest_site.start_rest_site()

    # connect to coordinator
    await self._coordinator.start_coordinator_channel()

    while True:
        try:
            await asyncio.wait_for(self._coordinator.coordinator_channel_ready(), 5)
        except asyncio.TimeoutError:
            self.warning(f"coordinator channel not ready, retry in 5 seconds")
        else:
            self.info(f"coordinator channel ready!")
            break
    asyncio.create_task(self._coordinator.subscribe())

    # job process loop:
    # 1. get job from rest site
    # 2. make proposal to coordinator
    # 3. send task to cluster by put it into a queue
    asyncio.create_task(self._submitted_job_handler())

stop(self) async

stop master

Source code in fedvision/framework/master/master.py
async def stop(self):
    """
    stop master
    """
    await self._coordinator.stop_coordinator_channel(grace=1)
    await self._rest_site.stop_rest_site()
    await self._cluster.stop_cluster_channel(grace=1)

ProposalAcceptRule

__eq__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __eq__(self, other):
    if other.__class__ is not self.__class__:
        return NotImplemented
    return  (
        self.shared_status,
    ) == (
        other.shared_status,
    )

__ge__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __ge__(self, other):
    """
    Automatically created by attrs.
    """
    if other.__class__ is self.__class__:
        return attrs_to_tuple(self) >= attrs_to_tuple(other)

    return NotImplemented

__gt__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __gt__(self, other):
    """
    Automatically created by attrs.
    """
    if other.__class__ is self.__class__:
        return attrs_to_tuple(self) > attrs_to_tuple(other)

    return NotImplemented

__init__(self, shared_status) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __init__(self, shared_status):
    self.shared_status = shared_status

__le__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __le__(self, other):
    """
    Automatically created by attrs.
    """
    if other.__class__ is self.__class__:
        return attrs_to_tuple(self) <= attrs_to_tuple(other)

    return NotImplemented

__lt__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __lt__(self, other):
    """
    Automatically created by attrs.
    """
    if other.__class__ is self.__class__:
        return attrs_to_tuple(self) < attrs_to_tuple(other)

    return NotImplemented

__ne__(self, other) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __ne__(self, other):
    """
    Check equality and either forward a NotImplemented or
    return the result negated.
    """
    result = self.__eq__(other)
    if result is NotImplemented:
        return NotImplemented

    return not result

__repr__(self) special

Method generated by attrs for class ProposalAcceptRule.

Source code in fedvision/framework/master/master.py
def __repr__(self):
    """
    Automatically created by attrs.
    """
    try:
        working_set = _already_repring.working_set
    except AttributeError:
        working_set = set()
        _already_repring.working_set = working_set

    if id(self) in working_set:
        return "..."
    real_cls = self.__class__
    if ns is None:
        qualname = getattr(real_cls, "__qualname__", None)
        if qualname is not None:
            class_name = qualname.rsplit(">.", 1)[-1]
        else:
            class_name = real_cls.__name__
    else:
        class_name = ns + "." + real_cls.__name__

    # Since 'self' remains on the stack (i.e.: strongly referenced) for the
    # duration of this call, it's safe to depend on id(...) stability, and
    # not need to track the instance and therefore worry about properties
    # like weakref- or hash-ability.
    working_set.add(id(self))
    try:
        result = [class_name, "("]
        first = True
        for name, attr_repr in attr_names_with_reprs:
            if first:
                first = False
            else:
                result.append(", ")
            result.extend(
                (name, "=", attr_repr(getattr(self, name, NOTHING)))
            )
        return "".join(result) + ")"
    finally:
        working_set.remove(id(self))

RESTService

service accept restful request from users

__init__(self, shared_status, port, host=None) special

init rest services instance

Parameters:

Name Type Description Default
shared_status _SharedStatus required
port int required
host str None
Source code in fedvision/framework/master/master.py
def __init__(self, shared_status: _SharedStatus, port: int, host: str = None):
    """
    init rest services instance
    Args:
        shared_status:
        port:
        host:
    """
    self.shared_status = shared_status
    self.port = port
    self.host = host

    self._site: Optional[web.TCPSite] = None

start_rest_site(self) async

start web service non-blocked

Source code in fedvision/framework/master/master.py
async def start_rest_site(self):
    """
    start web service non-blocked
    """
    self.info(
        f"starting restful services at {':' if self.host is None else self.host}:{self.port}"
    )
    app = web.Application()
    app.add_routes(self._register_routes())
    runner = web.AppRunner(app, access_log=self.get_logger())
    await runner.setup()
    self._site = web.TCPSite(runner=runner, host=self.host, port=self.port)
    await self._site.start()
    self.info(
        f"restful services started at  {':' if self.host is None else self.host}:{self.port}"
    )

stop_rest_site(self) async

stop web service

Source code in fedvision/framework/master/master.py
async def stop_rest_site(self):
    """
    stop web service
    """
    if self._site is not None:
        await self._site.stop()