[ Avaa Bypassed ]




Upload:

Command:

www-data@3.144.229.52: ~ $
# Copyright 2019 Canonical, Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
import concurrent.futures
import logging


log = logging.getLogger("subiquitycore.async_helpers")


def _done(fut):
    try:
        fut.result()
    except asyncio.CancelledError:
        pass


def schedule_task(coro, propagate_errors=True):
    loop = asyncio.get_event_loop()
    if asyncio.iscoroutine(coro):
        task = asyncio.Task(coro)
    else:
        task = coro
    if propagate_errors:
        task.add_done_callback(_done)
    loop.call_soon(asyncio.ensure_future, task)
    return task


async def run_in_thread(func, *args):
    loop = asyncio.get_event_loop()
    try:
        return await loop.run_in_executor(None, func, *args)
    except concurrent.futures.CancelledError:
        raise asyncio.CancelledError


class SingleInstanceTask:

    def __init__(self, func, propagate_errors=True):
        self.func = func
        self.propagate_errors = propagate_errors
        self.task = None

    async def _start(self, old):
        if old is not None:
            old.cancel()
            try:
                await old
            except BaseException:
                pass
        schedule_task(self.task, self.propagate_errors)

    async def start(self, *args, **kw):
        await self.start_sync(*args, **kw)
        return self.task

    def start_sync(self, *args, **kw):
        old = self.task
        coro = self.func(*args, **kw)
        if asyncio.iscoroutine(coro):
            self.task = asyncio.Task(coro)
        else:
            self.task = coro
        return schedule_task(self._start(old))

    async def wait(self):
        while True:
            try:
                return await self.task
            except asyncio.CancelledError:
                pass

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
controllers Folder 0755
models Folder 0755
testing Folder 0755
tests Folder 0755
ui Folder 0755
__init__.py File 710 B 0644
async_helpers.py File 2.27 KB 0644
context.py File 4.42 KB 0644
controller.py File 1.68 KB 0644
controllerset.py File 1.9 KB 0644
core.py File 4.5 KB 0644
file_util.py File 1.67 KB 0644
i18n.py File 1.7 KB 0644
log.py File 1.8 KB 0644
lsb_release.py File 947 B 0644
netplan.py File 5.34 KB 0644
palette.py File 4.53 KB 0644
prober.py File 1.96 KB 0644
pubsub.py File 1.22 KB 0644
screen.py File 4.9 KB 0644
snapd.py File 6.1 KB 0644
ssh.py File 3.54 KB 0644
tui.py File 12.98 KB 0644
tuicontroller.py File 3.38 KB 0644
utils.py File 5.71 KB 0644
view.py File 3.35 KB 0644