Apa cara terbaik untuk menjalankan fungsi berulang kali setiap x detik?

284

Saya ingin berulang kali menjalankan fungsi dalam Python setiap 60 detik selamanya (seperti NSTimer di Objective C). Kode ini akan berjalan sebagai daemon dan secara efektif seperti memanggil skrip python setiap menit menggunakan cron, tetapi tanpa mengharuskannya diatur oleh pengguna.

Dalam pertanyaan ini tentang cron diimplementasikan dalam Python , solusinya tampaknya secara efektif hanya tidur () selama x detik. Saya tidak membutuhkan fungsi canggih seperti itu jadi mungkin sesuatu seperti ini akan berfungsi

while True:
    # Code executed here
    time.sleep(60)

Apakah ada masalah yang dapat diperkirakan dengan kode ini?

davidmytton
sumber
84
Poin pedantic, tetapi mungkin kritis, kode Anda di atas kode tidak mengeksekusi setiap 60 detik itu menempatkan jarak 60 detik antara eksekusi. Ini hanya terjadi setiap 60 detik jika kode yang Anda eksekusi tidak membutuhkan waktu sama sekali.
Simon
4
juga time.sleep(60)dapat kembali lebih awal dan kemudian
jfs
5
Saya masih bertanya-tanya: Apakah ada masalah dengan kode ini?
Banana
1
"Masalah yang dapat diduga" adalah Anda tidak bisa mengharapkan 60 iterasi per jam hanya dengan menggunakan time.sleep (60). Jadi jika Anda menambahkan satu item per iterasi dan menyimpan daftar panjang yang ditetapkan ... rata-rata daftar itu tidak akan mewakili "periode" waktu yang konsisten; jadi fungsi seperti "moving average" bisa menjadi rujukan titik data yang terlalu tua, yang akan mengubah indikasi Anda.
litepresence
2
@ Banana Ya, Anda dapat mengharapkan masalah apa pun yang disebabkan karena skrip Anda tidak dieksekusi PERSIS setiap 60 detik. Misalnya. Saya mulai melakukan sesuatu seperti ini untuk memecah aliran video dan mengunggahnya, dan akhirnya saya mendapatkan untaian 5-10 ~ detik lebih lama karena antrian media buffering sementara saya memproses data di dalam loop. Itu tergantung pada data Anda. Jika fungsinya semacam pengawas sederhana yang memperingatkan Anda, misalnya, ketika disk Anda penuh, Anda seharusnya tidak memiliki masalah sama sekali dengan hal ini. Jika Anda memeriksa peringatan peringatan pembangkit listrik tenaga nuklir, Anda mungkin berakhir dengan kota. benar-benar meledak x
DGoiko

Jawaban:

229

Jika program Anda belum memiliki loop acara, gunakan modul sched , yang mengimplementasikan penjadwal acara tujuan umum.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Jika Anda telah menggunakan perpustakaan event loop seperti asyncio, trio, tkinter, PyQt5, gobject, kivy, dan banyak lainnya - hanya jadwal tugas menggunakan metode loop acara yang ada perpustakaan, sebagai gantinya.

nosklo
sumber
16
Modul sched adalah fungsi penjadwalan untuk dijalankan setelah beberapa waktu, bagaimana Anda menggunakannya untuk mengulangi panggilan fungsi setiap x detik tanpa menggunakan time.sleep ()?
Baishampayan Ghose
2
@ Baishampayan: Cukup jadwalkan lari baru.
nosklo
3
Kemudian apscheduler di packages.python.org/APScheduler juga harus mendapatkan menyebutkan pada saat ini.
Daniel F
6
catatan: versi ini dapat melayang. Anda bisa menggunakannya enterabs()untuk menghindarinya. Berikut ini adalah versi non-drifting untuk perbandingan .
jfs
8
@JavaSa: karena "kerjakan barangmu " tidak instan dan kesalahan dari time.sleepdapat menumpuk di sini. "jalankan setiap X detik" dan "jalankan dengan penundaan ~ X detik berulang kali" tidak sama. Lihat juga komentar ini
jfs
180

