decoder_multi.py 5.9 KB


  1. __all__= ['decode']
  2. import concurrent.futures as cf
  3. import threading
  4. import mllp_grpc.asr_pb2 as asr_pb2
  5. import mllp_grpc.asr_common_pb2 as asr_common_pb2
  6. from server_lib.decoder_mng import DecoderException
  7. def ee_format(txt):
  8. code=asr_common_pb2.DecodeResponse.Status.Code.ERR_WRONG_FORMAT
  9. raise DecoderException(code=code,msg=txt)
  10. class Demultiplexor:
  11. def __init__(self,request_iterator):
  12. self._qs= []
  13. self.pos2sysid= []
  14. # Els primers paquets són els identificadors
  15. for o in request_iterator:
  16. if not o.HasField('system_id'):
  17. break
  18. new_q= [o]
  19. self._qs.append(new_q)
  20. self.pos2sysid.append(o.system_id)
  21. if self._qs==[]: ee_format('no system_id were provided')
  22. if not o.HasField('data'):
  23. ee_format('no data packages were provided')
  24. # Insert in all qs
  25. for q in self._qs:
  26. q.append(o)
  27. # Init state
  28. self._it= request_iterator
  29. self._stop= False
  30. self._counter= self.num_streams
  31. self._cv= threading.Condition()
  32. # end __init__
  33. def __refill_nonsecure(self):
  34. # Dec and check
  35. if self._stop: return
  36. self._counter-= 1
  37. if self._counter>0: return # L'últim replena
  38. # Fill
  39. o= next(self._it,None)
  40. if o is None: self._stop= True
  41. else:
  42. self._counter= self.num_streams
  43. for q in self._qs:
  44. q.append(o)
  45. self._cv.notify_all()
  46. def __refill(self):
  47. try:
  48. self.__refill_nonsecure()
  49. except BaseException as e:
  50. self._stop= True
  51. self._cv.notify_all()
  52. raise e
  53. # Se li passa el número de cola
  54. def __call__(self,q_id):
  55. while True:
  56. # Intenta obtindre el següent element
  57. with self._cv:
  58. q= self._qs[q_id]
  59. self._cv.wait_for(lambda: len(q)>0 or self._stop)
  60. if self._stop and len(q)==0: return # Finalitzem
  61. o= q.pop(0)
  62. if q==[]: self.__refill()
  63. yield o
  64. # end __call__
  65. @property
  66. def num_streams(self):
  67. return len(self._qs)
  68. # end Demultiplexor
  69. class SegmentLangId:
  70. def __init__(self,num_langs,seg_id=0):
  71. NL= num_langs
  72. self._seg_id= seg_id
  73. self._next= None
  74. self._max= [float('-inf')]*NL
  75. self._NL= NL
  76. def __get_best(self):
  77. lang,score= 0,self._max[0]
  78. for l in range(1,self._NL):
  79. val= self._max[l]
  80. if val>score: lang,score= l,val
  81. return lang
  82. # Torna el millor (seg_id,lang_id) del segment actual
  83. def __call__(self,lang_id,score,nframes):
  84. if nframes==0: return self._seg_id,self.__get_best()
  85. score/= nframes
  86. self._max[lang_id]= score
  87. return self._seg_id,self.__get_best()
  88. @property
  89. def next(self):
  90. if self._next==None:
  91. self._next= SegmentLangId(self._NL,self._seg_id+1)
  92. return self._next
  93. # end SegmentLangId
  94. class LangId:
  95. def __init__(self,num_langs):
  96. lid= SegmentLangId(num_langs)
  97. self._current_lang= []
  98. for i in range(num_langs):
  99. self._current_lang.append(lid)
  100. # Torna el millor (seg_id,lang_id) del segment actual
  101. def __call__(self,lang_id,score,nframes,is_final):
  102. current= self._current_lang[lang_id]
  103. ret= current(lang_id,score,nframes)
  104. if is_final:
  105. self._current_lang[lang_id]= current.next
  106. return ret
  107. # end HypMng
  108. class ExceptionContainer:
  109. def __init__(self):
  110. self._e= None
  111. def __bool__(self):
  112. return self._e is not None
  113. def set(self,e):
  114. self._e= e
  115. @property
  116. def exception(self):
  117. return self._e
  118. # end ExceptionContainer
  119. class Counter:
  120. def __init__(self,val):
  121. self.val= val
  122. def dec(self):
  123. self.val-= 1
  124. # end Counter
  125. # Pot generar excepcions de tot tipus. Incloent DecoderException.
  126. def decode(mng,request_iterator,ii):
  127. # Launch threads
  128. demux= Demultiplexor(request_iterator)
  129. output_buffer= []
  130. error= ExceptionContainer()
  131. cv= threading.Condition()
  132. lid= LangId(demux.num_streams)
  133. stop_counter= Counter(demux.num_streams)
  134. def transcribe(lang_id):
  135. def my_ii(txt):
  136. ii('[sys_id:%d] %s'%(demux.pos2sysid[lang_id],txt))
  137. stop= False
  138. for o in mng.decode(demux(lang_id),my_ii):
  139. with cv:
  140. seg_id,best_lid= lid(lang_id,o.score,o.nframes,o.eos)
  141. best_sys_id= demux.pos2sysid[best_lid]
  142. curr_sys_id= demux.pos2sysid[lang_id]
  143. res= asr_pb2.DecodeMultiResponse(res=o,segment_id=seg_id,
  144. best_system=best_sys_id,
  145. current_system=curr_sys_id)
  146. output_buffer.append(res)
  147. cv.notify_all()
  148. with cv:
  149. stop_counter.dec()
  150. cv.notify_all()
  151. def transcribe_err(lang_id):
  152. try:
  153. transcribe(lang_id)
  154. except BaseException as e:
  155. with cv:
  156. if not error:
  157. error.set(e)
  158. cv.notify_all()
  159. pool= cf.ThreadPoolExecutor(max_workers=demux.num_streams)
  160. for i in range(demux.num_streams):
  161. pool.submit(transcribe_err,i)
  162. # Processa l'exida
  163. stop= False
  164. while True:
  165. with cv:
  166. cv.wait_for(lambda: len(output_buffer)>0 or stop_counter.val==0 or error)
  167. if error: stop= True
  168. elif len(output_buffer)>0:
  169. ret= output_buffer.pop(0)
  170. else:
  171. assert stop_counter.val==0
  172. stop= True
  173. if stop: break
  174. yield ret
  175. # Acaba
  176. pool.shutdown()
  177. if error:
  178. raise error.exception
  179. # end decode