РЕШЕНО! asyncio: выполнение задач + HTTPServer

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
1. OS : Windows && ArchLinux
2. Python version 3.7.2
3.
Package Version
----------------------- ----------
certifi 2018.11.29
cffi 1.12.3
chardet 3.0.4
click 7.1.2
gevent 1.4.0
greenlet 0.4.15
idna 2.8
lxml 4.5.1
mysql-connector-python 8.0.13
nest-asyncio 1.4.0
parse 1.16.0
pip 20.2.2
psutil 5.7.0
pycparser 2.19
PyMySQL 0.9.3
requests 2.21.0
ru-address 0.1
setuptools 40.6.2
simple-websocket-server 0.4.0
six 1.12.0
urllib3 1.24.1
var-dump 1.2
websocket 0.2.1
websocket-client 0.56.0
websockets 8.1

4. код укажу в сообщении



Доброго дня Всем! Бьюсь не первый день,

Задача: сделать управление программой через HttpServer и не мешать её работе/продолжать выполнение while и запускать задачи
Кратко о коде: Есть run_mi():
- он с базы берёт настройки на выполнение всякой лабуды, например запустить то то в такое то время и прочее (config.getOptions()),
- далее запускается HttpServer для управления программой через GET,
- затем идёт бесконечный цикл while (вот тут и запускается всякая лабуда по расписанию), запускаются задачи асинхронно благодаря asyncio, чтобы это было в while есть "await asyncio.sleep(1)"
Проблема: при запуске HttpServer, стопарится на команде await asyncio.sleep(1) в цикле while. У меня есть предположение, что "HTTPServer((host, port), HandleRequests).serve_forever()", выполняет прослушку порта и висит на нём, не давая выполняться больше ничего в программе

Так вот сам вопрос, как решить задачу? У меня уже глаз мылится на этой теме. Как повесить HttServer чтобы слушать get и чтоб не стопарить выполнение программы?
Может другие есть альтернативные способы, через WebSocket, Socket не хотелось бы - НО рассматриваю как альтернативу, через общий файл не серьёзно. Есть любые идеи или предложения?
 
Последнее редактирование:

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
Python:
import _functions
import config
import asyncio
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process, Queue
import multiprocessing, logging

import nest_asyncio
nest_asyncio.apply()

class GlobVar:
    options = dict()

from urllib.parse import urlparse
import urllib.parse
#import myurl
from http.server import BaseHTTPRequestHandler, HTTPServer # python3
class HandleRequests(BaseHTTPRequestHandler):
    def _set_headers(self):
##        self.send_response(200)
##        self.send_header('Content-type', 'application/json')
##        self.end_headers()
        pass

    def do_GET__old(self):
        self._set_headers()
        self.wfile.write("received get request")
        
    def do_GET(self):
##         self._set_headers()
##         print(self.headers)
##        content_len = int(self.headers.getheader('content-length', 0))
##        post_body = self.rfile.read(content_len)
##        print(post_body)

         print(self.path)

         #query = urlparse(self.path).query
         #parsed_query = parse_qs(query)
            
         #print(query)
         #print(parsed_query)

         self.send_response(200)
         self.send_header('content-type','application/json')
         self.end_headers()

         if self.path == '/':
            self.wfile.write(b"{ 0:false, 1:'Enter command', 'type':'error'}")
         else:
            if urllib.parse.urlparse(self.path).path == '/':
               # Обработка здесь уже, что там пришло

               q = urlparse(self.path)
               print(q)
              
               query = urllib.parse.urlparse(self.path).query
               __arr = (dict(urllib.parse.parse_qsl(query)))

               print(__arr)

               GlobVar.options[4]['__running'] = 5

               # print(myurl.parse(self.path))

               self.wfile.write(b"{'hello world!'}")
               self.wfile.write(b"{'hello world 2!'}")
               self.wfile.write(bytes(self.path, 'utf-8') )
            else:
               self.wfile.write(b"{'hello...!'}")
            
    def do_POST(self):
        '''Reads post request body'''
        self._set_headers()
        content_len = int(self.headers.getheader('content-length', 0))
        post_body = self.rfile.read(content_len)
        self.wfile.write("received post request:<br>{}".format(post_body))