Kunci putaran waktu Anda ke jam sistem seperti ini:

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Dave Rove
sumber
22
+1. milik Anda dan twistedjawabannya adalah satu-satunya jawaban yang menjalankan fungsi setiap xdetik. Sisanya menjalankan fungsi dengan penundaan selama xdetik setelah setiap panggilan.
jfs
13
Jika Anda ingin menambahkan beberapa kode ke ini yang membutuhkan waktu lebih dari satu detik ... Ini akan membuang waktu dan mulai ketinggalan .. Jawaban yang diterima dalam kasus ini benar ... Siapa pun dapat mengulang perintah cetak sederhana dan jalankan setiap detik tanpa penundaan ...
Marah 84
5
Saya lebih suka from time import time, sleepkarena implikasi eksistensial;)
Will
14
Bekerja dengan luar biasa. Tidak perlu mengurangi Anda starttimejika Anda mulai dengan menyinkronkannya ke waktu tertentu: time.sleep(60 - time.time() % 60)telah bekerja dengan baik untuk saya. Saya telah menggunakannya time.sleep(1200 - time.time() % 1200)dan memberi saya log di :00 :20 :40, persis seperti yang saya inginkan.
TemporalWolf
2
@AntonSchigur untuk menghindari penyimpangan setelah beberapa iterasi. Iterasi individu mungkin mulai sedikit cepat atau lambat tergantung pada sleep(), timer()presisi dan berapa lama waktu yang dibutuhkan untuk mengeksekusi tubuh loop tetapi pada iterasi rata-rata selalu terjadi pada batas-batas selang (bahkan jika beberapa dilewati): while keep_doing_it(): sleep(interval - timer() % interval). Bandingkan dengan di while keep_doing_it(): sleep(interval)mana kesalahan dapat terakumulasi setelah beberapa iterasi.
jfs
72

Anda mungkin ingin mempertimbangkan Twisted yang merupakan pustaka jaringan Python yang mengimplementasikan Pola Reactor .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Sementara "while True: sleep (60)" mungkin akan berfungsi Twisted mungkin sudah mengimplementasikan banyak fitur yang pada akhirnya Anda perlukan (daemonisasi, logging atau penanganan pengecualian seperti yang ditunjukkan oleh bobince) dan mungkin akan menjadi solusi yang lebih kuat

Aaron Maenpaa
sumber
Jawaban yang bagus juga, sangat akurat tanpa penyimpangan. Saya bertanya-tanya apakah ini membuat CPU tidur juga sambil menunggu untuk menjalankan tugas (alias tidak sibuk-menunggu)?
smoothware
1
ini melayang di tingkat milidetik
Derek Eden
1
Apa yang dimaksud dengan "melayang di tingkat milidetik"?
Jean-Paul Calderone
Apakah ada cara untuk memutus loop, katakanlah setelah 10 menit? @Aaron Maenpaa
alper
67

Jika Anda ingin cara non-pemblokiran untuk menjalankan fungsi Anda secara berkala, alih-alih pemblokiran tak terbatas saya akan menggunakan timer berulir. Dengan cara ini kode Anda dapat terus berjalan dan melakukan tugas-tugas lain dan masih memiliki fungsi Anda dipanggil setiap n detik. Saya menggunakan teknik ini banyak untuk mencetak info kemajuan pada tugas-tugas panjang, CPU / Disk / Jaringan.

Berikut kode yang saya posting dalam pertanyaan yang sama, dengan kontrol start () dan stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Pemakaian:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Fitur:

  • Hanya perpustakaan standar, tidak ada dependensi eksternal
  • start() dan stop() aman untuk dipanggil beberapa kali bahkan jika timer sudah mulai / berhenti
  • fungsi yang dipanggil dapat memiliki argumen posisi dan bernama
  • Anda dapat mengubah intervalkapan saja, itu akan efektif setelah dijalankan berikutnya. Sama untuk args, kwargsdan bahkan function!
MestreLion
sumber
Solusi ini tampaknya melayang seiring waktu; Saya membutuhkan versi yang bertujuan untuk memanggil fungsi setiap n detik tanpa penyimpangan. Saya akan memposting pembaruan dalam pertanyaan terpisah.
eraoul
Dalam def _run(self)saya mencoba membungkus kepala saya di sekitar mengapa Anda menelepon self.start()sebelumnya self.function(). Bisakah Anda menguraikan? Saya akan berpikir dengan menelepon start()dulu self.is_runningakan selalu Falsedemikian maka kami akan selalu memutar utas baru.
Rich Episcopo
1
Saya pikir saya sampai di dasarnya. Solusi @ MestreLion menjalankan fungsi setiap xdetik (yaitu t = 0, t = 1x, t = 2x, t = 3x, ...) di mana pada poster asli kode sampel menjalankan fungsi dengan interval x detik di antaranya. Juga, solusi ini saya percaya memiliki bug jika intervallebih pendek dari waktu yang dibutuhkan functionuntuk mengeksekusi. Dalam hal ini, self._timerakan ditimpa dalam startfungsi.
Rich Episcopo
Ya, @RichieEpiscopo, panggilan ke .function()setelah .start()adalah menjalankan fungsi pada t = 0. Dan saya tidak berpikir itu akan menjadi masalah jika functionmembutuhkan waktu lebih lama dari itu interval, tapi ya mungkin ada beberapa kondisi balap pada kode.
MestreLion
Ini adalah satu-satunya cara non-pemblokiran yang bisa saya dapatkan. Terima kasih.
backslashN
35

