lyquor/
lyquor.rs

1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio::sync::Notify;
12
13use lyquor_api::{
14    actor::Stop,
15    anyhow::{self, Context as _Context},
16    config::{DB, NodeConfig},
17    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
18    profile::{Devnet, LyquorProfile, NetworkType},
19    subkey_builder,
20};
21use lyquor_db::{MemDB, RocksDB};
22use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
23use lyquor_lib::{api, lyquid, utils};
24use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
25use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
26use lyquor_vm::log::Log;
27
28subkey_builder!(RootSubKey(
29    ([0x01])-lyquid() => Key,
30    ([0x02])-fco() => Key,
31    ([0x03])-log() => Key,
32    ([0x04])-event_store() => Key,
33));
34
35fn subkey() -> &'static RootSubKey {
36    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
37    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
38}
39
40const ANVIL_STATE_FILENAME: &str = "anvil.state";
41
42static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
43
44#[derive(Debug, Message)]
45#[rtype(result = "()")]
46struct LoadLyquid {
47    id: LyquidID,
48    deps: Vec<LyquidID>,
49}
50
51#[derive(Debug, Message)]
52#[rtype(result = "()")]
53struct SetPool {
54    pool: Addr<lyquid::LyquidPool>,
55}
56
57struct LyquidDiscovery {
58    pool: Option<Addr<lyquid::LyquidPool>>,
59    register: Arc<Notify>,
60    bartender_id: LyquidID,
61    bartender_addr: Address,
62}
63
64lyquor_primitives::debug_struct_name!(LyquidDiscovery);
65
66impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
67    type Result = ResponseFuture<Address>;
68    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
69    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
70        let lyquor_seq::eth::GetContract { id, idx } = msg;
71        if id == self.bartender_id && idx == Some(0) {
72            return Box::pin(std::future::ready(self.bartender_addr));
73        }
74        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
75        let reg = self.register.clone();
76        Box::pin(async move {
77            loop {
78                // FIXME: improve the efficiency of this by filtering the reigster event for
79                // the specific lyquid in bartender
80                // idx=None means latest; else nth deployment
81                if let Some(n) = idx {
82                    match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
83                        Ok(Some(info)) => return info.contract,
84                        Ok(None) => reg.notified().await,
85                        Err(e) => {
86                            tracing::error!("Failed to get lyquid deployment info: {}", e);
87                            // Stall forever for safety
88                            std::future::pending::<()>().await;
89                        }
90                    }
91                } else {
92                    match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
93                        Ok(Some(info)) => return info.contract,
94                        Ok(None) => reg.notified().await,
95                        Err(e) => {
96                            tracing::error!("Failed to get latest lyquid info: {}", e);
97                            std::future::pending::<()>().await;
98                        }
99                    }
100                }
101            }
102        })
103    }
104}
105
106// latest variant handled in GetContract via idx=None
107
108impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
109    type Result = ();
110
111    #[tracing::instrument(level = "trace", skip(msg))]
112    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
113        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
114        if let Some(event) = event {
115            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
116        }
117    }
118}
119
120impl Handler<LoadLyquid> for LyquidDiscovery {
121    type Result = ResponseFuture<()>;
122
123    #[tracing::instrument(level = "trace")]
124    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
125        Box::pin(self.load(msg.id, msg.deps))
126    }
127}
128
129impl Handler<SetPool> for LyquidDiscovery {
130    type Result = ();
131
132    #[tracing::instrument(level = "trace")]
133    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
134        if self.pool.is_some() {
135            return;
136        }
137        let pool = msg.pool;
138        let me = ctx.address();
139        self.pool = Some(pool.clone());
140        ctx.spawn(
141            async move {
142                // TODO: right now we just subscribe to every lyquid known in history, need to change this
143                // to user-configured set (or groups)
144                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
145                    None => {
146                        // FIXME: this could be triggered when bartender has only executed one slot
147                        // upon recovery. There is a race that before bartender repeat the
148                        // execution of the first slot, its state is without image so this call
149                        // will fail.
150                        tracing::error!("Failed to obtain the registered Lyquid list.");
151                        return;
152                    }
153                    Some(lyquids_with_deps) => lyquids_with_deps,
154                };
155                // Now we have the actual dependency list from the bartender
156                for (id, deps) in lyquids_with_deps.into_iter() {
157                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
158                        tracing::error!("Failed to send LoadLyquid message: {}", e);
159                    }
160                }
161            }
162            .into_actor(self),
163        );
164    }
165}
166
167impl LyquidDiscovery {
168    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
169        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
170        let register = self.register.clone();
171        async move {
172            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
173                Ok(result) => match result {
174                    Ok(already_exists) => {
175                        if !already_exists {
176                            tracing::info!("Discovered {}", id);
177                        }
178                    }
179                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
180                },
181                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
182            }
183            register.notify_waiters();
184        }
185    }
186}
187
188impl Handler<Stop> for LyquidDiscovery {
189    type Result = ();
190
191    #[tracing::instrument(level = "trace")]
192    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
193        ctx.stop();
194    }
195}
196
197impl Actor for LyquidDiscovery {
198    type Context = actix::Context<Self>;
199
200    #[tracing::instrument(level = "trace", skip(_ctx))]
201    fn started(&mut self, _ctx: &mut Self::Context) {}
202
203    #[tracing::instrument(level = "trace", skip(_ctx))]
204    fn stopped(&mut self, _ctx: &mut Self::Context) {}
205}
206
207struct DBFlusher {
208    last_bn: u64,
209    db_dir: PathBuf,
210    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
211    fco: Addr<FCO>,
212    anvil_chain: Option<ClientHandle>,
213    flush_interval_secs: u64,
214}
215
216lyquor_primitives::debug_struct_name!(DBFlusher);
217
218impl DBFlusher {
219    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
220        let anvil_chain = self.anvil_chain.clone();
221        let fco = self.fco.clone();
222        let db_dir = self.db_dir.clone();
223        async move {
224            fco.send(lyquor_seq::fco::Commit)
225                .await
226                .map_err(anyhow::Error::from)
227                .and_then(|e| e.map_err(anyhow::Error::from))?;
228
229            if let Some(client) = &anvil_chain {
230                // If running devnet (anvil), dump chain state.
231                //
232                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
233                // not outpace the dumped chain state (i.e., includes execution caused by the
234                // blocks later). Of course, this somehow assume Anvil's API handling has an
235                // ordering (the dump chain state is more recent than that handled previous API
236                // responses).
237                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
238                let dump = client
239                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
240                    .await
241                    .map(|r| r.0);
242
243                match dump {
244                    Ok(state) => {
245                        // Write the anvil state to file
246                        let devnet_path = db_dir.join("devnet");
247                        std::fs::create_dir_all(&devnet_path)?;
248                        std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
249                    }
250                    Err(_) => {
251                        tracing::error!(
252                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
253                        );
254                    }
255                }
256            }
257
258            Ok(())
259        }
260    }
261
262    fn flush(&mut self, res: anyhow::Result<()>) {
263        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
264            tracing::error!("Failed to write: {e}");
265        } else {
266            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
267        }
268    }
269}
270
271#[derive(Message, Debug)]
272#[rtype(result = "()")]
273struct FlushDB;
274
275impl Handler<FlushDB> for DBFlusher {
276    type Result = ResponseActFuture<Self, ()>;
277
278    #[tracing::instrument(level = "trace", skip(self, _ctx))]
279    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
280        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
281            actor.flush(res);
282        }))
283    }
284}
285
286impl Handler<Stop> for DBFlusher {
287    type Result = ResponseActFuture<Self, ()>;
288
289    #[tracing::instrument(level = "trace")]
290    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
291        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
292            actor.flush(res);
293        }))
294    }
295}
296
297impl Actor for DBFlusher {
298    type Context = actix::Context<Self>;
299
300    #[tracing::instrument(level = "trace", skip_all)]
301    fn started(&mut self, ctx: &mut Self::Context) {
302        if self.flush_interval_secs > 0 {
303            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
304            ctx.run_interval(periodic_write, |_act, ctx| {
305                ctx.address().do_send(FlushDB);
306            });
307        }
308    }
309
310    #[tracing::instrument(level = "trace", skip_all)]
311    fn stopped(&mut self, _ctx: &mut Self::Context) {}
312}
313
314async fn setup_eth_backend(
315    profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
316) -> anyhow::Result<eth::Backend> {
317    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
318        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
319
320    Ok(eth::Backend::new(
321        eth::Config::builder()
322            .finality_tag(finality_tag)
323            .registry(reg)
324            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
325            .build(),
326        jsonrpc_client,
327    ))
328}
329
330#[derive(Default)]
331struct Subsystems {
332    fco: Option<Addr<FCO>>,
333    log: Option<Addr<Log>>,
334    pool: Option<Addr<lyquid::LyquidPool>>,
335    api: Option<Addr<api::EthAPIServer>>,
336    db: Option<Addr<DBFlusher>>,
337    dis: Option<Addr<LyquidDiscovery>>,
338    client: Option<ClientHandle>,
339    eth_backend: Option<lyquor_seq::eth::Backend>,
340}
341debug_struct_name!(Subsystems);
342
343impl Subsystems {
344    #[tracing::instrument(level = "trace")]
345    async fn stop(&mut self) {
346        if let Some(api) = self.api.take() {
347            api.send(Stop).await.ok();
348        }
349        if let Some(eth_backend) = self.eth_backend.take() {
350            eth_backend.stop().await;
351        }
352        if let Some(log) = self.log.take() {
353            log.send(Stop).await.ok();
354        }
355        if let Some(dis) = self.dis.take() {
356            dis.send(Stop).await.ok();
357        }
358        if let Some(pool) = self.pool.take() {
359            pool.send(Stop).await.ok();
360        }
361        if let Some(db) = self.db.take() {
362            db.send(Stop).await.ok();
363        }
364        if let Some(dis) = self.dis.take() {
365            dis.send(Stop).await.ok();
366        }
367        if let Some(fco) = self.fco.take() {
368            fco.send(Stop).await.ok();
369        }
370    }
371}
372
373/// TODO: refactor this with actix's Actor model.
374struct Node {
375    sys: Subsystems,
376    config: Arc<NodeConfig>,
377    profile: Arc<LyquorProfile>,
378    seq_eth_url: String,
379    started_anvil: bool,
380    network: Option<lyquor_net::hub::Hub>,
381}
382
383lyquor_primitives::debug_struct_name!(Node);
384
385impl Handler<Stop> for Node {
386    type Result = AtomicResponse<Self, ()>;
387
388    #[tracing::instrument(level = "trace")]
389    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
390        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
391        AtomicResponse::new(Box::pin(
392            async move {
393                sys.stop().await;
394            }
395            .into_actor(self)
396            .map(|_, _act, ctx| {
397                ctx.stop();
398            }),
399        ))
400    }
401}
402
403impl Actor for Node {
404    type Context = Context<Self>;
405
406    #[tracing::instrument(level = "trace", skip(ctx))]
407    fn started(&mut self, ctx: &mut Self::Context) {
408        // NOTE: use spawn here instead of wait so Stop can be handled
409        ctx.spawn(
410            Self::init(
411                self.config.clone(),
412                self.profile.clone(),
413                self.seq_eth_url.clone(),
414                self.started_anvil,
415                self.network.take().expect("Node: Network not found."),
416            )
417            .into_actor(self)
418            .map(|sys, act, ctx| {
419                match sys {
420                    Ok(sys) => {
421                        act.sys = sys;
422                    }
423                    Err(e) => {
424                        tracing::error!("Failed to initialize node: {e:?}");
425                        ctx.stop();
426                    }
427                };
428            }),
429        );
430    }
431
432    #[tracing::instrument(level = "trace", skip(_ctx))]
433    fn stopped(&mut self, _ctx: &mut Self::Context) {}
434}
435
436impl Node {
437    #[tracing::instrument(level = "trace", skip_all)]
438    async fn init(
439        config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
440        network: lyquor_net::hub::Hub,
441    ) -> anyhow::Result<Subsystems> {
442        let mut sys = Subsystems::default();
443
444        // Set up Ethereum sequence backend.
445        let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
446        sys.client = Some(jsonrpc_client.clone());
447
448        //// 1. DB & Storage
449        let (store, lyq_store, anvil_chain): (
450            Arc<ShadowKVStore<Box<dyn KVStore>>>,
451            Arc<dyn utils::LVMStoreFactory>,
452            Option<ClientHandle>,
453        ) = match &config.storage.db {
454            DB::MemDb => {
455                tracing::warn!("using MemDB, all data will be lost upon exit");
456                let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
457                (store, Arc::new(utils::LVMMemStore), None)
458            }
459            DB::RocksDb { db_dir } => {
460                let base_dir = db_dir.clone();
461                let store = Arc::new(ShadowKVStore::new(
462                    Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
463                ));
464                let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
465                    store.clone(),
466                    subkey().lyquid(),
467                ))));
468
469                // Load anvil state if we started our own anvil and state file exists
470                let anvil_chain = if started_anvil {
471                    let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
472                    if state_file.exists() {
473                        use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
474                        let state_data = std::fs::read(&state_file)?;
475                        let load_result = jsonrpc_client
476                            .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
477                            .await?;
478                        tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
479                    } else {
480                        tracing::info!(
481                            "No existing anvil state file found at {}, starting with fresh state.",
482                            state_file.display()
483                        );
484                    }
485                    Some(jsonrpc_client.clone())
486                } else {
487                    None
488                };
489
490                (store, lyq_store, anvil_chain)
491            }
492        };
493        ////
494
495        //// 2. Fate-Constrainted Ordering Subsystem.
496        // Start discovering Lyquid and sequencing contracts.
497        let dis = LyquidDiscovery {
498            pool: None,
499            register: Arc::new(Notify::new()),
500            bartender_id: *profile.bartender_id(),
501            bartender_addr: Address::from_str(profile.bartender_addr())
502                .context("Invalid bartender address in profile.")?
503                .into(),
504        }
505        .start();
506        sys.dis = Some(dis.clone());
507
508        let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
509
510        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
511        let fco = FCO::new(
512            eth_backend.clone(),
513            Arc::new(fco_store),
514            Some(profile.init_chain_position()),
515        )?
516        .start();
517        sys.fco = Some(fco.clone());
518        // Keep a separate reference for shutdown management
519        sys.eth_backend = Some(eth_backend);
520        ////
521
522        //// 3. Lyquor Virtual Machine Subsystem.
523        // Configure Log subsystem.
524        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
525        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
526        sys.log = Some(log.clone());
527
528        let mut lvm = lyquid::LVM::new(
529            log.clone(),
530            network,
531            10,
532            Some(lyquor_vm::InterOps {
533                inter: fco.clone().recipient(),
534                submit: fco.clone().recipient(),
535            }),
536        );
537        let api_subs = api::Subscriptions::new().start();
538        let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
539            Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
540            1000,
541        ));
542        ////
543
544        //// 4. UPC peers
545        for peer in &config.peers {
546            lvm.upc_mut()
547                .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
548                .await
549                .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
550        }
551        ////
552
553        //// 5. Pool of all Lyquids
554        let image_repo_store = RocksDB::new(&config.storage.image_repo_dir)?;
555        let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
556        let bartender_id = *profile.bartender_id();
557        let pool = lyquid::LyquidPool::new(
558            bartender_id,
559            lvm,
560            lyquid::LyquidPoolSetup {
561                fco: fco.clone(),
562                store_factory: lyq_store,
563                image_repo,
564                event_store,
565                api_subs: api_subs.clone(),
566                config: lyquid::LyquidConfig::builder().build(),
567            },
568        )
569        .await?
570        .start();
571        sys.pool = Some(pool.clone());
572        ////
573
574        // Configure bartender.
575        let bartender_setup = {
576            let register_topic = LyteLog::tagged_value_topic("Register");
577            match log
578                .send(lyquor_vm::log::GetNumberOfRecords {
579                    id: bartender_id,
580                    topic: register_topic,
581                })
582                .await
583            {
584                Ok(count) if count > 0 => None,
585                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
586                    id: bartender_id,
587                    topic: register_topic,
588                })),
589                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
590            }
591        };
592
593        // Start the API server - use the same jsonrpc client as sequencer
594        let api = lyquor_lib::api::EthAPIServer::new(
595            &config.network.api_addr,
596            jsonrpc_client.clone(),
597            api_subs,
598            fco.clone(),
599            pool.clone(),
600        )
601        .await?;
602        sys.api = Some(api.start());
603
604        // Wait for bartender to be registered.
605        if let Some(reg) = bartender_setup {
606            tracing::info!("Waiting for bartender to be registered.");
607            reg.await.context("Failed to wait for bartender registration.")?;
608        }
609        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
610        let bartender_ins = pool
611            .send(lyquid::GetBartender)
612            .await
613            .expect("Failed to obtain bartender.");
614        let bartender_ln = bartender_ins
615            .instance()
616            .read()
617            .max_number()
618            .unwrap_or(LyquidNumber::ZERO);
619        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
620        bartender_ins
621            .wait_until(bartender_ln)
622            .await
623            .context("Failed to wait for the initial re-execution of bartender.")?;
624
625        // Finish up LyquidDiscovery.
626        log.send(lyquor_vm::log::Subscribe {
627            id: bartender_id,
628            topic: LyteLog::tagged_value_topic("Register"),
629            subscriber: dis.clone().recipient(),
630        })
631        .await
632        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
633        dis.send(SetPool { pool: pool.clone() })
634            .await
635            .context("Failed to set pool in LyquidDiscovery.")?;
636
637        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
638
639        // Start a DB writer that persist when node is idle.
640        let db = DBFlusher {
641            last_bn: 0,
642            db_dir: config.resolved_db_dir(),
643            store,
644            fco,
645            anvil_chain,
646            flush_interval_secs: config.flush_interval_secs,
647        }
648        .start();
649        sys.db = Some(db);
650
651        Ok(sys)
652    }
653}
654
655fn generate_unused_port() -> anyhow::Result<u16> {
656    let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
657    let port = listener.local_addr().context("Failed to get local address.")?.port();
658    Ok(port)
659}
660
661fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
662    let anvil_port = generate_unused_port()?;
663    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
664    // terminate when dropped)
665    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
666    let mut sigset = SigSet::empty();
667    sigset.add(Signal::SIGINT);
668    let mut old_mask = SigSet::empty();
669    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
670        .context("Failed to mask SIGINT for anvil.")?;
671
672    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
673
674    let stdout_reader = std::io::BufReader::new(
675        anvil
676            .child_mut()
677            .stdout
678            .take()
679            .context("Failed to read from anvil stdout.")?,
680    );
681
682    tokio::task::spawn_blocking(move || {
683        use std::io::BufRead;
684        let mut stdout_lines = stdout_reader.lines();
685        while let Some(line) = stdout_lines.next() {
686            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
687        }
688    });
689
690    tracing::info!("Anvil started at port {}.", anvil_port);
691    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
692        .context("Failed to restore old signal mask for anvil.")?;
693
694    Ok(anvil)
695}
696
697fn parse_override_value(raw: &str) -> Value {
698    if let Ok(v) = raw.parse::<bool>() {
699        return Value::from(v);
700    }
701    if let Ok(v) = raw.parse::<u64>() {
702        return Value::from(v);
703    }
704    if let Ok(v) = raw.parse::<i64>() {
705        return Value::from(v);
706    }
707    if let Ok(v) = raw.parse::<f64>() {
708        return Value::from(v);
709    }
710    Value::from(raw.to_string())
711}
712
713fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
714    let mut segments = key.split('.').peekable();
715    let mut current = overrides;
716
717    while let Some(segment) = segments.next() {
718        if segments.peek().is_none() {
719            current.insert(segment.to_string(), value);
720            return Ok(());
721        }
722
723        let entry = current
724            .entry(segment.to_string())
725            .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
726
727        match entry {
728            Value::Object(dict) => {
729                current = dict;
730            }
731            _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
732        }
733    }
734
735    Ok(())
736}
737
738fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
739    let mut overrides: Map<String, Value> = Map::new();
740
741    if let Some(entries) = matches.get_many::<String>("config-override") {
742        for entry in entries {
743            let (key, value) = entry
744                .split_once('=')
745                .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
746            insert_override_path(&mut overrides, key, parse_override_value(value))?;
747        }
748    }
749
750    Ok(overrides)
751}
752
753#[actix::main]
754async fn main() -> anyhow::Result<()> {
755    // Install panic hook to cleanup anvil on panic
756    std::panic::set_hook(Box::new(|_| {
757        if let Some(&pid) = ANVIL_PID.get() {
758            tracing::warn!("Panic detected, killing anvil process {}", pid);
759            use nix::sys::signal::{Signal, kill};
760            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
761        }
762    }));
763
764    lyquor_cli::setup_tracing()?;
765
766    println!("{}", lyquor_cli::format_logo_banner());
767
768    let matches = clap::command!()
769        .propagate_version(true)
770        .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
771        .arg(
772            clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
773                .required(false)
774                .action(clap::ArgAction::Append)
775                .value_parser(clap::builder::NonEmptyStringValueParser::new()),
776        )
777        .get_matches();
778
779    let config_path = matches.get_one::<String>("config").map(PathBuf::from);
780    let overrides = build_config_overrides(&matches)?;
781    let config = NodeConfig::load(config_path, overrides)?;
782
783    let node_seed = config.node_key.seed_bytes()?;
784    let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
785    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
786        .listen_addr(config.network.upc_addr.clone())
787        .build()
788        .context("Failed to setup network.")?;
789    network.start().await.context("Failed to start network.")?;
790
791    let mut _anvil: Option<AnvilInstance> = None;
792    let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
793        NetworkType::Devnet => {
794            let ws_url = match &config.force_seq_eth {
795                None => {
796                    let anvil = start_devnet_anvil()?;
797                    // Store anvil PID for panic hook
798                    ANVIL_PID.set(anvil.child().id()).ok();
799                    let ep = anvil.ws_endpoint();
800                    _anvil = Some(anvil);
801                    ep
802                }
803                Some(url) => url.to_string(),
804            };
805
806            Devnet::new(ws_url).into()
807        }
808        _ => return Err(anyhow::anyhow!("Network is not supported.")),
809    });
810    let seq_eth_url = config
811        .force_seq_eth
812        .clone()
813        .unwrap_or_else(|| profile.sequencer().to_string());
814
815    // Signal handlers should be registered before starting any actors, so they'll be stopped
816    // gracefully.
817    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
818    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
819
820    let node = Node {
821        sys: Subsystems::default(),
822        config: Arc::new(config),
823        profile: profile.clone(),
824        seq_eth_url,
825        started_anvil: _anvil.is_some(),
826        network: Some(network),
827    }
828    .start();
829
830    tokio::select! {
831        _ = sigint.recv() => {
832            tracing::info!("received SIGINT");
833        }
834        _ = sigterm.recv() => {
835            tracing::info!("received SIGTERM");
836        }
837    }
838
839    Ok(node.send(Stop).await?)
840}