Saya ingin mengaktifkan suatu fungsi setiap 0,5 detik dan dapat memulai dan menghentikan serta mengatur ulang pengatur waktu. Saya tidak terlalu paham tentang cara kerja utas Python dan saya mengalami kesulitan dengan pengatur waktu python.
Namun, saya terus mendapatkan RuntimeError: threads can only be started once
ketika saya mengeksekusi threading.timer.start()
dua kali. Apakah ada solusi untuk ini? Saya mencoba melamar threading.timer.cancel()
sebelum memulai.
Kode semu:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
python
python-3.x
python-2.7
pengguna1431282
sumber
sumber
threading.Event
danwait
sebagai gantinyasleep
. Kemudian, untuk membangunkannya, cukup atur acaranya. Anda bahkan tidak memerlukannyaself.stopped
karena Anda hanya memeriksa bendera acara.event.wait
hanya akan batas waktu dan bertindak seperti tidur, tetapi jika Anda ingin menghentikan (atau mengganggu utas), Anda akan mengatur acara utas dan itu akan segera bangun.thread.start()
memberi sayathreads can only be started once
Dari Ekuivalen setInterval dengan python :
import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator
Pemakaian:
@setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set()
Atau inilah fungsionalitas yang sama tetapi sebagai fungsi mandiri, bukan sebagai dekorator :
cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls()
Berikut cara melakukannya tanpa menggunakan utas .
sumber
@setInterval(1)
.stop = repeat(every=second, call=your_function); ...; stop()
.stop = call_repeatedly(interval, your_function); ...; stop()
implementasiMenggunakan utas pengatur waktu-
from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start()
ini bisa dihentikan
t.cancel()
sumber
cancel
metode ini. Saat ini dipanggil, utasnya adalah 1) tidak berjalan atau 2) berjalan. Dalam 1) kami menunggu untuk menjalankan fungsi, jadi pembatalan akan berfungsi dengan baik. di 2) kami sedang berjalan, jadi pembatalan tidak akan berpengaruh pada eksekusi saat ini. Selain itu, eksekusi saat ini menjadwal ulang sendiri sehingga tidak akan berpengaruh di masa mendatang.Sedikit memperbaiki jawaban Hans Then , kita bisa membuat subkelas fungsi Timer. Yang berikut ini menjadi seluruh kode "pengatur waktu berulang", dan dapat digunakan sebagai pengganti drop-in untuk threading.Timer dengan semua argumen yang sama:
from threading import Timer class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs)
Contoh penggunaan:
def dummyfn(msg="foo"): print(msg) timer = RepeatTimer(1, dummyfn) timer.start() time.sleep(5) timer.cancel()
menghasilkan keluaran sebagai berikut:
dan
timer = RepeatTimer(1, dummyfn, args=("bar",)) timer.start() time.sleep(5) timer.cancel()
menghasilkan
sumber
RuntimeError: threads can only be started once
,.threading.py
modul itu sendiri.Untuk kepentingan memberikan jawaban yang benar menggunakan Timer sesuai OP yang diminta, saya akan memperbaiki jawaban swapnil jariwala :
from threading import Timer class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start()
sumber
Saya telah mengubah beberapa kode di kode swapnil-jariwala untuk membuat jam konsol kecil.
from threading import Timer, Thread, Event from datetime import datetime class PT(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def printer(): tempo = datetime.today() h,m,s = tempo.hour, tempo.minute, tempo.second print(f"{h}:{m}:{s}") t = PT(1, printer) t.start()
>>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ...
Timer dengan antarmuka Grafis yang lebih rapi
Kode ini menempatkan pengatur waktu jam di jendela kecil dengan tkinter
from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop()
Contoh permainan kartu flash (semacam)
from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() x = datetime.today() start = x.second def printer(): global questions, counter, start x = datetime.today() tempo = x.second if tempo - 3 > start: show_ans() #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="") print() print("-" + questions[counter]) counter += 1 if counter == len(answers): counter = 0 def show_ans(): global answers, c2 print("It is {}".format(answers[c2])) c2 += 1 if c2 == len(answers): c2 = 0 questions = ["What is the capital of Italy?", "What is the capital of France?", "What is the capital of England?", "What is the capital of Spain?"] answers = "Rome", "Paris", "London", "Madrid" counter = 0 c2 = 0 print("Get ready to answer") t = perpetualTimer(3, printer) t.start()
keluaran:
Get ready to answer >>> -What is the capital of Italy? It is Rome -What is the capital of France? It is Paris -What is the capital of England? ...
sumber
Saya harus melakukan ini untuk sebuah proyek. Apa yang akhirnya saya lakukan adalah memulai utas terpisah untuk fungsi tersebut
**** detak jantung adalah fungsi saya, pekerja adalah salah satu argumen saya ****
di dalam fungsi detak jantung saya:
def heartbeat(worker): while True: time.sleep(5) #all of my code
Jadi ketika saya memulai utas, fungsinya akan berulang kali menunggu 5 detik, menjalankan semua kode saya, dan melakukannya tanpa batas. Jika Anda ingin menghentikan proses, cukup bunuh utasnya.
sumber
Saya telah menerapkan kelas yang berfungsi sebagai pengatur waktu.
Saya meninggalkan tautan di sini jika ada yang membutuhkannya: https://github.com/ivanhalencp/python/tree/master/xTimer
sumber
from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ) t.start() TaskManager()
Berikut adalah contoh kecil, ini akan membantu lebih baik memahami bagaimana itu berjalan. function taskManager () pada akhirnya membuat pemanggilan fungsi tertunda untuk dirinya sendiri.
Cobalah untuk mengubah variabel "dalay" dan Anda akan dapat melihat perbedaannya
from threading import Timer, _sleep # ------------------------------------------ DATA = [] dalay = 0.25 # sec counter = 0 allow_run = True FIFO = True def taskManager(): global counter, DATA, delay, allow_run counter += 1 if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]") else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]") else: print("["+str(counter)+"] no data") if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ) t.start() else: print(" END task-manager: disabled") # ------------------------------------------ def main(): DATA.append("data from main(): 0") _sleep(2) DATA.append("data from main(): 1") _sleep(2) # ------------------------------------------ print(" START task-manager:") taskManager() _sleep(2) DATA.append("first data") _sleep(2) DATA.append("second data") print(" START main():") main() print(" END main():") _sleep(2) DATA.append("last data") allow_run = False
sumber
Saya suka jawaban right2clicky, terutama karena tidak memerlukan Thread untuk dirobohkan dan yang baru dibuat setiap kali Timer berdetak. Selain itu, ini adalah penggantian yang mudah untuk membuat kelas dengan callback pengatur waktu yang dipanggil secara berkala. Itu kasus penggunaan normal saya:
class MyClass(RepeatTimer): def __init__(self, period): super().__init__(period, self.on_timer) def on_timer(self): print("Tick") if __name__ == "__main__": mc = MyClass(1) mc.start() time.sleep(5) mc.cancel()
sumber
Ini adalah implementasi alternatif yang menggunakan fungsi alih-alih kelas. Terinspirasi oleh @Andrew Wilkins di atas.
Karena menunggu lebih akurat daripada tidur (ini memperhitungkan runtime fungsi):
import threading PING_ON = threading.Event() def ping(): while not PING_ON.wait(1): print("my thread %s" % str(threading.current_thread().ident)) t = threading.Thread(target=ping) t.start() sleep(5) PING_ON.set()
sumber
Saya telah menemukan solusi lain dengan kelas SingleTon. Tolong beritahu saya jika ada kebocoran memori di sini.
import time,threading class Singleton: __instance = None sleepTime = 1 executeThread = False def __init__(self): if Singleton.__instance != None: raise Exception("This class is a singleton!") else: Singleton.__instance = self @staticmethod def getInstance(): if Singleton.__instance == None: Singleton() return Singleton.__instance def startThread(self): self.executeThread = True self.threadNew = threading.Thread(target=self.foo_target) self.threadNew.start() print('doing other things...') def stopThread(self): print("Killing Thread ") self.executeThread = False self.threadNew.join() print(self.threadNew) def foo(self): print("Hello in " + str(self.sleepTime) + " seconds") def foo_target(self): while self.executeThread: self.foo() print(self.threadNew) time.sleep(self.sleepTime) if not self.executeThread: break sClass = Singleton() sClass.startThread() time.sleep(5) sClass.getInstance().stopThread() sClass.getInstance().sleepTime = 2 sClass.startThread()
sumber
Selain jawaban bagus di atas menggunakan Threads, jika Anda harus menggunakan utas utama atau lebih suka pendekatan asinkron - Saya membungkus kelas pendek di sekitar kelas aio_timers Timer (untuk mengaktifkan pengulangan)
import asyncio from aio_timers import Timer class RepeatingAsyncTimer(): def __init__(self, interval, cb, *args, **kwargs): self.interval = interval self.cb = cb self.args = args self.kwargs = kwargs self.aio_timer = None self.start_timer() def start_timer(self): self.aio_timer = Timer(delay=self.interval, callback=self.cb_wrapper, callback_args=self.args, callback_kwargs=self.kwargs ) def cb_wrapper(self, *args, **kwargs): self.cb(*args, **kwargs) self.start_timer() from time import time def cb(timer_name): print(timer_name, time()) print(f'clock starts at: {time()}') timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1') timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
jam dimulai pada: 16024388 40 .9690785
timer_ 1 16024388 45 0,980087
timer_ 2 16024388 50 .9806316
timer_ 1 16024388 50 .9808934
timer_ 1 16024388 55 .9863033
timer_ 2 16024388 60 .9868324
timer_ 1 16024388 60 .9876585
sumber