Cara mudah yang saya yakini:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Dengan cara ini kode Anda dieksekusi, maka ia menunggu 60 detik kemudian dieksekusi lagi, menunggu, mengeksekusi, dll ... Tidak perlu menyulitkan hal-hal: D

Itxaka
sumber
Kata kunci True harus ditulis dalam huruf besar
Sean Cain
39
Sebenarnya ini bukan jawabannya: waktu tidur () hanya dapat digunakan untuk menunggu X detik setelah setiap eksekusi. Misalnya, jika fungsi Anda membutuhkan 0,5 detik untuk dieksekusi dan Anda menggunakan time.sleep (1), itu berarti fungsi Anda dieksekusi setiap 1,5 detik, bukan 1. Anda harus menggunakan modul dan / atau utas lainnya untuk memastikan sesuatu berfungsi selama Y kali dalam setiap X detik.
kommradHomer
1
@kommradHomer: Jawaban Dave Rove menunjukkan bahwa Anda dapat menggunakan time.sleep()menjalankan sesuatu setiap X detik
jfs
2
Menurut pendapat saya kode harus memanggil time.sleep()dalam while Truelingkaran seperti:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
Leonard Lepadatu
22
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Jika Anda ingin melakukan ini tanpa memblokir kode yang tersisa, Anda dapat menggunakan ini untuk menjalankannya di utasnya sendiri:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Solusi ini menggabungkan beberapa fitur yang jarang ditemukan digabungkan dalam solusi lain:

  • Penanganan pengecualian: Sejauh mungkin pada tingkat ini, pengecualian ditangani dengan benar, yaitu mendapatkan login untuk tujuan debugging tanpa membatalkan program kami.
  • Tanpa rantai: Implementasi mirip rantai umum (untuk menjadwalkan acara berikutnya) yang Anda temukan dalam banyak jawaban rapuh dalam aspek bahwa jika ada yang salah dalam mekanisme penjadwalan ( threading.Timeratau apa pun), ini akan mengakhiri rantai. Tidak ada eksekusi lebih lanjut akan terjadi kemudian, bahkan jika alasan masalah sudah diperbaiki. Lingkaran sederhana dan menunggu dengan sederhana sleep()jauh lebih kuat dibandingkan.
  • Tidak ada penyimpangan: Solusi saya melacak waktu yang seharusnya dijalankan. Tidak ada penyimpangan tergantung pada waktu eksekusi (seperti dalam banyak solusi lainnya).
  • Melewati: Solusi saya akan melewati tugas jika satu eksekusi terlalu banyak waktu (mis. Lakukan X setiap lima detik, tetapi X butuh 6 detik). Ini adalah perilaku cron standar (dan untuk alasan yang baik). Banyak solusi lain maka cukup jalankan tugas beberapa kali berturut-turut tanpa penundaan. Untuk sebagian besar kasus (mis. Tugas pembersihan) ini tidak diinginkan. Jika ini berharap, cukup gunakan next_time += delaysebagai gantinya.
Alfe
sumber
2
jawaban terbaik untuk tidak hanyut.
Sebastian Stark
1
@ PirateApp Saya akan melakukan ini di utas yang berbeda. Anda bisa melakukannya di utas yang sama tetapi kemudian Anda akhirnya memprogram sistem penjadwalan Anda sendiri yang terlalu rumit untuk dikomentari.
Alfe
1
Dalam Python, berkat GIL, mengakses variabel dalam dua utas sangat aman. Dan hanya membaca dalam dua utas seharusnya tidak menjadi masalah (juga tidak di lingkungan utas lainnya). Hanya menulis dari dua utas berbeda dalam sistem tanpa GIL (mis. Di Java, C ++, dll.) Memerlukan sinkronisasi eksplisit.
Alfe
1
@ user50473 Tanpa informasi lebih lanjut, saya pertama-tama akan mendekati tugas dari sisi berulir. Satu utas membaca data sekarang dan kemudian dan kemudian tidur sampai tiba waktunya untuk melakukannya. Solusi di atas bisa digunakan untuk melakukan ini tentunya. Tapi saya bisa membayangkan banyak alasan untuk pergi ke arah yang berbeda.
Selamat
1
Tidur dapat diganti dengan threading. Jangan sampai menunggu dengan batas waktu agar lebih responsif saat keluar dari aplikasi. stackoverflow.com/questions/29082268/…
themadmax
20