async def new_proc_wss(arr):
    v = 7
    if v == 1:
       pass
    elif v == 7:
      print('start http...') 
      # Параллельно не работает, тормозит и пиздец
      host = ''
      port = 8002
      await HTTPServer((host, port), HandleRequests).serve_forever()

def sub_loop(arr):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(new_proc_wss(arr))

# v 12 (down)
import asyncore
import socket
class EchoHandler(asyncore.dispatcher_with_send):
    def handle_read(self):
        data = self.recv(1024)
        if data == "close":self.close()
        self.send(data)
 
class EchoServer(asyncore.dispatcher):
     def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(11)
    
    def handle_accept(self):
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print ('conn', addr)
            handler = EchoHandler(sock)
 

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
Python:
# async def run():

async def run(executor):
    print(' run mi()');
    GlobVar.options = config.getOptions()
    print('options:', GlobVar.options)
    # Запуск автозапуска
    # a_dict.keys()

    for key in GlobVar.options:
        arr = GlobVar.options[key]
        if arr['mo_mode'] == 1:

            if arr['mo_str'] == 'websocket':

                #_websocket2.create_websocket()

                #print('..1')

                number = 5

                #__name = '_websocket2'

                __name = '_websocket2_async'

                #if arr['mo_str_2'] != False:

                #   __name = arr['mo_str_2']

                # default MODE run() v = 4

                 

                v = 3

                if v == 0:

                   pass

                elif v == 10:

                   # ERROR: Cannot run the event loop while another loop is running

                   ## medicine - nest_asyncio.apply()

                   # NEW FUCK: не доходит до next

                   #

                   await asyncio.get_event_loop().run_in_executor(executor, sub_loop(arr))

                   print('next...')

                elif v == 11:

                   # MODE run() v = 5

                   # Стопарит дальнейшее выполнение и не доходит до finished

                   task_4 = new_proc_wss(arr)

                   # tasks_list = [task_1, task_2, task_3, task_4]

                   tasks_list = [task_4]

                   finished, unfinished = await asyncio.wait(tasks_list, loop=executor, return_when=asyncio.ALL_COMPLETED)

                   print(finished, unfinished)

                   for unfinished_task in unfinished:

                       unfinished_task.cancel()

                   print('finished:: ', finished)

                elif v == 12:

                   # НЕ доходит до NEXT

                   host = ''

                   port = 2222



                   server = EchoServer(host, port)

                   await asyncore.loop()

                   print('next...')



                elif v == 13:

                   pass



                elif v == 1:

                   proc = multiprocessing.Process(target=_websocket2_async.create_websocket, name=__name, args=(number,))

                   #proc = Process(target=child, args=())

                   #print('..2')

                   procs.append(proc)

                   proc.daemon=True

                   print('..3')

                   #try:

                   proc.start()

                   #except OSError as err:

                   #    print(err)

                   # m = multiprocessing.Manager()

                 

                   print('..4')

                   #proc.join()

                   print('..5, proc = ', proc, ', p.pid = ', proc.pid, ', is_alive = ', proc.is_alive)

                 

                elif v == 3:

                   # Работает, но стопарит дальнейшее выполнение, на await asyncio.sleep(1)

                 

                   print('load wss 8000')

                   # async

                   future = asyncio.ensure_future(new_proc_wss(arr))



                   def callback_wss(fut):

                       result = fut.result()

                       print('finish callback_wss')

                     

                   #asyncio.ensure_future(new_proc(arr)).add_done_callback(callback)

                   print(111)

                   future.add_done_callback(callback_wss)

                   print(222)



                elif v == 4:

                   # Запускается и закрывается процесс, но это новый процесс, общаться через межпроц взаимодействие - на самый крайний случай

                   print('v4')

                   proc = multiprocessing.Process(target=new_proc_wss, name=__name, args=(arr,))

                   #proc = Process(target=child, args=())

                   #print('..2')

                   #procs.append(proc)

                   #### proc.daemon=True

                   print('..3')

                   #try:

                   proc.start()

                   print('starting')



                elif v == 5:

                   # Запускает хттп и тормозит на этом, до next не доходит...

                   await new_proc_wss(arr)

                   print('next...')



                elif v == 6:

                   # Блокирует дальнейшее выполнеие asyncio, до next не доходит...

                   task = asyncio.create_task(new_proc_wss(arr))

                   print('next...')



                elif v == 7:

                   # Блокирует дальнейшее выполнеие, до next не доходит...

                   results = await asyncio.gather(new_proc_wss(arr))

                   print(results)

                   print('next...')



                elif v == 8:

                  # ERROR - Cannot run the event loop while another loop is running

                  loop = asyncio.new_event_loop()

                  # Each client connection will create a new protocol instance

                  coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)

                  server = loop.run_until_complete(coro)

                  # Serve requests until CTRL+c is pressed

                  print('Serving on {}'.format(server.sockets[0].getsockname()))

                  try:

                      loop.run_forever()

                  except KeyboardInterrupt:

                      pass

                elif v == 9:

                   # До NEXT не доходит

                   # https://docs.python.org/3.7/library/asyncio-eventloop.html#asyncio.Server.serve_forever

                   async def client_connected(reader, writer):

                      # Communicate with the client with

                      # reader/writer streams.  For example:

                      await reader.readline()

   

                   srv = await asyncio.start_server(

                       client_connected, '127.0.0.1', '8003')

                   await srv.serve_forever()

                   print('next...')



    # Запуск цикла

    boom=1

    while boom >0:

        print('iteration1...')

       

        await asyncio.sleep(1)

        # time.sleep(1)

       

        print('iteration2...')



        # Запуск всякой лабуды в указанное время



