Saya menulis aplikasi yang memiliki tugas cron yang dijalankan setiap 60 detik. Aplikasi dikonfigurasi untuk menskalakan saat diperlukan ke beberapa contoh. Saya hanya ingin menjalankan tugas pada 1 contoh setiap 60 detik (Pada node mana pun). Di luar kotak saya tidak dapat menemukan solusi untuk ini dan saya terkejut itu belum pernah ditanyakan sebelumnya. Saya menggunakan Spring 4.1.6.
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
spring
spring-scheduled
pengguna3131879
sumber
sumber
CronJob
dikubernetes
?Jawaban:
Ada proyek ShedLock yang melayani tujuan ini dengan tepat. Anda hanya menambahkan catatan tugas yang harus dikunci saat dijalankan
@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }
Konfigurasikan Spring dan LockProvider
@Configuration @EnableScheduling @EnableSchedulerLock(defaultLockAtMostFor = "10m") class MySpringConfiguration { ... @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } ... }
sumber
Saya pikir Anda harus menggunakan Quartz Clustering dengan JDBC-JobStore untuk tujuan ini
sumber
Ini adalah cara sederhana dan kuat lainnya untuk menjalankan pekerjaan dengan aman dalam sebuah cluster. Anda dapat didasarkan pada database dan menjalankan tugas hanya jika node adalah "pemimpin" di cluster.
Juga ketika sebuah node gagal atau shutdown di cluster node lain menjadi pemimpin.
Yang Anda miliki hanyalah membuat mekanisme "pemilihan pemimpin" dan setiap saat untuk memeriksa apakah Anda adalah pemimpinnya:
@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
Ikuti langkah-langkah tersebut:
1. Tentukan objek dan tabel yang menampung satu entri per node di cluster:
@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}'; }
}
2. Buat layanan yang a) masukkan node dalam database, b) periksa pemimpin
@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }
}
3.ping database untuk mengirimkan bahwa Anda masih hidup
@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }
4. Anda siap! Periksa saja apakah Anda pemimpin sebelum menjalankan tugas:
@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
sumber
Tugas batch dan terjadwal biasanya dijalankan di server mandiri mereka sendiri, jauh dari aplikasi yang berhadapan dengan pelanggan, jadi bukan persyaratan umum untuk menyertakan tugas dalam aplikasi yang diharapkan berjalan di cluster. Selain itu, pekerjaan dalam lingkungan berkerumun biasanya tidak perlu khawatir tentang contoh lain dari pekerjaan yang sama yang berjalan secara paralel, jadi alasan lain mengapa isolasi contoh pekerjaan bukanlah persyaratan besar.
Solusi sederhana adalah mengonfigurasi pekerjaan Anda di dalam Profil Musim Semi. Misalnya, jika konfigurasi Anda saat ini adalah:
<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>
ubah menjadi:
<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>
Kemudian, luncurkan aplikasi Anda hanya di satu mesin dengan
scheduled
profil diaktifkan (-Dspring.profiles.active=scheduled
).Jika server utama menjadi tidak tersedia karena alasan tertentu, cukup luncurkan server lain dengan profil diaktifkan dan semuanya akan terus berfungsi dengan baik.
Hal-hal berubah jika Anda juga menginginkan failover otomatis untuk pekerjaan tersebut. Kemudian, Anda harus tetap menjalankan pekerjaan di semua server dan memeriksa sinkronisasi melalui sumber daya umum seperti tabel database, cache berkerumun, variabel JMX, dll.
sumber
get
danset
operasi untuk mencapainya.dlock dirancang untuk menjalankan tugas hanya sekali dengan menggunakan indeks database dan batasan. Anda cukup melakukan sesuatu seperti di bawah ini.
@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }
Lihat artikel tentang menggunakannya.
sumber
Saya menggunakan tabel database untuk melakukan penguncian. Hanya satu tugas dalam satu waktu yang dapat melakukan penyisipan ke tabel. Yang lainnya akan mendapatkan DuplicateKeyException. Logika sisipkan dan hapus ditangani oleh aspek di sekitar anotasi @Scheduled. Saya menggunakan Spring Boot 2.0
@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }
@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }
CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );
sumber
Anda dapat menggunakan penjadwal yang dapat disematkan seperti db-scheduler untuk melakukannya. Ini memiliki eksekusi yang persisten dan menggunakan mekanisme penguncian optimis sederhana untuk menjamin eksekusi oleh satu node.
Contoh kode bagaimana use case dapat dicapai:
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();
sumber
Konteks pegas tidak berkerumun sehingga mengelola tugas dalam aplikasi terdistribusi agak sulit dan Anda perlu menggunakan sistem yang mendukung jgroup untuk menyinkronkan status dan membiarkan tugas Anda diprioritaskan untuk menjalankan tindakan. Atau Anda dapat menggunakan konteks ejb untuk mengelola layanan ha tunggal berkerumun seperti lingkungan jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Atau Anda dapat menggunakan cache berkerumun dan mengakses sumber daya kunci antara layanan dan layanan pertama mengambil kunci akan membentuk tindakan atau mengimplementasikan Anda sendiri jgroup untuk mengkomunikasikan layanan Anda dan melakukan tindakan satu node
sumber