Berikut ini adalah pembaruan kode dari MestreLion yang menghindari drifiting dari waktu ke waktu.

Kelas RepeatTimer di sini memanggil fungsi yang diberikan setiap "interval" detik seperti yang diminta oleh OP; jadwal tidak tergantung pada berapa lama fungsi tersebut dijalankan. Saya suka solusi ini karena tidak memiliki dependensi perpustakaan eksternal; ini hanya python murni.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Penggunaan sampel (disalin dari jawaban MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
eraoul
sumber
5

Saya menghadapi masalah yang sama beberapa waktu lalu. Mungkin http://cronus.readthedocs.org dapat membantu?

Untuk v0.2, cuplikan berikut berfungsi

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Anay
sumber
4

Perbedaan utama antara itu dan cron adalah bahwa pengecualian akan membunuh daemon untuk selamanya. Anda mungkin ingin membungkusnya dengan catcher catcher dan logger.

bobince
sumber
4

Satu jawaban yang mungkin:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
sks
sumber
1
Ini sibuk menunggu karena itu sangat buruk.
Alfe
Solusi yang bagus untuk seseorang yang mencari penghitung waktu yang tidak menghalangi.
Noel
3

Saya akhirnya menggunakan modul jadwal . APInya bagus.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Belajar statistik dengan contoh
sumber
Saya mengalami kesulitan mencoba menggunakan modul ini khususnya, saya harus membuka blokir utas utama, saya telah memeriksa FAQ di situs web dokumentasi jadwal, tetapi saya tidak benar-benar memahami solusi yang disediakan. Adakah yang tahu di mana saya dapat menemukan contoh kerja yang tidak memblokir utas utama?
5Daydreams
1

Saya menggunakan metode Tkinter after (), yang tidak "mencuri permainan" (seperti modul sched yang disajikan sebelumnya), yaitu memungkinkan hal-hal lain berjalan secara paralel:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()dan do_something2()dapat berjalan secara paralel dan dalam kecepatan interval apa pun. Di sini, yang ke-2 akan dieksekusi dua kali lebih cepat. Perhatikan juga bahwa saya telah menggunakan penghitung sederhana sebagai syarat untuk mengakhiri salah satu fungsi. Anda dapat menggunakan contition apa pun yang Anda suka atau tidak ada jika Anda menjalankan fungsi apa saja sampai program berakhir (misalnya jam).

Apostolos
sumber
Hati-hati dengan kata-kata Anda: afterjangan biarkan hal-hal berjalan paralel. Tkinter adalah single-threaded dan hanya dapat melakukan satu hal pada satu waktu. Jika sesuatu yang dijadwalkan afterberjalan, itu tidak berjalan secara paralel dengan sisa kode. Jika keduanya do_something1dan do_something2dijadwalkan untuk berjalan pada saat yang sama, mereka akan berjalan secara berurutan, bukan secara paralel.
Bryan Oakley
@Apostolos semua solusi Anda lakukan adalah untuk menggunakan Tkinter mainloop bukan sched mainloop, sehingga bekerja sama persis dengan cara yang sama tetapi memungkinkan antarmuka Tkinter untuk melanjutkan menanggapi. Jika Anda tidak menggunakan tkinter untuk hal-hal lain maka itu tidak mengubah apa pun berkenaan dengan solusi sched. Anda dapat menggunakan dua atau lebih fungsi terjadwal dengan interval yang berbeda dalam schedsolusi dan itu akan bekerja sama persis dengan Anda.
nosklo
Tidak, itu tidak bekerja dengan cara yang sama. Saya menjelaskan ini. Yang satu "mengunci" program (yaitu menghentikan aliran, Anda tidak dapat melakukan hal lain - bahkan tidak memulai pekerjaan scecduled seperti yang Anda sarankan) sampai selesai dan yang lain membiarkan tangan Anda / bebas gratis (yaitu Anda dapat melakukan hal-hal lain setelah itu dimulai. Anda tidak perlu menunggu sampai selesai. Ini adalah perbedaan besar. Jika Anda telah mencoba metode yang saya sajikan, Anda akan melihat sendiri. Saya telah mencoba milik Anda. Mengapa Anda tidak coba punyaku?
Apostolos
1

Berikut adalah versi yang disesuaikan dengan kode dari MestreLion. Selain fungsi asli, kode ini:

1) tambahkan first_interval yang digunakan untuk memecat timer pada waktu tertentu (penelepon perlu menghitung first_interval dan meneruskannya)

2) menyelesaikan kondisi lomba dalam kode asli. Dalam kode asli, jika utas kontrol gagal membatalkan timer yang berjalan ("Hentikan timer, dan batalkan eksekusi tindakan timer. Ini hanya akan berfungsi jika timer masih dalam tahap menunggu." Dikutip dari https: // docs.python.org/2/library/threading.html ), timer akan berjalan tanpa henti.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
dproc
sumber
1

Ini tampaknya jauh lebih sederhana daripada solusi yang diterima - apakah ada kekurangan yang tidak saya pertimbangkan? Datang ke sini mencari beberapa copy pasta mati-sederhana dan kecewa.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Yang tidak sinkron.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

Memang ada pergeseran dalam arti bahwa jika tugas yang sedang berjalan membutuhkan waktu yang cukup lama, maka intervalnya menjadi 2 detik + waktu tugas, jadi jika Anda perlu penjadwalan yang tepat maka ini bukan untuk Anda.

Perhatikan daemon=Truebenderanya berarti utas ini tidak akan memblokir aplikasi untuk dimatikan. Sebagai contoh, ada masalah di mana pytestakan menggantung tanpa batas setelah menjalankan tes menunggu untuk ini berhenti.

Adam Hughes
sumber
Tidak, hanya mencetak datetime pertama dan kemudian berhenti ...
Alex Poca
Anda yakin - Saya baru saja menyalin dan menempel di terminal. Ini segera kembali tetapi hasil cetak berlanjut di latar belakang untuk saya.
Adam Hughes
Sepertinya saya kehilangan sesuatu di sini. Saya menyalin / disisipkan kode di test.py , dan menjalankan dengan python test.py . Dengan Python2.7 saya harus menghapus daemon = True yang tidak dikenali dan saya membaca banyak cetakan. Dengan Python3.8 berhenti setelah cetak pertama dan tidak ada proses yang aktif setelah akhirnya. Menghapus daemon = Benar Saya membaca banyak cetakan ...
Alex Poca
hmm aneh - Saya menggunakan python 3.6.10 tetapi tidak tahu mengapa itu penting
Adam Hughes
Sekali lagi: Python3.4.2 (Debian GNU / Linux 8 (jessie)), harus menghapus daemon = True sehingga dapat dicetak ganda. Dengan daemon saya mendapatkan kesalahan sintaksis. Tes sebelumnya dengan Python2.7 dan 3.8 ada di Ubuntu 19.10 Mungkinkah daemon diperlakukan berbeda sesuai dengan OS?
Alex Poca
0

Saya menggunakan ini untuk menyebabkan 60 peristiwa per jam dengan sebagian besar peristiwa terjadi pada jumlah detik yang sama setelah satu menit:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

Tergantung pada kondisi aktual Anda mungkin mendapatkan kutu panjang:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

tetapi pada akhir 60 menit Anda akan memiliki 60 ticks; dan kebanyakan dari mereka akan terjadi pada offset yang benar ke menit yang Anda inginkan.

Pada sistem saya, saya mendapatkan penyimpangan khas <1/20 detik hingga kebutuhan untuk koreksi muncul.

Keuntungan dari metode ini adalah resolusi clock drift; yang dapat menyebabkan masalah jika Anda melakukan hal-hal seperti menambahkan satu item per centang dan Anda mengharapkan 60 item ditambahkan per jam. Kegagalan untuk memperhitungkan penyimpangan dapat menyebabkan indikasi sekunder seperti rata-rata bergerak untuk mempertimbangkan data terlalu jauh ke masa lalu yang mengakibatkan output yang salah.

kehadiran kecil
sumber
0

misalnya, Tampilkan waktu setempat saat ini

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
bangkit.riyo
sumber
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
raviGupta
sumber
Atas dasar input pengguna itu akan mengulangi metode itu pada setiap interval waktu.
raviGupta
0

Ini adalah solusi lain tanpa menggunakan perpustakaan tambahan.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
Dat Nguyen
sumber