def run_mi():

   v = 4

   if v == 1:

       pass

   elif v == 4:

      loop = asyncio.ProactorEventLoop()

      asyncio.set_event_loop(loop)

      loop.run_until_complete(run(loop))

      loop.run_forever()

   elif v == 5:

      # https://stackoverflow.com/questions/38193596/asyncio-multiprocessing-unix

      executor = ProcessPoolExecutor()

      # executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)

      asyncio.get_event_loop().run_until_complete(run(executor))


if __name__ == '__main__':

   run_mi()
 
Последнее редактирование:

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
Пишу Тебе о страждущий путник, что ищет ответы на свои вопросы потеряв свой сон в поисках решения на просторах инета:

Решил таки сделать многопоточностью, запускаю отдельным потоком ХттпСервер


Python:
def _create_httpserver(p):
    host = ''
    port = p
    HTTPServer((host, port), HandleRequests).serve_forever()

...........

t = threading.Thread(target=_create_httpserver, args=[8030)
t.start()

Три плюса:
0. Если навернулась основная программа (закрылась, вылетела, ушла на свидание и пр.) - то завершается и поток, а при многопроцессорности - процесс висит себе и в ус не дует, плюс межпроцессорное взаимодействие намного громозкее и нафик оно не нужно здесь
1. Проще
2. Да к тому же херня какая то твориться начала в отдельном процессе, заголовки не передавались, если ошибка не видна, в винде в журнале событий - ничего, в Линуксе не пробовал, и на странице ошибка, дескать данные не переданы


Т.ч. вопрос снят
 
Последнее редактирование:

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
Кто нить знает как тему изменить, просто добавить в начале темы слово "РЕШЕНО", это для индексации в поисковиках для форума будет лучше, м?
 

freebsdd

Новичок
Пользователь
Авг 28, 2020
13
0
1
Изменил тему, вопрос снят. Если кто нить хочет порассуждать на эту тему, есть мысли какие, есть что добавить или поделиться опытом, ну или критика какая есть, рад буду продолжить диалог
 

Форум IT Специалистов